-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Added Lifecycle Subscription wrapper #5772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
35640ef
2ee17a2
26c33df
c49f05c
92d3264
ef44b00
53d59d6
df658f7
c114c21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,85 +35,57 @@ namespace interfaces | |
| { | ||
|
|
||
| /** | ||
| * @brief Create a SubscriptionOptions object with Nav2's QoS profiles and options | ||
| * @brief Create a subscription to a topic using Nav2 QoS profiles and SubscriptionOptions | ||
| * @param node Node to create the subscription on | ||
| * @param topic_name Name of topic | ||
| * @param allow_parameter_qos_overrides Whether to allow QoS overrides for this subscription | ||
| * @param callback_group_ptr Pointer to the callback group to use for this subscription | ||
| * @param qos_message_lost_callback Callback for when a QoS message is lost | ||
| * @param subscription_matched_callback Callback when a subscription is matched with a publisher | ||
| * @param incompatible_qos_type_callback Callback for when an incompatible QoS type is requested | ||
| * @param qos_requested_incompatible_qos_callback Callback for when a QoS request is incompatible | ||
| * @param qos_deadline_requested_callback Callback for when a QoS deadline is missed | ||
| * @param qos_liveliness_changed_callback Callback for when a QoS liveliness change occurs | ||
| * @return A nav2::SubscriptionOptions object with the specified configurations | ||
| * @param callback Callback function to handle incoming messages | ||
| * @param qos QoS settings for the subscription (default is nav2::qos::StandardTopicQoS()) | ||
| * @param callback_group The callback group to use (if provided) | ||
| * @return A shared pointer to the created lifecycle-enabled subscription | ||
| */ | ||
| inline rclcpp::SubscriptionOptions createSubscriptionOptions( | ||
| template<typename MessageT, typename NodeT, typename CallbackT> | ||
| typename nav2::Subscription<MessageT>::SharedPtr create_subscription( | ||
| const NodeT & node, | ||
| const std::string & topic_name, | ||
| const bool allow_parameter_qos_overrides = true, | ||
| const rclcpp::CallbackGroup::SharedPtr callback_group_ptr = nullptr, | ||
| rclcpp::QOSMessageLostCallbackType qos_message_lost_callback = nullptr, | ||
| rclcpp::SubscriptionMatchedCallbackType subscription_matched_callback = nullptr, | ||
| rclcpp::IncompatibleTypeCallbackType incompatible_qos_type_callback = nullptr, | ||
| rclcpp::QOSRequestedIncompatibleQoSCallbackType requested_incompatible_qos_callback = nullptr, | ||
| rclcpp::QOSDeadlineRequestedCallbackType qos_deadline_requested_callback = nullptr, | ||
| rclcpp::QOSLivelinessChangedCallbackType qos_liveliness_changed_callback = nullptr) | ||
|
Comment on lines
-52
to
-59
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need all this, do not change |
||
| CallbackT && callback, | ||
| const rclcpp::QoS & qos = nav2::qos::StandardTopicQoS(), | ||
| const rclcpp::CallbackGroup::SharedPtr & callback_group = nullptr) | ||
| { | ||
| rclcpp::SubscriptionOptions options; | ||
| // Allow for all topics to have QoS overrides | ||
| if (allow_parameter_qos_overrides) { | ||
| options.qos_overriding_options = rclcpp::QosOverridingOptions( | ||
| {rclcpp::QosPolicyKind::Depth, rclcpp::QosPolicyKind::Durability, | ||
| rclcpp::QosPolicyKind::Reliability, rclcpp::QosPolicyKind::History}); | ||
| } | ||
| bool allow_parameter_qos_overrides = nav2::declare_or_get_parameter( | ||
| node, "allow_parameter_qos_overrides", true); | ||
|
|
||
| // Set the callback group to use for this subscription, if given | ||
| options.callback_group = callback_group_ptr; | ||
| // Create the lifecycle subscription wrapper | ||
| auto lifecycle_sub = std::make_shared<nav2::LifecycleSubscription<MessageT>>( | ||
| nullptr, // Will be set below | ||
| topic_name); | ||
|
|
||
| // ROS 2 default logs this already | ||
| options.event_callbacks.incompatible_qos_callback = requested_incompatible_qos_callback; | ||
| options.event_callbacks.incompatible_type_callback = incompatible_qos_type_callback; | ||
| // Store the user's callback | ||
| using MessageSharedPtr = std::shared_ptr<MessageT>; | ||
| std::function<void(MessageSharedPtr)> user_callback = std::forward<CallbackT>(callback); | ||
|
|
||
| // Set the event callbacks if given, else log | ||
| if (qos_message_lost_callback) { | ||
| options.event_callbacks.message_lost_callback = | ||
| qos_message_lost_callback; | ||
| } else { | ||
| options.event_callbacks.message_lost_callback = | ||
| [topic_name](rclcpp::QOSMessageLostInfo & info) { | ||
| RCLCPP_WARN( | ||
| rclcpp::get_logger("nav2::interfaces"), | ||
| "[%lu] Message was dropped on topic [%s] due to queue size or network constraints.", | ||
| info.total_count_change, | ||
| topic_name.c_str()); | ||
| }; | ||
| } | ||
| // Create wrapped callback that checks lifecycle state | ||
| auto wrapped_callback = [lifecycle_sub, user_callback](MessageSharedPtr msg) { | ||
| if (lifecycle_sub->should_process_message()) { | ||
| user_callback(msg); | ||
| } | ||
| }; | ||
|
|
||
| if (subscription_matched_callback) { | ||
| options.event_callbacks.matched_callback = subscription_matched_callback; | ||
| } else { | ||
| options.event_callbacks.matched_callback = | ||
| [topic_name](rclcpp::MatchedInfo & status) { | ||
| if (status.current_count_change > 0) { | ||
| RCLCPP_DEBUG( | ||
| rclcpp::get_logger("nav2::interfaces"), | ||
| "Connected: %d new publisher(s) to [%s]. Total active: %zu.", | ||
| status.current_count_change, | ||
| topic_name.c_str(), | ||
| status.current_count); | ||
| } else if (status.current_count_change < 0) { | ||
| RCLCPP_DEBUG( | ||
| rclcpp::get_logger("nav2::interfaces"), | ||
| "Disconnected: %d publisher(s) from [%s]. Total active: %zu.", | ||
| -status.current_count_change, | ||
| topic_name.c_str(), | ||
| status.current_count); | ||
| } | ||
| }; | ||
| } | ||
| // Create the actual ROS 2 subscription with wrapped callback | ||
| auto params_interface = node->get_node_parameters_interface(); | ||
| auto topics_interface = node->get_node_topics_interface(); | ||
| auto sub = rclcpp::create_subscription<MessageT>( | ||
| params_interface, | ||
| topics_interface, | ||
| topic_name, | ||
| qos, | ||
| wrapped_callback, | ||
| createSubscriptionOptions(topic_name, allow_parameter_qos_overrides, callback_group)); | ||
|
|
||
| options.event_callbacks.deadline_callback = qos_deadline_requested_callback; | ||
| options.event_callbacks.liveliness_callback = qos_liveliness_changed_callback; | ||
| return options; | ||
| // Set the subscription in the lifecycle wrapper | ||
| // We need to update LifecycleSubscription to have a setter | ||
| lifecycle_sub->set_subscription(sub); | ||
|
|
||
| return lifecycle_sub; | ||
| } | ||
|
|
||
| /** | ||
|
|
||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File Purpose: Extended LifecycleNode with managed entity interface support for lifecycle management. Additions:
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| #include "rcl_interfaces/msg/parameter_descriptor.hpp" | ||
| #include "nav2_ros_common/node_thread.hpp" | ||
| #include "rclcpp_lifecycle/lifecycle_node.hpp" | ||
| #include "rclcpp_lifecycle/managed_entity.hpp" // ADDED: For lifecycle management | ||
| #include "rclcpp/rclcpp.hpp" | ||
| #include "bondcpp/bond.hpp" | ||
| #include "bond/msg/constants.hpp" | ||
|
|
@@ -157,9 +158,21 @@ class LifecycleNode : public rclcpp_lifecycle::LifecycleNode | |
| const rclcpp::QoS & qos = nav2::qos::StandardTopicQoS(), | ||
| const rclcpp::CallbackGroup::SharedPtr & callback_group = nullptr) | ||
| { | ||
| return nav2::interfaces::create_subscription<MessageT>( | ||
| auto sub = nav2::interfaces::create_subscription<MessageT>( | ||
| shared_from_this(), topic_name, | ||
| std::forward<CallbackT>(callback), qos, callback_group); | ||
|
|
||
| // ADDED: Add to managed entities | ||
| this->add_managed_entity(sub); | ||
|
|
||
| // ADDED: Automatically activate the subscription if the node is already active | ||
| if (get_current_state().id() == | ||
| lifecycle_msgs::msg::State::PRIMARY_STATE_ACTIVE) | ||
| { | ||
| sub->on_activate(); | ||
| } | ||
|
|
||
| return sub; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -369,6 +382,47 @@ class LifecycleNode : public rclcpp_lifecycle::LifecycleNode | |
| } | ||
| } | ||
|
|
||
| // ADDED: New methods for lifecycle interface management | ||
| /** | ||
| * @brief Add a managed entity (publisher, subscription, etc.) to be lifecycle-managed | ||
| * @param entity Shared pointer to the managed entity | ||
| */ | ||
| void add_managed_entity(std::shared_ptr<rclcpp_lifecycle::ManagedEntityInterface> entity) | ||
| { | ||
| managed_entities_.push_back(entity); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Activate all managed interfaces in the correct order | ||
| * Order: Publishers -> Service Clients -> Action Clients -> | ||
| * Subscriptions -> Service Servers -> Action Servers | ||
| */ | ||
| void activateInterfaces() | ||
| { | ||
| RCLCPP_INFO(get_logger(), "Activating all managed interfaces for %s", get_name()); | ||
| for (auto & entity : managed_entities_) { | ||
| if (entity) { | ||
| entity->on_activate(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @brief Deactivate all managed interfaces in reverse order | ||
| * Order: Action Servers -> Service Servers -> Subscriptions -> | ||
| * Action Clients -> Service Clients -> Publishers | ||
| */ | ||
| void deactivateInterfaces() | ||
| { | ||
| RCLCPP_INFO(get_logger(), "Deactivating all managed interfaces for %s", get_name()); | ||
| // Deactivate in reverse order | ||
| for (auto it = managed_entities_.rbegin(); it != managed_entities_.rend(); ++it) { | ||
| if (*it) { | ||
| (*it)->on_deactivate(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| protected: | ||
| /** | ||
| * @brief Print notifications for lifecycle node | ||
|
|
@@ -426,6 +480,9 @@ class LifecycleNode : public rclcpp_lifecycle::LifecycleNode | |
| std::shared_ptr<bond::Bond> bond_{nullptr}; | ||
| double bond_heartbeat_period{0.1}; | ||
| rclcpp::TimerBase::SharedPtr autostart_timer_; | ||
|
|
||
| // ADDED: Vector to store all managed entities (publishers, subscriptions, etc.) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line and ones like it are making me think this is the output of an LLM. How did you validate, test, review, and compile this? We do not typically review AI outputs without you taking responsibility for quality control, we cannot review direct AI outputs - it is not a good use of maintainer time and resources. If you want to contribute a feature or fix, please put in your own legwork / effort to at least do QA and testing.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your feedback @SteveMacenski ,
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its ok - just for the future its important to do your own QC before involving other developers :-) |
||
| std::vector<std::shared_ptr<rclcpp_lifecycle::ManagedEntityInterface>> managed_entities_; | ||
|
|
||
| private: | ||
| /** | ||
|
|
@@ -448,4 +505,4 @@ class LifecycleNode : public rclcpp_lifecycle::LifecycleNode | |
|
|
||
| } // namespace nav2 | ||
|
|
||
| #endif // NAV2_ROS_COMMON__LIFECYCLE_NODE_HPP_ | ||
| #endif // NAV2_ROS_COMMON__LIFECYCLE_NODE_HPP_ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| // Copyright (c) 2025 Open Navigation LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| #ifndef NAV2_ROS_COMMON__LIFECYCLE_SUBSCRIPTION_HPP_ | ||
| #define NAV2_ROS_COMMON__LIFECYCLE_SUBSCRIPTION_HPP_ | ||
|
|
||
| #include <functional> | ||
| #include <memory> | ||
| #include <string> | ||
|
|
||
| #include "rclcpp/rclcpp.hpp" | ||
| #include "rclcpp/logging.hpp" | ||
| #include "rclcpp_lifecycle/managed_entity.hpp" | ||
|
|
||
| namespace nav2 | ||
| { | ||
|
|
||
| /// @brief Lifecycle-aware subscription wrapper for Nav2 | ||
| /** | ||
| * This wrapper adds lifecycle management to ROS 2 subscriptions. | ||
| * When inactive, subscription callbacks are blocked and a warning is logged. | ||
| */ | ||
| template<typename MessageT> | ||
| class LifecycleSubscription : public rclcpp_lifecycle::SimpleManagedEntity | ||
| { | ||
| public: | ||
| using SharedPtr = std::shared_ptr<LifecycleSubscription<MessageT>>; | ||
| using CallbackType = std::function<void(std::shared_ptr<MessageT>)>; | ||
|
|
||
| /** | ||
| * @brief Constructor | ||
| * @param subscription The underlying ROS 2 subscription (can be nullptr initially) | ||
| * @param topic_name Name of the topic (for logging) | ||
| */ | ||
| LifecycleSubscription( | ||
| typename rclcpp::Subscription<MessageT>::SharedPtr subscription, | ||
| const std::string & topic_name) | ||
| : subscription_(subscription), | ||
| topic_name_(topic_name), | ||
| should_log_(true), | ||
| logger_(rclcpp::get_logger("LifecycleSubscription")) | ||
| { | ||
| } | ||
|
|
||
| /** | ||
| * @brief Set the underlying subscription (used by factory) | ||
| */ | ||
| void set_subscription(typename rclcpp::Subscription<MessageT>::SharedPtr subscription) | ||
| { | ||
| subscription_ = subscription; | ||
| } | ||
|
|
||
| ~LifecycleSubscription() = default; | ||
|
|
||
| /** | ||
| * @brief Get the underlying subscription | ||
| */ | ||
| typename rclcpp::Subscription<MessageT>::SharedPtr get_subscription() const | ||
| { | ||
| return subscription_; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Get the topic name | ||
| */ | ||
| const std::string & get_topic_name() const | ||
| { | ||
| return topic_name_; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Activation callback - resets logging flag | ||
| */ | ||
| void on_activate() override | ||
| { | ||
| SimpleManagedEntity::on_activate(); | ||
| should_log_ = true; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Check if message should be processed | ||
| * @return true if activated, false otherwise (and logs warning) | ||
| */ | ||
| bool should_process_message() | ||
| { | ||
| if (!this->is_activated()) { | ||
| log_subscription_not_enabled(); | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| private: | ||
| /** | ||
| * @brief Log that subscription is not enabled | ||
| */ | ||
| void log_subscription_not_enabled() | ||
| { | ||
| if (!should_log_) { | ||
| return; | ||
| } | ||
|
|
||
| RCLCPP_WARN( | ||
| logger_, | ||
| "Trying to process message on subscription '%s', but the subscription is not activated", | ||
| topic_name_.c_str()); | ||
|
|
||
| should_log_ = false; | ||
| } | ||
|
|
||
| typename rclcpp::Subscription<MessageT>::SharedPtr subscription_; | ||
| std::string topic_name_; | ||
| bool should_log_; | ||
| rclcpp::Logger logger_; | ||
| }; | ||
|
|
||
| } // namespace nav2 | ||
|
|
||
| #endif // NAV2_ROS_COMMON__LIFECYCLE_SUBSCRIPTION_HPP_ |
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File Purpose: Nav2 nodes rely on managed entities to ensure subscriptions follow lifecycle |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
| #define NAV2_ROS_COMMON__SUBSCRIPTION_HPP_ | ||
|
|
||
| #include <memory> | ||
| #include "rclcpp/rclcpp.hpp" | ||
| #include "nav2_ros_common/lifecycle_subscription.hpp" | ||
|
|
||
| namespace nav2 | ||
| { | ||
|
|
@@ -27,7 +27,7 @@ namespace nav2 | |
| * which may be further built up on in the future with custom APIs. | ||
| */ | ||
| template<typename MessageT> | ||
| using Subscription = rclcpp::Subscription<MessageT>; | ||
| using Subscription = LifecycleSubscription<MessageT>; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file should contain the new subscription, not lifecycle_subscription.hpp. This is now |
||
|
|
||
| } // namespace nav2 | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File Purpose: -Implements lifecycle-aware create_subscription and Provides overloads for rclcpp::Node and rclcpp_lifecycle::LifecycleNode.
Changes:
Replaced single complex
create_subscriptionfunction with two overloads that automatically select appropriate behavior based on node type.