-
-
Notifications
You must be signed in to change notification settings - Fork 12
file_processor_analysis.md
Mohamed edited this page Jun 10, 2025
·
1 revision
-
Location:
example/core_io/file_processor/ -
Objective: This example demonstrates a robust Manager-Worker pattern for handling potentially blocking file I/O operations (reads and writes) in an asynchronous manner within the QB Actor Framework. It showcases how to prevent
VirtualCores from stalling by offloading blocking tasks to dedicated worker actors.
By studying this example, you will understand:
- How to structure a Manager-Worker pattern using actors.
- The technique of wrapping blocking system calls (like file operations) within
qb::io::async::callbackto maintain system responsiveness. - Managing a pool of worker actors and distributing tasks based on availability.
- A request/response flow where workers can reply directly to the original client actor.
The file_processor example uses three main actor types to achieve its goal:
- Core Assignment (Typical): Core 0 (acts as a central dispatcher).
-
Role: The central coordinator and dispatcher. It receives file operation requests from
ClientActor(s), maintains a queue of pending tasks, and assigns these tasks to availableFileWorkeractors. -
QB Integration: Standard
qb::Actor. -
State Management:
-
_available_workers: Anstd::unordered_set<qb::ActorId>storing the IDs ofFileWorkeractors that are currently idle and ready for tasks. -
_read_requests,_write_requests:std::queues holding pendingReadFileRequestandWriteFileRequestevents respectively, when no workers are immediately available.
-
-
Key Event Handling & Logic:
-
on(ReadFileRequest&)andon(WriteFileRequest&):- Receives a request. Assigns a unique
request_idto track it (though in this example, the original request event is forwarded, already containing necessary IDs). - Checks
_available_workers. If a worker is free:- It removes the worker's ID from
_available_workers. - It
pushes the original request event (which includes therequestorID) directly to the chosenFileWorker.
// Simplified from FileManager::on(ReadFileRequest& request) if (!_available_workers.empty()) { qb::ActorId worker_id = *_available_workers.begin(); _available_workers.erase(_available_workers.begin()); qb::io::cout() << "FileManager: Assigning Read Request for \"" << request.filepath.c_str() << "\" to Worker " << worker_id << ".\n"; push<ReadFileRequest>(worker_id, request.filepath.c_str(), request.requestor, request.request_id); } else { _read_requests.push(request); // Forwarding the whole event implies copying or careful event design qb::io::cout() << "FileManager: No worker available for Read Request. Queued.\n"; }
- It removes the worker's ID from
- If no worker is available, the request is enqueued in
_read_requestsor_write_requests(with reads typically prioritized).
- Receives a request. Assigns a unique
-
on(WorkerAvailable&):- Called by a
FileWorkerwhen it finishes a task. - Adds the
worker_idfrom the event back to_available_workers. - Immediately checks the task queues (reads first, then writes). If a task is pending, it dequeues it and assigns it to the now-available worker (similar logic to handling a new request).
- Called by a
-
on(ReadFileResponse&)andon(WriteFileResponse&):- Receives the response event from a
FileWorker. - Crucially, it forwards the entire response event (which contains the result, original
request_id, and the originalrequestorID) to therequestoractor that initiated the operation.
// Inside FileManager::on(ReadFileResponse& response) qb::io::cout() << "FileManager: Forwarding ReadFileResponse for \"" << response.filepath.c_str() << "\" to original requestor " << response.requestor << ".\n"; push<ReadFileResponse>(response.requestor, response.filepath.c_str(), response.data, response.success, response.error_msg.c_str(), response.request_id);
- Receives the response event from a
-
- Core Assignment (Typical): Cores 1+ (can have multiple instances distributed across cores for parallelism).
-
Role: Executes a single file read or write operation. Designed to handle potentially blocking I/O without stalling its
VirtualCore. -
QB Integration: Standard
qb::Actor. - **Initialization (
onInit()):- Stores its manager's ID (
_manager_id). - Sends an initial
WorkerAvailablemessage to theFileManagerto signal its readiness.
- Stores its manager's ID (
- **Key Event Handling & Logic (
on(ReadFileRequest&)andon(WriteFileRequest&)):- Receives a task request from
FileManager. - Sets an internal
_is_busy = true;flag (though not explicitly used to prevent new tasks in this example, asFileManagermanages availability). -
Asynchronous I/O Offload: This is the core of its non-blocking nature. It captures the necessary request details (filepath, data for write, original requestor ID, request ID) into a lambda and schedules this lambda for execution using
qb::io::async::callback([this, captured_request, ...]() { ... });.// Simplified from FileWorker::on(ReadFileRequest& request) auto captured_request = request; // Capture needed data auto file_content = std::make_shared<std::vector<char>>(); // Buffer for read qb::io::async::callback([this, captured_request, file_content]() { if (!this->is_alive()) return; // Actor might have been killed qb::io::sys::file file_op; bool success = false; qb::string<256> error_message; // --- PERFORM BLOCKING FILE I/O --- if (file_op.open(captured_request.filepath.c_str(), O_RDONLY) >= 0) { // ... read file into file_content using file_op.read() ... // ... handle potential read errors ... success = true; file_op.close(); } else { error_message = "Failed to open file for reading"; } // --- END OF BLOCKING I/O --- // Send response DIRECTLY to the original client actor this->push<ReadFileResponse>(captured_request.requestor, captured_request.filepath.c_str(), file_content, success, error_message.c_str(), captured_request.request_id); notifyAvailable(); // Tell FileManager this worker is free });
-
Inside the
async::callbackLambda:- Performs the actual, potentially blocking, file operation using
qb::io::sys::file(open,read/write,close). - Constructs a response event (
ReadFileResponseorWriteFileResponse) containing the result (data read, bytes written, success status, error message), the originalrequest_id, and importantly, therequestorID from the captured request. -
Direct Reply to Client:
pushes the response event directly to the originalrequestor(captured_request.requestor). This bypasses theFileManagerfor the response path, reducing load on the manager. - Calls
notifyAvailable(), whichpushes aWorkerAvailableevent (with its ownid()) back to theFileManager.
- Performs the actual, potentially blocking, file operation using
- Receives a task request from
-
Core Assignment (Typical): Core 0 (can co-exist with
FileManager). - Role: Simulates one or more clients that make requests for file operations.
-
QB Integration: Standard
qb::Actor. -
Interaction Flow:
- In
onInit()or other handlers, sendsReadFileRequestorWriteFileRequestevents to theFileManager. It includes its ownid()as therequestorfield in these requests. - Receives
ReadFileResponseorWriteFileResponseevents directly from theFileWorkerthat processed its specific request. - Processes the response (e.g., prints content to console, checks status).
- Tracks pending requests and can initiate a system shutdown when all its simulated operations are complete.
- In
- Manager-Worker Pattern: A common and effective pattern for distributing tasks and managing a pool of worker components.
-
Asynchronicity for Blocking Operations: Demonstrates the crucial technique of wrapping potentially blocking system calls (like synchronous file I/O) inside
qb::io::async::callback. This prevents theFileWorkeractor from stalling itsVirtualCore, maintaining overall system responsiveness. -
Task Queuing & Basic Load Balancing: The
FileManageracts as a simple load balancer by assigning tasks to the next free worker or by queueing requests if all workers are busy. -
Decoupled Request/Response Flow (Direct Reply): The pattern where a client sends a request to a manager, the manager delegates to a worker, and the worker sends the response directly back to the original client is a powerful way to reduce load on central manager actors and improve response latency. The original
requestorID is passed along through the events to enable this. -
State Management for Workers:
FileWorkers manage their busy/available state implicitly by only notifying theFileManagerwhen they complete a task. -
Multi-Core Parallelism: Multiple
FileWorkeractors can be instantiated on differentVirtualCores, allowing for concurrent file I/O operations (though actual parallelism will be limited by the underlying disk subsystem's performance). -
Custom Event Design: Shows practical examples of events for requests (
ReadFileRequest), responses (ReadFileResponse), and internal coordination (WorkerAvailable).
This example provides a clear blueprint for how to integrate blocking tasks into a QB actor system without sacrificing the benefits of asynchronous processing.
(Next Example Analysis: message_broker Example Analysis**)