diff --git a/docs/tutorials/snapshots.rst b/docs/tutorials/snapshots.rst index 1177623..a1845f6 100644 --- a/docs/tutorials/snapshots.rst +++ b/docs/tutorials/snapshots.rst @@ -230,8 +230,276 @@ Troubleshooting - Check topic resolution priority (fault_specific > patterns > default) - Verify regex patterns in config file are correct +Rosbag Capture (Time-Window Recording) +-------------------------------------- + +In addition to JSON snapshots, you can enable **rosbag capture** for "black box" +style recording. This continuously buffers messages in memory and flushes them +to a bag file when a fault is confirmed. + +**Key differences from JSON snapshots:** + +.. list-table:: + :widths: 25 35 40 + :header-rows: 1 + + * - Feature + - JSON Snapshots + - Rosbag Capture + * - Data format + - JSON (human-readable) + - Binary (native ROS 2) + * - Time coverage + - Point-in-time (at confirmation) + - Time window (before + after fault) + * - Message fidelity + - Converted to JSON + - Original serialization preserved + * - Playback + - N/A + - ``ros2 bag play`` + * - Default + - Enabled + - Disabled + +Enabling Rosbag Capture +^^^^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: bash + + ros2 run ros2_medkit_fault_manager fault_manager_node --ros-args \ + -p snapshots.rosbag.enabled:=true \ + -p snapshots.rosbag.duration_sec:=5.0 \ + -p snapshots.rosbag.duration_after_sec:=1.0 + +This captures 5 seconds of data **before** the fault and 1 second **after**. + +Rosbag Configuration Options +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. list-table:: + :widths: 35 15 50 + :header-rows: 1 + + * - Parameter + - Default + - Description + * - ``snapshots.rosbag.enabled`` + - ``false`` + - Enable rosbag capture. When enabled, the system continuously buffers + messages in memory and writes them to a bag file when faults are confirmed. + * - ``snapshots.rosbag.duration_sec`` + - ``5.0`` + - Ring buffer duration in seconds. This determines how much history is + preserved before the fault confirmation. Larger values provide more + context but consume more memory. + * - ``snapshots.rosbag.duration_after_sec`` + - ``1.0`` + - Post-fault recording duration. After a fault is confirmed, recording + continues for this many seconds to capture immediate system response. + * - ``snapshots.rosbag.topics`` + - ``"config"`` + - Topic selection mode: + + - ``"config"`` - Use same topics as JSON snapshots (from config file) + - ``"all"`` or ``"auto"`` - Auto-discover and record all available topics + - ``"explicit"`` - Use only topics from ``include_topics`` list + * - ``snapshots.rosbag.include_topics`` + - ``[]`` + - Explicit list of topics to record (only used when ``topics: "explicit"``). + Example: ``["/odom", "/joint_states", "/cmd_vel"]`` + * - ``snapshots.rosbag.exclude_topics`` + - ``[]`` + - Topics to exclude from recording (applies to all modes). Useful for + filtering high-bandwidth topics like camera images. + * - ``snapshots.rosbag.format`` + - ``"sqlite3"`` + - Bag storage format: ``"sqlite3"`` (default, widely compatible) or + ``"mcap"`` (more efficient compression, requires plugin). + * - ``snapshots.rosbag.storage_path`` + - ``""`` + - Directory for bag files. Empty string uses system temp directory + (``/tmp``). Bags are named ``fault_{code}_{timestamp}/``. + * - ``snapshots.rosbag.auto_cleanup`` + - ``true`` + - Automatically delete bag files when faults are cleared. Set to ``false`` + to retain bags for manual analysis. + * - ``snapshots.rosbag.lazy_start`` + - ``false`` + - Controls when the ring buffer starts recording. See diagram below. + * - ``snapshots.rosbag.max_bag_size_mb`` + - ``50`` + - Maximum size per bag file in MB. When exceeded, rosbag2 creates + additional segment files. + * - ``snapshots.rosbag.max_total_storage_mb`` + - ``500`` + - Total storage limit for all bag files. Oldest bags are automatically + deleted when this limit is exceeded. + +Understanding lazy_start Mode +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``lazy_start`` parameter controls when the ring buffer starts recording: + +**lazy_start: false (default)** - Recording starts immediately at node startup. +Best for development and when you need maximum context for any fault. + +.. uml:: + + @startuml + skinparam backgroundColor transparent + participant "Ring Buffer" as RB + participant "Fault Manager" as FM + + == Node Startup == + RB -> RB : Start recording + note right of RB #LightGreen : Buffer active\n(continuous) + + ... time passes (messages buffered) ... + + == Fault Detected == + FM -> FM : PREFAILED + note right of RB #LightGreen : duration_sec\nof data buffered + + FM -> FM : CONFIRMED + FM -> RB : Flush buffer + RB -> RB : Write pre-fault data + note right of RB #LightBlue : Post-fault\nrecording + + ... duration_after_sec ... + + RB -> RB : Save bag file + note right of RB #LightGreen : Resume\nbuffering + @enduml + +**lazy_start: true** - Recording only starts when a fault enters PREFAILED state. +Saves resources but may miss context if fault confirms before buffer fills. + +.. uml:: + + @startuml + skinparam backgroundColor transparent + participant "Ring Buffer" as RB + participant "Fault Manager" as FM + + == Node Startup == + note right of RB #LightGray : Buffer inactive\n(saving resources) + + ... time passes (no recording) ... + + == Fault Detected == + FM -> FM : PREFAILED + FM -> RB : Start buffer + note right of RB #LightGreen : Recording\nstarts now + + ... buffer filling (may be < duration_sec) ... + + FM -> FM : CONFIRMED + FM -> RB : Flush buffer + RB -> RB : Write pre-fault data + note right of RB #LightBlue : Post-fault\nrecording + + ... duration_after_sec ... + + RB -> RB : Save bag file + note right of RB #LightGray : Buffer\ninactive + @enduml + +**When to use lazy_start: true:** + +- Production systems with limited resources +- When faults have reliable PREFAILED → CONFIRMED progression +- Systems where most faults are debounced (enter PREFAILED first) + +**When to use lazy_start: false:** + +- Development and debugging +- When faults may skip PREFAILED state (severity 3 = CRITICAL) +- When maximum fault context is more important than resource usage + +.. note:: + + The ``"mcap"`` format requires ``rosbag2_storage_mcap`` to be installed. + If not available, use ``"sqlite3"`` (default). + + .. code-block:: bash + + # Install MCAP support (optional) + sudo apt install ros-${ROS_DISTRO}-rosbag2-storage-mcap + +Downloading Rosbag Files +^^^^^^^^^^^^^^^^^^^^^^^^ + +**Via REST API:** + +.. code-block:: bash + + # Download bag archive (.tar.gz containing full bag directory) + curl -O -J http://localhost:8080/api/v1/faults/MOTOR_OVERHEAT/snapshots/bag + + # Extract the archive + tar -xzf fault_MOTOR_OVERHEAT_20260124_153045.tar.gz + + # Play back the bag + ros2 bag play fault_MOTOR_OVERHEAT_1735830000/ + +The REST API returns a compressed tar.gz archive containing the complete bag +directory structure (``metadata.yaml`` and all storage segments). This allows +direct playback with ``ros2 bag play`` after extraction. + +**Via ROS 2 service:** + +.. code-block:: bash + + ros2 service call /fault_manager/get_rosbag ros2_medkit_msgs/srv/GetRosbag \ + "{fault_code: 'MOTOR_OVERHEAT'}" + +Example: Production Configuration +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For production use with conservative resource usage: + +.. code-block:: yaml + + # config/snapshots.yaml + rosbag: + enabled: true + duration_sec: 3.0 + duration_after_sec: 0.5 + topics: "config" # Use same topics as JSON snapshots + lazy_start: true # Save resources until fault detected + format: "sqlite3" + max_bag_size_mb: 25 + max_total_storage_mb: 200 + auto_cleanup: true + + # Exclude high-bandwidth topics + # exclude_topics: + # - /camera/image_raw + # - /pointcloud + +Example: Debugging Configuration +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For development with maximum context: + +.. code-block:: yaml + + rosbag: + enabled: true + duration_sec: 10.0 # 10 seconds before fault + duration_after_sec: 2.0 # 2 seconds after + topics: "config" + lazy_start: false # Always recording + format: "sqlite3" + storage_path: "/var/log/ros2_medkit/rosbags" + max_bag_size_mb: 100 + max_total_storage_mb: 1000 + auto_cleanup: false # Keep bags for analysis + See Also -------- - :doc:`../requirements/specs/faults` - Fault API requirements - `Gateway README `_ - REST API reference +- `config/snapshots.yaml `_ - Full configuration reference diff --git a/src/ros2_medkit_fault_manager/CMakeLists.txt b/src/ros2_medkit_fault_manager/CMakeLists.txt index 3e749fa..175de97 100644 --- a/src/ros2_medkit_fault_manager/CMakeLists.txt +++ b/src/ros2_medkit_fault_manager/CMakeLists.txt @@ -39,6 +39,9 @@ find_package(SQLite3 REQUIRED) find_package(nlohmann_json REQUIRED) # yaml-cpp is required as transitive dependency from ros2_medkit_serialization find_package(yaml-cpp REQUIRED) +# rosbag2 for time-window snapshot recording +find_package(rosbag2_cpp REQUIRED) +find_package(rosbag2_storage REQUIRED) # Library target (shared between executable and tests) add_library(fault_manager_lib STATIC @@ -46,6 +49,7 @@ add_library(fault_manager_lib STATIC src/fault_storage.cpp src/sqlite_fault_storage.cpp src/snapshot_capture.cpp + src/rosbag_capture.cpp src/correlation/types.cpp src/correlation/config_parser.cpp src/correlation/pattern_matcher.cpp @@ -61,6 +65,8 @@ ament_target_dependencies(fault_manager_lib PUBLIC rclcpp ros2_medkit_msgs ros2_medkit_serialization + rosbag2_cpp + rosbag2_storage ) target_link_libraries(fault_manager_lib PUBLIC @@ -123,6 +129,11 @@ if(BUILD_TESTING) target_link_libraries(test_snapshot_capture fault_manager_lib) ament_target_dependencies(test_snapshot_capture rclcpp ros2_medkit_msgs) + # Rosbag capture tests + ament_add_gtest(test_rosbag_capture test/test_rosbag_capture.cpp) + target_link_libraries(test_rosbag_capture fault_manager_lib) + ament_target_dependencies(test_rosbag_capture rclcpp ros2_medkit_msgs) + # Correlation config parser tests ament_add_gtest(test_correlation_config_parser test/test_correlation_config_parser.cpp) target_link_libraries(test_correlation_config_parser fault_manager_lib) @@ -143,6 +154,8 @@ if(BUILD_TESTING) target_link_options(test_sqlite_storage PRIVATE --coverage) target_compile_options(test_snapshot_capture PRIVATE --coverage -O0 -g) target_link_options(test_snapshot_capture PRIVATE --coverage) + target_compile_options(test_rosbag_capture PRIVATE --coverage -O0 -g) + target_link_options(test_rosbag_capture PRIVATE --coverage) target_compile_options(test_correlation_config_parser PRIVATE --coverage -O0 -g) target_link_options(test_correlation_config_parser PRIVATE --coverage) target_compile_options(test_pattern_matcher PRIVATE --coverage -O0 -g) @@ -161,6 +174,12 @@ if(BUILD_TESTING) TARGET test_integration TIMEOUT 60 ) + + add_launch_test( + test/test_rosbag_integration.test.py + TARGET test_rosbag_integration + TIMEOUT 90 + ) endif() ament_package() diff --git a/src/ros2_medkit_fault_manager/config/snapshots.yaml b/src/ros2_medkit_fault_manager/config/snapshots.yaml index 856f83c..ddefcb0 100644 --- a/src/ros2_medkit_fault_manager/config/snapshots.yaml +++ b/src/ros2_medkit_fault_manager/config/snapshots.yaml @@ -47,3 +47,138 @@ patterns: default_topics: - /diagnostics - /rosout + +# ============================================================================= +# Rosbag Capture Configuration +# ============================================================================= +# +# Rosbag capture provides "black box" style recording - a ring buffer continuously +# records configured topics, and when a fault is confirmed, the buffer is flushed +# to a bag file along with a short period of post-fault recording. +# +# This is complementary to JSON snapshots: +# - JSON snapshots: Point-in-time capture, easy to query via REST API +# - Rosbag capture: Time-window recording, full message fidelity, playback support +# +# Usage: +# ros2 run ros2_medkit_fault_manager fault_manager_node \ +# --ros-args -p snapshots.rosbag.enabled:=true \ +# -p snapshots.rosbag.duration_sec:=5.0 +# +# Or via launch file parameters (see examples below) +# +# NOTE: The rosbag configuration below is for DOCUMENTATION PURPOSES ONLY. +# FaultManagerNode currently reads only `fault_specific`, `patterns`, and +# `default_topics` from this YAML file. Rosbag settings must be configured +# via ROS 2 parameters (--ros-args -p snapshots.rosbag.*) or launch files. + +rosbag: + # Enable/disable rosbag capture (default: false) + # When disabled, only JSON snapshots are captured + enabled: false + + # Ring buffer duration in seconds (default: 5.0) + # How many seconds of data to keep before fault confirmation + # Larger values = more context but more memory usage + duration_sec: 5.0 + + # Post-fault recording duration in seconds (default: 1.0) + # Continue recording after fault is confirmed to capture aftermath + duration_after_sec: 1.0 + + # Topic selection mode (default: "config") + # Options: + # "config" - Use same topics as JSON snapshots (from this config file) + # "all" - Record all available topics (WARNING: high memory usage!) + # "auto" - Alias for "all" (auto-discover all topics) + # "explicit" - Use only topics from include_topics list below + topics: "config" + + # Explicit topic list (only used when topics: "explicit") + # include_topics: + # - /joint_states + # - /odom + # - /cmd_vel + + # Topics to exclude from recording (applies to all modes) + # Useful to filter out high-bandwidth topics like camera images + # exclude_topics: + # - /camera/image_raw + # - /camera/depth/image_raw + # - /pointcloud + + # Lazy start mode (default: false) + # When true, ring buffer only starts when fault enters PREFAILED state + # Saves resources but may miss early context if fault confirms quickly + # When false, ring buffer runs continuously from node startup + lazy_start: false + + # Bag file format (default: "sqlite3") + # Options: + # "sqlite3" - Default rosbag2 format, widely compatible + # "mcap" - More efficient, better compression (requires rosbag2_storage_mcap) + format: "sqlite3" + + # Storage path for bag files (default: "" = system temp directory) + # Empty string uses /tmp/rosbag_snapshots/ + # Bag files are named: {fault_code}_{timestamp}/ + storage_path: "" + + # Maximum size per bag file in MB (default: 50) + # If a bag exceeds this size, it will be closed even if still recording + max_bag_size_mb: 50 + + # Maximum total storage for all bag files in MB (default: 500) + # Oldest bags are deleted when this limit is exceeded + max_total_storage_mb: 500 + + # Auto-cleanup bag files when fault is cleared (default: true) + # When true, bag files are deleted when ClearFault is called + # When false, bag files persist until manually deleted or storage limit hit + auto_cleanup: true + +# ============================================================================= +# Example Configurations +# ============================================================================= +# +# Minimal configuration (enable with defaults): +# --------------------------------------------- +# rosbag: +# enabled: true +# +# Production configuration (conservative resources): +# -------------------------------------------------- +# rosbag: +# enabled: true +# duration_sec: 3.0 +# duration_after_sec: 0.5 +# topics: "config" +# lazy_start: true +# max_bag_size_mb: 25 +# max_total_storage_mb: 200 +# auto_cleanup: true +# +# Development/debugging configuration (more context): +# --------------------------------------------------- +# rosbag: +# enabled: true +# duration_sec: 10.0 +# duration_after_sec: 2.0 +# topics: "config" +# lazy_start: false +# format: "mcap" +# storage_path: "/var/log/ros2_medkit/rosbags" +# max_bag_size_mb: 100 +# max_total_storage_mb: 1000 +# auto_cleanup: false +# +# Explicit topics (specific topics only): +# --------------------------------------- +# rosbag: +# enabled: true +# topics: "explicit" +# include_topics: +# - /joint_states +# - /odom +# - /diagnostics +# exclude_topics: [] diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp index a61cea3..3c3bfca 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp @@ -20,10 +20,12 @@ #include "rclcpp/rclcpp.hpp" #include "ros2_medkit_fault_manager/correlation/correlation_engine.hpp" #include "ros2_medkit_fault_manager/fault_storage.hpp" +#include "ros2_medkit_fault_manager/rosbag_capture.hpp" #include "ros2_medkit_fault_manager/snapshot_capture.hpp" #include "ros2_medkit_msgs/msg/fault_event.hpp" #include "ros2_medkit_msgs/srv/clear_fault.hpp" #include "ros2_medkit_msgs/srv/get_faults.hpp" +#include "ros2_medkit_msgs/srv/get_rosbag.hpp" #include "ros2_medkit_msgs/srv/get_snapshots.hpp" #include "ros2_medkit_msgs/srv/report_fault.hpp" @@ -72,6 +74,10 @@ class FaultManagerNode : public rclcpp::Node { void handle_get_snapshots(const std::shared_ptr & request, const std::shared_ptr & response); + /// Handle GetRosbag service request + void handle_get_rosbag(const std::shared_ptr & request, + const std::shared_ptr & response); + /// Create snapshot configuration from parameters SnapshotConfig create_snapshot_config(); @@ -106,6 +112,7 @@ class FaultManagerNode : public rclcpp::Node { rclcpp::Service::SharedPtr get_faults_srv_; rclcpp::Service::SharedPtr clear_fault_srv_; rclcpp::Service::SharedPtr get_snapshots_srv_; + rclcpp::Service::SharedPtr get_rosbag_srv_; rclcpp::TimerBase::SharedPtr auto_confirm_timer_; /// Timer for periodic cleanup of expired correlation data @@ -117,6 +124,9 @@ class FaultManagerNode : public rclcpp::Node { /// Snapshot capture for capturing topic data on fault confirmation std::unique_ptr snapshot_capture_; + /// Rosbag capture for time-window recording (nullptr if disabled) + std::unique_ptr rosbag_capture_; + /// Correlation engine for fault correlation/muting (nullptr if disabled) std::unique_ptr correlation_engine_; }; diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp index 32eacfd..4fcb572 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_storage.hpp @@ -82,6 +82,16 @@ struct SnapshotData { int64_t captured_at_ns{0}; }; +/// Rosbag file metadata for time-window recording +struct RosbagFileInfo { + std::string fault_code; + std::string file_path; + std::string format; ///< "sqlite3" or "mcap" + double duration_sec{0.0}; ///< Total duration of recorded data + size_t size_bytes{0}; ///< File size in bytes + int64_t created_at_ns{0}; ///< Timestamp when bag was created +}; + /// Abstract interface for fault storage backends class FaultStorage { public: @@ -145,6 +155,28 @@ class FaultStorage { virtual std::vector get_snapshots(const std::string & fault_code, const std::string & topic_filter = "") const = 0; + /// Store rosbag file metadata for a fault + /// @param info The rosbag file info to store (replaces any existing entry for fault_code) + virtual void store_rosbag_file(const RosbagFileInfo & info) = 0; + + /// Get rosbag file info for a fault + /// @param fault_code The fault code to get rosbag for + /// @return Rosbag file info if exists, nullopt otherwise + virtual std::optional get_rosbag_file(const std::string & fault_code) const = 0; + + /// Delete rosbag file record and the actual file for a fault + /// @param fault_code The fault code to delete rosbag for + /// @return true if record was deleted, false if not found + virtual bool delete_rosbag_file(const std::string & fault_code) = 0; + + /// Get total size of all stored rosbag files in bytes + /// @return Total size in bytes + virtual size_t get_total_rosbag_storage_bytes() const = 0; + + /// Get all rosbag files ordered by creation time (oldest first) + /// @return Vector of rosbag file info + virtual std::vector get_all_rosbag_files() const = 0; + protected: FaultStorage() = default; FaultStorage(const FaultStorage &) = default; @@ -182,6 +214,12 @@ class InMemoryFaultStorage : public FaultStorage { std::vector get_snapshots(const std::string & fault_code, const std::string & topic_filter = "") const override; + void store_rosbag_file(const RosbagFileInfo & info) override; + std::optional get_rosbag_file(const std::string & fault_code) const override; + bool delete_rosbag_file(const std::string & fault_code) override; + size_t get_total_rosbag_storage_bytes() const override; + std::vector get_all_rosbag_files() const override; + private: /// Update fault status based on debounce counter void update_status(FaultState & state); @@ -189,6 +227,7 @@ class InMemoryFaultStorage : public FaultStorage { mutable std::mutex mutex_; std::map faults_; std::vector snapshots_; + std::map rosbag_files_; DebounceConfig config_; }; diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/rosbag_capture.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/rosbag_capture.hpp new file mode 100644 index 0000000..d709eff --- /dev/null +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/rosbag_capture.hpp @@ -0,0 +1,186 @@ +// Copyright 2026 mfaferek93 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "ros2_medkit_fault_manager/fault_storage.hpp" +#include "ros2_medkit_fault_manager/snapshot_capture.hpp" + +namespace ros2_medkit_fault_manager { + +/// A buffered message stored in the ring buffer +struct BufferedMessage { + std::string topic; + std::string message_type; + std::shared_ptr serialized_data; + int64_t timestamp_ns{0}; +}; + +/// Manages rosbag recording with ring buffer for time-window capture +/// +/// This class implements a "black box" style recording where messages are +/// continuously buffered in memory. When a fault is confirmed, the buffer +/// is flushed to a bag file along with continued recording for a short +/// period after the fault. +/// +/// Lifecycle: +/// - start() begins buffering messages (or lazy_start waits for PREFAILED) +/// - on_fault_confirmed() flushes buffer to bag file +/// - on_fault_cleared() deletes bag file if auto_cleanup enabled +class RosbagCapture { + public: + /// Create rosbag capture + /// @param node ROS 2 node for creating subscriptions and timers + /// @param storage Fault storage for persisting bag file metadata + /// @param config Rosbag configuration + /// @param snapshot_config Snapshot configuration (for topic resolution when topics="config") + RosbagCapture(rclcpp::Node * node, FaultStorage * storage, const RosbagConfig & config, + const SnapshotConfig & snapshot_config); + + ~RosbagCapture(); + + // Non-copyable, non-movable + RosbagCapture(const RosbagCapture &) = delete; + RosbagCapture & operator=(const RosbagCapture &) = delete; + RosbagCapture(RosbagCapture &&) = delete; + RosbagCapture & operator=(RosbagCapture &&) = delete; + + /// Start the ring buffer recording + /// If lazy_start is false, this is called automatically on construction + void start(); + + /// Stop recording and clean up subscriptions + void stop(); + + /// Check if ring buffer is currently running + bool is_running() const; + + /// Called when a fault enters PREFAILED state (for lazy_start mode) + /// @param fault_code The fault code that entered PREFAILED + void on_fault_prefailed(const std::string & fault_code); + + /// Called when a fault is confirmed - flushes buffer to bag file + /// @param fault_code The fault code that was confirmed + void on_fault_confirmed(const std::string & fault_code); + + /// Called when a fault is cleared - deletes bag file if auto_cleanup + /// @param fault_code The fault code that was cleared + void on_fault_cleared(const std::string & fault_code); + + /// Get current configuration + const RosbagConfig & config() const { + return config_; + } + + /// Check if rosbag capture is enabled + bool is_enabled() const { + return config_.enabled; + } + + private: + /// Initialize subscriptions for configured topics + void init_subscriptions(); + + /// Message callback for all subscribed topics + void message_callback(const std::string & topic, const std::string & msg_type, + const std::shared_ptr & msg); + + /// Prune old messages from buffer based on duration_sec + void prune_buffer(); + + /// Resolve which topics to record based on config + std::vector resolve_topics() const; + + /// Get message type for a topic + std::string get_topic_type(const std::string & topic) const; + + /// Flush ring buffer to a bag file + /// @param fault_code The fault code to associate with the bag + /// @return Path to the created bag file, or empty string on failure + std::string flush_to_bag(const std::string & fault_code); + + /// Generate bag file path for a fault + std::string generate_bag_path(const std::string & fault_code) const; + + /// Calculate total size of a bag directory + size_t calculate_bag_size(const std::string & bag_path) const; + + /// Enforce storage limits by deleting oldest bags + void enforce_storage_limits(); + + /// Timer callback for post-fault recording + void post_fault_timer_callback(); + + /// Try to subscribe to a single topic + /// @param topic The topic to subscribe to + /// @return True if subscription was created, false if type couldn't be determined + bool try_subscribe_topic(const std::string & topic); + + /// Timer callback for retrying topic discovery + void discovery_retry_callback(); + + /// Validate storage format and check if plugin is available + /// @throws std::runtime_error if format is invalid or plugin unavailable + void validate_storage_format() const; + + rclcpp::Node * node_; + FaultStorage * storage_; + RosbagConfig config_; + SnapshotConfig snapshot_config_; + + /// Ring buffer for messages + mutable std::mutex buffer_mutex_; + std::deque message_buffer_; + + /// Subscriptions (kept alive for continuous recording) + std::vector subscriptions_; + + /// Running state + std::atomic running_{false}; + + /// Post-fault recording state + std::string current_fault_code_; + std::string current_bag_path_; + rclcpp::TimerBase::SharedPtr post_fault_timer_; + std::atomic recording_post_fault_{false}; + + /// Active writer for current bag (kept open during post-fault recording) + std::unique_ptr active_writer_; + std::mutex writer_mutex_; + std::set created_topics_; + + /// Topic types cache + mutable std::mutex topic_types_mutex_; + std::map topic_types_; + + /// Discovery retry state + rclcpp::TimerBase::SharedPtr discovery_retry_timer_; + std::vector pending_topics_; + int discovery_retry_count_{0}; +}; + +} // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/snapshot_capture.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/snapshot_capture.hpp index e12bf68..08438ab 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/snapshot_capture.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/snapshot_capture.hpp @@ -27,6 +27,46 @@ namespace ros2_medkit_fault_manager { +/// Configuration for rosbag recording (optional time-window capture) +struct RosbagConfig { + /// Whether rosbag recording is enabled (opt-in) + bool enabled{false}; + + /// Duration in seconds to buffer before fault confirmation + double duration_sec{5.0}; + + /// Duration in seconds to continue recording after fault confirmation + double duration_after_sec{1.0}; + + /// Topic selection mode: "config" (reuse JSON config), "all", or comma-separated list + std::string topics{"config"}; + + /// Additional topics to include (added to resolved list) + std::vector include_topics; + + /// Topics to exclude from recording + std::vector exclude_topics; + + /// If true, start ring buffer only when fault enters PREFAILED state + /// If false (default), ring buffer runs continuously from startup + bool lazy_start{false}; + + /// Storage format: "sqlite3" or "mcap" + std::string format{"sqlite3"}; + + /// Path to store bag files (empty = system temp directory) + std::string storage_path; + + /// Maximum size of a single bag file in MB + size_t max_bag_size_mb{50}; + + /// Maximum total storage for all bag files in MB + size_t max_total_storage_mb{500}; + + /// If true, delete bag file when fault is cleared + bool auto_cleanup{true}; +}; + /// Configuration for snapshot capture struct SnapshotConfig { /// Whether snapshot capture is enabled @@ -53,6 +93,9 @@ struct SnapshotConfig { /// Default topics to capture if no specific or pattern match std::vector default_topics; + + /// Rosbag recording configuration (optional) + RosbagConfig rosbag; }; /// Cached message from background subscription diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp index eeb6f47..9b467b8 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/sqlite_fault_storage.hpp @@ -65,6 +65,12 @@ class SqliteFaultStorage : public FaultStorage { std::vector get_snapshots(const std::string & fault_code, const std::string & topic_filter = "") const override; + void store_rosbag_file(const RosbagFileInfo & info) override; + std::optional get_rosbag_file(const std::string & fault_code) const override; + bool delete_rosbag_file(const std::string & fault_code) override; + size_t get_total_rosbag_storage_bytes() const override; + std::vector get_all_rosbag_files() const override; + /// Get the database path const std::string & db_path() const { return db_path_; diff --git a/src/ros2_medkit_fault_manager/package.xml b/src/ros2_medkit_fault_manager/package.xml index f488923..0b9dace 100644 --- a/src/ros2_medkit_fault_manager/package.xml +++ b/src/ros2_medkit_fault_manager/package.xml @@ -15,6 +15,8 @@ ros2_medkit_serialization sqlite3 nlohmann-json-dev + rosbag2_cpp + rosbag2_storage ament_lint_auto ament_lint_common @@ -23,6 +25,8 @@ ament_cmake_gtest launch_testing_ament_cmake launch_testing_ros + sensor_msgs + std_msgs ament_cmake diff --git a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp index c32a37d..d89bfed 100644 --- a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp +++ b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp @@ -14,6 +14,7 @@ #include "ros2_medkit_fault_manager/fault_manager_node.hpp" +#include #include #include #include @@ -28,6 +29,42 @@ namespace ros2_medkit_fault_manager { +namespace { + +/// Maximum allowed length for fault_code +constexpr size_t kMaxFaultCodeLength = 128; + +/// Validate fault_code format +/// @param fault_code The fault code to validate +/// @return Empty string if valid, error message if invalid +std::string validate_fault_code(const std::string & fault_code) { + if (fault_code.empty()) { + return "fault_code cannot be empty"; + } + + if (fault_code.length() > kMaxFaultCodeLength) { + return "fault_code exceeds maximum length of " + std::to_string(kMaxFaultCodeLength); + } + + // Allow only alphanumeric, underscore, hyphen, and dot + // This prevents path traversal (../) and header injection (\r\n, ") + for (char c : fault_code) { + if (!std::isalnum(static_cast(c)) && c != '_' && c != '-' && c != '.') { + return "fault_code contains invalid character '" + std::string(1, c) + + "'. Only alphanumeric, underscore, hyphen, and dot are allowed"; + } + } + + // Prevent path traversal patterns + if (fault_code.find("..") != std::string::npos) { + return "fault_code cannot contain '..'"; + } + + return ""; // Valid +} + +} // namespace + FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node("fault_manager", options) { // Declare and get parameters storage_type_ = declare_parameter("storage_type", "sqlite"); @@ -98,12 +135,23 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node(" handle_get_snapshots(request, response); }); + get_rosbag_srv_ = create_service( + "~/get_rosbag", [this](const std::shared_ptr & request, + const std::shared_ptr & response) { + handle_get_rosbag(request, response); + }); + // Initialize snapshot capture auto snapshot_config = create_snapshot_config(); if (snapshot_config.enabled) { snapshot_capture_ = std::make_unique(this, storage_.get(), snapshot_config); } + // Initialize rosbag capture if enabled + if (snapshot_config.rosbag.enabled) { + rosbag_capture_ = std::make_unique(this, storage_.get(), snapshot_config.rosbag, snapshot_config); + } + // Initialize correlation engine (nullptr if disabled or not configured) correlation_engine_ = create_correlation_engine(); @@ -176,9 +224,10 @@ void FaultManagerNode::handle_report_fault( const std::shared_ptr & request, const std::shared_ptr & response) { // Validate fault_code - if (request->fault_code.empty()) { + std::string validation_error = validate_fault_code(request->fault_code); + if (!validation_error.empty()) { response->accepted = false; - RCLCPP_WARN(get_logger(), "ReportFault rejected: fault_code cannot be empty"); + RCLCPP_WARN(get_logger(), "ReportFault rejected: %s", validation_error.c_str()); return; } @@ -272,6 +321,19 @@ void FaultManagerNode::handle_report_fault( if (just_confirmed && snapshot_capture_) { snapshot_capture_->capture(request->fault_code); } + + // Trigger rosbag capture when fault is confirmed (even if muted) + if (just_confirmed && rosbag_capture_) { + rosbag_capture_->on_fault_confirmed(request->fault_code); + } + + // Handle PREFAILED state for lazy_start rosbag capture + bool just_prefailed = (is_new && fault_after->status == ros2_medkit_msgs::msg::Fault::STATUS_PREFAILED) || + (!is_new && status_before != ros2_medkit_msgs::msg::Fault::STATUS_PREFAILED && + fault_after->status == ros2_medkit_msgs::msg::Fault::STATUS_PREFAILED); + if (just_prefailed && rosbag_capture_) { + rosbag_capture_->on_fault_prefailed(request->fault_code); + } } if (request->event_type == ros2_medkit_msgs::srv::ReportFault::Request::EVENT_FAILED) { @@ -345,9 +407,10 @@ void FaultManagerNode::handle_clear_fault( const std::shared_ptr & request, const std::shared_ptr & response) { // Validate fault_code - if (request->fault_code.empty()) { + std::string validation_error = validate_fault_code(request->fault_code); + if (!validation_error.empty()) { response->success = false; - response->message = "fault_code cannot be empty"; + response->message = validation_error; return; } @@ -367,6 +430,10 @@ void FaultManagerNode::handle_clear_fault( storage_->clear_fault(symptom_code); RCLCPP_DEBUG(get_logger(), "Auto-cleared symptom: %s (root cause: %s)", symptom_code.c_str(), request->fault_code.c_str()); + // Also cleanup rosbag for auto-cleared faults + if (rosbag_capture_) { + rosbag_capture_->on_fault_cleared(symptom_code); + } } response->auto_cleared_codes = auto_cleared_codes; @@ -379,6 +446,11 @@ void FaultManagerNode::handle_clear_fault( RCLCPP_INFO(get_logger(), "Fault cleared: %s (auto-cleared %zu symptoms)", request->fault_code.c_str(), auto_cleared_codes.size()); + // Cleanup rosbag for the main fault (auto_cleanup handled inside RosbagCapture) + if (rosbag_capture_) { + rosbag_capture_->on_fault_cleared(request->fault_code); + } + // Publish EVENT_CLEARED - get the cleared fault to include in event auto fault = storage_->get_fault(request->fault_code); if (fault) { @@ -442,6 +514,57 @@ SnapshotConfig FaultManagerNode::create_snapshot_config() { load_snapshot_config_from_yaml(config_file, config); } + // Rosbag configuration (opt-in) + config.rosbag.enabled = declare_parameter("snapshots.rosbag.enabled", false); + if (config.rosbag.enabled) { + config.rosbag.duration_sec = declare_parameter("snapshots.rosbag.duration_sec", 5.0); + if (config.rosbag.duration_sec <= 0.0) { + RCLCPP_WARN(get_logger(), "snapshots.rosbag.duration_sec must be positive, got %.2f. Using default 5.0s", + config.rosbag.duration_sec); + config.rosbag.duration_sec = 5.0; + } + + config.rosbag.duration_after_sec = declare_parameter("snapshots.rosbag.duration_after_sec", 1.0); + if (config.rosbag.duration_after_sec < 0.0) { + RCLCPP_WARN(get_logger(), "snapshots.rosbag.duration_after_sec must be non-negative, got %.2f. Using 0.0s", + config.rosbag.duration_after_sec); + config.rosbag.duration_after_sec = 0.0; + } + + config.rosbag.topics = declare_parameter("snapshots.rosbag.topics", "config"); + config.rosbag.include_topics = + declare_parameter>("snapshots.rosbag.include_topics", std::vector{}); + config.rosbag.exclude_topics = + declare_parameter>("snapshots.rosbag.exclude_topics", std::vector{}); + + config.rosbag.lazy_start = declare_parameter("snapshots.rosbag.lazy_start", false); + config.rosbag.format = declare_parameter("snapshots.rosbag.format", "sqlite3"); + config.rosbag.storage_path = declare_parameter("snapshots.rosbag.storage_path", ""); + + int64_t max_bag_size = declare_parameter("snapshots.rosbag.max_bag_size_mb", 50); + if (max_bag_size <= 0) { + RCLCPP_WARN(get_logger(), "snapshots.rosbag.max_bag_size_mb must be positive. Using 50MB"); + max_bag_size = 50; + } + config.rosbag.max_bag_size_mb = static_cast(max_bag_size); + + int64_t max_total_storage = declare_parameter("snapshots.rosbag.max_total_storage_mb", 500); + if (max_total_storage <= 0) { + RCLCPP_WARN(get_logger(), "snapshots.rosbag.max_total_storage_mb must be positive. Using 500MB"); + max_total_storage = 500; + } + config.rosbag.max_total_storage_mb = static_cast(max_total_storage); + + config.rosbag.auto_cleanup = declare_parameter("snapshots.rosbag.auto_cleanup", true); + + RCLCPP_INFO(get_logger(), + "Rosbag capture enabled (duration=%.1fs+%.1fs, topics=%s, lazy=%s, format=%s, " + "max_bag=%zuMB, max_total=%zuMB)", + config.rosbag.duration_sec, config.rosbag.duration_after_sec, config.rosbag.topics.c_str(), + config.rosbag.lazy_start ? "true" : "false", config.rosbag.format.c_str(), + config.rosbag.max_bag_size_mb, config.rosbag.max_total_storage_mb); + } + if (config.enabled) { RCLCPP_INFO(get_logger(), "Snapshot capture enabled (background=%s, timeout=%.1fs, max_size=%zu, " @@ -555,9 +678,10 @@ void FaultManagerNode::handle_get_snapshots( const std::shared_ptr & request, const std::shared_ptr & response) { // Validate fault_code - if (request->fault_code.empty()) { + std::string validation_error = validate_fault_code(request->fault_code); + if (!validation_error.empty()) { response->success = false; - response->error_message = "fault_code cannot be empty"; + response->error_message = validation_error; return; } @@ -605,10 +729,69 @@ void FaultManagerNode::handle_get_snapshots( } result["topics"] = topics_json; + // Include rosbag info if available + auto rosbag_info = storage_->get_rosbag_file(request->fault_code); + if (rosbag_info) { + nlohmann::json rosbag_json; + rosbag_json["available"] = true; + rosbag_json["duration_sec"] = rosbag_info->duration_sec; + rosbag_json["size_bytes"] = rosbag_info->size_bytes; + rosbag_json["format"] = rosbag_info->format; + rosbag_json["download_url"] = "/api/v1/faults/" + request->fault_code + "/snapshots/bag"; + result["rosbag"] = rosbag_json; + } else { + result["rosbag"] = {{"available", false}}; + } + response->success = true; response->data = result.dump(); - RCLCPP_DEBUG(get_logger(), "GetSnapshots returned %zu topics for fault '%s'", snapshots.size(), + RCLCPP_DEBUG(get_logger(), "GetSnapshots returned %zu topics for fault '%s' (rosbag=%s)", snapshots.size(), + request->fault_code.c_str(), rosbag_info ? "available" : "not available"); +} + +void FaultManagerNode::handle_get_rosbag(const std::shared_ptr & request, + const std::shared_ptr & response) { + // Validate fault_code + std::string validation_error = validate_fault_code(request->fault_code); + if (!validation_error.empty()) { + response->success = false; + response->error_message = validation_error; + return; + } + + // Check if fault exists + auto fault = storage_->get_fault(request->fault_code); + if (!fault) { + response->success = false; + response->error_message = "Fault not found: " + request->fault_code; + return; + } + + // Get rosbag info from storage + auto rosbag_info = storage_->get_rosbag_file(request->fault_code); + if (!rosbag_info) { + response->success = false; + response->error_message = "No rosbag file available for fault: " + request->fault_code; + return; + } + + // Check if file exists + if (!std::filesystem::exists(rosbag_info->file_path)) { + response->success = false; + response->error_message = "Rosbag file not found on disk: " + rosbag_info->file_path; + // Clean up the stale record + storage_->delete_rosbag_file(request->fault_code); + return; + } + + response->success = true; + response->file_path = rosbag_info->file_path; + response->format = rosbag_info->format; + response->duration_sec = rosbag_info->duration_sec; + response->size_bytes = rosbag_info->size_bytes; + + RCLCPP_DEBUG(get_logger(), "GetRosbag returned file '%s' for fault '%s'", rosbag_info->file_path.c_str(), request->fault_code.c_str()); } diff --git a/src/ros2_medkit_fault_manager/src/fault_storage.cpp b/src/ros2_medkit_fault_manager/src/fault_storage.cpp index 4030321..7eb1861 100644 --- a/src/ros2_medkit_fault_manager/src/fault_storage.cpp +++ b/src/ros2_medkit_fault_manager/src/fault_storage.cpp @@ -15,6 +15,7 @@ #include "ros2_medkit_fault_manager/fault_storage.hpp" #include +#include namespace ros2_medkit_fault_manager { @@ -288,4 +289,72 @@ std::vector InMemoryFaultStorage::get_snapshots(const std::string return result; } +void InMemoryFaultStorage::store_rosbag_file(const RosbagFileInfo & info) { + std::lock_guard lock(mutex_); + + // Delete existing bag file if present (prevent orphaned files on re-confirm) + auto it = rosbag_files_.find(info.fault_code); + if (it != rosbag_files_.end() && it->second.file_path != info.file_path) { + std::error_code ec; + std::filesystem::remove_all(it->second.file_path, ec); + // Ignore errors - file may already be deleted + } + + rosbag_files_[info.fault_code] = info; +} + +std::optional InMemoryFaultStorage::get_rosbag_file(const std::string & fault_code) const { + std::lock_guard lock(mutex_); + + auto it = rosbag_files_.find(fault_code); + if (it == rosbag_files_.end()) { + return std::nullopt; + } + return it->second; +} + +bool InMemoryFaultStorage::delete_rosbag_file(const std::string & fault_code) { + std::lock_guard lock(mutex_); + + auto it = rosbag_files_.find(fault_code); + if (it == rosbag_files_.end()) { + return false; + } + + // Try to delete the actual file + std::error_code ec; + std::filesystem::remove_all(it->second.file_path, ec); + // Ignore errors - file may already be deleted + + rosbag_files_.erase(it); + return true; +} + +size_t InMemoryFaultStorage::get_total_rosbag_storage_bytes() const { + std::lock_guard lock(mutex_); + + size_t total = 0; + for (const auto & [code, info] : rosbag_files_) { + total += info.size_bytes; + } + return total; +} + +std::vector InMemoryFaultStorage::get_all_rosbag_files() const { + std::lock_guard lock(mutex_); + + std::vector result; + result.reserve(rosbag_files_.size()); + for (const auto & [code, info] : rosbag_files_) { + result.push_back(info); + } + + // Sort by creation time (oldest first) + std::sort(result.begin(), result.end(), [](const RosbagFileInfo & a, const RosbagFileInfo & b) { + return a.created_at_ns < b.created_at_ns; + }); + + return result; +} + } // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/src/rosbag_capture.cpp b/src/ros2_medkit_fault_manager/src/rosbag_capture.cpp new file mode 100644 index 0000000..812a94f --- /dev/null +++ b/src/ros2_medkit_fault_manager/src/rosbag_capture.cpp @@ -0,0 +1,712 @@ +// Copyright 2026 mfaferek93 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_fault_manager/rosbag_capture.hpp" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace ros2_medkit_fault_manager { + +namespace { + +/// Custom deleter for rcutils_uint8_array_t that calls rcutils_uint8_array_fini +struct Uint8ArrayDeleter { + void operator()(rcutils_uint8_array_t * array) const { + if (array) { + // Cleanup is best-effort - nothing meaningful to do if it fails during destruction + [[maybe_unused]] rcutils_ret_t ret = rcutils_uint8_array_fini(array); + delete array; + } + } +}; + +/// Create a properly-initialized SerializedBagMessage with RAII-safe memory management +/// @param topic Topic name for the message +/// @param timestamp_ns Timestamp in nanoseconds +/// @param src_msg Source serialized message to copy from +/// @param logger Logger for error reporting +/// @return Shared pointer to bag message, or nullptr on allocation failure +std::shared_ptr create_bag_message(const std::string & topic, + int64_t timestamp_ns, + const rcl_serialized_message_t & src_msg, + const rclcpp::Logger & logger) { + auto bag_msg = std::make_shared(); + bag_msg->topic_name = topic; + bag_msg->recv_timestamp = timestamp_ns; + bag_msg->send_timestamp = timestamp_ns; + + // Create serialized_data with custom deleter that calls rcutils_uint8_array_fini + auto serialized_data = std::shared_ptr(new rcutils_uint8_array_t(), Uint8ArrayDeleter{}); + + // Initialize with rcutils (RAII-safe allocation) + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + rcutils_ret_t ret = rcutils_uint8_array_init(serialized_data.get(), src_msg.buffer_length, &allocator); + if (ret != RCUTILS_RET_OK) { + RCLCPP_ERROR(logger, "Failed to allocate serialized message buffer (%zu bytes): %s", src_msg.buffer_length, + rcutils_get_error_string().str); + rcutils_reset_error(); + return nullptr; + } + + // Copy data + memcpy(serialized_data->buffer, src_msg.buffer, src_msg.buffer_length); + serialized_data->buffer_length = src_msg.buffer_length; + + bag_msg->serialized_data = serialized_data; + return bag_msg; +} + +} // namespace + +RosbagCapture::RosbagCapture(rclcpp::Node * node, FaultStorage * storage, const RosbagConfig & config, + const SnapshotConfig & snapshot_config) + : node_(node), storage_(storage), config_(config), snapshot_config_(snapshot_config) { + if (!node_) { + throw std::invalid_argument("RosbagCapture requires a valid node pointer"); + } + if (!storage_) { + throw std::invalid_argument("RosbagCapture requires a valid storage pointer"); + } + + if (!config_.enabled) { + RCLCPP_INFO(node_->get_logger(), "RosbagCapture disabled"); + return; + } + + // Validate storage format before proceeding + validate_storage_format(); + + RCLCPP_INFO(node_->get_logger(), "RosbagCapture initialized (duration=%.1fs, after=%.1fs, lazy_start=%s, format=%s)", + config_.duration_sec, config_.duration_after_sec, config_.lazy_start ? "true" : "false", + config_.format.c_str()); + + // Start immediately if not lazy_start + if (!config_.lazy_start) { + start(); + } +} + +RosbagCapture::~RosbagCapture() { + stop(); +} + +void RosbagCapture::start() { + if (!config_.enabled || running_.load()) { + return; + } + + init_subscriptions(); + running_.store(true); + + RCLCPP_INFO(node_->get_logger(), "RosbagCapture started with %zu topic subscriptions", subscriptions_.size()); +} + +void RosbagCapture::stop() { + if (!running_.load()) { + return; + } + + running_.store(false); + + // Cancel any pending post-fault timer + if (post_fault_timer_) { + post_fault_timer_->cancel(); + post_fault_timer_.reset(); + } + + // Clear subscriptions + subscriptions_.clear(); + + // Clear buffer + { + std::lock_guard lock(buffer_mutex_); + message_buffer_.clear(); + } + + RCLCPP_INFO(node_->get_logger(), "RosbagCapture stopped"); +} + +bool RosbagCapture::is_running() const { + return running_.load(); +} + +void RosbagCapture::on_fault_prefailed(const std::string & fault_code) { + if (!config_.enabled) { + return; + } + + // Start buffer if lazy_start and not already running + if (config_.lazy_start && !running_.load()) { + RCLCPP_INFO(node_->get_logger(), "RosbagCapture starting on PREFAILED for fault '%s'", fault_code.c_str()); + start(); + } +} + +void RosbagCapture::on_fault_confirmed(const std::string & fault_code) { + if (!config_.enabled || !running_.load()) { + return; + } + + // Don't start a new recording if we're already recording post-fault + if (recording_post_fault_.load()) { + RCLCPP_WARN(node_->get_logger(), "Already recording post-fault data, skipping confirmation for '%s'", + fault_code.c_str()); + return; + } + + RCLCPP_INFO(node_->get_logger(), "RosbagCapture: fault '%s' confirmed, flushing buffer to bag", fault_code.c_str()); + + // Flush buffer to bag + std::string bag_path = flush_to_bag(fault_code); + if (bag_path.empty()) { + RCLCPP_WARN(node_->get_logger(), "Failed to create bag file for fault '%s'", fault_code.c_str()); + return; + } + + // If duration_after_sec > 0, continue recording + if (config_.duration_after_sec > 0.0) { + current_fault_code_ = fault_code; + current_bag_path_ = bag_path; + recording_post_fault_.store(true); + + // Create timer for post-fault recording + auto duration = std::chrono::duration(config_.duration_after_sec); + post_fault_timer_ = + node_->create_wall_timer(std::chrono::duration_cast(duration), [this]() { + post_fault_timer_callback(); + }); + + RCLCPP_DEBUG(node_->get_logger(), "Recording %.1fs more after fault confirmation", config_.duration_after_sec); + } else { + // No post-fault recording, close writer and finalize immediately + { + std::lock_guard wlock(writer_mutex_); + active_writer_.reset(); + created_topics_.clear(); + } + + size_t bag_size = calculate_bag_size(bag_path); + + RosbagFileInfo info; + info.fault_code = fault_code; + info.file_path = bag_path; + info.format = config_.format; + info.duration_sec = config_.duration_sec; + info.size_bytes = bag_size; + info.created_at_ns = node_->now().nanoseconds(); + + storage_->store_rosbag_file(info); + enforce_storage_limits(); + + RCLCPP_INFO(node_->get_logger(), "Bag file created: %s (%.2f MB)", bag_path.c_str(), + static_cast(bag_size) / (1024.0 * 1024.0)); + } +} + +void RosbagCapture::on_fault_cleared(const std::string & fault_code) { + if (!config_.enabled || !config_.auto_cleanup) { + return; + } + + // Delete the bag file for this fault + if (storage_->delete_rosbag_file(fault_code)) { + RCLCPP_INFO(node_->get_logger(), "Auto-cleanup: deleted bag file for fault '%s'", fault_code.c_str()); + } +} + +void RosbagCapture::init_subscriptions() { + auto topics = resolve_topics(); + if (topics.empty()) { + RCLCPP_WARN(node_->get_logger(), "No topics configured for rosbag capture"); + return; + } + + subscriptions_.clear(); + + // Track topics that couldn't be subscribed yet (type not discoverable) + std::vector pending_topics; + + for (const auto & topic : topics) { + if (!try_subscribe_topic(topic)) { + pending_topics.push_back(topic); + } + } + + // If we have pending topics, start a timer to retry subscription + if (!pending_topics.empty() && !discovery_retry_timer_) { + pending_topics_ = pending_topics; + discovery_retry_count_ = 0; + discovery_retry_timer_ = node_->create_wall_timer(std::chrono::milliseconds(500), [this]() { + discovery_retry_callback(); + }); + RCLCPP_INFO(node_->get_logger(), "Will retry subscribing to %zu pending topics", pending_topics_.size()); + } +} + +bool RosbagCapture::try_subscribe_topic(const std::string & topic) { + std::string msg_type = get_topic_type(topic); + if (msg_type.empty()) { + RCLCPP_DEBUG(node_->get_logger(), "Cannot determine type for topic '%s', will retry", topic.c_str()); + return false; + } + + // Cache the topic type + { + std::lock_guard lock(topic_types_mutex_); + topic_types_[topic] = msg_type; + } + + try { + rclcpp::QoS qos = rclcpp::SensorDataQoS(); + + auto callback = [this, topic, msg_type](const std::shared_ptr & msg) { + message_callback(topic, msg_type, msg); + }; + + auto subscription = node_->create_generic_subscription(topic, msg_type, qos, callback); + subscriptions_.push_back(subscription); + + RCLCPP_INFO(node_->get_logger(), "Subscribed to '%s' (%s) for rosbag capture", topic.c_str(), msg_type.c_str()); + return true; + + } catch (const std::exception & e) { + RCLCPP_WARN(node_->get_logger(), "Failed to create subscription for '%s': %s", topic.c_str(), e.what()); + return false; + } +} + +void RosbagCapture::discovery_retry_callback() { + if (pending_topics_.empty() || !running_.load()) { + if (discovery_retry_timer_) { + discovery_retry_timer_->cancel(); + discovery_retry_timer_.reset(); + } + return; + } + + discovery_retry_count_++; + constexpr int max_retries = 20; // 10 seconds total (20 * 500ms) + + std::vector still_pending; + for (const auto & topic : pending_topics_) { + if (!try_subscribe_topic(topic)) { + still_pending.push_back(topic); + } + } + pending_topics_ = still_pending; + + if (pending_topics_.empty()) { + RCLCPP_INFO(node_->get_logger(), "All topics subscribed after %d retries", discovery_retry_count_); + discovery_retry_timer_->cancel(); + discovery_retry_timer_.reset(); + } else if (discovery_retry_count_ >= max_retries) { + RCLCPP_WARN(node_->get_logger(), "Giving up on %zu topics after %d retries: ", pending_topics_.size(), max_retries); + for (const auto & topic : pending_topics_) { + RCLCPP_WARN(node_->get_logger(), " - %s", topic.c_str()); + } + discovery_retry_timer_->cancel(); + discovery_retry_timer_.reset(); + pending_topics_.clear(); + } +} + +void RosbagCapture::message_callback(const std::string & topic, const std::string & msg_type, + const std::shared_ptr & msg) { + if (!running_.load()) { + return; + } + + int64_t timestamp_ns = node_->now().nanoseconds(); + + // During post-fault recording, write directly to bag (no buffering) + if (recording_post_fault_.load()) { + std::lock_guard wlock(writer_mutex_); + if (active_writer_) { + try { + // Create topic if not already created + if (created_topics_.find(topic) == created_topics_.end()) { + rosbag2_storage::TopicMetadata topic_meta; + topic_meta.name = topic; + topic_meta.type = msg_type; + topic_meta.serialization_format = "cdr"; + active_writer_->create_topic(topic_meta); + created_topics_.insert(topic); + } + + auto bag_msg = create_bag_message(topic, timestamp_ns, msg->get_rcl_serialized_message(), node_->get_logger()); + if (bag_msg) { + active_writer_->write(bag_msg); + } + // Memory is automatically cleaned up by RAII when bag_msg goes out of scope + } catch (const std::exception & e) { + RCLCPP_WARN_THROTTLE(node_->get_logger(), *node_->get_clock(), 1000, "Failed to write post-fault message: %s", + e.what()); + } + } + return; // Don't buffer during post-fault recording + } + + // Normal buffering mode + BufferedMessage buffered; + buffered.topic = topic; + buffered.message_type = msg_type; + buffered.serialized_data = std::make_shared(*msg); + buffered.timestamp_ns = timestamp_ns; + + { + std::lock_guard lock(buffer_mutex_); + message_buffer_.push_back(std::move(buffered)); + } + + // Prune old messages + prune_buffer(); +} + +void RosbagCapture::prune_buffer() { + // Don't prune during post-fault recording - we need all messages for the final flush + if (recording_post_fault_.load()) { + return; + } + + std::lock_guard lock(buffer_mutex_); + + if (message_buffer_.empty()) { + return; + } + + int64_t now_ns = node_->now().nanoseconds(); + int64_t duration_ns = static_cast(config_.duration_sec * 1e9); + int64_t cutoff_ns = now_ns - duration_ns; + + // Remove messages older than cutoff + while (!message_buffer_.empty() && message_buffer_.front().timestamp_ns < cutoff_ns) { + message_buffer_.pop_front(); + } +} + +std::vector RosbagCapture::resolve_topics() const { + std::set topics_set; + + if (config_.topics == "config") { + // Reuse JSON snapshot config topics + for (const auto & topic : snapshot_config_.default_topics) { + topics_set.insert(topic); + } + for (const auto & [code, topics_vec] : snapshot_config_.fault_specific) { + for (const auto & topic : topics_vec) { + topics_set.insert(topic); + } + } + for (const auto & [pattern, topics_vec] : snapshot_config_.patterns) { + for (const auto & topic : topics_vec) { + topics_set.insert(topic); + } + } + } else if (config_.topics == "all" || config_.topics == "auto") { + // Discover all available topics ("auto" is an alias for "all") + auto topic_names_and_types = node_->get_topic_names_and_types(); + for (const auto & [topic, types] : topic_names_and_types) { + // Skip internal ROS topics + if (topic.find("/parameter_events") != std::string::npos || topic.find("/rosout") != std::string::npos) { + continue; + } + topics_set.insert(topic); + } + } else if (config_.topics == "explicit") { + // Explicit mode: use only include_topics, no topic derivation + // Topics will be populated from include_topics below + } else { + // Comma-separated list of topics + std::istringstream iss(config_.topics); + std::string topic; + while (std::getline(iss, topic, ',')) { + // Trim whitespace + topic.erase(0, topic.find_first_not_of(" \t")); + topic.erase(topic.find_last_not_of(" \t") + 1); + if (!topic.empty()) { + topics_set.insert(topic); + } + } + } + + // Add include_topics + for (const auto & topic : config_.include_topics) { + topics_set.insert(topic); + } + + // Remove exclude_topics + for (const auto & topic : config_.exclude_topics) { + topics_set.erase(topic); + } + + return {topics_set.begin(), topics_set.end()}; +} + +std::string RosbagCapture::get_topic_type(const std::string & topic) const { + auto topic_names_and_types = node_->get_topic_names_and_types(); + auto it = topic_names_and_types.find(topic); + if (it != topic_names_and_types.end() && !it->second.empty()) { + return it->second[0]; + } + return ""; +} + +std::string RosbagCapture::flush_to_bag(const std::string & fault_code) { + // Copy buffer under lock, then release to avoid holding mutex during IO + std::deque messages_to_write; + { + std::lock_guard lock(buffer_mutex_); + if (message_buffer_.empty()) { + RCLCPP_WARN(node_->get_logger(), "Buffer is empty, cannot create bag file"); + return ""; + } + messages_to_write = std::move(message_buffer_); + message_buffer_.clear(); + } + + std::string bag_path = generate_bag_path(fault_code); + + try { + // Create parent directory if needed + std::filesystem::path bag_dir(bag_path); + if (!bag_dir.parent_path().empty()) { + std::filesystem::create_directories(bag_dir.parent_path()); + } + + // Create writer and store as member for post-fault recording + { + std::lock_guard wlock(writer_mutex_); + active_writer_ = std::make_unique(); + created_topics_.clear(); + + rosbag2_storage::StorageOptions storage_options; + storage_options.uri = bag_path; + storage_options.storage_id = config_.format; + storage_options.max_bagfile_size = config_.max_bag_size_mb * 1024 * 1024; + + active_writer_->open(storage_options); + } + + // Write messages (no buffer_mutex_ held, writer_mutex_ only for brief access) + size_t msg_count = 0; + for (const auto & msg : messages_to_write) { + std::lock_guard wlock(writer_mutex_); + if (!active_writer_) { + break; + } + + // Create topic if not already created + if (created_topics_.find(msg.topic) == created_topics_.end()) { + rosbag2_storage::TopicMetadata topic_meta; + topic_meta.name = msg.topic; + topic_meta.type = msg.message_type; + topic_meta.serialization_format = "cdr"; + active_writer_->create_topic(topic_meta); + created_topics_.insert(msg.topic); + } + + auto bag_msg = create_bag_message(msg.topic, msg.timestamp_ns, msg.serialized_data->get_rcl_serialized_message(), + node_->get_logger()); + if (bag_msg) { + active_writer_->write(bag_msg); + ++msg_count; + } + // Memory is automatically cleaned up by RAII when bag_msg goes out of scope + } + + RCLCPP_DEBUG(node_->get_logger(), "Flushed %zu messages to bag: %s", msg_count, bag_path.c_str()); + + // Note: Writer is NOT closed here - it stays open for post-fault recording + // It will be closed in post_fault_timer_callback() + + return bag_path; + + } catch (const std::exception & e) { + RCLCPP_ERROR(node_->get_logger(), "Failed to write bag file '%s': %s", bag_path.c_str(), e.what()); + + // Clean up writer and partial bag file + { + std::lock_guard wlock(writer_mutex_); + active_writer_.reset(); + created_topics_.clear(); + } + std::error_code ec; + std::filesystem::remove_all(bag_path, ec); + + return ""; + } +} + +std::string RosbagCapture::generate_bag_path(const std::string & fault_code) const { + std::string base_path; + + if (config_.storage_path.empty()) { + // Use system temp directory + base_path = std::filesystem::temp_directory_path().string(); + } else { + base_path = config_.storage_path; + } + + // Create unique name with timestamp + auto now = std::chrono::system_clock::now(); + auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); + + std::ostringstream oss; + oss << base_path << "/fault_" << fault_code << "_" << timestamp; + + return oss.str(); +} + +size_t RosbagCapture::calculate_bag_size(const std::string & bag_path) const { + size_t total_size = 0; + + try { + if (std::filesystem::is_directory(bag_path)) { + for (const auto & entry : std::filesystem::recursive_directory_iterator(bag_path)) { + if (entry.is_regular_file()) { + total_size += entry.file_size(); + } + } + } else if (std::filesystem::is_regular_file(bag_path)) { + total_size = std::filesystem::file_size(bag_path); + } + } catch (const std::exception & e) { + RCLCPP_WARN(node_->get_logger(), "Failed to calculate bag size for '%s': %s", bag_path.c_str(), e.what()); + } + + return total_size; +} + +void RosbagCapture::enforce_storage_limits() { + size_t max_bytes = config_.max_total_storage_mb * 1024 * 1024; + size_t current_bytes = storage_->get_total_rosbag_storage_bytes(); + + if (current_bytes <= max_bytes) { + return; + } + + // Get all bags sorted by creation time (oldest first) + auto all_bags = storage_->get_all_rosbag_files(); + + for (const auto & bag : all_bags) { + if (current_bytes <= max_bytes) { + break; + } + + RCLCPP_INFO(node_->get_logger(), "Deleting old bag file for fault '%s' to enforce storage limit", + bag.fault_code.c_str()); + + current_bytes -= bag.size_bytes; + storage_->delete_rosbag_file(bag.fault_code); + } +} + +void RosbagCapture::post_fault_timer_callback() { + if (!recording_post_fault_.load()) { + return; + } + + // Cancel timer (one-shot) + if (post_fault_timer_) { + post_fault_timer_->cancel(); + post_fault_timer_.reset(); + } + + // Stop post-fault recording (no more direct writes to bag) + recording_post_fault_.store(false); + + // Close the writer (messages were written directly during post-fault period) + { + std::lock_guard wlock(writer_mutex_); + active_writer_.reset(); + created_topics_.clear(); + } + + // Calculate final size and store metadata + size_t bag_size = calculate_bag_size(current_bag_path_); + + RosbagFileInfo info; + info.fault_code = current_fault_code_; + info.file_path = current_bag_path_; + info.format = config_.format; + info.duration_sec = config_.duration_sec + config_.duration_after_sec; + info.size_bytes = bag_size; + info.created_at_ns = node_->now().nanoseconds(); + + storage_->store_rosbag_file(info); + enforce_storage_limits(); + + RCLCPP_INFO(node_->get_logger(), "Bag file completed: %s (%.2f MB, %.1fs)", current_bag_path_.c_str(), + static_cast(bag_size) / (1024.0 * 1024.0), info.duration_sec); + + current_fault_code_.clear(); + current_bag_path_.clear(); +} + +void RosbagCapture::validate_storage_format() const { + // Validate format is one of the known options + if (config_.format != "sqlite3" && config_.format != "mcap") { + throw std::runtime_error("Invalid rosbag storage format '" + config_.format + + "'. " + "Valid options: 'sqlite3', 'mcap'"); + } + + // sqlite3 is always available (built into rosbag2) + if (config_.format == "sqlite3") { + return; + } + + // For MCAP, verify the plugin is available by trying to create a test bag + if (config_.format == "mcap") { + std::string test_path = + std::filesystem::temp_directory_path().string() + "/.rosbag_mcap_test_" + std::to_string(getpid()); + + try { + rosbag2_cpp::Writer writer; + rosbag2_storage::StorageOptions opts; + opts.uri = test_path; + opts.storage_id = "mcap"; + writer.open(opts); + // Success - plugin is available, clean up test file + } catch (const std::exception & e) { + // Clean up any partial test files + std::error_code ec; + std::filesystem::remove_all(test_path, ec); + + throw std::runtime_error( + "MCAP storage format requested but rosbag2_storage_mcap plugin is not available. " + "Install with: sudo apt install ros-${ROS_DISTRO}-rosbag2-storage-mcap " + "Or use format: 'sqlite3' (default). " + "Error: " + + std::string(e.what())); + } + + // Clean up test file + std::error_code ec; + std::filesystem::remove_all(test_path, ec); + + RCLCPP_INFO(node_->get_logger(), "MCAP storage format validated successfully"); + } +} + +} // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp b/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp index 39585e2..102f82a 100644 --- a/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp +++ b/src/ros2_medkit_fault_manager/src/sqlite_fault_storage.cpp @@ -14,6 +14,7 @@ #include "ros2_medkit_fault_manager/sqlite_fault_storage.hpp" +#include #include #include #include @@ -186,6 +187,27 @@ void SqliteFaultStorage::initialize_schema() { sqlite3_free(err_msg); throw std::runtime_error("Failed to create snapshots table: " + error); } + + // Create rosbag_files table for storing time-window bag file metadata + const char * create_rosbag_files_table_sql = R"( + CREATE TABLE IF NOT EXISTS rosbag_files ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + fault_code TEXT NOT NULL UNIQUE, + file_path TEXT NOT NULL, + format TEXT NOT NULL, + duration_sec REAL NOT NULL, + size_bytes INTEGER NOT NULL, + created_at_ns INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_rosbag_files_fault_code ON rosbag_files(fault_code); + CREATE INDEX IF NOT EXISTS idx_rosbag_files_created_at ON rosbag_files(created_at_ns); + )"; + + if (sqlite3_exec(db_, create_rosbag_files_table_sql, nullptr, nullptr, &err_msg) != SQLITE_OK) { + std::string error = err_msg ? err_msg : "Unknown error"; + sqlite3_free(err_msg); + throw std::runtime_error("Failed to create rosbag_files table: " + error); + } } std::vector SqliteFaultStorage::parse_json_array(const std::string & json_str) { @@ -707,4 +729,126 @@ std::vector SqliteFaultStorage::get_snapshots(const std::string & return result; } +void SqliteFaultStorage::store_rosbag_file(const RosbagFileInfo & info) { + std::lock_guard lock(mutex_); + + // Query existing record to delete old file (prevent orphaned files on re-confirm) + { + SqliteStatement query_stmt(db_, "SELECT file_path FROM rosbag_files WHERE fault_code = ?"); + query_stmt.bind_text(1, info.fault_code); + if (query_stmt.step() == SQLITE_ROW) { + std::string old_path = query_stmt.column_text(0); + if (old_path != info.file_path) { + std::error_code ec; + std::filesystem::remove_all(old_path, ec); + // Ignore errors - file may already be deleted + } + } + } + + // Use INSERT OR REPLACE to handle updates (fault_code is UNIQUE) + SqliteStatement stmt(db_, + "INSERT OR REPLACE INTO rosbag_files " + "(fault_code, file_path, format, duration_sec, size_bytes, created_at_ns) " + "VALUES (?, ?, ?, ?, ?, ?)"); + + stmt.bind_text(1, info.fault_code); + stmt.bind_text(2, info.file_path); + stmt.bind_text(3, info.format); + // Bind duration_sec as a double using sqlite3_bind_double directly + if (sqlite3_bind_double(stmt.get(), 4, info.duration_sec) != SQLITE_OK) { + throw std::runtime_error(std::string("Failed to bind duration_sec: ") + sqlite3_errmsg(db_)); + } + stmt.bind_int64(5, static_cast(info.size_bytes)); + stmt.bind_int64(6, info.created_at_ns); + + if (stmt.step() != SQLITE_DONE) { + throw std::runtime_error(std::string("Failed to store rosbag file: ") + sqlite3_errmsg(db_)); + } +} + +std::optional SqliteFaultStorage::get_rosbag_file(const std::string & fault_code) const { + std::lock_guard lock(mutex_); + + SqliteStatement stmt(db_, + "SELECT fault_code, file_path, format, duration_sec, size_bytes, created_at_ns " + "FROM rosbag_files WHERE fault_code = ?"); + stmt.bind_text(1, fault_code); + + if (stmt.step() != SQLITE_ROW) { + return std::nullopt; + } + + RosbagFileInfo info; + info.fault_code = stmt.column_text(0); + info.file_path = stmt.column_text(1); + info.format = stmt.column_text(2); + info.duration_sec = sqlite3_column_double(stmt.get(), 3); + info.size_bytes = static_cast(stmt.column_int64(4)); + info.created_at_ns = stmt.column_int64(5); + + return info; +} + +bool SqliteFaultStorage::delete_rosbag_file(const std::string & fault_code) { + std::lock_guard lock(mutex_); + + // First get the file path so we can delete the actual file + SqliteStatement select_stmt(db_, "SELECT file_path FROM rosbag_files WHERE fault_code = ?"); + select_stmt.bind_text(1, fault_code); + + if (select_stmt.step() == SQLITE_ROW) { + std::string file_path = select_stmt.column_text(0); + + // Try to delete the actual file/directory + std::error_code ec; + std::filesystem::remove_all(file_path, ec); + // Ignore errors - file may already be deleted + } + + SqliteStatement delete_stmt(db_, "DELETE FROM rosbag_files WHERE fault_code = ?"); + delete_stmt.bind_text(1, fault_code); + + if (delete_stmt.step() != SQLITE_DONE) { + throw std::runtime_error(std::string("Failed to delete rosbag file record: ") + sqlite3_errmsg(db_)); + } + + return sqlite3_changes(db_) > 0; +} + +size_t SqliteFaultStorage::get_total_rosbag_storage_bytes() const { + std::lock_guard lock(mutex_); + + SqliteStatement stmt(db_, "SELECT COALESCE(SUM(size_bytes), 0) FROM rosbag_files"); + + if (stmt.step() != SQLITE_ROW) { + return 0; + } + + return static_cast(stmt.column_int64(0)); +} + +std::vector SqliteFaultStorage::get_all_rosbag_files() const { + std::lock_guard lock(mutex_); + + std::vector result; + + SqliteStatement stmt(db_, + "SELECT fault_code, file_path, format, duration_sec, size_bytes, created_at_ns " + "FROM rosbag_files ORDER BY created_at_ns ASC"); + + while (stmt.step() == SQLITE_ROW) { + RosbagFileInfo info; + info.fault_code = stmt.column_text(0); + info.file_path = stmt.column_text(1); + info.format = stmt.column_text(2); + info.duration_sec = sqlite3_column_double(stmt.get(), 3); + info.size_bytes = static_cast(stmt.column_int64(4)); + info.created_at_ns = stmt.column_int64(5); + result.push_back(info); + } + + return result; +} + } // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/test/test_rosbag_capture.cpp b/src/ros2_medkit_fault_manager/test/test_rosbag_capture.cpp new file mode 100644 index 0000000..b1117cc --- /dev/null +++ b/src/ros2_medkit_fault_manager/test/test_rosbag_capture.cpp @@ -0,0 +1,450 @@ +// Copyright 2025 mfaferek93 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "ros2_medkit_fault_manager/fault_storage.hpp" +#include "ros2_medkit_fault_manager/rosbag_capture.hpp" +#include "ros2_medkit_fault_manager/snapshot_capture.hpp" + +using ros2_medkit_fault_manager::InMemoryFaultStorage; +using ros2_medkit_fault_manager::RosbagCapture; +using ros2_medkit_fault_manager::RosbagConfig; +using ros2_medkit_fault_manager::SnapshotConfig; + +class RosbagCaptureTest : public ::testing::Test { + protected: + void SetUp() override { + rclcpp::init(0, nullptr); + node_ = std::make_shared("test_rosbag_capture_node"); + storage_ = std::make_unique(); + + // Create temp directory for test bags + temp_dir_ = std::filesystem::temp_directory_path() / "rosbag_capture_test"; + std::filesystem::create_directories(temp_dir_); + } + + void TearDown() override { + node_.reset(); + storage_.reset(); + rclcpp::shutdown(); + + // Clean up temp directory + std::error_code ec; + std::filesystem::remove_all(temp_dir_, ec); + } + + RosbagConfig create_rosbag_config(bool enabled = true) { + RosbagConfig config; + config.enabled = enabled; + config.duration_sec = 2.0; + config.duration_after_sec = 0.5; + config.topics = "all"; + config.format = "sqlite3"; + config.storage_path = temp_dir_.string(); + config.max_bag_size_mb = 10; + config.max_total_storage_mb = 50; + config.auto_cleanup = true; + return config; + } + + SnapshotConfig create_snapshot_config() { + SnapshotConfig config; + config.enabled = true; + config.default_topics = {"/test_topic"}; + return config; + } + + std::shared_ptr node_; + std::unique_ptr storage_; + std::filesystem::path temp_dir_; +}; + +// Constructor tests + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorRequiresValidNode) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + EXPECT_THROW(RosbagCapture(nullptr, storage_.get(), rosbag_config, snapshot_config), std::invalid_argument); +} + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorRequiresValidStorage) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + EXPECT_THROW(RosbagCapture(node_.get(), nullptr, rosbag_config, snapshot_config), std::invalid_argument); +} + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorSucceedsWithValidParams) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorWithDisabledRosbag) { + auto rosbag_config = create_rosbag_config(false); + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorThrowsOnInvalidFormat) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.format = "invalid_format"; + auto snapshot_config = create_snapshot_config(); + EXPECT_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config), std::runtime_error); +} + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, ConstructorAcceptsSqlite3Format) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.format = "sqlite3"; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// State management tests + +// @verifies REQ_INTEROP_088 +TEST_F(RosbagCaptureTest, IsEnabledReturnsConfigState) { + auto rosbag_config = create_rosbag_config(true); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture_enabled(node_.get(), storage_.get(), rosbag_config, snapshot_config); + EXPECT_TRUE(capture_enabled.is_enabled()); + + rosbag_config.enabled = false; + RosbagCapture capture_disabled(node_.get(), storage_.get(), rosbag_config, snapshot_config); + EXPECT_FALSE(capture_disabled.is_enabled()); +} + +TEST_F(RosbagCaptureTest, AutoStartsWhenNotLazy) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.lazy_start = false; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // With lazy_start=false, capture auto-starts on construction + EXPECT_TRUE(capture.is_running()); +} + +TEST_F(RosbagCaptureTest, StartMakesRunning) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + capture.start(); + EXPECT_TRUE(capture.is_running()); +} + +TEST_F(RosbagCaptureTest, StopMakesNotRunning) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + capture.start(); + EXPECT_TRUE(capture.is_running()); + capture.stop(); + EXPECT_FALSE(capture.is_running()); +} + +TEST_F(RosbagCaptureTest, DoubleStartIsIdempotent) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + capture.start(); + EXPECT_TRUE(capture.is_running()); + capture.start(); // Second start should not throw + EXPECT_TRUE(capture.is_running()); +} + +TEST_F(RosbagCaptureTest, StopWithoutStartIsIdempotent) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + EXPECT_NO_THROW(capture.stop()); // Should not throw when already stopped + EXPECT_FALSE(capture.is_running()); +} + +// Lazy start tests + +TEST_F(RosbagCaptureTest, LazyStartDoesNotRunImmediately) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.lazy_start = true; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // With lazy_start, capture is not running until a fault triggers it + // The start() would typically be called internally on fault + EXPECT_FALSE(capture.is_running()); +} + +TEST_F(RosbagCaptureTest, NonLazyAutoStartsOnConstruction) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.lazy_start = false; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // With lazy_start=false, auto-starts immediately on construction + EXPECT_TRUE(capture.is_running()); +} + +// Topic configuration tests + +TEST_F(RosbagCaptureTest, AllTopicsMode) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.topics = "all"; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, ConfigTopicsMode) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.topics = "config"; + auto snapshot_config = create_snapshot_config(); + snapshot_config.default_topics = {"/topic1", "/topic2"}; + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, ExplicitTopicsMode) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.topics = "explicit"; + rosbag_config.include_topics = {"/topic1", "/topic2"}; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, ExcludeTopicsRespected) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.topics = "all"; + rosbag_config.exclude_topics = {"/rosout", "/parameter_events"}; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// Fault lifecycle tests + +TEST_F(RosbagCaptureTest, OnFaultPrefailedWhileDisabled) { + auto rosbag_config = create_rosbag_config(false); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // Should not throw when disabled + EXPECT_NO_THROW(capture.on_fault_prefailed("TEST_FAULT")); +} + +TEST_F(RosbagCaptureTest, OnFaultConfirmedWhileDisabled) { + auto rosbag_config = create_rosbag_config(false); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // Should not throw when disabled + EXPECT_NO_THROW(capture.on_fault_confirmed("TEST_FAULT")); +} + +TEST_F(RosbagCaptureTest, OnFaultClearedWhileDisabled) { + auto rosbag_config = create_rosbag_config(false); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + // Should not throw when disabled + EXPECT_NO_THROW(capture.on_fault_cleared("TEST_FAULT")); +} + +TEST_F(RosbagCaptureTest, OnFaultPrefailedStartsLazyCapture) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.lazy_start = true; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + EXPECT_FALSE(capture.is_running()); + capture.on_fault_prefailed("TEST_FAULT"); + EXPECT_TRUE(capture.is_running()); +} + +// Storage path tests + +TEST_F(RosbagCaptureTest, DefaultStoragePathUsed) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.storage_path = ""; // Empty = use default + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, CustomStoragePathAccepted) { + auto rosbag_config = create_rosbag_config(); + auto custom_path = temp_dir_ / "custom"; + rosbag_config.storage_path = custom_path.string(); + auto snapshot_config = create_snapshot_config(); + // Storage path creation happens when bag is written, not on construction + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// Format tests + +TEST_F(RosbagCaptureTest, Sqlite3FormatAccepted) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.format = "sqlite3"; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, McapFormatAccepted) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.format = "mcap"; + auto snapshot_config = create_snapshot_config(); + // Note: mcap might not be available in all ROS installations + // The constructor should not throw, but actual writing might fail + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// Duration configuration tests + +TEST_F(RosbagCaptureTest, ZeroDurationHandled) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.duration_sec = 0.0; + rosbag_config.duration_after_sec = 0.0; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, NegativeDurationClamped) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.duration_sec = -1.0; + rosbag_config.duration_after_sec = -1.0; + auto snapshot_config = create_snapshot_config(); + // Should not throw, negative values should be handled gracefully + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// Size limits tests + +TEST_F(RosbagCaptureTest, ZeroMaxBagSizeHandled) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.max_bag_size_mb = 0; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +TEST_F(RosbagCaptureTest, ZeroMaxTotalStorageHandled) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.max_total_storage_mb = 0; + auto snapshot_config = create_snapshot_config(); + EXPECT_NO_THROW(RosbagCapture(node_.get(), storage_.get(), rosbag_config, snapshot_config)); +} + +// Integration tests (simplified without actual message publishing) + +class RosbagCaptureIntegrationTest : public RosbagCaptureTest { + protected: + void spin_for(std::chrono::milliseconds duration) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < duration) { + rclcpp::spin_some(node_); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +}; + +TEST_F(RosbagCaptureIntegrationTest, FullFaultLifecycleWithNoMessages) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.duration_sec = 0.5; + rosbag_config.duration_after_sec = 0.2; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + + // Start capture + capture.start(); + EXPECT_TRUE(capture.is_running()); + + // Simulate fault lifecycle + capture.on_fault_prefailed("TEST_FAULT_001"); + + // Let some time pass + spin_for(std::chrono::milliseconds(100)); + + capture.on_fault_confirmed("TEST_FAULT_001"); + + // Wait for post-fault timer + spin_for(std::chrono::milliseconds(300)); + + // Clear the fault + capture.on_fault_cleared("TEST_FAULT_001"); + + // Stop capture + capture.stop(); + EXPECT_FALSE(capture.is_running()); +} + +TEST_F(RosbagCaptureIntegrationTest, MultipleFaultsHandled) { + auto rosbag_config = create_rosbag_config(); + rosbag_config.duration_sec = 0.5; + rosbag_config.duration_after_sec = 0.1; + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + + capture.start(); + + // Multiple faults in sequence + capture.on_fault_prefailed("FAULT_A"); + spin_for(std::chrono::milliseconds(50)); + + capture.on_fault_prefailed("FAULT_B"); + spin_for(std::chrono::milliseconds(50)); + + capture.on_fault_confirmed("FAULT_A"); + spin_for(std::chrono::milliseconds(50)); + + capture.on_fault_confirmed("FAULT_B"); + spin_for(std::chrono::milliseconds(200)); + + capture.stop(); +} + +TEST_F(RosbagCaptureIntegrationTest, FaultClearedBeforeConfirmed) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + + capture.start(); + + // Prefail then clear (fault didn't confirm) + capture.on_fault_prefailed("CLEARED_EARLY"); + spin_for(std::chrono::milliseconds(50)); + capture.on_fault_cleared("CLEARED_EARLY"); + + // Should not crash + spin_for(std::chrono::milliseconds(100)); + + capture.stop(); +} + +TEST_F(RosbagCaptureIntegrationTest, ConfirmedWithoutPrefailed) { + auto rosbag_config = create_rosbag_config(); + auto snapshot_config = create_snapshot_config(); + RosbagCapture capture(node_.get(), storage_.get(), rosbag_config, snapshot_config); + + capture.start(); + + // Direct confirm (edge case) + capture.on_fault_confirmed("DIRECT_CONFIRM"); + spin_for(std::chrono::milliseconds(200)); + + capture.stop(); +} + +int main(int argc, char ** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ros2_medkit_fault_manager/test/test_rosbag_integration.test.py b/src/ros2_medkit_fault_manager/test/test_rosbag_integration.test.py new file mode 100644 index 0000000..eb46ec8 --- /dev/null +++ b/src/ros2_medkit_fault_manager/test/test_rosbag_integration.test.py @@ -0,0 +1,432 @@ +#!/usr/bin/env python3 +# Copyright 2025 mfaferek93 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for rosbag capture feature (Issue #120).""" + +import os +import tempfile +import time +import unittest + +from launch import LaunchDescription +import launch.actions +import launch_ros.actions +import launch_testing.actions +import launch_testing.markers +import rclpy +from rclpy.node import Node +from ros2_medkit_msgs.msg import Fault +from ros2_medkit_msgs.srv import ClearFault, GetRosbag, ReportFault + + +def get_coverage_env(): + """Get environment variables for gcov coverage data collection.""" + try: + from ament_index_python.packages import get_package_prefix + pkg_prefix = get_package_prefix('ros2_medkit_fault_manager') + workspace = os.path.dirname(os.path.dirname(pkg_prefix)) + build_dir = os.path.join(workspace, 'build', 'ros2_medkit_fault_manager') + + if os.path.exists(build_dir): + return { + 'GCOV_PREFIX': build_dir, + 'GCOV_PREFIX_STRIP': str(build_dir.count(os.sep)), + } + except Exception: + # Coverage environment is optional; on any error, fall back to no extra coverage config + pass + return {} + + +# Create a temp directory for rosbag storage that persists for test duration +ROSBAG_STORAGE_PATH = tempfile.mkdtemp(prefix='rosbag_test_') + +# Path to temp publisher script (set in generate_test_description, cleaned up in shutdown test) +PUBLISHER_SCRIPT_PATH = None + + +def generate_test_description(): + """Generate launch description with fault_manager node with rosbag enabled.""" + # Use Python script for publishers to ensure proper topic type registration + # ros2 topic pub has issues with topic type discovery in some configurations + publisher_script = """ +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy +from sensor_msgs.msg import Temperature +from std_msgs.msg import String +import time + +class TestPublisher(Node): + def __init__(self): + super().__init__('test_publisher') + import os + domain_id = os.environ.get('ROS_DOMAIN_ID', 'not set') + self.get_logger().info(f'Using ROS_DOMAIN_ID: {domain_id}') + + # Use sensor data QoS to match rosbag capture subscriptions + qos = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=10 + ) + self.temp_pub = self.create_publisher(Temperature, '/test/temperature', qos) + self.string_pub = self.create_publisher(String, '/test/status', qos) + self.timer = self.create_timer(0.1, self.publish) + self.counter = 0 + self.get_logger().info('TestPublisher started') + + def publish(self): + temp_msg = Temperature() + temp_msg.temperature = 25.0 + self.counter * 0.1 + temp_msg.variance = 0.1 + self.temp_pub.publish(temp_msg) + + string_msg = String() + string_msg.data = f'status_{self.counter}' + self.string_pub.publish(string_msg) + self.counter += 1 + if self.counter % 50 == 0: + self.get_logger().info(f'Published {self.counter} messages') + # Log available topics and types for debugging + topics = self.get_topic_names_and_types() + topic_info = [(t, types) for t, types in topics if 'test' in t] + if topic_info: + self.get_logger().info(f'Test topics visible: {topic_info}') + +def main(): + rclpy.init() + node = TestPublisher() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() +""" + + # Write publisher script to temp file and run it + # Store path in global for cleanup in post-shutdown test + global PUBLISHER_SCRIPT_PATH + script_file = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) + script_file.write(publisher_script) + script_file.close() + PUBLISHER_SCRIPT_PATH = script_file.name + + # Use explicit ROS settings to ensure processes can discover each other + env = os.environ.copy() + env['ROS_DOMAIN_ID'] = '42' + env['ROS_LOCALHOST_ONLY'] = '1' # Force localhost-only discovery + + test_publisher = launch.actions.ExecuteProcess( + cmd=['python3', script_file.name], + output='screen', + name='test_publisher', + env=env, + ) + + fault_manager_env = get_coverage_env() + fault_manager_env['ROS_DOMAIN_ID'] = '42' + fault_manager_env['ROS_LOCALHOST_ONLY'] = '1' + + fault_manager_node = launch_ros.actions.Node( + package='ros2_medkit_fault_manager', + executable='fault_manager_node', + name='fault_manager', + output='screen', + additional_env=fault_manager_env, + parameters=[{ + 'storage_type': 'memory', + 'confirmation_threshold': -1, # Single report confirms immediately + # Rosbag configuration + 'snapshots.rosbag.enabled': True, + 'snapshots.rosbag.duration_sec': 2.0, # 2 second buffer + 'snapshots.rosbag.duration_after_sec': 0.5, # 0.5 second after confirm + # Use explicit topics - faster than discovery, more reliable for tests + 'snapshots.rosbag.topics': '/test/temperature,/test/status', + 'snapshots.rosbag.format': 'sqlite3', + 'snapshots.rosbag.storage_path': ROSBAG_STORAGE_PATH, + 'snapshots.rosbag.max_bag_size_mb': 10, + 'snapshots.rosbag.max_total_storage_mb': 50, + 'snapshots.rosbag.auto_cleanup': True, + # lazy_start=false: Start recording immediately + 'snapshots.rosbag.lazy_start': False, + }], + ) + + # Delay fault_manager start so test_publisher has time to register topics + # DDS discovery can take several seconds to propagate topic types + delayed_fault_manager = launch.actions.TimerAction( + period=8.0, + actions=[fault_manager_node], + ) + + return ( + LaunchDescription([ + # Start publisher node first + test_publisher, + # Start fault_manager after delay + delayed_fault_manager, + launch_testing.actions.ReadyToTest(), + ]), + { + 'fault_manager_node': fault_manager_node, + 'test_publisher': test_publisher, + }, + ) + + +class TestRosbagCaptureIntegration(unittest.TestCase): + """Integration tests for rosbag capture feature (@verifies REQ_INTEROP_088).""" + + @classmethod + def setUpClass(cls): + """Initialize ROS 2 context and create service clients.""" + # Use same ROS settings as launch processes to ensure discovery + os.environ['ROS_DOMAIN_ID'] = '42' + os.environ['ROS_LOCALHOST_ONLY'] = '1' + rclpy.init() + cls.node = Node('test_rosbag_client') + + # Create service clients + cls.report_fault_client = cls.node.create_client( + ReportFault, '/fault_manager/report_fault' + ) + cls.clear_fault_client = cls.node.create_client( + ClearFault, '/fault_manager/clear_fault' + ) + cls.get_rosbag_client = cls.node.create_client( + GetRosbag, '/fault_manager/get_rosbag' + ) + + # Wait for services to be available + assert cls.report_fault_client.wait_for_service(timeout_sec=15.0), \ + 'report_fault service not available' + assert cls.clear_fault_client.wait_for_service(timeout_sec=15.0), \ + 'clear_fault service not available' + assert cls.get_rosbag_client.wait_for_service(timeout_sec=15.0), \ + 'get_rosbag service not available' + + # Give time for rosbag capture to fill buffer with messages from + # background publishers (duration_sec=2.0 configured) + time.sleep(3.0) + + @classmethod + def tearDownClass(cls): + """Shutdown ROS 2 context.""" + cls.node.destroy_node() + rclpy.shutdown() + + def _call_service(self, client, request, timeout_sec=10.0): + """Call a service and wait for response.""" + future = client.call_async(request) + rclpy.spin_until_future_complete(self.node, future, timeout_sec=timeout_sec) + self.assertIsNotNone(future.result(), 'Service call timed out') + return future.result() + + def _report_fault(self, fault_code, description='Test fault'): + """Report a fault and return the response.""" + request = ReportFault.Request() + request.fault_code = fault_code + request.event_type = ReportFault.Request.EVENT_FAILED + request.severity = Fault.SEVERITY_ERROR + request.description = description + request.source_id = '/test_node' + return self._call_service(self.report_fault_client, request) + + def test_01_rosbag_created_on_fault_confirmation(self): + """Test that rosbag file is created when fault is confirmed.""" + fault_code = 'ROSBAG_TEST_001' + + # Report fault - buffer should already have messages from background publishers + response = self._report_fault(fault_code, 'Rosbag integration test fault') + self.assertTrue(response.accepted) + print(f'Fault {fault_code} reported and confirmed') + + # Wait for post-fault recording to complete + time.sleep(1.0) # duration_after_sec=0.5 + buffer + + # Query rosbag info + rosbag_request = GetRosbag.Request() + rosbag_request.fault_code = fault_code + + rosbag_response = self._call_service(self.get_rosbag_client, rosbag_request) + + self.assertTrue(rosbag_response.success, + f'GetRosbag failed: {rosbag_response.error_message}') + self.assertGreater(len(rosbag_response.file_path), 0) + self.assertEqual(rosbag_response.format, 'sqlite3') + self.assertGreater(rosbag_response.duration_sec, 0) + self.assertGreater(rosbag_response.size_bytes, 0) + + # Verify file exists + self.assertTrue(os.path.exists(rosbag_response.file_path), + f'Rosbag file not found: {rosbag_response.file_path}') + + print(f'Rosbag created: {rosbag_response.file_path}') + print(f' Duration: {rosbag_response.duration_sec:.2f}s') + print(f' Size: {rosbag_response.size_bytes} bytes') + print(f' Format: {rosbag_response.format}') + + def test_02_rosbag_auto_cleanup_on_clear(self): + """Test that rosbag file is deleted when fault is cleared (auto_cleanup=true).""" + fault_code = 'ROSBAG_CLEANUP_TEST' + + # Report fault - buffer has messages from background publishers + response = self._report_fault(fault_code, 'Cleanup test fault') + self.assertTrue(response.accepted) + + # Wait for post-fault recording + time.sleep(1.0) + + # Verify rosbag exists + rosbag_request = GetRosbag.Request() + rosbag_request.fault_code = fault_code + rosbag_response = self._call_service(self.get_rosbag_client, rosbag_request) + + self.assertTrue(rosbag_response.success) + bag_path = rosbag_response.file_path + self.assertTrue(os.path.exists(bag_path)) + print(f'Rosbag exists before clear: {bag_path}') + + # Clear the fault + clear_request = ClearFault.Request() + clear_request.fault_code = fault_code + + clear_response = self._call_service(self.clear_fault_client, clear_request) + self.assertTrue(clear_response.success) + print('Fault cleared') + + # Small delay for cleanup + time.sleep(0.5) + + # Verify rosbag is deleted + self.assertFalse(os.path.exists(bag_path), + f'Rosbag should be deleted but still exists: {bag_path}') + print('Rosbag auto-deleted on clear') + + # GetRosbag should now fail + rosbag_response2 = self._call_service(self.get_rosbag_client, rosbag_request) + self.assertFalse(rosbag_response2.success) + print(f'GetRosbag after clear: {rosbag_response2.error_message}') + + def test_03_get_rosbag_nonexistent_fault(self): + """Test GetRosbag returns error for nonexistent fault.""" + request = GetRosbag.Request() + request.fault_code = 'NONEXISTENT_ROSBAG_FAULT' + + response = self._call_service(self.get_rosbag_client, request) + + self.assertFalse(response.success) + self.assertIn('not found', response.error_message.lower()) + print(f'Nonexistent fault error: {response.error_message}') + + def test_04_get_rosbag_empty_fault_code(self): + """Test GetRosbag returns error for empty fault code.""" + request = GetRosbag.Request() + request.fault_code = '' + + response = self._call_service(self.get_rosbag_client, request) + + self.assertFalse(response.success) + self.assertIn('empty', response.error_message.lower()) + print(f'Empty fault code error: {response.error_message}') + + def test_05_multiple_faults_separate_bags(self): + """Test that multiple faults create separate rosbag files.""" + fault_codes = ['MULTI_BAG_A', 'MULTI_BAG_B'] + bag_paths = [] + + # Report multiple faults - buffer has messages from background publishers + for fault_code in fault_codes: + response = self._report_fault(fault_code, f'Multi-bag test: {fault_code}') + self.assertTrue(response.accepted) + time.sleep(1.0) # Wait for post-fault recording + + # Verify each fault has its own rosbag + for fault_code in fault_codes: + rosbag_request = GetRosbag.Request() + rosbag_request.fault_code = fault_code + + rosbag_response = self._call_service(self.get_rosbag_client, rosbag_request) + + self.assertTrue(rosbag_response.success, + f'GetRosbag failed for {fault_code}: {rosbag_response.error_message}') + self.assertTrue(os.path.exists(rosbag_response.file_path)) + bag_paths.append(rosbag_response.file_path) + print(f'Rosbag for {fault_code}: {rosbag_response.file_path}') + + # Verify bags are different files + self.assertNotEqual(bag_paths[0], bag_paths[1]) + print('Multiple faults have separate rosbag files') + + def test_06_rosbag_contains_recorded_duration(self): + """Test that rosbag duration approximately matches configuration.""" + fault_code = 'DURATION_TEST' + + # Report fault - buffer should have ~duration_sec worth of messages + response = self._report_fault(fault_code, 'Duration test fault') + self.assertTrue(response.accepted) + + # Wait for post-fault recording (duration_after_sec=0.5) + time.sleep(1.0) + + # Get rosbag info + rosbag_request = GetRosbag.Request() + rosbag_request.fault_code = fault_code + + rosbag_response = self._call_service(self.get_rosbag_client, rosbag_request) + + self.assertTrue(rosbag_response.success) + + # Duration should be approximately duration_sec + duration_after_sec + # Allow some tolerance for timing + expected_min = 1.5 # At least some data + expected_max = 4.0 # Upper bound with tolerance + self.assertGreaterEqual(rosbag_response.duration_sec, expected_min, + f'Duration too short: {rosbag_response.duration_sec}') + self.assertLessEqual(rosbag_response.duration_sec, expected_max, + f'Duration too long: {rosbag_response.duration_sec}') + + print(f'Rosbag duration: {rosbag_response.duration_sec:.2f}s ' + f'(expected: ~2.5s with tolerance)') + + +@launch_testing.post_shutdown_test() +class TestRosbagCaptureShutdown(unittest.TestCase): + """Post-shutdown tests.""" + + def test_exit_code(self, proc_info): + """Verify fault_manager exits cleanly.""" + launch_testing.asserts.assertExitCodes( + proc_info, + process='fault_manager_node' + ) + + def test_cleanup_temp_directory(self): + """Clean up temporary rosbag storage directory and publisher script.""" + import shutil + if os.path.exists(ROSBAG_STORAGE_PATH): + shutil.rmtree(ROSBAG_STORAGE_PATH, ignore_errors=True) + print(f'Cleaned up temp directory: {ROSBAG_STORAGE_PATH}') + + # Clean up the temporary publisher script + if PUBLISHER_SCRIPT_PATH and os.path.exists(PUBLISHER_SCRIPT_PATH): + os.unlink(PUBLISHER_SCRIPT_PATH) + print(f'Cleaned up temp script: {PUBLISHER_SCRIPT_PATH}') diff --git a/src/ros2_medkit_gateway/README.md b/src/ros2_medkit_gateway/README.md index c5e110e..b555f8f 100644 --- a/src/ros2_medkit_gateway/README.md +++ b/src/ros2_medkit_gateway/README.md @@ -648,6 +648,7 @@ Faults represent errors or warnings reported by system components. The gateway p - `GET /api/v1/faults` - List all faults across the system (convenience API for dashboards) - `GET /api/v1/faults/stream` - Real-time fault event stream via Server-Sent Events (SSE) - `GET /api/v1/faults/{fault_code}/snapshots` - Get topic snapshots captured when fault was confirmed +- `GET /api/v1/faults/{fault_code}/snapshots/bag` - Download rosbag file for fault (if rosbag capture enabled) - `GET /api/v1/components/{component_id}/faults` - List faults for a specific component - `GET /api/v1/components/{component_id}/faults/{fault_code}` - Get a specific fault - `GET /api/v1/components/{component_id}/faults/{fault_code}/snapshots` - Get snapshots for a component's fault @@ -871,6 +872,91 @@ default_topics: - /diagnostics ``` +#### GET /api/v1/faults/{fault_code}/snapshots/bag + +Download the rosbag file associated with a fault. This endpoint is only available when rosbag capture is enabled in FaultManager. + +Rosbag capture provides "black box" style recording - a ring buffer continuously records configured topics, and when a fault is confirmed, the buffer is flushed to a bag file. This allows capturing system state both **before and after** fault confirmation. + +**Example:** +```bash +# Download rosbag archive +curl -O -J http://localhost:8080/api/v1/faults/MOTOR_OVERHEAT/snapshots/bag + +# Or save with custom filename +curl http://localhost:8080/api/v1/faults/MOTOR_OVERHEAT/snapshots/bag -o motor_fault.tar.gz +``` + +**Response (200 OK):** +- For directory-based bags (default rosbag2 format): compressed tar.gz archive containing the full bag directory with metadata.yaml and all storage segments +- Content-Type: `application/gzip` +- Content-Disposition: `attachment; filename="fault_MOTOR_OVERHEAT_20260124_153045.tar.gz"` + +The archive can be extracted and played directly with `ros2 bag play`. + +**Response (404 Not Found - Fault or rosbag not found):** +```json +{ + "error": "Rosbag not found", + "fault_code": "MOTOR_OVERHEAT", + "details": "No rosbag file associated with this fault" +} +``` + +**Response (404 Not Found - Rosbag file deleted):** +```json +{ + "error": "Rosbag file not found", + "fault_code": "MOTOR_OVERHEAT", + "details": "File was deleted or moved" +} +``` + +**Rosbag Configuration:** + +Rosbag capture is configured via FaultManager parameters. See `config/snapshots.yaml` for full configuration options. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `snapshots.rosbag.enabled` | bool | `false` | Enable/disable rosbag capture | +| `snapshots.rosbag.duration_sec` | double | `5.0` | Ring buffer duration (seconds before fault) | +| `snapshots.rosbag.duration_after_sec` | double | `1.0` | Recording duration after fault confirmed | +| `snapshots.rosbag.topics` | string | `"config"` | Topic selection: `"config"`, `"all"`, or `"explicit"` | +| `snapshots.rosbag.format` | string | `"sqlite3"` | Bag format: `"sqlite3"` or `"mcap"` | +| `snapshots.rosbag.auto_cleanup` | bool | `true` | Delete bag when fault is cleared | +| `snapshots.rosbag.max_bag_size_mb` | int | `50` | Max size per bag file | +| `snapshots.rosbag.max_total_storage_mb` | int | `500` | Total storage limit | + +**Enable rosbag capture:** +```bash +ros2 run ros2_medkit_fault_manager fault_manager_node \ + --ros-args -p snapshots.rosbag.enabled:=true \ + -p snapshots.rosbag.duration_sec:=5.0 +``` + +**Playback downloaded rosbag:** +```bash +# Extract the downloaded archive +tar -xzf fault_MOTOR_OVERHEAT_20260124_153045.tar.gz + +# Play back the bag +ros2 bag play fault_MOTOR_OVERHEAT_1735830000/ + +# Inspect bag contents +ros2 bag info fault_MOTOR_OVERHEAT_1735830000/ +``` + +**Differences from JSON Snapshots:** + +| Feature | JSON Snapshots | Rosbag Capture | +|---------|----------------|----------------| +| Data format | JSON (human-readable) | Binary (native ROS 2) | +| Time coverage | Point-in-time (at confirmation) | Time window (before + after) | +| Message fidelity | Converted to JSON | Original serialization | +| Playback | N/A | `ros2 bag play` | +| Query via REST | Yes (structured JSON) | Download only | +| Default | Enabled | Disabled | + ## Quick Start ### Build diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp index 33a7ae6..03c454b 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/fault_manager.hpp @@ -24,6 +24,7 @@ #include "ros2_medkit_msgs/msg/fault.hpp" #include "ros2_medkit_msgs/srv/clear_fault.hpp" #include "ros2_medkit_msgs/srv/get_faults.hpp" +#include "ros2_medkit_msgs/srv/get_rosbag.hpp" #include "ros2_medkit_msgs/srv/get_snapshots.hpp" #include "ros2_medkit_msgs/srv/report_fault.hpp" @@ -82,6 +83,11 @@ class FaultManager { /// @return FaultResult with snapshot data (JSON in data field) FaultResult get_snapshots(const std::string & fault_code, const std::string & topic = ""); + /// Get rosbag file info for a fault + /// @param fault_code Fault identifier + /// @return FaultResult with rosbag file path and metadata + FaultResult get_rosbag(const std::string & fault_code); + /// Check if fault manager service is available /// @return true if services are available bool is_available() const; @@ -100,6 +106,7 @@ class FaultManager { rclcpp::Client::SharedPtr get_faults_client_; rclcpp::Client::SharedPtr clear_fault_client_; rclcpp::Client::SharedPtr get_snapshots_client_; + rclcpp::Client::SharedPtr get_rosbag_client_; /// Service timeout double service_timeout_sec_{5.0}; diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp index 7a15b7f..6260b0e 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/fault_handlers.hpp @@ -75,6 +75,11 @@ class FaultHandlers { */ void handle_get_component_snapshots(const httplib::Request & req, httplib::Response & res); + /** + * @brief Handle GET /faults/{fault_code}/snapshots/bag - download rosbag file for a fault. + */ + void handle_get_rosbag(const httplib::Request & req, httplib::Response & res); + private: HandlerContext & ctx_; }; diff --git a/src/ros2_medkit_gateway/src/fault_manager.cpp b/src/ros2_medkit_gateway/src/fault_manager.cpp index 63a987a..6430efc 100644 --- a/src/ros2_medkit_gateway/src/fault_manager.cpp +++ b/src/ros2_medkit_gateway/src/fault_manager.cpp @@ -29,6 +29,7 @@ FaultManager::FaultManager(rclcpp::Node * node) : node_(node) { get_faults_client_ = node_->create_client("/fault_manager/get_faults"); clear_fault_client_ = node_->create_client("/fault_manager/clear_fault"); get_snapshots_client_ = node_->create_client("/fault_manager/get_snapshots"); + get_rosbag_client_ = node_->create_client("/fault_manager/get_rosbag"); // Get configurable timeout service_timeout_sec_ = node_->declare_parameter("fault_service_timeout_sec", 5.0); @@ -330,4 +331,41 @@ FaultResult FaultManager::get_snapshots(const std::string & fault_code, const st return result; } +FaultResult FaultManager::get_rosbag(const std::string & fault_code) { + std::lock_guard lock(service_mutex_); + FaultResult result; + + auto timeout = std::chrono::duration(service_timeout_sec_); + if (!get_rosbag_client_->wait_for_service(timeout)) { + result.success = false; + result.error_message = "GetRosbag service not available"; + return result; + } + + auto request = std::make_shared(); + request->fault_code = fault_code; + + auto future = get_rosbag_client_->async_send_request(request); + + if (future.wait_for(timeout) != std::future_status::ready) { + result.success = false; + result.error_message = "GetRosbag service call timed out"; + return result; + } + + auto response = future.get(); + result.success = response->success; + + if (response->success) { + result.data = {{"file_path", response->file_path}, + {"format", response->format}, + {"duration_sec", response->duration_sec}, + {"size_bytes", response->size_bytes}}; + } else { + result.error_message = response->error_message; + } + + return result; +} + } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp index 760bb0d..14d6268 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp @@ -14,6 +14,16 @@ #include "ros2_medkit_gateway/http/handlers/fault_handlers.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/http/error_codes.hpp" #include "ros2_medkit_gateway/http/http_utils.hpp" @@ -25,6 +35,62 @@ using httplib::StatusCode; namespace ros2_medkit_gateway { namespace handlers { +namespace { + +/// Sanitize a string for use in HTTP Content-Disposition filename +/// Removes/replaces characters that could cause header injection or filesystem issues +std::string sanitize_filename(const std::string & input) { + std::string result; + result.reserve(input.size()); + for (char c : input) { + // Allow only alphanumeric, underscore, hyphen, dot + if (std::isalnum(static_cast(c)) || c == '_' || c == '-' || c == '.') { + result += c; + } else { + result += '_'; // Replace unsafe characters + } + } + return result; +} + +/// Generate a timestamp string in YYYYMMDD_HHMMSS format for filenames +std::string generate_timestamp() { + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + std::tm now_tm; + gmtime_r(&now_time, &now_tm); // Use UTC for consistency + std::ostringstream oss; + oss << std::put_time(&now_tm, "%Y%m%d_%H%M%S"); + return oss.str(); +} + +/// Maximum allowed length for fault_code +constexpr size_t kMaxFaultCodeLength = 128; + +/// Validate fault_code format (same rules as FaultManagerNode) +/// @param fault_code The fault code to validate +/// @return Empty string if valid, error message if invalid +std::string validate_fault_code(const std::string & fault_code) { + if (fault_code.empty()) { + return "fault_code cannot be empty"; + } + if (fault_code.length() > kMaxFaultCodeLength) { + return "fault_code exceeds maximum length of " + std::to_string(kMaxFaultCodeLength) + " characters"; + } + for (char c : fault_code) { + if (!std::isalnum(static_cast(c)) && c != '_' && c != '-' && c != '.') { + return "fault_code contains invalid character '" + std::string(1, c) + + "'. Only alphanumeric, underscore, hyphen, and dot are allowed"; + } + } + if (fault_code.find("..") != std::string::npos) { + return "fault_code cannot contain '..' (path traversal not allowed)"; + } + return ""; +} + +} // namespace + void FaultHandlers::handle_list_all_faults(const httplib::Request & req, httplib::Response & res) { try { auto filter = parse_fault_status_param(req); @@ -460,5 +526,147 @@ void FaultHandlers::handle_get_component_snapshots(const httplib::Request & req, } } +void FaultHandlers::handle_get_rosbag(const httplib::Request & req, httplib::Response & res) { + std::string fault_code; + try { + if (req.matches.size() < 2) { + HandlerContext::send_error(res, StatusCode::BadRequest_400, ERR_INVALID_REQUEST, "Invalid request", {}); + return; + } + + fault_code = req.matches[1]; + + // Validate fault code format (prevents path traversal and injection attacks) + std::string validation_error = validate_fault_code(fault_code); + if (!validation_error.empty()) { + HandlerContext::send_error(res, StatusCode::BadRequest_400, ERR_INVALID_PARAMETER, "Invalid fault code", + {{"details", validation_error}, {"fault_code", fault_code}}); + return; + } + + auto fault_mgr = ctx_.node()->get_fault_manager(); + auto result = fault_mgr->get_rosbag(fault_code); + + if (!result.success) { + // Check if it's a "not found" error + if (result.error_message.find("not found") != std::string::npos || + result.error_message.find("No rosbag") != std::string::npos) { + HandlerContext::send_error(res, StatusCode::NotFound_404, ERR_RESOURCE_NOT_FOUND, "Rosbag not found", + {{"details", result.error_message}, {"fault_code", fault_code}}); + } else if (result.error_message.find("invalid") != std::string::npos) { + // Validation error from service + HandlerContext::send_error(res, StatusCode::BadRequest_400, ERR_INVALID_PARAMETER, "Invalid fault code", + {{"details", result.error_message}, {"fault_code", fault_code}}); + } else { + HandlerContext::send_error(res, StatusCode::ServiceUnavailable_503, ERR_SERVICE_UNAVAILABLE, + "Failed to get rosbag", + {{"details", result.error_message}, {"fault_code", fault_code}}); + } + return; + } + + // Get file path from result + std::string file_path = result.data["file_path"].get(); + std::string format = result.data["format"].get(); + + // Check if path is a directory (rosbag2 creates directories) + std::filesystem::path bag_path(file_path); + bool is_directory = std::filesystem::is_directory(bag_path); + std::string archive_path; // Will be set if we create a temp archive + + // Determine content type and filename based on what we're sending + std::string content_type; + std::string timestamp = generate_timestamp(); + std::string filename; + + if (is_directory) { + // Create tar.gz archive of the entire bag directory (includes all segments + metadata) + archive_path = std::filesystem::temp_directory_path().string() + "/rosbag_download_" + + std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()) + ".tar.gz"; + + std::string tar_cmd = "tar -czf " + archive_path + " -C " + bag_path.parent_path().string() + " " + + bag_path.filename().string() + " 2>/dev/null"; + + int tar_result = std::system(tar_cmd.c_str()); + if (tar_result != 0) { + HandlerContext::send_error(res, StatusCode::InternalServerError_500, ERR_INTERNAL_ERROR, + "Failed to create rosbag archive", + {{"fault_code", fault_code}, {"path", file_path}}); + return; + } + + file_path = archive_path; + content_type = "application/gzip"; + filename = "fault_" + sanitize_filename(fault_code) + "_" + timestamp + ".tar.gz"; + } else { + // Single file - determine type from format + if (format == "mcap") { + content_type = "application/x-mcap"; + filename = "fault_" + sanitize_filename(fault_code) + "_" + timestamp + ".mcap"; + } else { + content_type = "application/octet-stream"; + filename = "fault_" + sanitize_filename(fault_code) + "_" + timestamp + ".db3"; + } + } + + // Check file exists and get size + std::error_code ec; + auto file_size = std::filesystem::file_size(file_path, ec); + if (ec) { + if (!archive_path.empty()) { + std::filesystem::remove(archive_path, ec); + } + HandlerContext::send_error(res, StatusCode::InternalServerError_500, ERR_INTERNAL_ERROR, + "Failed to read rosbag file", {{"fault_code", fault_code}, {"path", file_path}}); + return; + } + + res.set_header("Content-Disposition", "attachment; filename=\"" + filename + "\""); + res.set_header("Content-Type", content_type); + res.status = StatusCode::OK_200; + + // Use streaming response for large files to avoid loading entire bag into memory + std::string path_copy = file_path; // Capture for content provider lambda + std::string archive_copy = archive_path; // Capture for cleanup lambda + res.set_content_provider( + file_size, content_type, + [path_copy](size_t offset, size_t length, httplib::DataSink & sink) { + std::ifstream file(path_copy, std::ios::binary); + if (!file) { + return false; + } + file.seekg(static_cast(offset)); + constexpr size_t kChunkSize = 65536; // 64KB chunks + std::vector buffer(std::min(length, kChunkSize)); + size_t remaining = length; + while (remaining > 0 && file) { + size_t to_read = std::min(remaining, kChunkSize); + file.read(buffer.data(), static_cast(to_read)); + auto bytes_read = static_cast(file.gcount()); + if (bytes_read == 0) { + break; + } + sink.write(buffer.data(), bytes_read); + remaining -= bytes_read; + } + return true; + }, + [archive_copy](bool /*success*/) { + // Resource releaser callback - clean up temp archive if we created one + if (!archive_copy.empty()) { + std::error_code cleanup_ec; + std::filesystem::remove(archive_copy, cleanup_ec); + // Ignore errors - temp file cleanup is best-effort + } + }); + + } catch (const std::exception & e) { + HandlerContext::send_error(res, StatusCode::InternalServerError_500, ERR_INTERNAL_ERROR, + "Failed to download rosbag", {{"details", e.what()}, {"fault_code", fault_code}}); + RCLCPP_ERROR(HandlerContext::logger(), "Error in handle_get_rosbag for fault '%s': %s", fault_code.c_str(), + e.what()); + } +} + } // namespace handlers } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp index 428e379..1f986f5 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp @@ -84,6 +84,7 @@ void HealthHandlers::handle_root(const httplib::Request & req, httplib::Response "GET /api/v1/components/{component_id}/faults/{fault_code}", "DELETE /api/v1/components/{component_id}/faults/{fault_code}", "GET /api/v1/faults/{fault_code}/snapshots", + "GET /api/v1/faults/{fault_code}/snapshots/bag", "GET /api/v1/components/{component_id}/faults/{fault_code}/snapshots", }); diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index edee960..c61e42e 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -559,6 +559,12 @@ void RESTServer::setup_routes() { fault_handlers_->handle_get_snapshots(req, res); }); + // GET /faults/{fault_code}/snapshots/bag - download rosbag file for fault + srv->Get((api_path("/faults") + R"(/([^/]+)/snapshots/bag$)"), + [this](const httplib::Request & req, httplib::Response & res) { + fault_handlers_->handle_get_rosbag(req, res); + }); + // GET /components/{component_id}/faults/{fault_code}/snapshots - component-scoped snapshot access srv->Get((api_path("/components") + R"(/([^/]+)/faults/([^/]+)/snapshots$)"), [this](const httplib::Request & req, httplib::Response & res) { diff --git a/src/ros2_medkit_gateway/test/test_fault_manager.cpp b/src/ros2_medkit_gateway/test/test_fault_manager.cpp index 5db66dc..2dd3583 100644 --- a/src/ros2_medkit_gateway/test/test_fault_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_fault_manager.cpp @@ -23,10 +23,12 @@ #include #include "ros2_medkit_gateway/fault_manager.hpp" +#include "ros2_medkit_msgs/srv/get_rosbag.hpp" #include "ros2_medkit_msgs/srv/get_snapshots.hpp" using namespace std::chrono_literals; using ros2_medkit_gateway::FaultManager; +using ros2_medkit_msgs::srv::GetRosbag; using ros2_medkit_msgs::srv::GetSnapshots; class FaultManagerTest : public ::testing::Test { @@ -187,6 +189,62 @@ TEST_F(FaultManagerTest, GetSnapshotsEmptyResponse) { EXPECT_TRUE(result.data.empty()); } +// GetRosbag service client tests + +// @verifies REQ_INTEROP_088 +TEST_F(FaultManagerTest, GetRosbagServiceNotAvailable) { + FaultManager fault_manager(node_.get()); + + // Don't create a service, so it will timeout + auto result = fault_manager.get_rosbag("TEST_FAULT"); + + EXPECT_FALSE(result.success); + EXPECT_EQ(result.error_message, "GetRosbag service not available"); +} + +// @verifies REQ_INTEROP_088 +TEST_F(FaultManagerTest, GetRosbagSuccess) { + auto service = node_->create_service( + "/fault_manager/get_rosbag", + [](const std::shared_ptr request, std::shared_ptr response) { + response->success = true; + response->file_path = "/tmp/test_bag_" + request->fault_code; + response->format = "sqlite3"; + response->duration_sec = 5.5; + response->size_bytes = 12345; + }); + + start_spinning(); + FaultManager fault_manager(node_.get()); + + auto result = fault_manager.get_rosbag("TEST_ROSBAG_FAULT"); + + EXPECT_TRUE(result.success); + EXPECT_TRUE(result.data.contains("file_path")); + EXPECT_EQ(result.data["file_path"], "/tmp/test_bag_TEST_ROSBAG_FAULT"); + EXPECT_EQ(result.data["format"], "sqlite3"); + EXPECT_EQ(result.data["duration_sec"], 5.5); + EXPECT_EQ(result.data["size_bytes"], 12345); +} + +// @verifies REQ_INTEROP_088 +TEST_F(FaultManagerTest, GetRosbagNotFound) { + auto service = node_->create_service( + "/fault_manager/get_rosbag", + [](const std::shared_ptr /*request*/, std::shared_ptr response) { + response->success = false; + response->error_message = "No rosbag file available for fault"; + }); + + start_spinning(); + FaultManager fault_manager(node_.get()); + + auto result = fault_manager.get_rosbag("NONEXISTENT_FAULT"); + + EXPECT_FALSE(result.success); + EXPECT_EQ(result.error_message, "No rosbag file available for fault"); +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ros2_medkit_gateway/test/test_integration.test.py b/src/ros2_medkit_gateway/test/test_integration.test.py index 890115b..031da05 100644 --- a/src/ros2_medkit_gateway/test/test_integration.test.py +++ b/src/ros2_medkit_gateway/test/test_integration.test.py @@ -211,13 +211,22 @@ def generate_test_description(): # Launch the fault_manager node for fault REST API tests # Use in-memory storage to avoid filesystem permission issues in CI + # Enable rosbag capture for integration testing - capture lidar scan topic + # which is published by the lidar_sensor demo node fault_manager_node = launch_ros.actions.Node( package='ros2_medkit_fault_manager', executable='fault_manager_node', name='fault_manager', output='screen', additional_env=coverage_env, - parameters=[{'storage_type': 'memory'}], + parameters=[{ + 'storage_type': 'memory', + 'snapshots.rosbag.enabled': True, + 'snapshots.rosbag.duration_sec': 2.0, + 'snapshots.rosbag.duration_after_sec': 0.5, + 'snapshots.rosbag.topics': 'explicit', + 'snapshots.rosbag.include_topics': ['/perception/lidar/scan'], + }], ) # Start demo nodes with a delay to ensure gateway starts first @@ -3806,3 +3815,115 @@ def test_106_reset_single_configuration(self): self.assertEqual(len(response.content), 0, '204 should have no body') print(f'✓ Reset single configuration test passed: {config_id}') + + # ==================== Rosbag Snapshot Tests ==================== + + def test_107_get_rosbag_nonexistent_fault(self): + """ + Test /faults/{code}/snapshots/bag returns 404 for unknown fault. + + @verifies REQ_INTEROP_088 + """ + response = requests.get( + f'{self.BASE_URL}/faults/NONEXISTENT_ROSBAG_FAULT/snapshots/bag', + timeout=10 + ) + self.assertEqual(response.status_code, 404) + + data = response.json() + self.assertIn('error_code', data) + self.assertIn('message', data) + self.assertIn('parameters', data) + self.assertIn('fault_code', data['parameters']) + self.assertEqual(data['parameters'].get('fault_code'), 'NONEXISTENT_ROSBAG_FAULT') + + print('✓ Get rosbag nonexistent fault test passed') + + def test_108_get_rosbag_invalid_fault_code(self): + """ + Test /faults/{code}/snapshots/bag rejects invalid fault codes. + + @verifies REQ_INTEROP_088 + """ + # These should be rejected by fault_code validation + invalid_codes = [ + '../../../etc/passwd', # Path traversal attempt + 'fault"injection', # Quote injection attempt + ] + + for invalid_code in invalid_codes: + response = requests.get( + f'{self.BASE_URL}/faults/{invalid_code}/snapshots/bag', + timeout=10 + ) + # Should return 400 (bad request) or 404 (not found) + # depending on whether validation happens before or after lookup + self.assertIn( + response.status_code, + [400, 404], + f'Expected 400 or 404 for fault_code: {invalid_code}' + ) + + print('✓ Get rosbag invalid fault code test passed') + + def test_72_get_rosbag_happy_path(self): + """ + Test rosbag download happy path. + + Uses LIDAR_RANGE_INVALID fault which is auto-reported by lidar_sensor + demo node due to invalid parameters (min_range=10 > max_range=5). + The rosbag captures /perception/lidar/scan topic. + + @verifies REQ_INTEROP_088 + """ + import tarfile + import tempfile + import time + + # Use the fault that lidar_sensor automatically reports + # (configured with min_range=10 > max_range=5 which triggers this fault) + fault_code = 'LIDAR_RANGE_INVALID' + + # Wait for: + # 1. Ring buffer to fill (duration_sec = 2.0) + # 2. lidar_sensor to report fault (initial check after 3s delay) + # 3. Post-fault recording to complete (duration_after_sec = 0.5) + time.sleep(6) + + # Download the rosbag for the automatically-reported fault + response = requests.get( + f'{self.BASE_URL}/faults/{fault_code}/snapshots/bag', + timeout=30 + ) + + self.assertEqual( + response.status_code, 200, + f'Expected 200 OK but got {response.status_code}: {response.text}' + ) + + # Verify headers + content_type = response.headers.get('Content-Type', '') + self.assertIn('gzip', content_type, 'Expected gzip content type') + + content_disp = response.headers.get('Content-Disposition', '') + self.assertIn('attachment', content_disp, 'Expected attachment disposition') + self.assertIn('.tar.gz', content_disp, 'Expected .tar.gz extension') + + # Verify content is a valid tar.gz archive + with tempfile.NamedTemporaryFile(suffix='.tar.gz', delete=False) as f: + f.write(response.content) + temp_path = f.name + + try: + with tarfile.open(temp_path, 'r:gz') as tar: + names = tar.getnames() + # Should contain at least one file + self.assertGreater(len(names), 0, 'Archive should not be empty') + # Should contain metadata.yaml (rosbag2 standard) + has_metadata = any('metadata.yaml' in n for n in names) + self.assertTrue(has_metadata, f'Expected metadata.yaml: {names}') + finally: + import os + os.unlink(temp_path) + + print('✓ Get rosbag happy path test passed') diff --git a/src/ros2_medkit_msgs/CMakeLists.txt b/src/ros2_medkit_msgs/CMakeLists.txt index 1047e1a..d2a042f 100644 --- a/src/ros2_medkit_msgs/CMakeLists.txt +++ b/src/ros2_medkit_msgs/CMakeLists.txt @@ -32,6 +32,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/GetFaults.srv" "srv/ClearFault.srv" "srv/GetSnapshots.srv" + "srv/GetRosbag.srv" DEPENDENCIES builtin_interfaces ) diff --git a/src/ros2_medkit_msgs/srv/GetRosbag.srv b/src/ros2_medkit_msgs/srv/GetRosbag.srv new file mode 100644 index 0000000..96aa33a --- /dev/null +++ b/src/ros2_medkit_msgs/srv/GetRosbag.srv @@ -0,0 +1,45 @@ +# Copyright 2026 mfaferek93 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# GetRosbag.srv - Retrieve rosbag file information for a fault. +# +# Returns the path to the rosbag file that was recorded when a fault was confirmed. +# The rosbag contains time-window recording of configured topics before and after +# the fault confirmation event. + +# Request fields + +# The fault_code to get rosbag for. +string fault_code +--- +# Response fields + +# True if a rosbag file exists for this fault. +bool success + +# Path to the rosbag file/directory. +# Empty if success is false. +string file_path + +# Storage format: "sqlite3" or "mcap" +string format + +# Total duration of recording in seconds. +float64 duration_sec + +# File size in bytes. +uint64 size_bytes + +# Error message if success is false. +string error_message