diff --git a/CLAUDE.md b/CLAUDE.md index 1bfba81..3404707 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -157,7 +157,7 @@ void processInputs() { #include "asio.hpp" // Third-party -#include "actor-core/actor.hpp" // Local headers +#include - Unified Input Abstraction - -All input to actors follows the **InputSource** interface for unified processing: - -```cpp -template -class InputSource { - virtual std::optional tryTake() = 0; // Pull next item - virtual bool hasInputItems() const = 0; // Check availability -}; -``` - -**Standard implementations:** -- `Subscription` - Messages from topics -- `Timer` - Timer events -- Custom sources - stdin, files, sockets, etc. - -This unified interface enables: -- ✅ Generic processing helpers that work with ANY input source -- ✅ Consistent API across all input types -- ✅ Easy to add new input sources -- ✅ Polymorphic input handling - -### Topic -A typed pub-sub channel that delivers messages from publishers to subscribers. - -```cpp -auto tick_topic = std::make_shared>(); -``` - -### Subscription -An actor's message queue for a specific topic. Implements `InputSource`. - -```cpp -// In actor constructor -tick_sub_ = create_sub(tick_topic); - -// In processInputs() - unified interface -while (auto msg = tick_sub_->tryTake()) { - handleTick(*msg); -} - -// Legacy method still works -while (auto msg = tick_sub_->tryTakeMessage()) { - handleTick(*msg); -} -``` - -### Publisher -A lightweight wrapper for publishing messages to a topic. - -```cpp -// In actor constructor -tick_pub_ = create_pub(tick_topic); - -// Publish a message -tick_pub_->publish(TickMsg{timestamp}); -``` - -### Timer -Periodic or single-shot timers that implement `InputSource`. - -```cpp -// In actor constructor -timer_ = create_timer(timer_factory); -timer_->execute_command(make_periodic_command(std::chrono::seconds(1))); - -// In processInputs() - unified interface -while (auto event = timer_->tryTake()) { - handleTimerEvent(*event); -} - -// Legacy method still works -while (auto event = timer_->tryTakeElapsedEvent()) { - handleTimerEvent(*event); -} -``` - -## Creating an Actor - -Actors inherit from `Actor` using the CRTP pattern and must use the factory method: - -```cpp -#include "actor-core/actor.hpp" - -// Define your message types -struct InputMsg { - std::string data; -}; - -struct OutputMsg { - int result; -}; - -// Create actor class -class MyActor : public Actor { -public: - // Constructor - takes ActorContext as first parameter - MyActor(ActorContext ctx, - TopicPtr input_topic, - TopicPtr output_topic) - : Actor(ctx), - input_sub_(create_sub(input_topic)), - output_pub_(create_pub(output_topic)) {} - - // Required: implement message processing - void processInputs() override { - // Pull and process all pending messages - while (auto msg = input_sub_->tryTakeMessage()) { - handleInput(*msg); - } - } - -private: - void handleInput(const InputMsg& msg) { - // Process the message - int result = msg.data.length(); - - // Publish result - output_pub_->publish(OutputMsg{result}); - } - - SubscriptionPtr input_sub_; - PublisherPtr output_pub_; -}; - -// Factory method is inherited from Actor -// Use: auto actor = MyActor::create(io, input_topic, output_topic); -``` - -## Message Flow - -1. **Publishing**: Any actor publishes a message via `Publisher::publish()` -2. **Topic Distribution**: Topic enqueues message in all subscriber queues -3. **Notification**: Topic posts `processInputs()` call to subscriber's strand (only if queue was empty) -4. **Pull Processing**: Actor wakes up, pulls messages via `Subscription::tryTakeMessage()` in desired order -5. **Processing**: Actor handles messages, may publish new messages in response - -## Complete Example - -```cpp -#include -#include "actor-core/actor.hpp" -#include "actor-core/topic.hpp" -#include "actor-core/timer/timer.hpp" - -// Message definitions -struct Request { int id; }; -struct Response { int id; std::string result; }; - -// Timer types for producer -struct ProducerTag {}; -using ProducerCommand = TimerCommand; -using ProducerElapsed = TimerElapsedEvent; -using ProducerTimer = Timer; - -// Producer actor (timer-driven) -class Producer : public Actor { -public: - Producer(ActorContext ctx, - TopicPtr request_topic, - TimerFactoryPtr timer_factory) - : Actor(ctx), - request_pub_(create_pub(request_topic)), - timer_(create_timer(timer_factory)) { - // Start periodic timer (1 second intervals) - timer_->execute_command(make_periodic_command( - std::chrono::seconds(1) - )); - } - - void processInputs() override { - // Process timer ticks - while (auto event = timer_->tryTakeElapsedEvent()) { - produce(); - } - } - -private: - void produce() { - request_pub_->publish(Request{next_id_++}); - } - - PublisherPtr request_pub_; - std::shared_ptr timer_; - int next_id_ = 0; -}; - -// Worker actor -class Worker : public Actor { -public: - Worker(ActorContext ctx, - TopicPtr request_topic, - TopicPtr response_topic) - : Actor(ctx), - request_sub_(create_sub(request_topic)), - response_pub_(create_pub(response_topic)) {} - - void processInputs() override { - while (auto msg = request_sub_->tryTakeMessage()) { - // Process request - std::string result = "Processed " + std::to_string(msg->id); - response_pub_->publish(Response{msg->id, result}); - } - } - -private: - SubscriptionPtr request_sub_; - PublisherPtr response_pub_; -}; - -// Main -int main() { - asio::io_context io; - - // Create shared resources - auto request_topic = std::make_shared>(); - auto response_topic = std::make_shared>(); - auto timer_factory = std::make_shared(io); - - // Create actors - auto producer = Producer::create(io, request_topic, timer_factory); - auto worker = Worker::create(io, request_topic, response_topic); - - // Run event loop - io.run(); - - return 0; -} -``` - -## Usage Pattern - -Actors must be created using the factory method and follow this structure: - -```cpp -class MyActor : public Actor { -public: - MyActor(ActorContext ctx, /* your dependencies */) - : Actor(ctx) { /* initialize subscriptions/publishers */ } - - void processInputs() override { /* handle messages */ } - -private: - // Your private members -}; - -// Create with factory method -auto actor = MyActor::create(io, dependencies...); -``` - -## Thread Safety - -Actor code runs serialized on its strand - no manual locking needed. Topics are thread-safe for publishing from any thread. Never call actor methods directly - always communicate via topics. - -## Best Practices - -- **Drain message queues**: Use `while (auto msg = sub->tryTakeMessage())` to process all pending messages -- **Keep handlers fast**: Avoid blocking operations in `processInputs()` -- **Inject dependencies**: Pass `TopicPtr` to constructors for better testability -- **Let actors die naturally**: Just drop the `shared_ptr` - cleanup happens automatically - -## Architecture Notes - -### Why Pull-Based? - -- Actors control message processing order -- Simpler backpressure handling (bounded queues) -- No callback spaghetti -- Easy to prioritize different message types - -### Why Strands? - -- Eliminates data races within actors -- No need for actor-level mutexes -- Asio handles scheduling automatically -- Clean separation between actors - -### Why Automatic Memory Management? - -- Topics hold weak references to subscribers -- Actors are automatically removed from topics when destroyed -- No manual cleanup or unsubscribe needed -- Simple actor lifecycle - just let shared_ptr go out of scope - -## Requirements - -- C++17 or later -- Standalone Asio 1.30.2+ (header-only) -- CMake 3.12+ (for building examples) - -## License - -See project root for license information. diff --git a/include/actor-core/actor.hpp b/include/actor-core/actor.hpp deleted file mode 100644 index d0f1175..0000000 --- a/include/actor-core/actor.hpp +++ /dev/null @@ -1,110 +0,0 @@ -#ifndef ACTOR_CORE_ACTOR_HPP -#define ACTOR_CORE_ACTOR_HPP - -#include -#include -#include -#include - -#include "actor-core/processor_interface.hpp" -#include "actor-core/publisher.hpp" -#include "actor-core/subscription.hpp" -#include "actor-core/timer/timer_factory.hpp" -#include "actor-core/topic.hpp" -#include "asio.hpp" - -namespace actor_core { - -// CRTP base class for all actors -// Provides factory pattern, strand management, and subscription lifecycle -template -class Actor : public ProcessorInterface, public std::enable_shared_from_this { - public: - // Execution context wrapper - can only be constructed by factory - // Abstracts away asio implementation details from actor constructors - class ActorContext { - public: - asio::io_context& io_context() { return io_; } - - private: - asio::io_context& io_; - explicit ActorContext(asio::io_context& io) : io_(io) {} - friend class Actor; // Only Actor can construct - }; - - // Factory method - creates actor and finalizes subscriptions - template - static std::shared_ptr create(asio::io_context& io, Args&&... args) { - // Wrap io_context in ActorContext (only factory can do this) - ActorContext ctx{io}; - auto actor = std::make_shared(ctx, std::forward(args)...); - - // Finalize deferred subscriptions (now shared_from_this() works) - actor->finalize(); - - return actor; - } - - protected: - explicit Actor(ActorContext ctx) : strand_(asio::make_strand(ctx.io_context())) {} - virtual ~Actor() = default; - - // Helper for derived classes to create subscriptions - // Returns shared_ptr that can be initialized in initializer list - // Defers actual registration until finalize() is called by factory - template - SubscriptionPtr create_sub(TopicPtr topic) { - auto sub = std::make_shared>(); - - // Defer registration - will be completed in finalize() - deferred_.push_back([this, topic, sub]() { - // Register subscription with topic (now shared_from_this() works) - // Topic holds weak_ptr to actor - auto-cleanup on actor destruction - topic->subscribe(this->shared_from_this(), sub.get(), strand_); - }); - - return sub; - } - - // Helper for derived classes to create publishers - // Returns shared_ptr for symmetric API with create_sub - template - PublisherPtr create_pub(TopicPtr topic) { - return std::make_shared>(topic); - } - - // Helper for derived classes to create timers - // Creates timer immediately but defers subscription until finalize() - template - std::shared_ptr create_timer(TimerFactoryPtr factory) { - auto timer = factory->create(strand_); - - // Defer subscription - will be completed in finalize() - deferred_.push_back([this, timer]() { - // Subscribe this actor to timer events (now shared_from_this() works) - timer->subscribe(this->shared_from_this()); - }); - - return timer; - } - - // Helper for async callbacks - use weak_ptr to avoid keeping actor alive - std::weak_ptr weak_from_this() { return this->shared_from_this(); } - - asio::strand strand_; - - private: - // Finalize deferred subscriptions (called by factory after construction) - void finalize() { - for (auto& fn : deferred_) { - fn(); - } - deferred_.clear(); - } - - std::vector> deferred_; // Deferred subscription registrations -}; - -} // namespace actor_core - -#endif // ACTOR_CORE_ACTOR_HPP diff --git a/include/actor-core/effect_handler.hpp b/include/actor-core/effect_handler.hpp deleted file mode 100644 index 1c1bfc9..0000000 --- a/include/actor-core/effect_handler.hpp +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once - -#include -#include - -namespace actor_core { - -/** - * @brief Dispatches effects from a tuple to an effect handler - * - * Applies handler.handle(element) for each element in the tuple. - * The handler must provide overloads for all tuple element types. - * Compile-time error if handler lacks an overload for any effect type. - * - * This pattern models effects in a functional-programming sense: - * - Tuple of values → batch of pure results or operations - * - EffectHandler → concrete interpreter of what those values mean - * - dispatch_effect() → executes the "effect" by sending each value to handler - * - * @tparam TEffectHandler Type with handle() overloads for each effect type - * @tparam TTuple Tuple type (forwarding reference for flexibility) - * @param handler Effect handler instance - * @param effects Tuple of effects to dispatch - */ -template -void dispatch_effect(TEffectHandler& handler, TTuple&& effects) { - std::apply( - [&](auto&&... elems) { - // C++17 fold expression: expands to handler.handle(elem1), handler.handle(elem2), ... - (handler.handle(std::forward(elems)), ...); - }, - std::forward(effects)); -} - -} // namespace actor_core diff --git a/include/actor-core/input_source.hpp b/include/actor-core/input_source.hpp deleted file mode 100644 index 84ab482..0000000 --- a/include/actor-core/input_source.hpp +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef ACTOR_CORE_INPUT_SOURCE_HPP -#define ACTOR_CORE_INPUT_SOURCE_HPP - -#include - -namespace actor_core { - -/** - * @brief Abstract interface for pull-based input sources - * - * Provides a unified interface for any source of input to an actor. - * All input sources (Subscription, Timer, StdinReader, etc.) implement this contract. - * - * Pattern: - * - Pull-based: actor calls tryTake() to retrieve items - * - Non-blocking: returns std::nullopt if no items available - * - Notification: source notifies processor when items become available - * - * @tparam T The type of items this source provides - */ -template -class InputSource { - public: - virtual ~InputSource() = default; - - /** - * @brief Try to take the next item (non-blocking) - * @return The next item if available, std::nullopt otherwise - * - * This is the unified pull interface - all sources use the same method name. - */ - virtual std::optional tryTake() = 0; - - /** - * @brief Check if input items are available - * @return true if tryTake() would return an item, false otherwise - */ - virtual bool hasInputItems() const = 0; -}; - -} // namespace actor_core - -#endif // ACTOR_CORE_INPUT_SOURCE_HPP diff --git a/include/actor-core/processor_interface.hpp b/include/actor-core/processor_interface.hpp deleted file mode 100644 index d6f5da4..0000000 --- a/include/actor-core/processor_interface.hpp +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef ACTOR_CORE_PROCESSOR_INTERFACE_HPP -#define ACTOR_CORE_PROCESSOR_INTERFACE_HPP - -namespace actor_core { - -// Interface for actors that process events from event sources (topics, timers, etc.) -// Event sources use weak_ptr to this interface to notify actors when events occur -class ProcessorInterface { - public: - virtual ~ProcessorInterface() = default; - - // Called when subscribed event sources have events available - // Actor should pull events from its subscriptions/timers in desired order - virtual void processInputs() = 0; -}; - -} // namespace actor_core - -#endif // ACTOR_CORE_PROCESSOR_INTERFACE_HPP diff --git a/include/actor-core/publisher.hpp b/include/actor-core/publisher.hpp deleted file mode 100644 index 7f9d869..0000000 --- a/include/actor-core/publisher.hpp +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef ACTOR_CORE_PUBLISHER_HPP -#define ACTOR_CORE_PUBLISHER_HPP - -#include - -namespace actor_core { - -template -class Topic; - -template -using TopicPtr = std::shared_ptr>; - -// Publisher interface for sending messages to a topic -// Provides symmetric API to Subscription -template -class Publisher { - public: - explicit Publisher(TopicPtr topic) : topic_(topic) {} - - // Publish a message to the topic - // Takes by value to support both copy (lvalue) and move (rvalue) semantics - void publish(TMessage msg) { topic_->publish(std::move(msg)); } - - private: - TopicPtr topic_; -}; - -// Convenience type alias for shared_ptr> -template -using PublisherPtr = std::shared_ptr>; - -} // namespace actor_core - -#endif // ACTOR_CORE_PUBLISHER_HPP diff --git a/include/actor-core/subscription.hpp b/include/actor-core/subscription.hpp deleted file mode 100644 index f4045d6..0000000 --- a/include/actor-core/subscription.hpp +++ /dev/null @@ -1,94 +0,0 @@ -#ifndef ACTOR_CORE_SUBSCRIPTION_HPP -#define ACTOR_CORE_SUBSCRIPTION_HPP - -#include -#include -#include -#include - -#include "actor-core/input_source.hpp" - -namespace actor_core { - -// Represents an actor's subscription to a specific topic -// Holds a bounded queue of pending messages -// Owned by the actor as a value type -// Implements InputSource interface for unified processing -template -class Subscription : public InputSource { - public: - Subscription() = default; - explicit Subscription(size_t max_queue_size) : max_queue_size_(max_queue_size) {} - - // Non-copyable, movable - Subscription(const Subscription&) = delete; - Subscription& operator=(const Subscription&) = delete; - Subscription(Subscription&&) = default; - Subscription& operator=(Subscription&&) = default; - - // InputSource interface implementation - std::optional tryTake() override { return tryTakeMessage(); } - - bool hasInputItems() const override { return hasMessages(); } - - // Try to take the next message from queue (non-blocking) - // Returns std::nullopt if queue is empty - std::optional tryTakeMessage() { - std::lock_guard lock(mutex_); - if (queue_.empty()) { - return std::nullopt; - } - TMessage msg = queue_.front(); - queue_.pop_front(); - return msg; - } - - // Peek at next message without removing it - std::optional peek() const { - std::lock_guard lock(mutex_); - if (queue_.empty()) { - return std::nullopt; - } - return queue_.front(); - } - - // Check if there are pending messages - bool hasMessages() const { - std::lock_guard lock(mutex_); - return !queue_.empty(); - } - - // Get number of pending messages - size_t pendingCount() const { - std::lock_guard lock(mutex_); - return queue_.size(); - } - - // Internal: Add message to queue (called by Topic::publish) - // Returns true if this is the first message (was empty before) - bool enqueue(TMessage msg) { - std::lock_guard lock(mutex_); - - // If queue is full, drop oldest message - if (queue_.size() >= max_queue_size_) { - queue_.pop_front(); - } - - bool was_empty = queue_.empty(); - queue_.push_back(std::move(msg)); - return was_empty; - } - - private: - mutable std::mutex mutex_; - std::deque queue_; - size_t max_queue_size_{100}; // Default max queue size -}; - -// Convenience type alias for shared_ptr> -template -using SubscriptionPtr = std::shared_ptr>; - -} // namespace actor_core - -#endif // ACTOR_CORE_SUBSCRIPTION_HPP diff --git a/include/actor-core/timer/asio_timer_core.hpp b/include/actor-core/timer/asio_timer_core.hpp deleted file mode 100644 index ee9fac8..0000000 --- a/include/actor-core/timer/asio_timer_core.hpp +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "actor-core/processor_interface.hpp" -#include "actor-core/timer/timer_core.hpp" - -namespace actor_core { - -/** - * @brief Asio-based implementation of ITimerCore - * - * Uses asio::steady_timer for timer functionality. - * Thread-safe for use across multiple strands. - * Notifies a processor when timer events occur. - */ -class AsioTimerCore : public ITimerCore, public std::enable_shared_from_this { - public: - /** - * @brief Construct a new Asio Timer Core - * @param io The io_context for async operations - * @param strand The strand to execute timer callbacks on - */ - AsioTimerCore(asio::io_context& io, asio::strand& strand); - - void subscribe(std::weak_ptr processor) override; - void schedule_single_shot(std::chrono::milliseconds duration_from_now_until_elapsed) override; - void schedule_periodic(std::chrono::milliseconds interval) override; - void cancel_scheduled() override; - bool is_scheduled() const override; - bool tryTakeElapsedEvent() override; - bool hasElapsedEvents() const override; - - private: - void schedule_timer(std::chrono::milliseconds duration); - void on_timer_expired(const asio::error_code& ec); - - asio::steady_timer timer_; - asio::strand& strand_; - std::weak_ptr processor_; - - mutable std::mutex mutex_; - bool is_periodic_{false}; - std::chrono::milliseconds interval_{0}; - bool is_scheduled_{false}; - size_t elapsed_count_{0}; -}; - -} // namespace actor_core diff --git a/include/actor-core/timer/timer.hpp b/include/actor-core/timer/timer.hpp deleted file mode 100644 index 89b1751..0000000 --- a/include/actor-core/timer/timer.hpp +++ /dev/null @@ -1,182 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "actor-core/input_source.hpp" -#include "actor-core/timer/timer_core.hpp" - -namespace actor_core { - -// Forward declaration -class ProcessorInterface; - -/** - * @brief Empty context type for when no context is needed - */ -struct NoContext {}; - -/** - * @brief Timer command structure for controlling timer behavior - */ -template -struct TimerCommand { - enum class Command { - NONE, // No operation - START_SINGLE_SHOT, // Start single-shot timer with specified duration - START_PERIODIC, // Start periodic timer with specified interval - CANCEL // Cancel/reset any scheduled timer - }; - - Command command{Command::NONE}; - std::chrono::milliseconds duration{0}; - TContext context{}; -}; - -/** - * @brief Default timer elapsed event structure - */ -template -struct TimerElapsedEvent { - TContext context{}; -}; - -/** - * @brief Templated timer class with custom event and command handling - * - * This class provides the take_all_elapsed_events() method for consuming elapsed events - * and execute_command() for timer control. By using templates, different timer event types - * and command types can be used without code duplication. - * - * Implements InputSource for unified processing. - */ -template -class Timer : public InputSource { - public: - using ElapsedEventType = TTimerElapsedEvent; - using CommandType = TTimerCommand; - - /** - * @brief Construct a new Timer with provided timer core - * @param timer_core Timer core implementation - */ - explicit Timer(TimerCorePtr timer_core) : timer_core_{std::move(timer_core)} {} - - /** - * @brief Execute a timer command - * @param command Timer command to execute - */ - void execute_command(const TTimerCommand& command) { - current_context_ = command.context; - - switch (command.command) { - case TTimerCommand::Command::START_SINGLE_SHOT: - timer_core_->schedule_single_shot(command.duration); - break; - case TTimerCommand::Command::START_PERIODIC: - timer_core_->schedule_periodic(command.duration); - break; - case TTimerCommand::Command::CANCEL: - timer_core_->cancel_scheduled(); - break; - case TTimerCommand::Command::NONE: - // No operation - break; - } - } - - // InputSource interface implementation - std::optional tryTake() override { return tryTakeElapsedEvent(); } - - bool hasInputItems() const override { return timer_core_->hasElapsedEvents(); } - - /** - * @brief Try to take the next elapsed event (non-blocking) - * @return Optional event if one was available, nullopt if no events pending - */ - std::optional tryTakeElapsedEvent() { - if (timer_core_->tryTakeElapsedEvent()) { - TTimerElapsedEvent event{}; - event.context = current_context_; - return event; - } - return std::nullopt; - } - - /** - * @brief Check if timer is currently scheduled - * @return true if timer is scheduled, false otherwise - */ - bool is_scheduled() const { return timer_core_->is_scheduled(); } - - /** - * @brief Subscribe a processor to receive timer event notifications - * @param processor The processor (forwards to timer core) - */ - void subscribe(std::weak_ptr processor) { timer_core_->subscribe(std::move(processor)); } - - private: - // Helper to extract context type from command - template - static TContext extract_context_type(const TimerCommand&); - using ContextType = decltype(extract_context_type(std::declval())); - - ContextType current_context_{}; - TimerCorePtr timer_core_; -}; - -// Template alias for convenience -template -using TimerPtr = std::shared_ptr>; - -// Factory functions for creating timer commands -template -TimerCommand make_single_shot_command(std::chrono::milliseconds duration, - const TContext& context = {}) { - return {TimerCommand::Command::START_SINGLE_SHOT, duration, context}; -} - -template -TimerCommand make_periodic_command(std::chrono::milliseconds interval, const TContext& context = {}) { - return {TimerCommand::Command::START_PERIODIC, interval, context}; -} - -template -TimerCommand make_cancel_command(const TContext& context = {}) { - return {TimerCommand::Command::CANCEL, std::chrono::milliseconds{0}, context}; -} - -template -TimerCommand make_no_op_command(const TContext& context = {}) { - return {TimerCommand::Command::NONE, std::chrono::milliseconds{0}, context}; -} - -// Utility function for converting command to string -template -std::string to_string(const TimerCommand& command) { - std::ostringstream ss; - ss << "TimerCommand{"; - ss << "command: "; - switch (command.command) { - case TimerCommand::Command::NONE: - ss << "NONE"; - break; - case TimerCommand::Command::START_SINGLE_SHOT: - ss << "START_SINGLE_SHOT"; - break; - case TimerCommand::Command::START_PERIODIC: - ss << "START_PERIODIC"; - break; - case TimerCommand::Command::CANCEL: - ss << "CANCEL"; - break; - } - ss << ", duration: " << command.duration.count() << "ms}"; - return ss.str(); -} - -} // namespace actor_core diff --git a/include/actor-core/timer/timer_core.hpp b/include/actor-core/timer/timer_core.hpp deleted file mode 100644 index 4fde1dc..0000000 --- a/include/actor-core/timer/timer_core.hpp +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include -#include - -namespace actor_core { - -// Forward declaration -class ProcessorInterface; - -/** - * @brief Interface for timer core functionality - * - * This interface abstracts the underlying timer implementation to support - * both real asio timer and mock implementations for testing. - */ -class ITimerCore { - public: - virtual ~ITimerCore() = default; - - /** - * @brief Subscribe a processor to receive timer event notifications - * @param processor The processor (weak_ptr for automatic cleanup) - */ - virtual void subscribe(std::weak_ptr processor) = 0; - - /** - * @brief Schedule a single-shot timer - * @param duration_from_now_until_elapsed Duration from now until the timer elapses - */ - virtual void schedule_single_shot(std::chrono::milliseconds duration_from_now_until_elapsed) = 0; - - /** - * @brief Schedule a periodic timer - * @param interval Interval between timer events - */ - virtual void schedule_periodic(std::chrono::milliseconds interval) = 0; - - /** - * @brief Cancel any scheduled timer - */ - virtual void cancel_scheduled() = 0; - - /** - * @brief Check if timer is currently scheduled - * @return true if timer is scheduled, false otherwise - */ - virtual bool is_scheduled() const = 0; - - /** - * @brief Try to take the next elapsed event (non-blocking) - * @return true if an event was taken, false if no events are pending - */ - virtual bool tryTakeElapsedEvent() = 0; - - /** - * @brief Check if there are pending elapsed events - * @return true if there are events available, false otherwise - */ - virtual bool hasElapsedEvents() const = 0; -}; - -using TimerCorePtr = std::shared_ptr; - -} // namespace actor_core diff --git a/include/actor-core/timer/timer_factory.hpp b/include/actor-core/timer/timer_factory.hpp deleted file mode 100644 index 237fab8..0000000 --- a/include/actor-core/timer/timer_factory.hpp +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include -#include - -#include "actor-core/timer/asio_timer_core.hpp" -#include "actor-core/timer/timer.hpp" -#include "actor-core/timer/timer_core.hpp" - -namespace actor_core { - -/** - * @brief Factory class for creating timers with Asio backend - * - * Holds reference to io_context. The strand is passed per-timer - * since each actor has its own strand. - */ -class TimerFactory { - public: - /** - * @brief Construct a new Timer Factory - * @param io The io_context for async operations - */ - explicit TimerFactory(asio::io_context& io) : io_(io) {} - - /** - * @brief Create a timer with the specified type - * @tparam TTimer Timer type (Timer) - * @param strand The strand for this timer (typically the actor's strand) - * @return Shared pointer to the created timer - */ - template - std::shared_ptr create(asio::strand& strand) { - auto timer_core = std::make_shared(io_, strand); - return std::make_shared(timer_core); - } - - private: - asio::io_context& io_; -}; - -using TimerFactoryPtr = std::shared_ptr; - -} // namespace actor_core diff --git a/include/actor-core/topic.hpp b/include/actor-core/topic.hpp deleted file mode 100644 index e1c0fba..0000000 --- a/include/actor-core/topic.hpp +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef ACTOR_CORE_TOPIC_HPP -#define ACTOR_CORE_TOPIC_HPP - -#include -#include -#include -#include - -#include "actor-core/processor_interface.hpp" -#include "actor-core/subscription.hpp" -#include "asio.hpp" - -namespace actor_core { - -// Forward declarations -template -class Publisher; - -template -class Actor; - -// Topic implements a typed pub-sub channel for a specific message type -// Uses pull-based model: actors pull messages from their TopicSubscription -// Thread-safe: multiple actors can publish concurrently -template -class Topic { - public: - Topic() = default; - - private: - // Grant access to Publisher for publish() - friend class Publisher; - - // Grant access to Actor for subscribe() in create_sub() - template - friend class Actor; - - // Subscribe to this topic - // processor: Actor that will be notified when messages arrive (via weak_ptr) - // subscription: Actor's subscription queue (raw pointer - actor owns it) - // strand: Strand to post notifications on - void subscribe(std::weak_ptr processor, - Subscription* subscription, - asio::strand strand) { - std::lock_guard lock(mutex_); - subscriptions_.push_back({processor, subscription, strand}); - } - - // Unsubscribe a subscription (currently unused - cleanup is lazy via weak_ptr) - void unsubscribe(Subscription* subscription) { - std::lock_guard lock(mutex_); - subscriptions_.erase( - std::remove_if(subscriptions_.begin(), - subscriptions_.end(), - [subscription](const SubscriptionEntry& s) { return s.subscription == subscription; }), - subscriptions_.end()); - } - - // Publish a message to all live subscribers - // Enqueues message in each subscriber's queue - // Notifies processor only if queue was empty (to avoid redundant notifications) - // Thread-safe: multiple threads can call this concurrently - void publish(TMessage msg) { - std::lock_guard lock(mutex_); - - auto it = subscriptions_.begin(); - while (it != subscriptions_.end()) { - if (auto processor = it->processor.lock()) { - // Processor still alive, enqueue message - bool was_empty = it->subscription->enqueue(msg); - - // Only notify if queue transitioned from empty to non-empty - // This avoids spamming notifications while messages are being processed - if (was_empty) { - asio::post(it->strand, [weak_proc = it->processor]() { - if (auto p = weak_proc.lock()) { - p->processInputs(); - } - }); - } - - ++it; - } else { - // Processor is dead, remove from list - it = subscriptions_.erase(it); - } - } - } - - struct SubscriptionEntry { - std::weak_ptr processor; - Subscription* subscription; // Raw pointer - owned by actor - asio::strand strand; - }; - - std::mutex mutex_; // Protects subscriptions_ from concurrent access - std::vector subscriptions_; -}; - -// Convenience type alias for shared_ptr> -template -using TopicPtr = std::shared_ptr>; - -} // namespace actor_core - -#endif // ACTOR_CORE_TOPIC_HPP diff --git a/include/snake/game_engine_actor.hpp b/include/snake/game_engine_actor.hpp index 7e5ba44..9dde5b1 100644 --- a/include/snake/game_engine_actor.hpp +++ b/include/snake/game_engine_actor.hpp @@ -1,28 +1,28 @@ #pragma once #include +#include +#include +#include +#include #include -#include "actor-core/actor.hpp" -#include "actor-core/subscription.hpp" -#include "actor-core/timer/timer.hpp" -#include "actor-core/topic.hpp" #include "snake/control_messages.hpp" #include "snake/game_messages.hpp" namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::Actor; -using actor_core::make_cancel_command; -using actor_core::make_periodic_command; -using actor_core::PublisherPtr; -using actor_core::SubscriptionPtr; -using actor_core::Timer; -using actor_core::TimerCommand; -using actor_core::TimerElapsedEvent; -using actor_core::TimerFactoryPtr; -using actor_core::TopicPtr; +using funkyactors::Actor; +using funkyactors::make_cancel_command; +using funkyactors::make_periodic_command; +using funkyactors::PublisherPtr; +using funkyactors::SubscriptionPtr; +using funkyactors::Timer; +using funkyactors::TimerCommand; +using funkyactors::TimerElapsedEvent; +using funkyactors::TimerFactoryPtr; +using funkyactors::TopicPtr; // Timer type definitions for GameEngineActor struct GameTimerTag {}; diff --git a/include/snake/game_manager_actor.hpp b/include/snake/game_manager_actor.hpp index 212f231..bcad6b2 100644 --- a/include/snake/game_manager_actor.hpp +++ b/include/snake/game_manager_actor.hpp @@ -1,28 +1,28 @@ #pragma once #include +#include +#include +#include +#include #include -#include "actor-core/actor.hpp" -#include "actor-core/subscription.hpp" -#include "actor-core/timer/timer.hpp" -#include "actor-core/topic.hpp" #include "snake/control_messages.hpp" #include "snake/game_messages.hpp" namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::Actor; -using actor_core::make_cancel_command; -using actor_core::make_periodic_command; -using actor_core::PublisherPtr; -using actor_core::SubscriptionPtr; -using actor_core::Timer; -using actor_core::TimerCommand; -using actor_core::TimerElapsedEvent; -using actor_core::TimerFactoryPtr; -using actor_core::TopicPtr; +using funkyactors::Actor; +using funkyactors::make_cancel_command; +using funkyactors::make_periodic_command; +using funkyactors::PublisherPtr; +using funkyactors::SubscriptionPtr; +using funkyactors::Timer; +using funkyactors::TimerCommand; +using funkyactors::TimerElapsedEvent; +using funkyactors::TimerFactoryPtr; +using funkyactors::TopicPtr; // Timer type definitions for GameManagerActor struct RepositionTimerTag {}; diff --git a/include/snake/input_actor.hpp b/include/snake/input_actor.hpp index 0826501..36649ce 100644 --- a/include/snake/input_actor.hpp +++ b/include/snake/input_actor.hpp @@ -1,10 +1,10 @@ #pragma once +#include +#include #include #include -#include "actor-core/actor.hpp" -#include "actor-core/topic.hpp" #include "snake/game_messages.hpp" #include "snake/key.hpp" #include "snake/stdin_reader.hpp" @@ -12,9 +12,9 @@ namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::Actor; -using actor_core::PublisherPtr; -using actor_core::TopicPtr; +using funkyactors::Actor; +using funkyactors::PublisherPtr; +using funkyactors::TopicPtr; /** * @brief Captures and processes user input diff --git a/include/snake/process_helpers.hpp b/include/snake/process_helpers.hpp index ec2d159..94c5abf 100644 --- a/include/snake/process_helpers.hpp +++ b/include/snake/process_helpers.hpp @@ -1,19 +1,19 @@ #pragma once +#include +#include +#include #include #include #include -#include "actor-core/effect_handler.hpp" -#include "actor-core/input_source.hpp" -#include "actor-core/subscription.hpp" #include "funkypipes/details/tuple/separate_tuple_elements.hpp" namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::InputSource; -using actor_core::Subscription; +using funkyactors::InputSource; +using funkyactors::Subscription; /** * @brief Higher-order function to process all messages from a subscription @@ -92,7 +92,7 @@ auto with_effect_handling(TProcessFn&& process_fn, TEffectHandler& effect_handle auto [state_tuple, effects_tuple] = funkypipes::details::separateTupleElements<0>(std::move(result)); // Dispatch effects through effect handler - actor_core::dispatch_effect(effect_handler, effects_tuple); + funkyactors::dispatch_effect(effect_handler, effects_tuple); // Return new state return std::move(std::get<0>(state_tuple)); diff --git a/include/snake/renderer_actor.hpp b/include/snake/renderer_actor.hpp index 3b10165..7a2c08f 100644 --- a/include/snake/renderer_actor.hpp +++ b/include/snake/renderer_actor.hpp @@ -1,24 +1,24 @@ #pragma once #include +#include +#include +#include +#include #include -#include "actor-core/actor.hpp" -#include "actor-core/subscription.hpp" -#include "actor-core/timer/timer.hpp" -#include "actor-core/topic.hpp" #include "snake/game_messages.hpp" namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::Actor; -using actor_core::SubscriptionPtr; -using actor_core::Timer; -using actor_core::TimerCommand; -using actor_core::TimerElapsedEvent; -using actor_core::TimerFactoryPtr; -using actor_core::TopicPtr; +using funkyactors::Actor; +using funkyactors::SubscriptionPtr; +using funkyactors::Timer; +using funkyactors::TimerCommand; +using funkyactors::TimerElapsedEvent; +using funkyactors::TimerFactoryPtr; +using funkyactors::TopicPtr; // Timer type definitions for RendererActor struct FlashTimerTag {}; diff --git a/include/snake/stdin_reader.hpp b/include/snake/stdin_reader.hpp index 65919a9..10bffc4 100644 --- a/include/snake/stdin_reader.hpp +++ b/include/snake/stdin_reader.hpp @@ -4,14 +4,13 @@ #include #include +#include +#include #include #include #include #include -#include "actor-core/input_source.hpp" -#include "actor-core/processor_interface.hpp" - namespace snake { /** @@ -25,7 +24,7 @@ namespace snake { * * Implements InputSource for unified processing. */ -class StdinReader : public actor_core::InputSource { +class StdinReader : public funkyactors::InputSource { public: explicit StdinReader(asio::io_context& io); ~StdinReader(); @@ -41,7 +40,7 @@ class StdinReader : public actor_core::InputSource { * @param processor The processor to notify (weak_ptr for automatic cleanup) * @param strand The strand to post notifications on */ - void subscribe(std::weak_ptr processor, + void subscribe(std::weak_ptr processor, asio::strand strand); /** @@ -92,7 +91,7 @@ class StdinReader : public actor_core::InputSource { static constexpr size_t MAX_QUEUE_SIZE = 100; // Processor notification (like Topic) - std::weak_ptr processor_; + std::weak_ptr processor_; asio::strand strand_; // State diff --git a/src/actor-core/timer/asio_timer_core.cpp b/src/actor-core/timer/asio_timer_core.cpp deleted file mode 100644 index 8083c23..0000000 --- a/src/actor-core/timer/asio_timer_core.cpp +++ /dev/null @@ -1,100 +0,0 @@ -#include "actor-core/timer/asio_timer_core.hpp" - -namespace actor_core { - -AsioTimerCore::AsioTimerCore(asio::io_context& io, asio::strand& strand) - : timer_(io), strand_(strand) {} - -void AsioTimerCore::subscribe(std::weak_ptr processor) { - std::lock_guard lock(mutex_); - processor_ = std::move(processor); -} - -void AsioTimerCore::schedule_single_shot(std::chrono::milliseconds duration_from_now_until_elapsed) { - std::lock_guard lock(mutex_); - - is_periodic_ = false; - interval_ = duration_from_now_until_elapsed; - is_scheduled_ = true; - - schedule_timer(duration_from_now_until_elapsed); -} - -void AsioTimerCore::schedule_periodic(std::chrono::milliseconds interval) { - std::lock_guard lock(mutex_); - - is_periodic_ = true; - interval_ = interval; - is_scheduled_ = true; - - schedule_timer(interval); -} - -void AsioTimerCore::cancel_scheduled() { - std::lock_guard lock(mutex_); - - is_scheduled_ = false; - timer_.cancel(); -} - -bool AsioTimerCore::is_scheduled() const { - std::lock_guard lock(mutex_); - return is_scheduled_; -} - -bool AsioTimerCore::tryTakeElapsedEvent() { - std::lock_guard lock(mutex_); - - if (elapsed_count_ > 0) { - --elapsed_count_; - return true; - } - return false; -} - -bool AsioTimerCore::hasElapsedEvents() const { - std::lock_guard lock(mutex_); - return elapsed_count_ > 0; -} - -void AsioTimerCore::schedule_timer(std::chrono::milliseconds duration) { - timer_.expires_after(duration); - timer_.async_wait(asio::bind_executor(strand_, [weak_self = weak_from_this()](const asio::error_code& ec) { - if (auto self = weak_self.lock()) { - self->on_timer_expired(ec); - } - })); -} - -void AsioTimerCore::on_timer_expired(const asio::error_code& ec) { - if (ec == asio::error::operation_aborted) { - // Timer was cancelled - return; - } - - if (ec) { - // Error occurred - return; - } - - { - std::lock_guard lock(mutex_); - - // Increment elapsed event counter - ++elapsed_count_; - - // If periodic, reschedule - if (is_periodic_ && is_scheduled_) { - schedule_timer(interval_); - } else { - is_scheduled_ = false; - } - } - - // Notify processor that timer event occurred (outside lock) - if (auto processor = processor_.lock()) { - asio::post(strand_, [processor]() { processor->processInputs(); }); - } -} - -} // namespace actor_core diff --git a/src/game_engine_actor.cpp b/src/game_engine_actor.cpp index abe0045..0b310c1 100644 --- a/src/game_engine_actor.cpp +++ b/src/game_engine_actor.cpp @@ -1,9 +1,9 @@ #include "snake/game_engine_actor.hpp" +#include #include #include -#include "actor-core/effect_handler.hpp" #include "funkypipes/bind_front.hpp" #include "funkypipes/details/tuple/separate_tuple_elements.hpp" #include "funkypipes/make_pipe.hpp" diff --git a/src/main.cpp b/src/main.cpp index 010d74c..3852e7e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,10 +1,10 @@ #include +#include +#include +#include #include #include -#include "actor-core/publisher.hpp" -#include "actor-core/timer/timer_factory.hpp" -#include "actor-core/topic.hpp" #include "snake/game_engine_actor.hpp" #include "snake/game_manager_actor.hpp" #include "snake/input_actor.hpp" @@ -12,9 +12,9 @@ #include "snake/renderer_actor.hpp" #include "snake/stdin_reader.hpp" -using actor_core::Publisher; -using actor_core::TimerFactory; -using actor_core::Topic; +using funkyactors::Publisher; +using funkyactors::TimerFactory; +using funkyactors::Topic; int main() { std::cout << "=== Snake Game - Interactive Demo ===\n\n"; diff --git a/src/renderer_actor.cpp b/src/renderer_actor.cpp index 4f8ffc3..52418e5 100644 --- a/src/renderer_actor.cpp +++ b/src/renderer_actor.cpp @@ -1,19 +1,19 @@ #include "snake/renderer_actor.hpp" #include +#include #include #include #include #include -#include "actor-core/timer/timer.hpp" #include "snake/control_messages.hpp" #include "snake/process_helpers.hpp" namespace snake { -using actor_core::make_cancel_command; -using actor_core::make_periodic_command; +using funkyactors::make_cancel_command; +using funkyactors::make_periodic_command; RendererActor::RendererActor(ActorContext ctx, TopicPtr state_topic, diff --git a/src/stdin_reader.cpp b/src/stdin_reader.cpp index abc5729..ba9b1cf 100644 --- a/src/stdin_reader.cpp +++ b/src/stdin_reader.cpp @@ -11,7 +11,7 @@ StdinReader::StdinReader(asio::io_context& io) StdinReader::~StdinReader() { stopReading(); } -void StdinReader::subscribe(std::weak_ptr processor, +void StdinReader::subscribe(std::weak_ptr processor, asio::strand strand) { processor_ = processor; strand_ = strand; diff --git a/tests/mock_actors.hpp b/tests/mock_actors.hpp index ffb85a2..685ff6c 100644 --- a/tests/mock_actors.hpp +++ b/tests/mock_actors.hpp @@ -1,20 +1,20 @@ #pragma once +#include +#include +#include #include #include -#include "actor-core/actor.hpp" -#include "actor-core/subscription.hpp" -#include "actor-core/topic.hpp" #include "snake/control_messages.hpp" #include "snake/game_messages.hpp" namespace snake { // Import actor_core types into snake namespace for convenience -using actor_core::Actor; -using actor_core::SubscriptionPtr; -using actor_core::TopicPtr; +using funkyactors::Actor; +using funkyactors::SubscriptionPtr; +using funkyactors::TopicPtr; /** * @brief Mock subscriber for TickMsg messages diff --git a/tests/test_actors.cpp b/tests/test_actors.cpp index 0677343..3f1bf42 100644 --- a/tests/test_actors.cpp +++ b/tests/test_actors.cpp @@ -1,8 +1,9 @@ #include -#include "actor-core/publisher.hpp" -#include "actor-core/timer/timer_factory.hpp" -#include "actor-core/topic.hpp" +#include +#include +#include + #include "mock_actors.hpp" #include "snake/game_engine_actor.hpp" #include "snake/game_manager_actor.hpp" @@ -11,9 +12,9 @@ #include "snake/renderer_actor.hpp" #include "snake/stdin_reader.hpp" -using actor_core::Publisher; -using actor_core::TimerFactory; -using actor_core::Topic; +using funkyactors::Publisher; +using funkyactors::TimerFactory; +using funkyactors::Topic; using snake::StdinReader; namespace snake { @@ -71,9 +72,18 @@ TEST(ActorTest, GameManagerActor_CoordinatesStartGame) { auto pause_topic = std::make_shared>(); // Create GameManagerActor - auto manager = GameManagerActor::create(io, clock_topic, startgame_topic, reposition_topic, metadata_topic, - tickrate_topic, alivests_topic, summary_req_topic, summary_resp_topic, - gameover_topic, pause_topic, timer_factory); + auto manager = GameManagerActor::create(io, + clock_topic, + startgame_topic, + reposition_topic, + metadata_topic, + tickrate_topic, + alivests_topic, + summary_req_topic, + summary_resp_topic, + gameover_topic, + pause_topic, + timer_factory); // Verify GameManagerActor was created successfully SUCCEED(); @@ -97,8 +107,16 @@ TEST(ActorTest, GameEngineActor_HandlesClockCommands) { auto summary_resp_topic = std::make_shared>(); // Create GameEngineActor - auto engine = GameEngineActor::create(io, direction_topic, state_topic, clock_topic, tickrate_topic, reposition_topic, - alivests_topic, summary_req_topic, summary_resp_topic, timer_factory); + auto engine = GameEngineActor::create(io, + direction_topic, + state_topic, + clock_topic, + tickrate_topic, + reposition_topic, + alivests_topic, + summary_req_topic, + summary_resp_topic, + timer_factory); // Create publisher to send clock commands Publisher clock_pub{clock_topic}; @@ -143,9 +161,18 @@ TEST(ActorTest, GameManagerActor_HandlesPauseToggle) { auto mock_clock_subscriber = MockClockCommandSubscriber::create(io, clock_topic); // Create GameManagerActor - auto manager = GameManagerActor::create(io, clock_topic, startgame_topic, reposition_topic, metadata_topic, - tickrate_topic, alivests_topic, summary_req_topic, summary_resp_topic, - gameover_topic, pause_topic, timer_factory); + auto manager = GameManagerActor::create(io, + clock_topic, + startgame_topic, + reposition_topic, + metadata_topic, + tickrate_topic, + alivests_topic, + summary_req_topic, + summary_resp_topic, + gameover_topic, + pause_topic, + timer_factory); // Create publisher for pause toggle Publisher pause_pub{pause_topic}; @@ -211,9 +238,18 @@ TEST(ActorTest, GameManagerActor_SendsClockCommands) { auto mock_clock_subscriber = MockClockCommandSubscriber::create(io, clock_topic); // Create GameManagerActor - auto manager = GameManagerActor::create(io, clock_topic, startgame_topic, reposition_topic, metadata_topic, - tickrate_topic, alivests_topic, summary_req_topic, summary_resp_topic, - gameover_topic, pause_topic, timer_factory); + auto manager = GameManagerActor::create(io, + clock_topic, + startgame_topic, + reposition_topic, + metadata_topic, + tickrate_topic, + alivests_topic, + summary_req_topic, + summary_resp_topic, + gameover_topic, + pause_topic, + timer_factory); // Create publisher for start game Publisher startgame_pub{startgame_topic};