-
-
Couldn't load subscription status.
- Fork 12
message_broker_analysis.md
-
Location:
example/core_io/message_broker/ - Objective: This example implements a fully functional, topic-based Publish/Subscribe (Pub/Sub) message broker. It demonstrates how QB actors can be used to build a robust messaging system with TCP-based client-server communication, custom protocol handling, and efficient message distribution to multiple subscribers.
Key learning points from this example include:
- Implementing a classic Pub/Sub architecture with actors.
- Efficiently fanning out messages to numerous subscribers using shared message payloads (
MessageContainer). - Designing a custom binary protocol for diverse message types (
BrokerProtocol). - Managing distributed state for topics and subscriptions in a central actor.
- Leveraging
qb::string_viewfor zero-copy string processing during internal event forwarding.
The message_broker system comprises a server (the broker) and multiple clients that can publish messages to topics or subscribe to receive messages from topics.
The server architecture is similar in structure to the chat_tcp example, with distinct roles for accepting connections, managing client sessions, and handling the core broker logic.
-
AcceptActor(server/AcceptActor.h/.cpp)- Core (Typical): Core 0.
- Role: Listens for incoming TCP connections on a configured port (e.g., 12345).
-
QB Integration:
qb::Actor,qb::io::use<AcceptActor>::tcp::acceptor. -
Functionality: When a new client connects,
on(accepted_socket_type&& new_io)is called. It then round-robins the newqb::io::tcp::socket(wrapped in aNewSessionEvent) to one of the availableServerActorinstances for session management.
-
ServerActor(server/ServerActor.h/.cpp)- Core (Typical): Core 1 (or a pool across multiple cores, e.g., 1 & 2).
-
Role: Manages a group of active client connections, represented by
BrokerSessioninstances. It acts as an intermediary, forwarding client commands to theTopicManagerActorand relaying messages from theTopicManagerActorback to the appropriate clients. -
QB Integration:
qb::Actor,qb::io::use<ServerActor>::tcp::server<BrokerSession>(which providesio_handlercapabilities). -
Session Creation:
on(NewSessionEvent& evt)callsregisterSession(std::move(evt.socket))to create, start, and manage a newBrokerSession. -
Efficient Command Forwarding: When a
BrokerSessioncalls methods likeserver().handleSubscribe(id(), std::move(msg)), theServerActoroften usesstd::movefor the incomingbroker::Message. It then creates specialized events for theTopicManagerActor(e.g.,SubscribeEvent,PublishEvent) that can utilizebroker::MessageContainer(for shared ownership of message payloads) andstd::string_view(for topic/content if parsed locally before forwarding). This minimizes unnecessary string copying, especially for message content being published.// Simplified from ServerActor::handlePublish // The BrokerSession has already parsed the topic and content into string_views // and put the original message into a MessageContainer. void ServerActor::handlePublish(qb::uuid session_id, broker::MessageContainer&& container, std::string_view topic, std::string_view content) { if (_topic_manager_id.is_valid()) { push<PublishEvent>(_topic_manager_id, session_id, id(), std::move(container), topic, content); } }
-
Delivering Messages to Clients:
on(SendMessageEvent& evt)receives messages from theTopicManagerActor. It looks up the targetBrokerSessionusingevt.target_session_idand sends the actual message payload (accessed viaevt.message_container.message().payload) to the client using the session'soperator<<.
-
BrokerSession(server/BrokerSession.h/.cpp)-
Context: Managed by a
ServerActor, runs on the sameVirtualCore. - Role: Handles I/O and protocol parsing for a single connected client.
-
QB Integration:
qb::io::use<BrokerSession>::tcp::client<ServerActor>,qb::io::use<BrokerSession>::timeout. -
Protocol:
using Protocol = broker::BrokerProtocol<BrokerSession>;(defined inshared/Protocol.h). - **Command Processing (
on(broker::Message msg)):- Receives a fully parsed
broker::Messagefrom itsBrokerProtocol. - Crucially, it takes
broker::Message msgby value to allowstd::movefor efficient forwarding. - Based on
msg.type(SUBSCRIBE,UNSUBSCRIBE,PUBLISH):- For
PUBLISH, it first parses themsg.payloadto extracttopicandcontentasstd::string_views. - It then calls the appropriate handler on its parent
ServerActor(e.g.,server().handleSubscribe(id(), std::move(msg)), or for publish:server().handlePublish(id(), broker::MessageContainer(std::move(msg)), topic_view, content_view);). This passes ownership of the message (or a shared container of it) efficiently.
- For
- Receives a fully parsed
-
Lifecycle: Notifies its
ServerActorviahandleDisconnect()on disconnection or inactivity timeout.
-
Context: Managed by a
-
TopicManagerActor(server/TopicManagerActor.h/.cpp)- Core (Typical): Core 2 (dedicated to core broker logic).
- Role: The heart of the broker. Manages topic subscriptions and efficiently routes published messages to all relevant subscribers.
-
State Management:
-
_sessions: Mapsqb::uuid(client session ID) toSessionInfo { qb::ActorId server_id }(to know whichServerActormanages that session). -
_subscriptions: Mapsqb::string topic_nametostd::set<qb::uuid> subscriber_session_ids. -
_session_topics: Mapsqb::uuid session_idtostd::set<qb::string topic_name>(for cleanup on disconnect).
-
-
Event Handling:
-
on(SubscribeEvent&): Addsevt.session_idto the subscriber set forevt.topic_sv.data(). Sends aRESPONSEmessage back to the client via itsServerActor. -
on(UnsubscribeEvent&): Removes the session from the topic's subscriber set. -
on(PublishEvent&): This is where efficient fan-out happens.- Formats the message to be broadcast (e.g., "topic:publisher_id:content").
-
Creates a single
broker::MessageContainer shared_message(...). This container likely holds astd::shared_ptrto the actual formatted message string/payload. - Looks up all
subscriber_session_ids for the givenevt.topic_sv.data(). - For each subscriber, it retrieves their managing
ServerActor's ID from_sessions. -
pushes aSendMessageEvent(subscriber_session_id, managing_server_id, shared_message)to the managingServerActor. Becauseshared_messageis passed (likely by const-ref or by copying theMessageContainerwhich shares the underlying payload), the actual message data is not copied for each subscriber.
// Simplified from TopicManagerActor::on(PublishEvent& evt) if (_subscriptions.count(topic_key)) { qb::string<512> formatted_msg_content; // Or std::string if payload is large // ... format message content using evt.publisher_id and evt.content_sv ... broker::MessageContainer shared_payload_container( broker::MessageType::MESSAGE, std::string(formatted_msg_content.c_str()) // Convert qb::string to std::string for MessageContainer ); for (qb::uuid subscriber_session_id : _subscriptions.at(topic_key)) { if (_sessions.count(subscriber_session_id)) { qb::ActorId target_server_id = _sessions.at(subscriber_session_id).server_id; push<SendMessageEvent>(target_server_id, subscriber_session_id, shared_payload_container); } } }
-
on(DisconnectEvent&): Removes the disconnected session from all its topic subscriptions and from the_sessionsmap.
-
Similar to the chat_tcp client:
-
InputActor(client/InputActor.h/.cpp): Handles console input, parses basic commands (SUB, UNSUB, PUB, QUIT, HELP), andpushes aBrokerInputEvent(containing the raw command string) to theClientActor. -
ClientActor(client/ClientActor.h/.cpp):- Manages the TCP connection to the broker server using
qb::io::use<ClientActor>::tcp::client<>. - Uses
BrokerProtocolfor message framing:using Protocol = broker::BrokerProtocol<ClientActor>;. -
onInit(): Connects to the server usingqb::io::async::tcp::connect. -
on(BrokerInputEvent&): Further parses the raw command fromInputActor. For example, for "PUB topicname message content", it extracts "topicname" and "message content". - Sends
SUBSCRIBE,UNSUBSCRIBE, orPUBLISHmessages (asbroker::Messageobjects) to the server using*this << broker_message << Protocol::end;. -
on(broker::Message&): HandlesRESPONSEandMESSAGEtypes from the server, printing information to the console. -
on(qb::io::async::event::disconnected const&): Handles disconnection and schedules reconnection attempts usingqb::io::async::callback.
- Manages the TCP connection to the broker server using
-
Publish/Subscribe Architecture: A full implementation of the Pub/Sub pattern using a central
TopicManagerActor. -
Efficient Message Fan-Out (Zero/Minimal Copy for Payloads):
- The
TopicManagerActordemonstrates a crucial optimization: when broadcasting a published message to multiple subscribers, it creates the actual payload data (or a container for it likebroker::MessageContainerwhich likely usesstd::shared_ptr) once. Then, it sends events (SendMessageEvent) to variousServerActors, each containing a reference or a shared pointer to this single payload. This avoids repeatedly copying potentially large message contents for every subscriber, which is critical for performance in systems with many subscribers to a topic. - The use of
std::moveinBrokerSessionwhen callingserver().handlePublish(...)and inServerActorwhen creatingPublishEventhelps transfer ownership of message data efficiently towards theTopicManagerActor.
- The
-
Custom Binary Protocol (
BrokerProtocol): The example definesMessageHeaderandMessageTypeto structure communication, showing how to build more complex protocols than simple delimiter-based ones. -
Use of
std::string_viewfor Intermediate Processing:BrokerSessionandServerActorusestd::string_viewto refer to parts of the message payload (like topic and content) without copying them before the data is packaged into theMessageContaineror specific events for theTopicManagerActor. -
Multi-Core Scalability: Distributing the
AcceptActor,ServerActors (potentially multiple instances on different cores), and theTopicManagerActoracross differentVirtualCores allows the system to handle high connection rates and message throughput by parallelizing work. - Layered Responsibilities: Clear separation of concerns: connection acceptance, session I/O and protocol parsing, and topic/subscription management.
This message_broker example is significantly more advanced than the chat_tcp example in its message handling and provides excellent insights into building high-performance, scalable messaging systems with QB.
(Next Example Analysis: We have covered the main core_io examples. Consider revisiting Developer Guides for broader patterns or the Reference Documentation section.**)