diff --git a/rmf_fleet_adapter/CMakeLists.txt b/rmf_fleet_adapter/CMakeLists.txt index 74333057a..b2bbab1e6 100644 --- a/rmf_fleet_adapter/CMakeLists.txt +++ b/rmf_fleet_adapter/CMakeLists.txt @@ -39,6 +39,7 @@ set(dep_pkgs rmf_traffic_ros2 rmf_battery rmf_task + rmf_task_ros2 std_msgs ) foreach(pkg ${dep_pkgs}) @@ -62,6 +63,7 @@ target_link_libraries(rmf_fleet_adapter rmf_traffic_ros2::rmf_traffic_ros2 rmf_battery::rmf_battery rmf_task::rmf_task + rmf_task_ros2::rmf_task_ros2 yaml-cpp ${rmf_fleet_msgs_LIBRARIES} ${rclcpp_LIBRARIES} @@ -322,6 +324,7 @@ target_include_directories(task_aggregator ament_export_targets(rmf_fleet_adapter HAS_LIBRARY_TARGET) ament_export_dependencies( rmf_task + rmf_task_ros2 rmf_traffic_ros2 rmf_fleet_msgs rmf_door_msgs diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp index 29dc1a8b9..7f654a313 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp @@ -49,12 +49,6 @@ const std::string IngestorStateTopicName = "ingestor_states"; const std::string DeliveryTopicName = "delivery_requests"; const std::string LoopRequestTopicName = "loop_requests"; const std::string TaskSummaryTopicName = "task_summaries"; - -const std::string BidNoticeTopicName = "rmf_task/bid_notice"; -const std::string BidProposalTopicName = "rmf_task/bid_proposal"; -const std::string DispatchRequestTopicName = "rmf_task/dispatch_request"; -const std::string DispatchAckTopicName = "rmf_task/dispatch_ack"; - const std::string DockSummaryTopicName = "dock_summary"; } // namespace rmf_fleet_adapter diff --git a/rmf_fleet_adapter/package.xml b/rmf_fleet_adapter/package.xml index fad1cd873..2551cd840 100644 --- a/rmf_fleet_adapter/package.xml +++ b/rmf_fleet_adapter/package.xml @@ -23,6 +23,7 @@ rmf_traffic_msgs rmf_battery rmf_task + rmf_task_ros2 eigen yaml-cpp @@ -34,4 +35,3 @@ ament_cmake - diff --git a/rmf_fleet_adapter/src/full_control/main.cpp b/rmf_fleet_adapter/src/full_control/main.cpp index 4b2998ac1..96fae0c73 100644 --- a/rmf_fleet_adapter/src/full_control/main.cpp +++ b/rmf_fleet_adapter/src/full_control/main.cpp @@ -670,8 +670,6 @@ std::shared_ptr make_fleet( if (node->declare_parameter("perform_deliveries", false)) { task_types.insert(rmf_task_msgs::msg::TaskType::TYPE_DELIVERY); - connections->fleet->accept_delivery_requests( - [](const rmf_task_msgs::msg::Delivery&){ return true; }); } if (node->declare_parameter("perform_cleaning", false)) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp index f2b31919a..75aaf7ba8 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.cpp @@ -88,6 +88,18 @@ const std::string& Task::id() const return _id; } +//============================================================================== +const rmf_task_msgs::msg::TaskProfile Task::profile_msg() const +{ + return _profile_msg; +} + +//============================================================================== +void Task::profile_msg(const rmf_task_msgs::msg::TaskProfile& profile) +{ + _profile_msg = profile; +} + //============================================================================== const rmf_task::ConstRequestPtr Task::request() const { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp index d0850e042..cf701fe0e 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/Task.hpp @@ -40,6 +40,7 @@ class Task : public std::enable_shared_from_this public: using StatusMsg = rmf_task_msgs::msg::TaskSummary; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; /// This class represents the active phase of a Task. It provides an /// observable that the Task can track to stay up-to-date on the status and to @@ -123,6 +124,12 @@ class Task : public std::enable_shared_from_this void cancel(); const std::string& id() const; + + /// Get the profile msg, for publish status msg + const TaskProfile profile_msg() const; + + /// Set the profile msg, for publish status msg + void profile_msg(const TaskProfile& profile); /// Get the request used to generate this task const rmf_task::ConstRequestPtr request() const; @@ -162,6 +169,7 @@ class Task : public std::enable_shared_from_this rmf_traffic::Time _deployment_time; rmf_task::agv::State _finish_state; rmf_task::ConstRequestPtr _request; + TaskProfile _profile_msg; void _start_next_phase(); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index b16de3ce3..42ea5c6d6 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -82,40 +82,31 @@ TaskManager::TaskManager(agv::RobotContextPtr context) } //============================================================================== -void TaskManager::queue_task(std::shared_ptr task, Start expected_finish) +void TaskManager::queue_task(std::shared_ptr task) { - std::lock_guard guard(_mutex); - _queue.push_back(std::move(task)); - _expected_finish_location = std::move(expected_finish); + { + std::lock_guard guard(_mutex); + _queue.push_back(task); RCLCPP_INFO( _context->node()->get_logger(), "Queuing new task [%s] for [%s]. New queue size: %d", - _queue.back()->id().c_str(), _context->requester_id().c_str(), + task->id().c_str(), _context->requester_id().c_str(), _queue.size()); - if (!_active_task) - { - _begin_next_task(); - } - else - { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); + msg.task_profile = task->profile_msg(); + msg.task_id = task->id(); // duplicated msg.state = msg.STATE_QUEUED; msg.robot_name = _context->name(); + msg.fleet_name = _context->description().owner(); + msg.start_time = rmf_traffic_ros2::convert(task->deployment_time()); + msg.end_time = rmf_traffic_ros2::convert(task->finish_state().finish_time()); this->_context->node()->task_summary()->publish(msg); } -} -//============================================================================== -auto TaskManager::expected_finish_location() const -> StartSet -{ - if (_expected_finish_location) - return {*_expected_finish_location}; - - return _context->location(); + if (!_active_task) + _begin_next_task(); } //============================================================================== @@ -151,119 +142,106 @@ const Task* TaskManager::current_task() const } //============================================================================== -agv::ConstRobotContextPtr TaskManager::context() const +const std::vector> TaskManager::task_queue() const { - return _context; + return _queue; } //============================================================================== void TaskManager::set_queue( - const std::vector& assignments) + const std::vector>& tasks) { - // We indent this block as _mutex is also locked in the _begin_next_task() - // function that is called at the end of this function. { std::lock_guard guard(_mutex); _queue.clear(); + } - // We use dynamic cast to determine the type of request and then call the - // appropriate make(~) function to convert the request into a task - for (std::size_t i = 0; i < assignments.size(); ++i) - { - const auto& a = assignments[i]; - auto start = _context->current_task_end_state().location(); - if (i != 0) - start = assignments[i-1].state().location(); - start.time(a.deployment_time()); - rmf_task_msgs::msg::TaskType task_type_msg; - const auto request = a.request(); - if (std::dynamic_pointer_cast< - const rmf_task::requests::CleanDescription>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CLEAN; - auto task = rmf_fleet_adapter::tasks::make_clean( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } - - else if (std::dynamic_pointer_cast< - const rmf_task::requests::ChargeBatteryDescription>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_CHARGE_BATTERY; - const auto task = tasks::make_charge_battery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } + for (const auto t : tasks) + queue_task(t); +} - else if (std::dynamic_pointer_cast< - const rmf_task::requests::DeliveryDescription>( - request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_DELIVERY; - const auto task = tasks::make_delivery( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } +//============================================================================== +void TaskManager::set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles) +{ + std::vector> task_queue; + std::shared_ptr task = nullptr; + for (std::size_t i = 0; i < assignments.size(); ++i) + { + const auto& a = assignments[i]; + auto start = _context->current_task_end_state().location(); + if (i != 0) + start = assignments[i-1].state().location(); + start.time(a.deployment_time()); - else if (std::dynamic_pointer_cast< - const rmf_task::requests::LoopDescription>(request->description()) != nullptr) - { - task_type_msg.type = task_type_msg.TYPE_LOOP; - const auto task = tasks::make_loop( - request, - _context, - start, - a.deployment_time(), - a.state()); - - _queue.push_back(task); - } + const auto req = a.request(); + const auto id = req->id(); + const auto req_desc = req->description(); - else - { - RCLCPP_WARN( - _context->node()->get_logger(), - "[TaskManager] Un-supported request type in assignment list. " - "Please update the implementation of TaskManager::set_queue() to " - "support request with task_id:[%s]", - a.request()->id().c_str()); + rmf_task_msgs::msg::TaskProfile profile; - continue; - } + if (task_profiles.find(id) != task_profiles.end()) + { + profile = task_profiles.at(id); + } + else + { + profile.task_id = id; + profile.submission_time = _context->node()->now(); + profile.description.start_time = + rmf_traffic_ros2::convert(a.deployment_time()); + } - // publish queued task - rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = _queue.back()->id(); - msg.task_profile.task_id = _queue.back()->id(); - msg.state = msg.STATE_QUEUED; - msg.robot_name = _context->name(); - msg.fleet_name = _context->description().owner(); - msg.task_profile.description.task_type = task_type_msg; - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->deployment_time()); - msg.start_time = rmf_traffic_ros2::convert( - _queue.back()->finish_state().finish_time()); - this->_context->node()->task_summary()->publish(msg); + // We use dynamic cast to determine the type of request and then call the + // appropriate make(~) function to convert the request into a task + using namespace rmf_task::requests; + + /// CHARGE BATTERY TASK (Auto-Generated) + if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_charge_battery( + req, _context, start, a.deployment_time(), a.state()); + profile.description.task_type.type = + rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY; } + /// CLEAN TASK (User Requested) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_clean( + req, _context, start, a.deployment_time(), a.state()); + } + /// DELIVERY TASK (User Requested) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_delivery( + req, _context, start, a.deployment_time(), a.state()); + } + /// LOOP TASK (User Requested) + else if (std::dynamic_pointer_cast(req_desc)) + { + task = tasks::make_loop( + req, _context, start, a.deployment_time(), a.state()); + } + else + { + RCLCPP_WARN( + _context->node()->get_logger(), + "[TaskManager] Un-supported request type in assignment list. " + "Please update the implementation of TaskManager::set_queue() to " + "support request with task_id:[%s]", id); + continue; + } + task->profile_msg(profile); + task_queue.push_back(task); } + this->set_queue(task_queue); +} - _begin_next_task(); +//============================================================================== +agv::ConstRobotContextPtr TaskManager::context() const +{ + return _context; } //============================================================================== @@ -326,8 +304,8 @@ void TaskManager::_begin_next_task() .subscribe( [this, id = _active_task->id()](Task::StatusMsg msg) { - msg.task_id = id; - msg.task_profile.task_id = id; + msg.task_profile = _active_task->profile_msg(); + msg.task_id = _active_task->id(); // duplicated msg.robot_name = _context->name(); msg.fleet_name = _context->description().owner(); msg.start_time = rmf_traffic_ros2::convert( @@ -348,8 +326,8 @@ void TaskManager::_begin_next_task() msg.status = e.what(); } - msg.task_id = id; - msg.task_profile.task_id = id; + msg.task_profile = _active_task->profile_msg(); + msg.task_id = _active_task->id(); // duplicated msg.robot_name = _context->name(); msg.fleet_name = _context->description().owner(); msg.start_time = rmf_traffic_ros2::convert( @@ -361,8 +339,8 @@ void TaskManager::_begin_next_task() [this, id = _active_task->id()]() { rmf_task_msgs::msg::TaskSummary msg; - msg.task_id = id; - msg.task_profile.task_id = id; + msg.task_profile = _active_task->profile_msg(); + msg.task_id = _active_task->id(); // duplicated msg.state = msg.STATE_COMPLETED; msg.robot_name = _context->name(); msg.fleet_name = _context->description().owner(); @@ -371,7 +349,6 @@ void TaskManager::_begin_next_task() msg.end_time = rmf_traffic_ros2::convert( _active_task->finish_state().finish_time()); this->_context->node()->task_summary()->publish(msg); - _active_task = nullptr; }); @@ -475,7 +452,21 @@ void TaskManager::retreat_to_charger() finish.value().finish_state(), current_state.finish_time()); - set_queue({charging_assignment}); + rmf_task_msgs::msg::TaskProfile task_profile; + task_profile.task_id = charging_request->id(); + task_profile.submission_time = _context->node()->now(); + task_profile.description.start_time = _context->node()->now(); + task_profile.description.task_type.type = + rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY; + + const auto task = rmf_fleet_adapter::tasks::make_charge_battery( + charging_request, + _context, + current_state.location(), + charging_assignment.deployment_time(), + charging_assignment.state()); + task->profile_msg(task_profile); + queue_task(task); RCLCPP_INFO( _context->node()->get_logger(), diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 965dc512a..a19541f22 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -45,13 +45,10 @@ class TaskManager : public std::enable_shared_from_this using StartSet = rmf_traffic::agv::Plan::StartSet; using Assignment = rmf_task::agv::TaskPlanner::Assignment; using State = rmf_task::agv::State; + using TaskProfile = rmf_task_msgs::msg::TaskProfile; /// Add a task to the queue of this manager. - void queue_task(std::shared_ptr task, Start expected_finish); - - /// The location where we expect this robot to be at the end of its current - /// task queue. - StartSet expected_finish_location() const; + void queue_task(std::shared_ptr task); const agv::RobotContextPtr& context(); @@ -60,13 +57,21 @@ class TaskManager : public std::enable_shared_from_this const Task* current_task() const; /// Set the queue for this task manager with assignments generated from the - /// task planner - void set_queue(const std::vector& assignments); + /// task planner, arg 'task_profiles' is merely used for status publishing. + void set_queue( + const std::vector& assignments, + const std::unordered_map& task_profiles = {}); + + /// set a vector of tasks + void set_queue(const std::vector>& tasks); + + // get tasks in the queue + const std::vector> task_queue() const; /// Get the non-charging requests among pending tasks const std::vector requests() const; - /// The state of the robot. + /// The finish state of the current task. State expected_finish_state() const; /// Callback for the retreat timer. Appends a charging task to the task queue @@ -84,7 +89,6 @@ class TaskManager : public std::enable_shared_from_this agv::RobotContextPtr _context; std::shared_ptr _active_task; std::vector> _queue; - rmf_utils::optional _expected_finish_location; rxcpp::subscription _task_sub; rxcpp::subscription _emergency_sub; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index a2aad0041..85d2ff83d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -106,9 +106,16 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (msg->task_profile.task_id.empty()) return; - if (bid_notice_assignments.find(msg->task_profile.task_id) - != bid_notice_assignments.end()) + if (received_task_profiles.find(msg->task_profile.task_id) + != received_task_profiles.end()) + { + RCLCPP_WARN( + node->get_logger(), + "Task [%s] has been submitted by Fleet [%s] previously", + msg->task_profile.task_id.c_str(), + name.c_str()); return; + } if (!accept_task) { @@ -151,7 +158,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const rmf_traffic::Time start_time = rmf_traffic_ros2::convert(task_profile.description.start_time); // TODO (YV) get rid of ID field in RequestPtr - std::string id = msg->task_profile.task_id; + const std::string id = msg->task_profile.task_id; const auto& graph = planner->get_configuration().graph(); // Generate the priority of the request. The current implementation supports @@ -162,41 +169,39 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( rmf_task::BinaryPriorityScheme::make_low_priority(); // Process Cleaning task - if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_CLEAN) + if (task_type.type == TaskType::TYPE_CLEAN) { - if (task_profile.description.clean.start_waypoint.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [clean.start_waypoint] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); + const auto& clean = task_profile.description.clean; + if (clean.start_waypoint.empty()) + { + RCLCPP_ERROR(node->get_logger(), + "Clean Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } // Check for valid start waypoint - const std::string start_wp_name = - task_profile.description.clean.start_waypoint; - const auto start_wp = graph.find_waypoint(start_wp_name); + const auto start_wp = graph.find_waypoint(clean.start_waypoint); if (!start_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), start_wp_name.c_str(), id.c_str()); + name.c_str(), clean.start_waypoint.c_str(), id.c_str()); return; } // Get dock parameters - const auto clean_param_it = dock_param_map.find(start_wp_name); + const auto clean_param_it = dock_param_map.find(clean.start_waypoint); if (clean_param_it == dock_param_map.end()) { RCLCPP_INFO( node->get_logger(), "Dock param for dock_name:[%s] unavailable. Rejecting BidNotice with " - "task_id:[%s]", start_wp_name.c_str(), id.c_str()); + "task_id:[%s]", clean.start_waypoint.c_str(), id.c_str()); return; } @@ -231,7 +236,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Unable to generate cleaning trajectory from positions specified " - " in DockSummary msg for [%s]", start_wp_name.c_str()); + " in DockSummary msg for [%s]", clean.start_waypoint.c_str()); return; } @@ -254,90 +259,48 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "Generated Clean request for task_id:[%s]", id.c_str()); } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_DELIVERY) + else if (task_type.type == TaskType::TYPE_DELIVERY) { - const auto& delivery = task_profile.description.delivery; - if (delivery.pickup_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.pickup_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } + const auto& del = task_profile.description.delivery; - if (delivery.pickup_dispenser.empty()) + if (del.pickup_place_name.empty() || del.pickup_dispenser.empty() || + del.dropoff_place_name.empty() || del.dropoff_ingestor.empty()) { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.pickup_dispenser] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - + RCLCPP_ERROR(node->get_logger(), + "Delivery Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } - if (delivery.dropoff_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (delivery.dropoff_place_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_place_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (delivery.dropoff_ingestor.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [delivery.dropoff_ingestor] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - const auto pickup_wp = graph.find_waypoint(delivery.pickup_place_name); + const auto pickup_wp = graph.find_waypoint(del.pickup_place_name); if (!pickup_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), delivery.pickup_place_name.c_str(), id.c_str()); - + name.c_str(), del.pickup_place_name.c_str(), id.c_str()); return; } - const auto dropoff_wp = graph.find_waypoint(delivery.dropoff_place_name); + const auto dropoff_wp = graph.find_waypoint(del.dropoff_place_name); if (!dropoff_wp) { RCLCPP_INFO( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), delivery.dropoff_place_name.c_str(), id.c_str()); - + name.c_str(), del.dropoff_place_name.c_str(), id.c_str()); return; } new_request = rmf_task::requests::Delivery::make( id, pickup_wp->index(), - delivery.pickup_dispenser, + del.pickup_dispenser, dropoff_wp->index(), - delivery.dropoff_ingestor, - delivery.items, + del.dropoff_ingestor, + del.items, motion_sink, ambient_sink, planner, @@ -348,38 +311,17 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( RCLCPP_INFO( node->get_logger(), "Generated Delivery request for task_id:[%s]", id.c_str()); - } - else if (task_type.type == rmf_task_msgs::msg::TaskType::TYPE_LOOP) + else if (task_type.type == TaskType::TYPE_LOOP) { const auto& loop = task_profile.description.loop; - if (loop.start_name.empty()) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.start_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (loop.finish_name.empty()) + + if (loop.start_name.empty() || loop.finish_name.empty() || + loop.num_loops == 0) { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.finish_name] missing in TaskProfile." - "Rejecting BidNotice with task_id:[%s]" , id.c_str()); - - return; - } - - if (loop.num_loops < 1) - { - RCLCPP_ERROR( - node->get_logger(), - "Required param [loop.num_loops: %d] in TaskProfile is invalid." - "Rejecting BidNotice with task_id:[%s]" , loop.num_loops, id.c_str()); - + RCLCPP_ERROR(node->get_logger(), + "Loop Msg is invalid/invalid." + "Rejecting BidNotice with task_id:[%s]", id.c_str()); return; } @@ -402,7 +344,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( node->get_logger(), "Fleet [%s] does not have a named waypoint [%s] configured in its " "nav graph. Rejecting BidNotice with task_id:[%s]", - name.c_str(), loop.finish_name.c_str(), id.c_str()); + name.c_str(), loop.start_name.c_str(), id.c_str()); return; } @@ -436,7 +378,6 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( if (!new_request) return; - generated_requests.insert({id, new_request}); const auto allocation_result = allocate_tasks(new_request); @@ -482,7 +423,7 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( std::size_t index = 0; for (const auto& t : task_managers) { - robot_name_map.insert({index, t.first->name()}); + robot_name_map.insert({index, t->context()->name()}); ++index; } @@ -510,8 +451,8 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( id.c_str(), bid_proposal.robot_name.c_str(), cost); // Store assignments in internal map - bid_notice_assignments.insert({id, assignments}); - + latest_bid_notice_assignments = {id, new_request, assignments}; + received_task_profiles.insert({id, task_profile}); } //============================================================================== @@ -528,8 +469,8 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( if (msg->method == DispatchRequest::ADD) { - const auto task_it = bid_notice_assignments.find(id); - if (task_it == bid_notice_assignments.end()) + auto [ bid_notice_id, req_ptr, assignments] = latest_bid_notice_assignments; + if (id != bid_notice_id) { RCLCPP_WARN( node->get_logger(), @@ -543,9 +484,7 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( node->get_logger(), "Bid for task_id:[%s] awarded to fleet [%s]. Processing request...", id.c_str(), name.c_str()); - - auto& assignments = task_it->second; - + if (assignments.size() != task_managers.size()) { RCLCPP_ERROR( @@ -556,28 +495,13 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - // Here we make sure none of the tasks in the assignments has already begun - // execution. If so, we replan assignments until a valid set is obtained - // and only then update the task manager queues - const auto request_it = generated_requests.find(id); - if (request_it == generated_requests.end()) - { - RCLCPP_ERROR( - node->get_logger(), - "Unable to find generated request for task_id:[%s]. This request will " - "be ignored.", - id.c_str()); - dispatch_ack_pub->publish(dispatch_ack); - return; - } - bool valid_assignments = is_valid_assignments(assignments); if (!valid_assignments) { // TODO: This replanning is blocking the main thread. Instead, the // replanning should run on a separate worker and then deliver the // result back to the main worker. - const auto replan_results = allocate_tasks(request_it->second); + const auto replan_results = allocate_tasks(req_ptr); if (!replan_results) { RCLCPP_WARN( @@ -594,15 +518,12 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( // rxcpp worker. Hence, no new tasks would have started during this replanning. } - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + // set assignments to taskmangaer of each robot + assert(assignments.size() == task_managers.size()); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], received_task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); - assigned_requests.insert({id, request_it->second}); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); @@ -632,26 +553,28 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } - auto request_to_cancel_it = assigned_requests.find(id); - if (request_to_cancel_it == assigned_requests.end()) + std::unordered_set executed_tasks; + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) + { + if (mgr->current_task()) + executed_tasks.insert(mgr->current_task()->id()); + for (const auto& tsk : mgr->task_queue()) + queued_tasks.insert(tsk->id()); + } + + if (queued_tasks.find(id) == queued_tasks.end()) { RCLCPP_WARN( node->get_logger(), - "Unable to cancel task with task_id:[%s] as it is not assigned to " - "fleet:[%s].", + "Unable to cancel task with task_id:[%s] as it is not queued to " + "fleet: [%s].", id.c_str(), name.c_str()); dispatch_ack_pub->publish(dispatch_ack); return; } - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) - { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); - } - // Check if received request is to cancel an active task if (executed_tasks.find(id) != executed_tasks.end()) { @@ -663,12 +586,11 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( dispatch_ack_pub->publish(dispatch_ack); return; - } + } // Re-plan assignments while ignoring request for task to be cancelled - const auto replan_results = allocate_tasks( - nullptr, request_to_cancel_it->second); - + const auto replan_results = allocate_tasks(nullptr, id); + if (!replan_results.has_value()) { RCLCPP_WARN( @@ -680,16 +602,13 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( return; } + // set assignments to taskmangaer of each robot const auto& assignments = replan_results.value(); - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } + assert(assignments.size() == task_managers.size()); + for (std::size_t i = 0; i < task_managers.size(); ++i) + task_managers[i]->set_queue(assignments[i], received_task_profiles); current_assignment_cost = task_planner->compute_cost(assignments); - dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); cancelled_task_ids.insert(id); @@ -717,22 +636,31 @@ void FleetUpdateHandle::Implementation::dispatch_request_cb( auto FleetUpdateHandle::Implementation::is_valid_assignments( Assignments& assignments) const -> bool { - std::unordered_set executed_tasks; - for (const auto& [context, mgr] : task_managers) + // This checks if the current assignments are all queued in task managers + std::unordered_set queued_tasks; + for (const auto& mgr : task_managers) { - const auto& tasks = mgr->get_executed_tasks(); - executed_tasks.insert(tasks.begin(), tasks.end()); + for (const auto task : mgr->task_queue()) + queued_tasks.insert(task->id()); } + size_t assignment_size = 0; for (const auto& agent : assignments) { for (const auto& a : agent) { - if (executed_tasks.find(a.request()->id()) != executed_tasks.end()) + // If ID exists doesnt exist in queue_tasks + if (queued_tasks.find(a.request()->id()) == queued_tasks.end()) return false; + + assignment_size++; } } + // Check if total queued tasks is the same as the assignments + if (queued_tasks.size() != assignment_size) + return false; + return true; } @@ -821,7 +749,6 @@ rmf_fleet_msgs::msg::RobotState convert_state(const TaskManager& mgr) .index(0); }(); - return rmf_fleet_msgs::build() .name(context.name()) .model(context.description().owner()) @@ -846,7 +773,7 @@ rmf_fleet_msgs::msg::RobotState convert_state(const TaskManager& mgr) void FleetUpdateHandle::Implementation::publish_fleet_state() const { std::vector robot_states; - for (const auto& [context, mgr] : task_managers) + for (const auto& mgr : task_managers) robot_states.emplace_back(convert_state(*mgr)); auto fleet_state = rmf_fleet_msgs::build() @@ -859,7 +786,7 @@ void FleetUpdateHandle::Implementation::publish_fleet_state() const //============================================================================== auto FleetUpdateHandle::Implementation::allocate_tasks( rmf_task::ConstRequestPtr new_request, - rmf_task::ConstRequestPtr ignore_request) const -> std::optional + std::string ignore_request_id) const -> std::optional { // Collate robot states, constraints and combine new requestptr with // requestptr of non-charging tasks in task manager queues @@ -876,42 +803,34 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( for (const auto& t : task_managers) { - states.push_back(t.second->expected_finish_state()); - constraints_set.push_back(t.first->task_planning_constraints()); - const auto requests = t.second->requests(); - pending_requests.insert( - pending_requests.end(), requests.begin(), requests.end()); - } - - // Remove the request to be ignored if present - if (ignore_request) - { - auto ignore_request_it = pending_requests.end(); - for (auto it = pending_requests.begin(); it != pending_requests.end(); ++it) - { - auto pending_request = *it; - if (pending_request->id() == ignore_request->id()) - ignore_request_it = it; - } - if (ignore_request_it != pending_requests.end()) - { - pending_requests.erase(ignore_request_it); - RCLCPP_INFO( - node->get_logger(), - "Request with task_id:[%s] will be ignored during task allocation.", - ignore_request->id().c_str()); - } - else + states.push_back(t->expected_finish_state()); + constraints_set.push_back(t->context()->task_planning_constraints()); + + for (const auto task : t->task_queue()) { - RCLCPP_WARN( - node->get_logger(), - "Request with task_id:[%s] is not present in any of the task queues.", - ignore_request->id().c_str()); + const auto type = task->profile_msg().description.task_type.type; + if ( type == TaskType::TYPE_CHARGE_BATTERY) + { + // ignore auto allocated charging task + continue; + } + else if (task->id() == ignore_request_id ) + { + RCLCPP_INFO( + node->get_logger(), + "Request with task_id:[%s] will be ignored during task allocation.", + ignore_request_id.c_str()); + continue; + } + else + { + pending_requests.push_back(task->request()); + } } } RCLCPP_INFO( - node->get_logger(), + node->get_logger(), "Planning for [%d] robot(s) and [%d] request(s)", states.size(), pending_requests.size()); @@ -939,7 +858,6 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( " insufficient initial battery charge for all robots in this fleet.", id.c_str()); } - else if (*error == rmf_task::agv::TaskPlanner::TaskPlannerError::limited_capacity) { @@ -949,7 +867,6 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( " insufficient battery capacity to accommodate one or more requests by" " any of the robots in this fleet.", id.c_str()); } - else { RCLCPP_ERROR( @@ -957,7 +874,6 @@ auto FleetUpdateHandle::Implementation::allocate_tasks( "[TaskPlanner] Failed to compute assignments for task_id:[%s]", id.c_str()); } - return std::nullopt; } @@ -1045,7 +961,7 @@ void FleetUpdateHandle::add_robot( "Added a robot named [%s] with participant ID [%d]", context->name().c_str(), context->itinerary().id()); - fleet->_pimpl->task_managers.insert({context, TaskManager::make(context)}); + fleet->_pimpl->task_managers.push_back(TaskManager::make(context)); if (handle_cb) { handle_cb(RobotUpdateHandle::Implementation::make(std::move(context))); @@ -1075,7 +991,7 @@ FleetUpdateHandle& FleetUpdateHandle::accept_task_requests( FleetUpdateHandle& FleetUpdateHandle::accept_delivery_requests( AcceptDeliveryRequest check) { - _pimpl->accept_delivery = std::move(check); + RCLCPP_WARN(_pimpl->node->get_logger(), "Use accept_task_requests() instead"); return *this; } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 1ecb6c9ba..68192e668 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -34,6 +34,7 @@ #include #include +#include #include "Node.hpp" #include "RobotContext.hpp" @@ -151,16 +152,12 @@ class FleetUpdateHandle::Implementation rmf_utils::optional default_maximum_delay = std::chrono::nanoseconds(std::chrono::seconds(10)); - AcceptDeliveryRequest accept_delivery = nullptr; - std::unordered_map> task_managers = {}; - + std::vector> task_managers; + rclcpp::Publisher::SharedPtr fleet_state_pub = nullptr; rclcpp::TimerBase::SharedPtr fleet_state_timer = nullptr; - // Map task id to pair of using Assignments = rmf_task::agv::TaskPlanner::Assignments; - std::unordered_map> task_map = {}; // Map of dock name to dock parameters std::unordered_map available_charging_waypoints; double current_assignment_cost = 0.0; - // Map to store task id with assignments for BidNotice - std::unordered_map bid_notice_assignments = {}; + + // Map to store task id with the profile msg of the received tasks + using TaskProfile = rmf_task_msgs::msg::TaskProfile; + std::unordered_map received_task_profiles = {}; + + // Tuple which stored the latest received BidNotice + std::tuple + latest_bid_notice_assignments; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> generated_requests = {}; - std::unordered_map< - std::string, rmf_task::ConstRequestPtr> assigned_requests = {}; std::unordered_set cancelled_task_ids = {}; AcceptTaskRequest accept_task = nullptr; @@ -206,6 +205,8 @@ class FleetUpdateHandle::Implementation using DockSummarySub = rclcpp::Subscription::SharedPtr; DockSummarySub dock_summary_sub = nullptr; + using TaskType = rmf_task_msgs::msg::TaskType; + template static std::shared_ptr make(Args&&... args) { @@ -227,17 +228,17 @@ class FleetUpdateHandle::Implementation // Publish BidProposal handle._pimpl->bid_proposal_pub = handle._pimpl->node->create_publisher( - BidProposalTopicName, default_qos); + rmf_task_ros2::BidProposalTopicName, default_qos); // Publish DispatchAck handle._pimpl->dispatch_ack_pub = handle._pimpl->node->create_publisher( - DispatchAckTopicName, default_qos); + rmf_task_ros2::DispatchAckTopicName, default_qos); // Subscribe BidNotice handle._pimpl->bid_notice_sub = handle._pimpl->node->create_subscription( - BidNoticeTopicName, + rmf_task_ros2::BidNoticeTopicName, default_qos, [p = handle._pimpl.get()](const BidNotice::SharedPtr msg) { @@ -247,7 +248,7 @@ class FleetUpdateHandle::Implementation // Subscribe DispatchRequest handle._pimpl->dispatch_request_sub = handle._pimpl->node->create_subscription( - DispatchRequestTopicName, + rmf_task_ros2::DispatchRequestTopicName, default_qos, [p = handle._pimpl.get()](const DispatchRequest::SharedPtr msg) { @@ -290,7 +291,7 @@ class FleetUpdateHandle::Implementation /// new request and while optionally ignoring a specific request. std::optional allocate_tasks( rmf_task::ConstRequestPtr new_request = nullptr, - rmf_task::ConstRequestPtr ignore_request = nullptr) const; + std::string ignore_request_id = "") const; /// Helper function to check if assignments are valid. An assignment set is /// invalid if one of the assignments has already begun execution. diff --git a/rmf_task/include/rmf_task/Evaluator.hpp b/rmf_task/include/rmf_task/Evaluator.hpp new file mode 100644 index 000000000..e7bcbf34e --- /dev/null +++ b/rmf_task/include/rmf_task/Evaluator.hpp @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#ifndef RMF_TASK__EVALUATOR_HPP +#define RMF_TASK__EVALUATOR_HPP + +#include +#include +#include +#include + +namespace rmf_task { + +//============================================================================== +/// A pure abstract interface class for the auctioneer to choose the best +/// submission. +class Evaluator +{ +public: + + struct Submission + { + std::string fleet_name; + std::string robot_name; + double prev_cost = 0.0; + double new_cost = std::numeric_limits::max(); + rmf_traffic::Time finish_time; + }; + + using Submissions = std::vector; + + /// Given a list of submissions, choose the one that is the "best". It is + /// up to the implementation of the Evaluator to decide how to rank. + /// + /// \return + /// index of the best submission + virtual std::optional choose( + const Submissions& submissions) const = 0; + + virtual ~Evaluator() = default; +}; + +//============================================================================== +class LeastFleetDiffCostEvaluator : public Evaluator +{ +public: + std::optional choose( + const Submissions& submissions) const final; +}; + +//============================================================================== +class LeastFleetCostEvaluator : public Evaluator +{ +public: + std::optional choose( + const Submissions& submissions) const final; +}; + +//============================================================================== +class QuickestFinishEvaluator : public Evaluator +{ +public: + std::optional choose( + const Submissions& submissions) const final; +}; + + +} // namespace rmf_task + +#endif // RMF_TASK__EVALUATOR_HPP diff --git a/rmf_task/src/rmf_task/Evaluator.cpp b/rmf_task/src/rmf_task/Evaluator.cpp new file mode 100644 index 000000000..c9537cc62 --- /dev/null +++ b/rmf_task/src/rmf_task/Evaluator.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include + +namespace rmf_task { + +//============================================================================== +std::optional LeastFleetDiffCostEvaluator::choose( + const Submissions& submissions) const +{ + if (submissions.empty()) + return std::nullopt; + + auto winner_it = submissions.begin(); + float winner_cost_diff = winner_it->new_cost - winner_it->prev_cost; + for (auto nominee_it = ++submissions.begin(); + nominee_it != submissions.end(); ++nominee_it) + { + float nominee_cost_diff = nominee_it->new_cost - nominee_it->prev_cost; + if (nominee_cost_diff < winner_cost_diff) + { + winner_it = nominee_it; + winner_cost_diff = nominee_cost_diff; + } + } + return std::distance(submissions.begin(), winner_it); +} + +//============================================================================== +std::optional LeastFleetCostEvaluator::choose( + const Submissions& submissions) const +{ + if (submissions.empty()) + return std::nullopt; + + auto winner_it = submissions.begin(); + for (auto nominee_it = ++submissions.begin(); + nominee_it != submissions.end(); ++nominee_it) + { + if (nominee_it->new_cost < winner_it->new_cost) + winner_it = nominee_it; + } + return std::distance(submissions.begin(), winner_it); +} + +//============================================================================== +std::optional QuickestFinishEvaluator::choose( + const Submissions& submissions) const +{ + if (submissions.empty()) + return std::nullopt; + + auto winner_it = submissions.begin(); + for (auto nominee_it = ++submissions.begin(); + nominee_it != submissions.end(); ++nominee_it) + { + if (nominee_it->finish_time < winner_it->finish_time) + winner_it = nominee_it; + } + return std::distance(submissions.begin(), winner_it); +} + +} // namespace rmf_task diff --git a/rmf_task/test/unit/test_Evaluator.cpp b/rmf_task/test/unit/test_Evaluator.cpp new file mode 100644 index 000000000..560f72c51 --- /dev/null +++ b/rmf_task/test/unit/test_Evaluator.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include + +namespace rmf_task { + +//============================================================================== +auto now = std::chrono::steady_clock::now(); + +Evaluator::Submission submission1{ + "fleet1", "", 2.3, 3.4, rmf_traffic::time::apply_offset(now, 5) +}; +Evaluator::Submission submission2{ + "fleet2", "", 3.5, 3.6, rmf_traffic::time::apply_offset(now, 5.5) +}; +Evaluator::Submission submission3{ + "fleet3", "", 0.0, 1.4, rmf_traffic::time::apply_offset(now, 3) +}; +Evaluator::Submission submission4{ + "fleet4", "", 5.0, 5.4, rmf_traffic::time::apply_offset(now, 4) +}; +Evaluator::Submission submission5{ + "fleet5", "", 0.5, 0.8, rmf_traffic::time::apply_offset(now, 3.5) +}; + +//============================================================================== +SCENARIO("Winner from Evaluator", "[Evaluator]") +{ + WHEN("Least Diff Cost Evaluator") + { + auto evaluator = std::make_shared(); + + AND_WHEN("0 submissions") + { + std::vector submissions{}; + auto choice = evaluator->choose(submissions); + REQUIRE(choice == std::nullopt); // no winner + } + AND_WHEN("5 submissions") + { + std::vector submissions{ + submission1, submission2, submission3, submission4, submission5 }; + auto choice = evaluator->choose(submissions); + REQUIRE(choice != std::nullopt); + REQUIRE(submissions[*choice].fleet_name == "fleet2"); // least diff cost agent + } + } + + WHEN("Least Fleet Cost Evaluator") + { + auto evaluator = std::make_shared(); + + AND_WHEN("0 submissions") + { + std::vector submissions{}; + auto choice = evaluator->choose(submissions); + REQUIRE(choice == std::nullopt); // no winner + } + AND_WHEN("5 submissions") + { + std::vector submissions{ + submission1, submission2, submission3, submission4, submission5 }; + auto choice = evaluator->choose(submissions); + REQUIRE(choice != std::nullopt); + REQUIRE(submissions[*choice].fleet_name == "fleet5"); // least cost agent + } + } + + WHEN("Quickest Finish Time Evaluator") + { + auto evaluator = std::make_shared(); + + AND_WHEN("0 submissions") + { + std::vector submissions{}; + auto choice = evaluator->choose(submissions); + REQUIRE(choice == std::nullopt); // no winner + } + AND_WHEN("5 submissions") + { + std::vector submissions{ + submission1, submission2, submission3, submission4, submission5 }; + auto choice = evaluator->choose(submissions); + REQUIRE(choice != std::nullopt); + REQUIRE(submissions[*choice].fleet_name == "fleet3"); // quickest agent + } + } +} + +} // namespace rmf_task diff --git a/rmf_task_msgs/msg/TaskSummary.msg b/rmf_task_msgs/msg/TaskSummary.msg index bb75e7b38..09b6256ba 100644 --- a/rmf_task_msgs/msg/TaskSummary.msg +++ b/rmf_task_msgs/msg/TaskSummary.msg @@ -4,6 +4,7 @@ string fleet_name # *optional and duplicated in TaskProfile +# TODO: This is not needed, to removed string task_id TaskProfile task_profile diff --git a/rmf_task_msgs/srv/SubmitTask.srv b/rmf_task_msgs/srv/SubmitTask.srv index 12572d3b4..2031249c1 100644 --- a/rmf_task_msgs/srv/SubmitTask.srv +++ b/rmf_task_msgs/srv/SubmitTask.srv @@ -7,6 +7,7 @@ string requester TaskDescription description # fleet selection evaluator +# TODO: This is not needed, to removed uint8 evaluator uint8 LOWEST_DIFF_COST_EVAL=0 uint8 LOWEST_COST_EVAL=1 diff --git a/rmf_task_ros2/CMakeLists.txt b/rmf_task_ros2/CMakeLists.txt index 208358372..d8ba05873 100644 --- a/rmf_task_ros2/CMakeLists.txt +++ b/rmf_task_ros2/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.5) project(rmf_task_ros2) if(NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 14) + set(CMAKE_CXX_STANDARD 17) endif() if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") # we dont use add_compile_options with pedantic in message packages @@ -17,6 +17,7 @@ find_package(ament_cmake REQUIRED) find_package(rmf_traffic REQUIRED) find_package(rmf_traffic_ros2 REQUIRED) find_package(rmf_task_msgs REQUIRED) +find_package(rmf_task REQUIRED) find_package(Eigen3 REQUIRED) find_package(rclcpp REQUIRED) @@ -27,6 +28,7 @@ target_link_libraries(rmf_task_ros2 PUBLIC rmf_traffic::rmf_traffic rmf_traffic_ros2::rmf_traffic_ros2 + rmf_task::rmf_task ${rmf_task_msgs_LIBRARIES} ${rclcpp_LIBRARIES} ) @@ -37,11 +39,12 @@ target_include_directories(rmf_task_ros2 $ ${rmf_traffic_ros2_INCLUDE_DIRS} ${rmf_task_msgs_INCLUDE_DIRS} + ${rmf_task_INCLUDE_DIRS} ${rclcpp_INCLUDE_DIRS} ) ament_export_targets(rmf_task_ros2 HAS_LIBRARY_TARGET) -ament_export_dependencies(rmf_traffic rmf_task_msgs rclcpp) +ament_export_dependencies(rmf_traffic_ros2 rmf_task_msgs rmf_task rclcpp) #=============================================================================== @@ -56,6 +59,7 @@ if(BUILD_TESTING AND ament_cmake_catch2_FOUND) rmf_task_ros2 rmf_traffic::rmf_traffic rmf_traffic_ros2::rmf_traffic_ros2 + rmf_task::rmf_task ) target_include_directories(test_rmf_task_ros2 diff --git a/rmf_task_ros2/include/rmf_task_ros2/Description.hpp b/rmf_task_ros2/include/rmf_task_ros2/Description.hpp new file mode 100644 index 000000000..1521a88c0 --- /dev/null +++ b/rmf_task_ros2/include/rmf_task_ros2/Description.hpp @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#ifndef RMF_TASK_ROS2__DESCRIPTION_HPP +#define RMF_TASK_ROS2__DESCRIPTION_HPP + +#include +#include + +#include +#include + +#include + +namespace rmf_task_ros2 { + +//============================================================================== +// A description for user requested task +class Description +{ +public: + /// Create a Description. This is only used when the task type is not + /// available as the derived class. + static std::shared_ptr make_description( + rmf_traffic::Time start_time, + uint32_t type, + uint64_t priority = 0); + + /// Get the start_time of the task + rmf_traffic::Time start_time() const; + + /// Get the type of the task + uint32_t type() const; + + /// Get the priority of the task + uint64_t priority() const; + + virtual ~Description() = default; + + class Implementation; +protected: + Description(); + rmf_utils::unique_impl_ptr _pimpl_base; +}; + +using ConstDescriptionPtr = std::shared_ptr; + +//============================================================================== +namespace description { +class Delivery : public Description +{ +public: + using DispenserRequestItem = rmf_dispenser_msgs::msg::DispenserRequestItem; + + /// Make a Delivery Task Description + static std::shared_ptr make( + rmf_traffic::Time start_time, + std::string pickup_place_name, + std::string pickup_dispenser, + std::string dropoff_place_name, + std::string dropoff_ingestor, + std::vector items = {}, + uint64_t priority = 0); + + /// Get the pickup_place_name + const std::string& pickup_place_name() const; + + /// Get the dropoff_place_name + const std::string& dropoff_place_name() const; + + /// Get the pickup_dispenser + const std::string& pickup_dispenser() const; + + /// Get the dropoff_ingestor + const std::string& dropoff_ingestor() const; + + class Implementation; +private: + Delivery(); + rmf_utils::unique_impl_ptr _pimpl; +}; + +//============================================================================== +class Loop : public Description +{ +public: + /// Make a Loop Task Description + static std::shared_ptr make( + rmf_traffic::Time start_time, + std::string start_name, + std::string finish_name, + std::size_t num_loops, + uint64_t priority = 0); + + /// Get the start_name + const std::string& start_name() const; + + /// Get the finish_name + const std::string& finish_name() const; + + /// Get the num_loops + std::size_t num_loops() const; + + class Implementation; +private: + Loop(); + rmf_utils::unique_impl_ptr _pimpl; +}; + +//============================================================================== +class Clean : public Description +{ +public: + /// Make a Clean Task Description + static std::shared_ptr make( + rmf_traffic::Time start_time, + std::string start_waypoint, + uint64_t priority = 0); + + /// Get the start_waypoint of a clean + const std::string& start_waypoint() const; + + class Implementation; +private: + Clean(); + rmf_utils::unique_impl_ptr _pimpl; +}; + +} // namespace description +} // namespace rmf_task_ros2 + +#endif // RMF_TASK_ROS2__DESCRIPTION_HPP diff --git a/rmf_task_ros2/include/rmf_task_ros2/Dispatcher.hpp b/rmf_task_ros2/include/rmf_task_ros2/Dispatcher.hpp index f01ebf656..07f6f4bc6 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/Dispatcher.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/Dispatcher.hpp @@ -21,11 +21,10 @@ #include #include #include -#include -#include +#include #include - +#include namespace rmf_task_ros2 { @@ -35,51 +34,58 @@ namespace rmf_task_ros2 { class Dispatcher : public std::enable_shared_from_this { public: + using TaskID = std::string; using DispatchTasks = std::unordered_map; - using TaskDescription = rmf_task_msgs::msg::TaskDescription; /// Initialize an rclcpp context and make an dispatcher instance. This will /// instantiate an rclcpp::Node, a task dispatcher node. Dispatcher node will /// allow you to dispatch submitted task to the best fleet/robot within RMF. /// - /// \param[in] dispatcher_node_name - /// The ROS 2 node to manage the Dispatching of Task + /// \note Using default params: + /// LeastFleetCostEvaluator, bidding window: 2s and keep last 50 tasks /// /// \sa init_and_make_node() - static std::shared_ptr init_and_make_node( - const std::string dispatcher_node_name); + static std::shared_ptr init_and_make_node(); /// Similarly this will init the dispatcher, but you will also need to init /// rclcpp via rclcpp::init(~). /// - /// \param[in] dispatcher_node_name - /// The ROS 2 node to manage the Dispatching of Task - /// /// \sa make_node() - static std::shared_ptr make_node( - const std::string dispatcher_node_name); + static std::shared_ptr make_node(); - /// Create a dispatcher by providing the ros2 node + /// Create a dispatcher by providing the ros2 node and other params /// /// \param[in] node /// ROS 2 node instance /// + /// \param[in] evaluator + /// Bidding Evaluator, default: LeastFleetDiffCostEvaluator + /// + /// \param[in] bidding_time_window + /// Bidding time window, default: 2.0 + /// + /// \param[in] terminated_tasks_depth + /// Keep history depth size of terminated tasks, default: 50 + /// /// \sa make() static std::shared_ptr make( - const std::shared_ptr& node); - - /// Submit task to dispatcher node. Calling this function will immediately - /// trigger the bidding process, then the task "action". Once submmitted, - /// Task State will be in 'Pending' State, till the task is awarded to a fleet - /// then the state will turn to 'Queued' + const std::shared_ptr& node, + const std::shared_ptr evaluator = + std::make_shared(), + const double bidding_time_window = 2.0, + const int terminated_tasks_depth = 50); + + /// Submit task description to RMF dispatcher node. Calling this function will + /// immediately trigger the bidding process, and dispatch the task to the + /// selected downstream fleet adapter. /// /// \param [in] task_description - /// Submit a task to dispatch + /// Submit a Description of task to dispatch /// /// \return task_id /// self-generated task_id, nullopt is submit task failed std::optional submit_task( - const TaskDescription& task_description); + const ConstDescriptionPtr task_description); /// Cancel an active task which was previously submitted to Dispatcher. This /// will terminate the task with a State of: `Canceled`. If a task is @@ -99,7 +105,7 @@ class Dispatcher : public std::enable_shared_from_this /// task_id obtained from `submit_task()` /// /// \return State of the task, nullopt if task is not available - const rmf_utils::optional get_task_state( + const std::optional get_task_state( const TaskID& task_id) const; /// Get a mutable ref of active tasks map list handled by dispatcher @@ -121,7 +127,7 @@ class Dispatcher : public std::enable_shared_from_this /// /// \param [in] evaluator /// evaluator used to select the best bid from fleets - void evaluator(std::shared_ptr evaluator); + void evaluator(std::shared_ptr evaluator); /// Get the rclcpp::Node that this dispatcher will be using for communication. std::shared_ptr node(); diff --git a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp index 2bdc71588..c3c135726 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp @@ -30,8 +30,8 @@ const std::string SubmitTaskSrvName = "submit_task"; const std::string CancelTaskSrvName = "cancel_task"; const std::string GetTaskListSrvName = "get_tasks"; -const std::string TaskRequestTopicName = Prefix + "dispatch_request"; -const std::string TaskAckTopicName = Prefix + "dispatch_ack"; +const std::string DispatchRequestTopicName = Prefix + "dispatch_request"; +const std::string DispatchAckTopicName = Prefix + "dispatch_ack"; const std::string TaskStatusTopicName = "task_summaries"; } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/include/rmf_task_ros2/TaskStatus.hpp b/rmf_task_ros2/include/rmf_task_ros2/TaskStatus.hpp index a8f96fcd6..20b8f9038 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/TaskStatus.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/TaskStatus.hpp @@ -20,49 +20,70 @@ #include #include -#include -#include +#include +#include namespace rmf_task_ros2 { //============================================================================== -using TaskProfile = rmf_task_msgs::msg::TaskProfile; -using StatusMsg = rmf_task_msgs::msg::TaskSummary; -using TaskID = std::string; - -//============================================================================== +/// This represents the Task Status of each unique task /// \note TaskStatus struct is based on TaskSummary.msg -struct TaskStatus +class TaskStatus { - enum class State : uint8_t - { - Queued = StatusMsg::STATE_QUEUED, - Executing = StatusMsg::STATE_ACTIVE, - Completed = StatusMsg::STATE_COMPLETED, - Failed = StatusMsg::STATE_FAILED, - Canceled = StatusMsg::STATE_CANCELED, - Pending = StatusMsg::STATE_PENDING - }; +public: + static std::shared_ptr make( + std::string task_id, + rmf_traffic::Time submission_time, + ConstDescriptionPtr task_description); + + /// Get Task ID + std::string task_id() const; + /// Get submission time + rmf_traffic::Time submission_time() const; + + /// Get Task Description + ConstDescriptionPtr description() const; + + /// The fleet which will execute the task std::string fleet_name; - TaskProfile task_profile; + + /// The robot which will execute the task + std::string robot_name; + + /// The estimated time which the task will start executing rmf_traffic::Time start_time; + + /// The estimated time which the task will finish rmf_traffic::Time end_time; - std::string robot_name; - std::string status; // verbose msg - State state = State::Pending; // default + /// Verbose status of this task + std::string status; + + enum class State : uint8_t + { + Queued = 0, + Executing = 1, + Completed = 2, + Failed = 3, + Canceled = 4, + Pending = 5 + }; + + /// Current State of the task + State state; + + /// Check if the current task is terminated bool is_terminated() const; + + class Implementation; +private: + TaskStatus(); + rmf_utils::unique_impl_ptr _pimpl; }; using TaskStatusPtr = std::shared_ptr; -// ============================================================================== -TaskStatus convert_status(const StatusMsg& from); - -// ============================================================================== -StatusMsg convert_status(const TaskStatus& from); - } // namespace rmf_task_ros2 #endif // RMF_TASK_ROS2__TASK_STATUS_HPP diff --git a/rmf_task_ros2/include/rmf_task_ros2/bidding/Submission.hpp b/rmf_task_ros2/include/rmf_task_ros2/bidding/Submission.hpp deleted file mode 100644 index b9b8cc813..000000000 --- a/rmf_task_ros2/include/rmf_task_ros2/bidding/Submission.hpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2020 Open Source Robotics Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -#ifndef RMF_TASK_ROS2__BIDDING__SUBMISSION_HPP -#define RMF_TASK_ROS2__BIDDING__SUBMISSION_HPP - -#include -#include - -namespace rmf_task_ros2 { -namespace bidding { - -//============================================================================== -struct Submission -{ - std::string fleet_name; - std::string robot_name; - double prev_cost = 0.0; - double new_cost = std::numeric_limits::max(); - rmf_traffic::Time finish_time; -}; - -//============================================================================== -using Submissions = std::vector; -using BidNotice = rmf_task_msgs::msg::BidNotice; - -} // namespace bidding -} // namespace rmf_task_ros2 - -#endif // RMF_TASK_ROS2__BIDDING__SUBMISSION_HPP diff --git a/rmf_task_ros2/package.xml b/rmf_task_ros2/package.xml index 268800c53..1df429007 100644 --- a/rmf_task_ros2/package.xml +++ b/rmf_task_ros2/package.xml @@ -13,6 +13,7 @@ rmf_traffic rmf_traffic_ros2 rmf_task_msgs + rmf_task rclcpp eigen diff --git a/rmf_task_ros2/src/dispatcher_node/main.cpp b/rmf_task_ros2/src/dispatcher_node/main.cpp index a96d066fd..5d1b31508 100644 --- a/rmf_task_ros2/src/dispatcher_node/main.cpp +++ b/rmf_task_ros2/src/dispatcher_node/main.cpp @@ -22,11 +22,44 @@ int main(int argc, char* argv[]) { rclcpp::init(argc, argv); std::cout << "~Initializing Dispatcher Node~" << std::endl; + auto node = rclcpp::Node::make_shared("rmf_dispatcher_node"); - auto dispatcher = rmf_task_ros2::Dispatcher::make_node( - "rmf_dispatcher_node"); + // ros2 param + const double bidding_time_window = + node->declare_parameter("bidding_time_window", 2.0); + RCLCPP_INFO(node->get_logger(), + "Declared Time Window Param as: [%f] secs", bidding_time_window); + const int terminated_tasks_depth = + node->declare_parameter("terminated_tasks_depth", 50); + RCLCPP_INFO(node->get_logger(), + "Declared Terminated Tasks Depth Param as: [%d]", + terminated_tasks_depth); + const int evaluator_type_enum = + node->declare_parameter("evaluator_type_enum", 0); + RCLCPP_INFO(node->get_logger(), "Type of Evaluator: [%d]", + evaluator_type_enum); + + std::shared_ptr eval_type; + switch (evaluator_type_enum) + { + case 0: + eval_type = std::make_shared(); + break; + case 1: + eval_type = std::make_shared(); + break; + case 2: + eval_type = std::make_shared(); + break; + default: + RCLCPP_WARN(node->get_logger(), "Invalid Evaluator, use default: 0"); + eval_type = std::make_shared(); + break; + } + + auto dispatcher = rmf_task_ros2::Dispatcher::make( + node, eval_type, bidding_time_window, terminated_tasks_depth); - const auto& node = dispatcher->node(); RCLCPP_INFO(node->get_logger(), "Starting task dispatcher node"); dispatcher->spin(); RCLCPP_INFO(node->get_logger(), "Closing down task dispatcher"); diff --git a/rmf_task_ros2/src/mock_bidder/main.cpp b/rmf_task_ros2/src/mock_bidder/main.cpp index 7340c6b30..8c639ec43 100644 --- a/rmf_task_ros2/src/mock_bidder/main.cpp +++ b/rmf_task_ros2/src/mock_bidder/main.cpp @@ -17,14 +17,17 @@ /// Note: This is a testing bidder node script -#include +#include "../rmf_task_ros2/bidding/MinimalBidder.hpp" #include "../rmf_task_ros2/action/Server.hpp" #include #include using namespace rmf_task_ros2; + using TaskType = bidding::MinimalBidder::TaskType; +using BidNotice = rmf_task_msgs::msg::BidNotice; +using TaskProfile = rmf_task_msgs::msg::TaskProfile; int main(int argc, char* argv[]) { @@ -43,14 +46,14 @@ int main(int argc, char* argv[]) node, fleet_name, { TaskType::Clean, TaskType::Delivery }, - [](const bidding::BidNotice& notice) + [](const BidNotice& notice) { // Here user will provice the best robot as a bid submission std::cout << "[MockBidder] Providing best estimates" << std::endl; auto req_start_time = rmf_traffic_ros2::convert(notice.task_profile.description.start_time); - bidding::Submission best_robot_estimate; + bidding::MinimalBidder::Submission best_robot_estimate; best_robot_estimate.robot_name = "dumbot"; best_robot_estimate.prev_cost = 10.2; best_robot_estimate.new_cost = 13.5; @@ -71,30 +74,28 @@ int main(int argc, char* argv[]) << task_profile.task_id<now()); - status.end_time = - rmf_traffic::time::apply_offset(status.start_time, 7); - const auto id = profile.task_id; + const auto now = rmf_traffic_ros2::convert(node->now()); + const auto status = TaskStatus::make(id, now, nullptr); //todo + status->robot_name = "dumbot"; + status->start_time = now; + status->end_time = rmf_traffic::time::apply_offset(now, 7); + std::cout << " [MockBidder] Queued, TaskID: " << id << std::endl; - action_server->update_status(status); + action_server->update_status(*status); std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << " [MockBidder] Executing, TaskID: " << id << std::endl; - status.state = TaskStatus::State::Executing; - action_server->update_status(status); + status->state = TaskStatus::State::Executing; + action_server->update_status(*status); std::this_thread::sleep_for(std::chrono::seconds(5)); std::cout << " [MockBidder] Completed, TaskID: " << id << std::endl; - status.state = TaskStatus::State::Completed; - action_server->update_status(status); + status->state = TaskStatus::State::Completed; + action_server->update_status(*status); }, task_profile ); t.detach(); diff --git a/rmf_task_ros2/src/rmf_task_ros2/Description.cpp b/rmf_task_ros2/src/rmf_task_ros2/Description.cpp new file mode 100644 index 000000000..5d118efcf --- /dev/null +++ b/rmf_task_ros2/src/rmf_task_ros2/Description.cpp @@ -0,0 +1,335 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include "internal_Description.hpp" +#include + +namespace rmf_task_ros2 { + +//============================================================================== +class Description::Implementation +{ +public: + Implementation() + {} + + rmf_traffic::Time start_time; + rmf_task_msgs::msg::Priority priority; + rmf_task_msgs::msg::TaskType task_type; +}; + +//============================================================================== +rmf_traffic::Time Description::start_time() const +{ + return _pimpl_base->start_time; +} + +//============================================================================== +uint32_t Description::type() const +{ + return _pimpl_base->task_type.type; +} + + +//============================================================================== +uint64_t Description::priority() const +{ + return _pimpl_base->priority.value; +} + +//============================================================================== +std::shared_ptr Description::make_description( + rmf_traffic::Time start_time, + uint32_t type, + uint64_t priority) +{ + std::shared_ptr desc(new Description()); + desc->_pimpl_base->task_type.type = type; + desc->_pimpl_base->priority.value = priority; + desc->_pimpl_base->start_time = std::move(start_time); + return desc; +} + +//============================================================================== +Description::Description() +: _pimpl_base(rmf_utils::make_impl(Implementation())) +{ + // Do nothing +} + +//============================================================================== +//============================================================================== +namespace description { + +class Delivery::Implementation +{ +public: + Implementation() + {} + + std::string pickup_place_name; + std::string pickup_dispenser; + std::string dropoff_place_name; + std::string dropoff_ingestor; + std::vector items; +}; + +//============================================================================== +std::shared_ptr Delivery::make( + rmf_traffic::Time start_time, + std::string pickup_place_name, + std::string pickup_dispenser, + std::string dropoff_place_name, + std::string dropoff_ingestor, + std::vector items, + uint64_t priority) +{ + if (pickup_place_name.empty() || pickup_dispenser.empty() || + dropoff_place_name.empty() || dropoff_ingestor.empty()) + return nullptr; + + std::shared_ptr delivery_desc(new Delivery()); + + using TaskType = rmf_task_msgs::msg::TaskType; + delivery_desc->_pimpl_base->task_type.type = TaskType::TYPE_DELIVERY; + delivery_desc->_pimpl_base->start_time = std::move(start_time); + delivery_desc->_pimpl_base->priority.value = priority; + delivery_desc->_pimpl->pickup_place_name = std::move(pickup_place_name); + delivery_desc->_pimpl->pickup_dispenser = std::move(pickup_dispenser); + delivery_desc->_pimpl->dropoff_place_name = std::move(dropoff_place_name); + delivery_desc->_pimpl->dropoff_ingestor = std::move(dropoff_ingestor); + delivery_desc->_pimpl->items = std::move(items); + + return delivery_desc; +} + +//============================================================================== +Delivery::Delivery() +: _pimpl(rmf_utils::make_impl(Implementation())) +{ + // Do nothing +} + +//============================================================================== +const std::string& Delivery::pickup_place_name() const +{ + return _pimpl->pickup_place_name; +} + +//============================================================================== +const std::string& Delivery::dropoff_place_name() const +{ + return _pimpl->dropoff_place_name; +} + +//============================================================================== +const std::string& Delivery::pickup_dispenser() const +{ + return _pimpl->pickup_dispenser; +} + +//============================================================================== +const std::string& Delivery::dropoff_ingestor() const +{ + return _pimpl->dropoff_ingestor; +} + +//============================================================================== +//============================================================================== +class Loop::Implementation +{ +public: + Implementation() + {} + + std::string start_name; + std::string finish_name; + std::size_t num_loops; +}; + +std::shared_ptr Loop::make( + rmf_traffic::Time start_time, + std::string start_name, + std::string finish_name, + std::size_t num_loops, + uint64_t priority) +{ + if (start_name.empty() || finish_name.empty() || num_loops == 0) + return nullptr; + + std::shared_ptr loop_desc(new Loop()); + + using TaskType = rmf_task_msgs::msg::TaskType; + loop_desc->_pimpl_base->task_type.type = TaskType::TYPE_LOOP; + loop_desc->_pimpl_base->priority.value = priority; + loop_desc->_pimpl_base->start_time = std::move(start_time); + loop_desc->_pimpl->start_name = std::move(start_name); + loop_desc->_pimpl->finish_name = std::move(finish_name); + loop_desc->_pimpl->num_loops = num_loops; + return loop_desc; +} + +//============================================================================== +Loop::Loop() +: _pimpl(rmf_utils::make_impl(Implementation())) +{ + // Do nothing +} + +//============================================================================== +const std::string& Loop::start_name() const +{ + return this->_pimpl->start_name; +} + +//============================================================================== +const std::string& Loop::finish_name() const +{ + return this->_pimpl->finish_name; +} + +//============================================================================== +std::size_t Loop::num_loops() const +{ + return this->_pimpl->num_loops; +} + +//============================================================================== +//============================================================================== +class Clean::Implementation +{ +public: + Implementation() + {} + + std::string start_waypoint; +}; + +std::shared_ptr Clean::make( + rmf_traffic::Time start_time, + std::string start_waypoint, + uint64_t priority) +{ + if (start_waypoint.empty()) + return nullptr; + + std::shared_ptr clean_desc(new Clean()); + + using TaskType = rmf_task_msgs::msg::TaskType; + clean_desc->_pimpl_base->task_type.type = TaskType::TYPE_CLEAN; + clean_desc->_pimpl_base->start_time = std::move(start_time); + clean_desc->_pimpl_base->priority.value = priority; + clean_desc->_pimpl->start_waypoint = std::move(start_waypoint); + return clean_desc; +} + +//============================================================================== +Clean::Clean() +: _pimpl(rmf_utils::make_impl(Implementation())) +{ + // Do nothing +} + +//============================================================================== +const std::string& Clean::start_waypoint() const +{ + return this->_pimpl->start_waypoint; +} + +//============================================================================== +//============================================================================== +std::shared_ptr make_delivery_from_msg( + const rmf_task_msgs::msg::TaskDescription& msg) +{ + if (msg.task_type.type != rmf_task_msgs::msg::TaskType::TYPE_DELIVERY) + return nullptr; + + return Delivery::make( + rmf_traffic_ros2::convert(msg.start_time), + msg.delivery.pickup_place_name, + msg.delivery.pickup_dispenser, + msg.delivery.dropoff_place_name, + msg.delivery.dropoff_ingestor, + msg.delivery.items, + msg.priority.value); +} + +//============================================================================== +std::shared_ptr make_loop_from_msg( + const rmf_task_msgs::msg::TaskDescription& msg) +{ + if (msg.task_type.type != rmf_task_msgs::msg::TaskType::TYPE_LOOP) + return nullptr; + + return Loop::make( + rmf_traffic_ros2::convert(msg.start_time), + msg.loop.start_name, + msg.loop.finish_name, + msg.loop.num_loops, + msg.priority.value); +} + +//============================================================================== +std::shared_ptr make_clean_from_msg( + const rmf_task_msgs::msg::TaskDescription& msg) +{ + if (msg.task_type.type != rmf_task_msgs::msg::TaskType::TYPE_CLEAN) + return nullptr; + + return Clean::make( + rmf_traffic_ros2::convert(msg.start_time), + msg.clean.start_waypoint, + msg.priority.value); +} + + +//============================================================================== +rmf_task_msgs::msg::TaskDescription convert( + const ConstDescriptionPtr& description) +{ + rmf_task_msgs::msg::TaskDescription msg; + msg.task_type.type = description->type(); + msg.priority.value = description->priority(); + msg.start_time = rmf_traffic_ros2::convert(description->start_time()); + + if (const auto desc = + std::dynamic_pointer_cast(description)) + { + msg.delivery.pickup_place_name = desc->pickup_place_name(); + msg.delivery.dropoff_place_name = desc->dropoff_place_name(); + msg.delivery.pickup_dispenser = desc->pickup_dispenser(); + msg.delivery.dropoff_ingestor = desc->dropoff_ingestor(); + // msg.delivery.items = desc->items(); + } + else if (const auto desc = + std::dynamic_pointer_cast(description)) + { + msg.loop.start_name = desc->start_name(); + msg.loop.finish_name = desc->finish_name(); + msg.loop.num_loops = desc->num_loops(); + } + else if (const auto desc = + std::dynamic_pointer_cast(description)) + { + msg.clean.start_waypoint = desc->start_waypoint(); + } + return msg; +} + +} // namespace description +} // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp index 2dea6a86b..bd28aedd1 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp @@ -20,6 +20,8 @@ #include #include "action/Client.hpp" +#include "bidding/Auctioneer.hpp" +#include "internal_Description.hpp" #include #include @@ -40,6 +42,7 @@ class Dispatcher::Implementation using SubmitTaskSrv = rmf_task_msgs::srv::SubmitTask; using CancelTaskSrv = rmf_task_msgs::srv::CancelTask; using GetTaskListSrv = rmf_task_msgs::srv::GetTaskList; + using TaskType = rmf_task_msgs::msg::TaskType; rclcpp::Service::SharedPtr submit_task_srv; rclcpp::Service::SharedPtr cancel_task_srv; @@ -47,9 +50,10 @@ class Dispatcher::Implementation StatusCallback on_change_fn; - std::queue queue_bidding_tasks; + std::queue queue_bidding_tasks; DispatchTasks active_dispatch_tasks; DispatchTasks terminal_dispatch_tasks; + std::size_t task_counter = 0; // index for generating task_id double bidding_time_window; int terminated_tasks_max_size; @@ -67,51 +71,32 @@ class Dispatcher::Implementation Implementation(std::shared_ptr node_) : node{std::move(node_)} { - // ros2 param - bidding_time_window = - node->declare_parameter("bidding_time_window", 2.0); - RCLCPP_INFO(node->get_logger(), - " Declared Time Window Param as: %f secs", bidding_time_window); - terminated_tasks_max_size = - node->declare_parameter("terminated_tasks_max_size", 100); - RCLCPP_INFO(node->get_logger(), - " Declared Terminated Tasks Max Size Param as: %d", - terminated_tasks_max_size); - - // Setup up stream srv interfaces submit_task_srv = node->create_service( rmf_task_ros2::SubmitTaskSrvName, [this]( const std::shared_ptr request, std::shared_ptr response) { - switch (request->evaluator) + response->success = false; + + ConstDescriptionPtr task_description; + const auto desc_msg = request->description; + if (auto d = description::make_delivery_from_msg(desc_msg)) + task_description = std::move(d); + else if (auto d = description::make_clean_from_msg(desc_msg)) + task_description = std::move(d); + else if (auto d = description::make_loop_from_msg(desc_msg)) + task_description = std::move(d); + else { - using namespace rmf_task_ros2::bidding; - case SubmitTaskSrv::Request::LOWEST_DIFF_COST_EVAL: - this->auctioneer->select_evaluator( - std::make_shared()); - break; - case SubmitTaskSrv::Request::LOWEST_COST_EVAL: - this->auctioneer->select_evaluator( - std::make_shared()); - break; - case SubmitTaskSrv::Request::QUICKEST_FINISH_EVAL: - this->auctioneer->select_evaluator( - std::make_shared()); - break; - default: - RCLCPP_WARN(this->node->get_logger(), - "Selected Evaluator is invalid, switch back to previous"); - break; + RCLCPP_ERROR(node->get_logger(), + "Received an invalid task from task submision request"); + return; } - const auto id = this->submit_task(request->description); + const auto id = this->submit_task(task_description); if (id == std::nullopt) - { - response->success = false; return; - } response->task_id = *id; response->success = true; @@ -163,13 +148,9 @@ class Dispatcher::Implementation std::bind(&Implementation::task_status_cb, this, _1)); } - std::optional submit_task(const TaskDescription& description) + std::optional submit_task(const ConstDescriptionPtr description) { - TaskProfile submitted_task; - submitted_task.submission_time = node->now(); - submitted_task.description = description; - - const auto task_type = static_cast(description.task_type.type); + const auto task_type = static_cast(description->type()); if (!task_type_name.count(task_type)) { @@ -178,23 +159,20 @@ class Dispatcher::Implementation } // auto generate a task_id for a given submitted task - submitted_task.task_id = - task_type_name[task_type] + std::to_string(task_counter++); - - RCLCPP_INFO(node->get_logger(), - "Received Task Submission [%s]", submitted_task.task_id.c_str()); + const auto id = task_type_name[task_type] + std::to_string(task_counter++); + RCLCPP_INFO(node->get_logger(), "Received Task Request [%s]", id.c_str()); - // add task to internal cache - TaskStatus status; - status.task_profile = submitted_task; - auto new_task_status = std::make_shared(status); - active_dispatch_tasks[submitted_task.task_id] = new_task_status; + const auto status = TaskStatus::make( + id, rmf_traffic_ros2::convert(node->now()), description); + active_dispatch_tasks[id] = status; if (on_change_fn) - on_change_fn(new_task_status); + on_change_fn(status); - bidding::BidNotice bid_notice; - bid_notice.task_profile = submitted_task; + rmf_task_msgs::msg::BidNotice bid_notice; + bid_notice.task_profile.task_id = id; + bid_notice.task_profile.submission_time = node->now(); + bid_notice.task_profile.description = description::convert(description); bid_notice.time_window = rmf_traffic_ros2::convert( rmf_traffic::time::from_seconds(bidding_time_window)); queue_bidding_tasks.push(bid_notice); @@ -202,7 +180,7 @@ class Dispatcher::Implementation if (queue_bidding_tasks.size() == 1) auctioneer->start_bidding(queue_bidding_tasks.front()); - return submitted_task.task_id; + return id; } bool cancel_task(const TaskID& task_id) @@ -223,17 +201,22 @@ class Dispatcher::Implementation if (cancel_task_status->state == TaskStatus::State::Pending) { cancel_task_status->state = TaskStatus::State::Canceled; + terminate_task(cancel_task_status); if (on_change_fn) on_change_fn(cancel_task_status); + queue_bidding_tasks.pop(); + if (!queue_bidding_tasks.empty()) + auctioneer->start_bidding(queue_bidding_tasks.front()); + return true; } // Charging task doesnt support cancel task - if (cancel_task_status->task_profile.description.task_type.type == - rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY) + if (cancel_task_status->description()->type() == + TaskType::TYPE_CHARGE_BATTERY) { RCLCPP_ERROR(node->get_logger(), "Charging task is not cancelled-able"); return false; @@ -255,11 +238,10 @@ class Dispatcher::Implementation for (auto it = active_dispatch_tasks.begin(); it != active_dispatch_tasks.end(); ) { - const auto type = it->second->task_profile.description.task_type.type; const bool is_fleet_name = (cancel_task_status->fleet_name == it->second->fleet_name); const bool is_charging_task = - (type == rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY); + (it->second->description()->type() == TaskType::TYPE_CHARGE_BATTERY); if (is_charging_task && is_fleet_name) it = active_dispatch_tasks.erase(it); @@ -270,7 +252,7 @@ class Dispatcher::Implementation // Cancel action task, this will only send a cancel to FA. up to // the FA whether to cancel the task. On change is implemented // internally in action client - return action_client->cancel_task(cancel_task_status->task_profile); + return action_client->cancel_task(cancel_task_status->task_id()); } const std::optional get_task_state( @@ -290,8 +272,8 @@ class Dispatcher::Implementation } void receive_bidding_winner_cb( - const TaskID& task_id, - const rmf_utils::optional winner) + const std::string& task_id, + const std::optional winner) { const auto it = active_dispatch_tasks.find(task_id); if (it == active_dispatch_tasks.end()) @@ -328,10 +310,9 @@ class Dispatcher::Implementation for (auto it = active_dispatch_tasks.begin(); it != active_dispatch_tasks.end(); ) { - const auto type = it->second->task_profile.description.task_type.type; const bool is_fleet_name = (winner->fleet_name == it->second->fleet_name); const bool is_charging_task = - (type == rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY); + (it->second->description()->type() == TaskType::TYPE_CHARGE_BATTERY); if (is_charging_task && is_fleet_name) it = active_dispatch_tasks.erase(it); @@ -340,9 +321,8 @@ class Dispatcher::Implementation } // add task to action server - action_client->add_task( + action_client->dispatch_task( winner->fleet_name, - pending_task_status->task_profile, pending_task_status); } @@ -350,7 +330,7 @@ class Dispatcher::Implementation { assert(terminate_status->is_terminated()); - // prevent terminal_dispatch_tasks from piling up meaning + // prevent terminal_dispatch_tasks from piling up if (terminal_dispatch_tasks.size() >= terminated_tasks_max_size) { RCLCPP_WARN(node->get_logger(), @@ -359,19 +339,19 @@ class Dispatcher::Implementation auto rm_task = terminal_dispatch_tasks.begin(); for (auto it = rm_task++; it != terminal_dispatch_tasks.end(); it++) { - const auto t1 = it->second->task_profile.submission_time; - const auto t2 = rm_task->second->task_profile.submission_time; - if (rmf_traffic_ros2::convert(t1) < rmf_traffic_ros2::convert(t2)) + const auto t1 = it->second->submission_time(); + const auto t2 = rm_task->second->submission_time(); + + if (t1 < t2) rm_task = it; } terminal_dispatch_tasks.erase(terminal_dispatch_tasks.begin() ); } - const auto id = terminate_status->task_profile.task_id; + const auto id = terminate_status->task_id(); - // destroy prev status ptr and recreate one - auto status = std::make_shared(*terminate_status); - (terminal_dispatch_tasks)[id] = status; + // Move Status to terminated task list. + (terminal_dispatch_tasks)[id] = std::move(terminate_status); active_dispatch_tasks.erase(id); } @@ -380,7 +360,7 @@ class Dispatcher::Implementation // This is to solve the issue that the dispatcher is not aware of those // "stray" tasks that are not dispatched by the dispatcher. This will add // the stray tasks when an unknown TaskSummary is heard. - const std::string id = status->task_profile.task_id; + const std::string id = status->task_id(); const auto it = active_dispatch_tasks.find(id); if (it == active_dispatch_tasks.end()) { @@ -390,7 +370,8 @@ class Dispatcher::Implementation } // check if there's a change in state for the previous completed bidding task - // TODO, better way to impl this + // This ensures that the next task will be executed after receiving ack msg + // TODO(YL), better way to impl this if (!queue_bidding_tasks.empty() && id == queue_bidding_tasks.front().task_profile.task_id) { @@ -405,23 +386,24 @@ class Dispatcher::Implementation }; //============================================================================== -std::shared_ptr Dispatcher::init_and_make_node( - const std::string dispatcher_node_name) +std::shared_ptr Dispatcher::init_and_make_node() { rclcpp::init(0, nullptr); - return make_node(dispatcher_node_name); + return make_node(); } //============================================================================== -std::shared_ptr Dispatcher::make_node( - const std::string dispatcher_node_name) +std::shared_ptr Dispatcher::make_node() { - return make(rclcpp::Node::make_shared(dispatcher_node_name)); + return make(rclcpp::Node::make_shared("rmf_dispatcher_node")); } //============================================================================== std::shared_ptr Dispatcher::make( - const std::shared_ptr& node) + const std::shared_ptr& node, + const std::shared_ptr evaluator, + const double bidding_time_window, + const int terminated_tasks_depth) { auto pimpl = rmf_utils::make_impl(node); pimpl->action_client = action::Client::make(node); @@ -429,25 +411,28 @@ std::shared_ptr Dispatcher::make( auto dispatcher = std::shared_ptr(new Dispatcher()); dispatcher->_pimpl = std::move(pimpl); dispatcher->_pimpl->start(); + dispatcher->_pimpl->bidding_time_window = bidding_time_window; + dispatcher->_pimpl->terminated_tasks_max_size = terminated_tasks_depth; + dispatcher->_pimpl->auctioneer->select_evaluator(evaluator); return dispatcher; } //============================================================================== -std::optional Dispatcher::submit_task( - const TaskDescription& task_description) +std::optional Dispatcher::submit_task( + const ConstDescriptionPtr task_description) { return _pimpl->submit_task(task_description); } //============================================================================== -bool Dispatcher::cancel_task(const TaskID& task_id) +bool Dispatcher::cancel_task(const std::string& task_id) { return _pimpl->cancel_task(task_id); } //============================================================================== const std::optional Dispatcher::get_task_state( - const TaskID& task_id) const + const std::string& task_id) const { return _pimpl->get_task_state(task_id); } @@ -472,7 +457,7 @@ void Dispatcher::on_change(StatusCallback on_change_fn) //============================================================================== void Dispatcher::evaluator( - std::shared_ptr evaluator) + std::shared_ptr evaluator) { _pimpl->auctioneer->select_evaluator(evaluator); } diff --git a/rmf_task_ros2/src/rmf_task_ros2/TaskStatus.cpp b/rmf_task_ros2/src/rmf_task_ros2/TaskStatus.cpp index cd06e3c94..a0265b62d 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/TaskStatus.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/TaskStatus.cpp @@ -21,40 +21,62 @@ namespace rmf_task_ros2 { // ============================================================================== -bool TaskStatus::is_terminated() const +class TaskStatus::Implementation { - return (state == State::Failed) || - (state == State::Completed) || - (state == State::Canceled); +public: + Implementation() + {} + + std::string task_id; + rmf_traffic::Time submission_time; + ConstDescriptionPtr description; +}; + +// ============================================================================== +TaskStatusPtr TaskStatus::make( + std::string task_id, + rmf_traffic::Time submission_time, + ConstDescriptionPtr description) +{ + std::shared_ptr desc(new TaskStatus()); + desc->_pimpl->task_id = task_id; + desc->_pimpl->submission_time = std::move(submission_time); + desc->_pimpl->description = std::move(description); + desc->state = State::Pending; + return desc; +} + +// ============================================================================== +ConstDescriptionPtr TaskStatus::description() const +{ + return _pimpl->description; } // ============================================================================== -TaskStatus convert_status(const StatusMsg& from) +std::string TaskStatus::task_id() const { - TaskStatus status; - status.fleet_name = from.fleet_name; - status.task_profile = from.task_profile; - status.start_time = rmf_traffic_ros2::convert(from.start_time); - status.end_time = rmf_traffic_ros2::convert(from.end_time); - status.robot_name = from.robot_name; - status.status = from.status; - status.state = static_cast(from.state); - return status; + return _pimpl->task_id; } // ============================================================================== -StatusMsg convert_status(const TaskStatus& from) +rmf_traffic::Time TaskStatus::submission_time() const +{ + return _pimpl->submission_time; +} + +// ============================================================================== +bool TaskStatus::is_terminated() const +{ + return (state == State::Failed) || + (state == State::Completed) || + (state == State::Canceled); +} + +//============================================================================== +TaskStatus::TaskStatus() +: _pimpl(rmf_utils::make_impl(Implementation())) { - StatusMsg status; - status.fleet_name = from.fleet_name; - status.task_id = from.task_profile.task_id; // duplication - status.task_profile = from.task_profile; - status.start_time = rmf_traffic_ros2::convert(from.start_time); - status.end_time = rmf_traffic_ros2::convert(from.end_time); - status.robot_name = from.robot_name; - status.status = from.status; - status.state = static_cast(from.state); - return status; + // Do nothing } } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/action/Client.cpp b/rmf_task_ros2/src/rmf_task_ros2/action/Client.cpp index d2bda8de0..2f4f6e607 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/action/Client.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/action/Client.cpp @@ -16,6 +16,8 @@ */ #include "Client.hpp" +#include +#include "../internal_Description.hpp" namespace rmf_task_ros2 { namespace action { @@ -33,7 +35,7 @@ Client::Client(std::shared_ptr node) const auto dispatch_qos = rclcpp::ServicesQoS().reliable(); _request_msg_pub = _node->create_publisher( - TaskRequestTopicName, dispatch_qos); + DispatchRequestTopicName, dispatch_qos); _status_msg_sub = _node->create_subscription( TaskStatusTopicName, dispatch_qos, @@ -52,11 +54,8 @@ Client::Client(std::shared_ptr node) return; } - // TODO: hack to retain task profile and fleet name (to remove) - auto cache_profile = weak_status->task_profile; // update status to ptr - *weak_status = convert_status(*msg); - weak_status->task_profile = cache_profile; + update_status_from_msg(weak_status, *msg); if (weak_status->is_terminated()) RCLCPP_INFO(_node->get_logger(), @@ -67,17 +66,29 @@ Client::Client(std::shared_ptr node) } else { - // will still provide onchange even if the task_id is unknown. + /// This is when the task_id is unknown to the dispatcher node. Here + /// we will make and add the self-generated task from the fleet + /// adapter to the dispatcher queue (e.g. ChargeBattery Task) RCLCPP_DEBUG(_node->get_logger(), "[action] Unknown task: [%s]", task_id.c_str()); - auto task_status = std::make_shared(convert_status(*msg)); - _active_task_status[task_id] = task_status; - update_task_status(task_status); + + const auto& desc_msg = msg->task_profile.description; + const auto desc = Description::make_description( + rmf_traffic_ros2::convert(desc_msg.start_time), + desc_msg.task_type.type, + desc_msg.priority.value); + + const auto status_ptr = TaskStatus::make( + task_id, std::chrono::steady_clock::now(), desc); + update_status_from_msg(status_ptr, *msg); + + _active_task_status[task_id] = status_ptr; + update_task_status(status_ptr); } }); _ack_msg_sub = _node->create_subscription( - TaskAckTopicName, dispatch_qos, + DispatchAckTopicName, dispatch_qos, [&](const std::unique_ptr msg) { const auto task_id = msg->dispatch_request.task_profile.task_id; @@ -132,42 +143,41 @@ void Client::update_task_status(const TaskStatusPtr status) // erase terminated task and call on_terminate callback if (status->is_terminated()) { - _active_task_status.erase(status->task_profile.task_id); + _active_task_status.erase(status->task_id()); if (_on_terminate_callback) _on_terminate_callback(status); } } //============================================================================== -// check if task is updated TODO - -//============================================================================== -void Client::add_task( +void Client::dispatch_task( const std::string& fleet_name, - const TaskProfile& task_profile, TaskStatusPtr status_ptr) { + rmf_task_msgs::msg::TaskProfile task_profile; + task_profile.task_id = status_ptr->task_id(); + task_profile.description = description::convert(status_ptr->description()); + task_profile.submission_time = + rmf_traffic_ros2::convert(status_ptr->submission_time()); + // send request and wait for acknowledgement RequestMsg request_msg; + request_msg.method = RequestMsg::ADD; request_msg.fleet_name = fleet_name; request_msg.task_profile = task_profile; - request_msg.method = RequestMsg::ADD; _request_msg_pub->publish(request_msg); // save status ptr status_ptr->fleet_name = fleet_name; - status_ptr->task_profile = task_profile; - _active_task_status[task_profile.task_id] = status_ptr; + _active_task_status[status_ptr->task_id()] = status_ptr; RCLCPP_DEBUG(_node->get_logger(), "Assign task: [%s] to fleet [%s]", - task_profile.task_id.c_str(), fleet_name.c_str()); + status_ptr->task_id().c_str(), fleet_name.c_str()); return; } //============================================================================== -bool Client::cancel_task( - const TaskProfile& task_profile) +bool Client::cancel_task(const std::string& task_id) { - const auto task_id = task_profile.task_id; RCLCPP_DEBUG(_node->get_logger(), "[action] Cancel Task: [%s]", task_id.c_str()); @@ -188,11 +198,17 @@ bool Client::cancel_task( return false; } + rmf_task_msgs::msg::TaskProfile task_profile; + task_profile.task_id = weak_status->task_id(); + task_profile.description = description::convert(weak_status->description()); + task_profile.submission_time = + rmf_traffic_ros2::convert(weak_status->submission_time()); + // send cancel RequestMsg request_msg; + request_msg.method = RequestMsg::CANCEL; request_msg.fleet_name = weak_status->fleet_name; request_msg.task_profile = task_profile; - request_msg.method = RequestMsg::CANCEL; _request_msg_pub->publish(request_msg); return true; } @@ -230,4 +246,39 @@ void Client::on_terminate( } } // namespace action + +//============================================================================== +void update_status_from_msg( + const TaskStatusPtr task_status_ptr, + const StatusMsg msg) +{ + task_status_ptr->fleet_name = msg.fleet_name; + task_status_ptr->start_time = rmf_traffic_ros2::convert(msg.start_time); + task_status_ptr->end_time = rmf_traffic_ros2::convert(msg.end_time); + task_status_ptr->robot_name = msg.robot_name; + task_status_ptr->status = msg.status; + task_status_ptr->state = static_cast(msg.state); +} + +// ============================================================================== +StatusMsg convert_status(const TaskStatus& from) +{ + StatusMsg status; + status.fleet_name = from.fleet_name; + status.task_id = from.task_id(); // duplication + status.task_profile.task_id = from.task_id(); + status.task_profile.submission_time = + rmf_traffic_ros2::convert(from.submission_time()); + status.start_time = rmf_traffic_ros2::convert(from.start_time); + status.end_time = rmf_traffic_ros2::convert(from.end_time); + status.robot_name = from.robot_name; + status.status = from.status; + status.state = static_cast(from.state); + + if (from.description()) + status.task_profile.description = description::convert(from.description()); + + return status; +} + } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/action/Client.hpp b/rmf_task_ros2/src/rmf_task_ros2/action/Client.hpp index d39f1fbba..6b664adfb 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/action/Client.hpp +++ b/rmf_task_ros2/src/rmf_task_ros2/action/Client.hpp @@ -24,8 +24,12 @@ #include #include #include +#include namespace rmf_task_ros2 { + +using StatusMsg = rmf_task_msgs::msg::TaskSummary; + namespace action { //============================================================================== @@ -33,7 +37,6 @@ namespace action { // fleet. The fleet will work on the requested task and provides a status to // the client when the task progresses. Termination will be triggered when the // task ends. - class Client { public: @@ -44,28 +47,26 @@ class Client static std::shared_ptr make( std::shared_ptr node); - /// Add a task to a targeted fleet + /// Dispatch a task to a targeted fleet /// /// \param[in] fleet_name /// Target fleet which will execute this task /// - /// \param[in] task_profile - /// Task Description which will be executed - /// /// \param[out] status_ptr - /// Will update the status of the task here - void add_task( + /// task_status will have a task description member, which is used to + /// describe the task to dispatch. With this status ptr, recent status + /// of task will also get updated here. + void dispatch_task( const std::string& fleet_name, - const TaskProfile& task_profile, TaskStatusPtr status_ptr); /// Cancel an added task /// - /// \param[in] task_profile + /// \param[in] task_id /// Task which to cancel /// /// \return bool which indicate if cancel task is success - bool cancel_task(const TaskProfile& task_profile); + bool cancel_task(const std::string& task_id); /// Get the number of active task being track by client /// @@ -101,13 +102,26 @@ class Client std::shared_ptr _node; StatusCallback _on_change_callback; StatusCallback _on_terminate_callback; - std::unordered_map> _active_task_status; + std::unordered_map> _active_task_status; rclcpp::Publisher::SharedPtr _request_msg_pub; rclcpp::Subscription::SharedPtr _status_msg_sub; rclcpp::Subscription::SharedPtr _ack_msg_sub; }; } // namespace action + +// ============================================================================== +/// This helper function is to only update status elements in TaskStatus, in +/// which static task descriptions (e.g. id, tasktype...) will not be changed +void update_status_from_msg( + const TaskStatusPtr task_status_ptr, + const StatusMsg status_msg); + +// ============================================================================== +/// Utility function to convert to TaskSummary Msg +StatusMsg convert_status(const TaskStatus& from); + } // namespace rmf_task_ros2 #endif // SRC__RMF_TASK_ROS2__ACTION__CLIENT_HPP diff --git a/rmf_task_ros2/src/rmf_task_ros2/action/Server.cpp b/rmf_task_ros2/src/rmf_task_ros2/action/Server.cpp index 87130837a..79277b25c 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/action/Server.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/action/Server.cpp @@ -16,6 +16,8 @@ */ #include "Server.hpp" +#include +#include "../internal_Description.hpp" namespace rmf_task_ros2 { namespace action { @@ -38,10 +40,24 @@ void Server::register_callbacks( //============================================================================== void Server::update_status( - const TaskStatus& task_status) + const TaskStatus& status) { - auto msg = convert_status(task_status); + // Here solely converts TaskStatus to StatusMsg + StatusMsg msg; msg.fleet_name = _fleet_name; + msg.task_id = status.task_id(); // duplication + msg.task_profile.task_id = status.task_id(); + msg.task_profile.submission_time = + rmf_traffic_ros2::convert(status.submission_time()); + msg.start_time = rmf_traffic_ros2::convert(status.start_time); + msg.end_time = rmf_traffic_ros2::convert(status.end_time); + msg.robot_name = status.robot_name; + msg.status = status.status; + msg.state = static_cast(status.state); + + if (status.description()) + msg.task_profile.description = description::convert(status.description()); + _status_msg_pub->publish(msg); } @@ -54,7 +70,7 @@ Server::Server( const auto dispatch_qos = rclcpp::ServicesQoS().reliable(); _request_msg_sub = _node->create_subscription( - TaskRequestTopicName, dispatch_qos, + DispatchRequestTopicName, dispatch_qos, [&](const std::unique_ptr msg) { if (msg->fleet_name != _fleet_name) @@ -89,7 +105,7 @@ Server::Server( }); _ack_msg_pub = _node->create_publisher( - TaskAckTopicName, dispatch_qos); + DispatchAckTopicName, dispatch_qos); _status_msg_pub = _node->create_publisher( TaskStatusTopicName, dispatch_qos); diff --git a/rmf_task_ros2/src/rmf_task_ros2/action/Server.hpp b/rmf_task_ros2/src/rmf_task_ros2/action/Server.hpp index d8c0150a6..479020c4e 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/action/Server.hpp +++ b/rmf_task_ros2/src/rmf_task_ros2/action/Server.hpp @@ -22,15 +22,18 @@ #include #include + #include #include -#include +#include -using TaskProfile = rmf_task_msgs::msg::TaskProfile; +#include namespace rmf_task_ros2 { namespace action { +using TaskProfile = rmf_task_msgs::msg::TaskProfile; + //============================================================================== /// Task Action Server - This is used within the fleet adapter with the role of /// receiving incoming dispatch requests (from a action_client/Dispatcher), @@ -78,6 +81,7 @@ class Server using RequestMsg = rmf_task_msgs::msg::DispatchRequest; using AckMsg = rmf_task_msgs::msg::DispatchAck; + using StatusMsg = rmf_task_msgs::msg::TaskSummary; std::shared_ptr _node; std::string _fleet_name; diff --git a/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.cpp b/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.cpp index 3bb3d9252..27db7916d 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.cpp @@ -15,13 +15,64 @@ * */ -#include "internal_Auctioneer.hpp" +#include "Auctioneer.hpp" +#include +#include +#include namespace rmf_task_ros2 { namespace bidding { +using BidProposal = rmf_task_msgs::msg::BidProposal; +using Submission = rmf_task::Evaluator::Submission; +using Submissions = rmf_task::Evaluator::Submissions; + //============================================================================== -Submission convert(const BidProposal& from) +class Auctioneer::Implementation +{ +public: + std::shared_ptr node; + rclcpp::TimerBase::SharedPtr timer; + BiddingResultCallback bidding_result_callback; + std::shared_ptr evaluator; + + struct BiddingTask + { + BidNotice bid_notice; + builtin_interfaces::msg::Time start_time; + Submissions submissions; + }; + + bool bidding_in_proccess = false; + std::queue queue_bidding_tasks; + + using BidNoticePub = rclcpp::Publisher; + BidNoticePub::SharedPtr bid_notice_pub; + + using BidProposalSub = rclcpp::Subscription; + BidProposalSub::SharedPtr bid_proposal_sub; + + Implementation( + const std::shared_ptr& node_, + BiddingResultCallback result_callback); + + /// Start a bidding process + void start_bidding(const BidNotice& bid_notice); + + // Receive proposal and evaluate + void receive_proposal(const BidProposal& msg); + + // determine the winner within a bidding task instance + void check_bidding_process(); + + bool determine_winner(const BiddingTask& bidding_task); + + std::optional evaluate(const Submissions& submissions); +}; + +//============================================================================== +Submission convert( + const BidProposal& from) { Submission submission; submission.fleet_name = from.fleet_name; @@ -40,7 +91,7 @@ Auctioneer::Implementation::Implementation( bidding_result_callback{std::move(result_callback)} { // default evaluator - evaluator = std::make_shared(); + evaluator = std::make_shared(); const auto dispatch_qos = rclcpp::ServicesQoS().reliable(); bid_notice_pub = node->create_publisher( @@ -154,21 +205,17 @@ bool Auctioneer::Implementation::determine_winner( std::optional Auctioneer::Implementation::evaluate( const Submissions& submissions) { - if (submissions.size() == 0) - return std::nullopt; - if (!evaluator) { RCLCPP_WARN(node->get_logger(), "Bidding Evaluator is not set"); return std::nullopt; } - const std::size_t choice = evaluator->choose(submissions); - - if (choice >= submissions.size()) + auto choice = evaluator->choose(submissions); + if (choice == std::nullopt) return std::nullopt; - return submissions[choice]; + return submissions[*choice]; } //============================================================================== @@ -191,7 +238,7 @@ void Auctioneer::start_bidding(const BidNotice& bid_notice) //============================================================================== void Auctioneer::select_evaluator( - std::shared_ptr evaluator) + std::shared_ptr evaluator) { _pimpl->evaluator = std::move(evaluator); } @@ -202,52 +249,5 @@ Auctioneer::Auctioneer() // do nothing } -//============================================================================== -std::size_t LeastFleetDiffCostEvaluator::choose( - const Submissions& submissions) const -{ - auto winner_it = submissions.begin(); - float winner_cost_diff = winner_it->new_cost - winner_it->prev_cost; - for (auto nominee_it = ++submissions.begin(); - nominee_it != submissions.end(); ++nominee_it) - { - float nominee_cost_diff = nominee_it->new_cost - nominee_it->prev_cost; - if (nominee_cost_diff < winner_cost_diff) - { - winner_it = nominee_it; - winner_cost_diff = nominee_cost_diff; - } - } - return std::distance(submissions.begin(), winner_it); -} - -//============================================================================== -std::size_t LeastFleetCostEvaluator::choose( - const Submissions& submissions) const -{ - auto winner_it = submissions.begin(); - for (auto nominee_it = ++submissions.begin(); - nominee_it != submissions.end(); ++nominee_it) - { - if (nominee_it->new_cost < winner_it->new_cost) - winner_it = nominee_it; - } - return std::distance(submissions.begin(), winner_it); -} - -//============================================================================== -std::size_t QuickestFinishEvaluator::choose( - const Submissions& submissions) const -{ - auto winner_it = submissions.begin(); - for (auto nominee_it = ++submissions.begin(); - nominee_it != submissions.end(); ++nominee_it) - { - if (nominee_it->finish_time < winner_it->finish_time) - winner_it = nominee_it; - } - return std::distance(submissions.begin(), winner_it); -} - } // namespace bidding } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/include/rmf_task_ros2/bidding/Auctioneer.hpp b/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.hpp similarity index 69% rename from rmf_task_ros2/include/rmf_task_ros2/bidding/Auctioneer.hpp rename to rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.hpp index 4b9b5a87d..5a1337777 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/bidding/Auctioneer.hpp +++ b/rmf_task_ros2/src/rmf_task_ros2/bidding/Auctioneer.hpp @@ -22,7 +22,8 @@ #include #include -#include +#include +#include namespace rmf_task_ros2 { namespace bidding { @@ -34,6 +35,9 @@ namespace bidding { class Auctioneer : public std::enable_shared_from_this { public: + using BidNotice = rmf_task_msgs::msg::BidNotice; + using Submission = rmf_task::Evaluator::Submission; + /// Callback which will provide the winner when a bid is concluded /// /// \param[in] task_id @@ -66,24 +70,11 @@ class Auctioneer : public std::enable_shared_from_this /// bidding task, task which will call for bid void start_bidding(const BidNotice& bid_notice); - /// A pure abstract interface class for the auctioneer to choose the best - /// choosing the best submissions. - class Evaluator - { - public: - - /// Given a list of submissions, choose the one that is the "best". It is - /// up to the implementation of the Evaluator to decide how to rank. - virtual std::size_t choose(const Submissions& submissions) const = 0; - - virtual ~Evaluator() = default; - }; - /// Provide a custom evaluator which will be used to choose the best bid /// If no selection is given, Default is: LeastFleetDiffCostEvaluator /// /// \param[in] evaluator - void select_evaluator(std::shared_ptr evaluator); + void select_evaluator(std::shared_ptr evaluator); class Implementation; @@ -92,27 +83,6 @@ class Auctioneer : public std::enable_shared_from_this rmf_utils::unique_impl_ptr _pimpl; }; -//============================================================================== -class LeastFleetDiffCostEvaluator : public Auctioneer::Evaluator -{ -public: - std::size_t choose(const Submissions& submissions) const final; -}; - -//============================================================================== -class LeastFleetCostEvaluator : public Auctioneer::Evaluator -{ -public: - std::size_t choose(const Submissions& submissions) const final; -}; - -//============================================================================== -class QuickestFinishEvaluator : public Auctioneer::Evaluator -{ -public: - std::size_t choose(const Submissions& submissions) const final; -}; - } // namespace bidding } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.cpp b/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.cpp index 524097a4a..2bb7e3101 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.cpp @@ -15,8 +15,7 @@ * */ -#include -#include +#include "MinimalBidder.hpp" #include #include @@ -27,18 +26,6 @@ namespace bidding { using BidProposal = rmf_task_msgs::msg::BidProposal; -//============================================================================== -BidProposal convert(const Submission& from) -{ - BidProposal proposal_msg; - proposal_msg.fleet_name = from.fleet_name; - proposal_msg.robot_name = from.robot_name; - proposal_msg.prev_cost = from.prev_cost; - proposal_msg.new_cost = from.new_cost; - proposal_msg.finish_time = rmf_traffic_ros2::convert(from.finish_time); - return proposal_msg; -} - //============================================================================== class MinimalBidder::Implementation { @@ -99,12 +86,19 @@ class MinimalBidder::Implementation if (!get_submission_fn) return; - // Submit proposal const auto bid_submission = get_submission_fn(msg); - auto best_proposal = convert(bid_submission); - best_proposal.fleet_name = fleet_name; - best_proposal.task_profile = msg.task_profile; - dispatch_proposal_pub->publish(best_proposal); + + // Submit proposal + BidProposal proposal_msg; + proposal_msg.fleet_name = fleet_name; + proposal_msg.task_profile = msg.task_profile; + proposal_msg.robot_name = bid_submission.robot_name; + proposal_msg.prev_cost = bid_submission.prev_cost; + proposal_msg.new_cost = bid_submission.new_cost; + proposal_msg.finish_time = + rmf_traffic_ros2::convert(bid_submission.finish_time); + + dispatch_proposal_pub->publish(proposal_msg); } }; diff --git a/rmf_task_ros2/include/rmf_task_ros2/bidding/MinimalBidder.hpp b/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.hpp similarity index 79% rename from rmf_task_ros2/include/rmf_task_ros2/bidding/MinimalBidder.hpp rename to rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.hpp index ef53db6fc..cd93e1fb7 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/bidding/MinimalBidder.hpp +++ b/rmf_task_ros2/src/rmf_task_ros2/bidding/MinimalBidder.hpp @@ -15,20 +15,22 @@ * */ -#ifndef RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP -#define RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP +#ifndef SRC__RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP +#define SRC__RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP #include #include #include - -#include +#include +#include namespace rmf_task_ros2 { namespace bidding { //============================================================================== +/// This is a sample bidder utility class which will listen to bid notice and +/// submit a bid to the auctioneer. This is currently used in test. class MinimalBidder { public: @@ -44,6 +46,14 @@ class MinimalBidder Patrol = TaskTypeMsg::TYPE_PATROL }; + struct Submission + { + std::string robot_name; + double prev_cost = 0.0; + double new_cost = std::numeric_limits::max(); + rmf_traffic::Time finish_time; + }; + /// Callback function when a bid notice is received from the autioneer /// /// \param[in] notice @@ -51,6 +61,8 @@ class MinimalBidder /// /// \return submission /// Estimates of a task. This submission is used by dispatcher for eval + using BidNotice = rmf_task_msgs::msg::BidNotice; + using ParseSubmissionCallback = std::function; @@ -84,4 +96,4 @@ class MinimalBidder } // namespace bidding } // namespace rmf_task_ros2 -#endif // RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP +#endif // SRC__RMF_TASK_ROS2__BIDDING__MINIMALBIDDER_HPP diff --git a/rmf_task_ros2/src/rmf_task_ros2/bidding/internal_Auctioneer.hpp b/rmf_task_ros2/src/rmf_task_ros2/bidding/internal_Auctioneer.hpp deleted file mode 100644 index 57cf1a0eb..000000000 --- a/rmf_task_ros2/src/rmf_task_ros2/bidding/internal_Auctioneer.hpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2020 Open Source Robotics Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - - -#ifndef SRC__RMF_TASK_ROS2__BIDDING__INTERNAL_AUCTIONEER_HPP -#define SRC__RMF_TASK_ROS2__BIDDING__INTERNAL_AUCTIONEER_HPP - -#include -#include -#include - -#include -#include - -namespace rmf_task_ros2 { -namespace bidding { - -using BidProposal = rmf_task_msgs::msg::BidProposal; - -//============================================================================== -class Auctioneer::Implementation -{ -public: - std::shared_ptr node; - rclcpp::TimerBase::SharedPtr timer; - BiddingResultCallback bidding_result_callback; - std::shared_ptr evaluator; - - struct BiddingTask - { - BidNotice bid_notice; - builtin_interfaces::msg::Time start_time; - std::vector submissions; - }; - - bool bidding_in_proccess = false; - std::queue queue_bidding_tasks; - - using BidNoticePub = rclcpp::Publisher; - BidNoticePub::SharedPtr bid_notice_pub; - - using BidProposalSub = rclcpp::Subscription; - BidProposalSub::SharedPtr bid_proposal_sub; - - Implementation( - const std::shared_ptr& node_, - BiddingResultCallback result_callback); - - /// Start a bidding process - void start_bidding(const BidNotice& bid_notice); - - // Receive proposal and evaluate - void receive_proposal(const BidProposal& msg); - - // determine the winner within a bidding task instance - void check_bidding_process(); - - bool determine_winner(const BiddingTask& bidding_task); - - std::optional evaluate(const Submissions& submissions); - - static const Implementation& get(const Auctioneer& auctioneer) - { - return *auctioneer._pimpl; - } -}; - -//============================================================================== -std::optional evaluate( - const Auctioneer& auctioneer, - const Submissions& submissions) -{ - auto fimpl = Auctioneer::Implementation::get(auctioneer); - return fimpl.evaluate(submissions); -} - -} // namespace bidding -} // namespace rmf_task_ros2 - -#endif // SRC__RMF_TASK_ROS2__BIDDING__INTERNAL_AUCTIONEER_HPP diff --git a/rmf_task_ros2/src/rmf_task_ros2/internal_Description.hpp b/rmf_task_ros2/src/rmf_task_ros2/internal_Description.hpp new file mode 100644 index 000000000..6bdf2fffc --- /dev/null +++ b/rmf_task_ros2/src/rmf_task_ros2/internal_Description.hpp @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#ifndef SRC__RMF_TASK_ROS2__DESCRIPTION__INTERNAL_DESCRIPTION_HPP +#define SRC__RMF_TASK_ROS2__DESCRIPTION__INTERNAL_DESCRIPTION_HPP + +#include +#include + +namespace rmf_task_ros2 { +namespace description { + +using TaskDescription = rmf_task_msgs::msg::TaskDescription; + +//============================================================================== +std::shared_ptr make_delivery_from_msg( + const TaskDescription& msg); + +//============================================================================== +std::shared_ptr make_loop_from_msg( + const TaskDescription& msg); + +//============================================================================== +std::shared_ptr make_clean_from_msg( + const TaskDescription& msg); + +//============================================================================== +TaskDescription convert(const ConstDescriptionPtr& description); + +} // namespace description +} // namespace rmf_task_ros2 + +#endif // SRC__RMF_TASK_ROS2__DESCRIPTION__INTERNAL_DESCRIPTION_HPP diff --git a/rmf_task_ros2/test/action/test_ActionInterface.cpp b/rmf_task_ros2/test/action/test_ActionInterface.cpp index 11241a6e0..b3a61a4a6 100644 --- a/rmf_task_ros2/test/action/test_ActionInterface.cpp +++ b/rmf_task_ros2/test/action/test_ActionInterface.cpp @@ -30,15 +30,11 @@ namespace action { //============================================================================== SCENARIO("Action communication with client and server", "[ActionInterface]") { - TaskProfile task_profile1; - task_profile1.task_id = "task1"; - task_profile1.description.task_type.type = - rmf_task_msgs::msg::TaskType::TYPE_STATION; + const auto now = std::chrono::steady_clock::now(); - TaskProfile task_profile2; - task_profile2.task_id = "task2"; - task_profile2.description.task_type.type = - rmf_task_msgs::msg::TaskType::TYPE_STATION; + auto clean = description::Clean::make(now, "clean_here"); + const auto task_status1 = TaskStatus::make("task1", now, clean); + const auto task_status2 = TaskStatus::make("task2", now, clean); //============================================================================ @@ -51,7 +47,6 @@ SCENARIO("Action communication with client and server", "[ActionInterface]") auto node = rclcpp::Node::make_shared("test_ActionInferface"); auto action_server = Server::make(node, "test_server"); auto action_client = Client::make(node); - rclcpp::executors::SingleThreadedExecutor executor; executor.add_node(node); @@ -77,40 +72,37 @@ SCENARIO("Action communication with client and server", "[ActionInterface]") WHEN("Add Task") { - // Add invalid Task! - TaskStatusPtr status_ptr(new TaskStatus); - - action_client->add_task("wrong_server", task_profile1, status_ptr); + action_client->dispatch_task("wrong_server", task_status1); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); // should not receive cuz incorrect serverid - REQUIRE(status_ptr->state == TaskStatus::State::Pending); + REQUIRE(task_status1->state == TaskStatus::State::Pending); - action_client->add_task("test_server", task_profile1, status_ptr); + action_client->dispatch_task("test_server", task_status1); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); // check status - REQUIRE(status_ptr->state == TaskStatus::State::Queued); + REQUIRE(task_status1->state == TaskStatus::State::Queued); + // TODO(YL) to fix this // status ptr is destroyed, should not have anymore tracking - status_ptr.reset(); - REQUIRE(action_client->size() == 0); + // task_status1.reset(); + // REQUIRE(action_client->size() == 0); } WHEN("Cancel Task") { // send valid task - TaskStatusPtr status_ptr(new TaskStatus); - action_client->add_task("test_server", task_profile2, status_ptr); + action_client->dispatch_task("test_server", task_status2); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); // Invalid Cancel Task! - bool cancel_success = action_client->cancel_task(task_profile1); + bool cancel_success = action_client->cancel_task(task_status1->task_id()); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); REQUIRE(!test_cancel_task); @@ -118,39 +110,37 @@ SCENARIO("Action communication with client and server", "[ActionInterface]") REQUIRE(action_client->size() == 1); // Valid Cancel task - action_client->cancel_task(task_profile2); + action_client->cancel_task(task_status2->task_id()); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); REQUIRE(test_cancel_task); - REQUIRE(*test_cancel_task == task_profile2); - REQUIRE(status_ptr->is_terminated()); + REQUIRE(task_status2->is_terminated()); REQUIRE(action_client->size() == 0); } //============================================================================ - std::optional test_task_onchange; - std::optional test_task_onterminate; + TaskStatusPtr test_task_onchange = nullptr; + TaskStatusPtr test_task_onterminate = nullptr; // received status update from server action_client->on_change( [&test_task_onchange](const TaskStatusPtr status) { - test_task_onchange = *status; + test_task_onchange = status; } ); action_client->on_terminate( [&test_task_onterminate](const TaskStatusPtr status) { - test_task_onterminate = *status; + test_task_onterminate = status; } ); WHEN("On Change and On Terminate Task") { - TaskStatusPtr status_ptr(new TaskStatus); - action_client->add_task("test_server", task_profile1, status_ptr); + action_client->dispatch_task("test_server", task_status1); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); @@ -158,12 +148,13 @@ SCENARIO("Action communication with client and server", "[ActionInterface]") REQUIRE(test_task_onchange); REQUIRE(test_task_onchange->state == TaskStatus::State::Queued); - TaskStatus server_task; - server_task.task_profile = task_profile1; - server_task.state = TaskStatus::State::Executing; + // this task is received by server + const auto server_task = + TaskStatus::make(test_add_task->task_id, now, clean); + server_task->state = TaskStatus::State::Executing; // Update it as executing - action_server->update_status(server_task); + action_server->update_status(*server_task); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(1.5)); @@ -171,9 +162,9 @@ SCENARIO("Action communication with client and server", "[ActionInterface]") REQUIRE(!test_task_onterminate); // havnt terminated yet // completion - server_task.state = TaskStatus::State::Completed; + server_task->state = TaskStatus::State::Completed; // Update it as executing - action_server->update_status(server_task); + action_server->update_status(*server_task); executor.spin_until_future_complete(ready_future, rmf_traffic::time::from_seconds(0.5)); diff --git a/rmf_task_ros2/test/bidding/test_Evaluator.cpp b/rmf_task_ros2/test/bidding/test_Evaluator.cpp deleted file mode 100644 index cc9159ad9..000000000 --- a/rmf_task_ros2/test/bidding/test_Evaluator.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (C) 2020 Open Source Robotics Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -#include -#include "../../src/rmf_task_ros2/bidding/internal_Auctioneer.hpp" -#include - -namespace rmf_task_ros2 { -namespace bidding { - -//============================================================================== -auto now = std::chrono::steady_clock::now(); - -Submission submission1{ - "fleet1", "", 2.3, 3.4, rmf_traffic::time::apply_offset(now, 5) -}; -Submission submission2{ - "fleet2", "", 3.5, 3.6, rmf_traffic::time::apply_offset(now, 5.5) -}; -Submission submission3{ - "fleet3", "", 0.0, 1.4, rmf_traffic::time::apply_offset(now, 3) -}; -Submission submission4{ - "fleet4", "", 5.0, 5.4, rmf_traffic::time::apply_offset(now, 4) -}; -Submission submission5{ - "fleet5", "", 0.5, 0.8, rmf_traffic::time::apply_offset(now, 3.5) -}; - -//============================================================================== -SCENARIO("Auctioneer Winner Evaluator", "[Evaluator]") -{ - rclcpp::init(0, nullptr); - auto node = rclcpp::Node::make_shared("test_selfbidding"); - auto auctioneer = Auctioneer::make(node, - [](const std::string&, const std::optional) {}); - - WHEN("Least Diff Cost Evaluator") - { - auto eval = std::make_shared(); - auctioneer->select_evaluator(eval); - - AND_WHEN("0 submissions") - { - std::vector submissions{}; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(!winner); // no winner - } - AND_WHEN("5 submissions") - { - std::vector submissions{ - submission1, submission2, submission3, submission4, submission5 }; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(winner->fleet_name == "fleet2"); // least diff cost agent - } - } - - WHEN("Least Fleet Cost Evaluator") - { - auto eval = std::make_shared(); - auctioneer->select_evaluator(eval); - - AND_WHEN("0 submissions") - { - std::vector submissions{}; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(!winner); // no winner - } - AND_WHEN("5 submissions") - { - std::vector submissions{ - submission1, submission2, submission3, submission4, submission5 }; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(winner->fleet_name == "fleet5"); // least diff cost agent - } - } - - WHEN("Quickest Finish Time Evaluator") - { - auto eval = std::make_shared(); - auctioneer->select_evaluator(eval); - - AND_WHEN("0 submissions") - { - std::vector submissions{}; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(!winner); // no winner - } - AND_WHEN("5 submissions") - { - std::vector submissions{ - submission1, submission2, submission3, submission4, submission5 }; - auto winner = evaluate(*auctioneer, submissions); - REQUIRE(winner->fleet_name == "fleet3"); // least diff cost agent - } - } - - rclcpp::shutdown(); -} - -} // namespace bidding -} // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/test/bidding/test_SelfBid.cpp b/rmf_task_ros2/test/bidding/test_SelfBid.cpp index 4656e70e4..d7d6e1e97 100644 --- a/rmf_task_ros2/test/bidding/test_SelfBid.cpp +++ b/rmf_task_ros2/test/bidding/test_SelfBid.cpp @@ -15,8 +15,8 @@ * */ -#include -#include +#include "../../src/rmf_task_ros2/bidding/MinimalBidder.hpp" +#include "../../src/rmf_task_ros2/bidding/Auctioneer.hpp" #include #include @@ -28,6 +28,7 @@ namespace rmf_task_ros2 { namespace bidding { using TaskProfile = rmf_task_msgs::msg::TaskProfile; +using BidNotice = rmf_task_msgs::msg::BidNotice; using TaskType = bidding::MinimalBidder::TaskType; //============================================================================== @@ -66,7 +67,8 @@ SCENARIO("Auction with 2 Bids", "[TwoBids]") node, /// Bidding Result Callback Function [&r_result_id, &r_result_winner]( - const std::string& task_id, const std::optional winner) + const std::string& task_id, + const std::optional winner) { if (!winner) return; @@ -83,7 +85,7 @@ SCENARIO("Auction with 2 Bids", "[TwoBids]") node, "bidder1", { TaskType::Station, TaskType::Delivery }, [&test_notice_bidder1](const BidNotice& notice) { - Submission best_robot_estimate; + MinimalBidder::Submission best_robot_estimate; test_notice_bidder1 = notice.task_profile; return best_robot_estimate; } @@ -94,7 +96,7 @@ SCENARIO("Auction with 2 Bids", "[TwoBids]") [&test_notice_bidder2](const BidNotice& notice) { // TaskType should not be supported - Submission best_robot_estimate; + MinimalBidder::Submission best_robot_estimate; best_robot_estimate.new_cost = 2.3; // lower cost than bidder1 test_notice_bidder2 = notice.task_profile; return best_robot_estimate; diff --git a/rmf_task_ros2/test/dispatcher/test_Dispatcher.cpp b/rmf_task_ros2/test/dispatcher/test_Dispatcher.cpp index 266bdd2e9..ac89b8611 100644 --- a/rmf_task_ros2/test/dispatcher/test_Dispatcher.cpp +++ b/rmf_task_ros2/test/dispatcher/test_Dispatcher.cpp @@ -19,8 +19,8 @@ #include // mock Fleet Adapter to test dispatcher -#include #include "../../src/rmf_task_ros2/action/Server.hpp" +#include "../../src/rmf_task_ros2/bidding/MinimalBidder.hpp" #include #include @@ -35,13 +35,13 @@ namespace rmf_task_ros2 { //============================================================================== SCENARIO("Dispatcehr API Test", "[Dispatcher]") { - Dispatcher::TaskDescription task_desc1; - Dispatcher::TaskDescription task_desc2; - task_desc1.task_type.type = rmf_task_msgs::msg::TaskType::TYPE_STATION; - task_desc2.task_type.type = rmf_task_msgs::msg::TaskType::TYPE_CLEAN; + const auto now = std::chrono::steady_clock::now(); + const auto clean_task = description::Clean::make(now, "clean_here"); + const auto delivery_task = description::Delivery::make( + now, "pickup", "dispenser1", "dropoff", "ingestor2", {}); //============================================================================ - auto dispatcher = Dispatcher::init_and_make_node("test_dispatcher_node"); + auto dispatcher = Dispatcher::init_and_make_node(); auto spin_thread = std::thread( [&dispatcher]() @@ -68,35 +68,43 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") REQUIRE(get_tasks_client->wait_for_service(std::chrono::milliseconds(0))); } - WHEN("Add 1 and cancel task") + WHEN("Add 2 tasks and cancel 1 task") { - // add task - const auto id = dispatcher->submit_task(task_desc1); - REQUIRE(dispatcher->active_tasks().size() == 1); + // add 2 tasks + const auto id = dispatcher->submit_task(delivery_task); + const auto id2 = dispatcher->submit_task(clean_task); + REQUIRE(dispatcher->active_tasks().size() == 2); REQUIRE(dispatcher->terminated_tasks().size() == 0); REQUIRE(dispatcher->get_task_state(*id) == TaskStatus::State::Pending); + REQUIRE(dispatcher->get_task_state(*id2) == TaskStatus::State::Pending); - // cancel task + // cancel task during bidding REQUIRE(dispatcher->cancel_task(*id)); - REQUIRE(dispatcher->active_tasks().size() == 0); + REQUIRE(dispatcher->active_tasks().size() == 1); REQUIRE(dispatcher->terminated_tasks().size() == 1); + REQUIRE(dispatcher->get_task_state(*id) == TaskStatus::State::Canceled); + + // wait task2 to fail due to no submissions from F.Adapters + std::this_thread::sleep_for(std::chrono::milliseconds(5500)); + REQUIRE(dispatcher->get_task_state(*id2) == TaskStatus::State::Failed); + REQUIRE(dispatcher->terminated_tasks().size() == 2); // check random id REQUIRE(!(dispatcher->get_task_state("non_existence_id"))); // add an invalid task - task_desc2.task_type.type = 10; // this is invalid - REQUIRE(dispatcher->submit_task(task_desc2) == std::nullopt); + const auto null_task = Description::make_description(now, 10); + REQUIRE(dispatcher->submit_task(null_task) == std::nullopt); } //============================================================================ // test on change fn callback int change_times = 0; - TaskProfile test_taskprofile; + std::string test_dispatcher_received_id = ""; dispatcher->on_change( - [&change_times, &test_taskprofile](const TaskStatusPtr status) + [&change_times, &test_dispatcher_received_id](const TaskStatusPtr status) { - test_taskprofile = status->task_profile; + test_dispatcher_received_id = status->task_id(); change_times++; } ); @@ -104,7 +112,7 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") WHEN("Track Task till Bidding Failed") { // Submit first task and wait for bidding - auto id = dispatcher->submit_task(task_desc1); + auto id = dispatcher->submit_task(delivery_task); REQUIRE(dispatcher->active_tasks().size() == 1); REQUIRE(dispatcher->get_task_state(*id) == TaskStatus::State::Pending); @@ -112,14 +120,14 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") std::this_thread::sleep_for(std::chrono::milliseconds(3500)); CHECK(dispatcher->get_task_state(*id) == TaskStatus::State::Failed); REQUIRE(dispatcher->terminated_tasks().size() == 1); - REQUIRE(test_taskprofile.task_id == id); + REQUIRE(test_dispatcher_received_id == id); CHECK(change_times == 2); // add and failed // Submit another task - id = dispatcher->submit_task(task_desc2); + id = dispatcher->submit_task(clean_task); std::this_thread::sleep_for(std::chrono::milliseconds(3000)); REQUIRE(dispatcher->terminated_tasks().size() == 2); - REQUIRE(test_taskprofile.task_id == *id); + REQUIRE(test_dispatcher_received_id == *id); CHECK(change_times == 4); // add and failed x2 } @@ -130,11 +138,11 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") auto bidder = bidding::MinimalBidder::make( node, "dummy_fleet", - { TaskType::Station, TaskType::Clean }, - [](const bidding::BidNotice&) + { TaskType::Delivery, TaskType::Clean }, + [](const rmf_task_msgs::msg::BidNotice&) { // Provide a best estimate - bidding::Submission best_robot_estimate; + bidding::MinimalBidder::Submission best_robot_estimate; best_robot_estimate.new_cost = 13.5; return best_robot_estimate; } @@ -148,15 +156,15 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") action_server->register_callbacks( // Add Task callback - [&action_server, &task_canceled_flag](const TaskProfile& task_profile) + [&action_server, &task_canceled_flag](const auto& task_profile) { // Start action task auto t = std::thread( [&action_server, &task_canceled_flag](auto profile) { - TaskStatus status; - status.task_profile = profile; - status.robot_name = "dumbot"; + const auto status = TaskStatus::make( + profile.task_id, std::chrono::steady_clock::now(), nullptr); + status->robot_name = "dumbot"; std::this_thread::sleep_for(std::chrono::seconds(2)); if (task_canceled_flag) @@ -166,20 +174,20 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") } // Executing - status.state = TaskStatus::State::Executing; - action_server->update_status(status); + status->state = TaskStatus::State::Executing; + action_server->update_status(*status); std::this_thread::sleep_for(std::chrono::seconds(1)); // Completed - status.state = TaskStatus::State::Completed; - action_server->update_status(status); + status->state = TaskStatus::State::Completed; + action_server->update_status(*status); }, task_profile ); t.detach(); return true; //successs (send State::Queued) }, // Cancel Task callback - [&action_server, &task_canceled_flag](const TaskProfile&) + [&action_server, &task_canceled_flag](const auto&) { task_canceled_flag = true; return true; //success ,send State::Canceled when dispatcher->cancel_task @@ -189,7 +197,7 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") //============================================================================ WHEN("Full Dispatch cycle") { - const auto id = dispatcher->submit_task(task_desc1); + const auto id = dispatcher->submit_task(delivery_task); CHECK(dispatcher->get_task_state(*id) == TaskStatus::State::Pending); std::this_thread::sleep_for(std::chrono::milliseconds(3500)); @@ -205,12 +213,11 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") CHECK(change_times == 4); // Pending > Queued > Executing > Completed // Add auto generated ChargeBattery Task from fleet adapter - TaskStatus status; - status.task_profile.task_id = "ChargeBattery10"; - status.state = TaskStatus::State::Queued; - status.task_profile.description.task_type.type = - rmf_task_msgs::msg::TaskType::TYPE_CHARGE_BATTERY; - action_server->update_status(status); + const auto status = TaskStatus::make( + "ChargeBattery10", std::chrono::steady_clock::now(), nullptr); + status->state = TaskStatus::State::Queued; + + action_server->update_status(*status); std::this_thread::sleep_for(std::chrono::seconds(1)); CHECK(change_times == 5); // new stray charge task @@ -219,7 +226,7 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") WHEN("Half way cancel Dispatch cycle") { - const auto id = dispatcher->submit_task(task_desc2); + const auto id = dispatcher->submit_task(clean_task); CHECK(dispatcher->get_task_state(*id) == TaskStatus::State::Pending); REQUIRE(dispatcher->active_tasks().size() == 1); std::this_thread::sleep_for(std::chrono::milliseconds(3000)); @@ -231,7 +238,7 @@ SCENARIO("Dispatcehr API Test", "[Dispatcher]") REQUIRE(dispatcher->active_tasks().size() == 0); REQUIRE(dispatcher->terminated_tasks().size() == 1); REQUIRE(dispatcher->terminated_tasks().begin()->first == *id); - auto status = dispatcher->terminated_tasks().begin()->second; + const auto status = dispatcher->terminated_tasks().begin()->second; CHECK(status->state == TaskStatus::State::Canceled); CHECK(change_times == 3); // Pending -> Queued -> Canceled } diff --git a/rmf_task_ros2/test/dispatcher/test_TaskStatus.cpp b/rmf_task_ros2/test/dispatcher/test_TaskStatus.cpp new file mode 100644 index 000000000..3c61ca2d6 --- /dev/null +++ b/rmf_task_ros2/test/dispatcher/test_TaskStatus.cpp @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2021 Open Source Robotics Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * +*/ + +#include +#include "../../src/rmf_task_ros2/internal_Description.hpp" + +#include +#include + +#include + +namespace rmf_task_ros2 { + +//============================================================================== +SCENARIO("TaskStatus and Description Test", "[TaskStatus]") +{ + std::cout << "[Testing Description Task Type]" << std::endl; + + using TaskDescription = rmf_task_msgs::msg::TaskDescription; + using TaskType = rmf_task_msgs::msg::TaskType; + using StatusMsg = rmf_task_msgs::msg::TaskSummary; + +//============================================================================== + // Check State Enum Val, to sync with msg + REQUIRE((uint8_t)TaskStatus::State::Queued == StatusMsg::STATE_QUEUED); + REQUIRE((uint8_t)TaskStatus::State::Executing == StatusMsg::STATE_ACTIVE); + REQUIRE((uint8_t)TaskStatus::State::Completed == StatusMsg::STATE_COMPLETED); + REQUIRE((uint8_t)TaskStatus::State::Failed == StatusMsg::STATE_FAILED); + REQUIRE((uint8_t)TaskStatus::State::Canceled == StatusMsg::STATE_CANCELED); + REQUIRE((uint8_t)TaskStatus::State::Pending == StatusMsg::STATE_PENDING); + +//============================================================================== + const auto now = std::chrono::steady_clock::now(); + + // test delivery description msg + auto delivery = description::Delivery::make( + now, "pick", "dis", "drop", "ing", {}); + REQUIRE(delivery->type() == TaskType::TYPE_DELIVERY); + + // By default this will be a Station Type, which is not supported + TaskDescription msg; + + auto delivery2 = description::make_delivery_from_msg(msg); + REQUIRE(!delivery2); + + // Create TaskStatus Object + auto delivery_status = TaskStatus::make("Delivery001", now, delivery); + REQUIRE(delivery_status->task_id() == "Delivery001"); + REQUIRE(delivery_status->state == TaskStatus::State::Pending); + REQUIRE(!delivery_status->is_terminated()); + +//============================================================================== + // test loop descripttion msg + auto loop = description::Loop::make(now, "start_yo", "end_yo", 1); + REQUIRE(loop->type() == TaskType::TYPE_LOOP); + + auto d_loop = std::dynamic_pointer_cast(loop); + auto loop2 = description::make_loop_from_msg(msg); + REQUIRE(!loop2); + +//============================================================================== + // test clean descripttion msg + auto clean = description::Clean::make(now, "clean_here"); + REQUIRE(clean->type() == TaskType::TYPE_CLEAN); + auto clean2 = description::make_clean_from_msg(msg); + REQUIRE(!clean2); +} + +} // namespace rmf_task_ros2