See More

// Copyright 2015 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include #include #include #include #include #include "rclcpp/callback_group.hpp" #include "rclcpp/client.hpp" #include "rclcpp/service.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/waitable.hpp" using rclcpp::CallbackGroup; using rclcpp::CallbackGroupType; CallbackGroup::CallbackGroup( CallbackGroupType group_type, rclcpp::Context::WeakPtr context, bool automatically_add_to_executor_with_node) : type_(group_type), associated_with_executor_(false), can_be_taken_from_(true), automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node), context_(context) {} CallbackGroup::~CallbackGroup() { trigger_notify_guard_condition(); } std::atomic_bool & CallbackGroup::can_be_taken_from() { return can_be_taken_from_; } const CallbackGroupType & CallbackGroup::type() const { return type_; } size_t CallbackGroup::size() const { return subscription_ptrs_.size() + service_ptrs_.size() + client_ptrs_.size() + timer_ptrs_.size() + waitable_ptrs_.size(); } void CallbackGroup::collect_all_ptrs( std::function sub_func, std::function service_func, std::function client_func, std::function timer_func, std::function waitable_func) const { std::lock_guard<:mutex> lock(mutex_); for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) { rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock(); if (ref_ptr) { sub_func(ref_ptr); } } for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) { rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock(); if (ref_ptr) { service_func(ref_ptr); } } for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) { rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock(); if (ref_ptr) { client_func(ref_ptr); } } for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) { rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock(); if (ref_ptr) { timer_func(ref_ptr); } } for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) { rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock(); if (ref_ptr) { waitable_func(ref_ptr); } } } std::atomic_bool & CallbackGroup::get_associated_with_executor_atomic() { return associated_with_executor_; } bool CallbackGroup::automatically_add_to_executor_with_node() const { return automatically_add_to_executor_with_node_; } rclcpp::GuardCondition::SharedPtr CallbackGroup::get_notify_guard_condition() { std::lock_guard<:recursive_mutex> lock(notify_guard_condition_mutex_); rclcpp::Context::SharedPtr context_ptr = context_.lock(); if (context_ptr && context_ptr->is_valid()) { if (!notify_guard_condition_) { notify_guard_condition_ = std::make_shared<:guardcondition>(context_ptr); } return notify_guard_condition_; } return nullptr; } void CallbackGroup::trigger_notify_guard_condition() { std::lock_guard<:recursive_mutex> lock(notify_guard_condition_mutex_); if (notify_guard_condition_) { notify_guard_condition_->trigger(); } } void CallbackGroup::add_subscription( const rclcpp::SubscriptionBase::SharedPtr subscription_ptr) { std::lock_guard<:mutex> lock(mutex_); subscription_ptrs_.push_back(subscription_ptr); subscription_ptrs_.erase( std::remove_if( subscription_ptrs_.begin(), subscription_ptrs_.end(), [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}), subscription_ptrs_.end()); } void CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr) { std::lock_guard<:mutex> lock(mutex_); timer_ptrs_.push_back(timer_ptr); timer_ptrs_.erase( std::remove_if( timer_ptrs_.begin(), timer_ptrs_.end(), [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}), timer_ptrs_.end()); } void CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr) { std::lock_guard<:mutex> lock(mutex_); service_ptrs_.push_back(service_ptr); service_ptrs_.erase( std::remove_if( service_ptrs_.begin(), service_ptrs_.end(), [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}), service_ptrs_.end()); } void CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr) { std::lock_guard<:mutex> lock(mutex_); client_ptrs_.push_back(client_ptr); client_ptrs_.erase( std::remove_if( client_ptrs_.begin(), client_ptrs_.end(), [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}), client_ptrs_.end()); } void CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) { std::lock_guard<:mutex> lock(mutex_); waitable_ptrs_.push_back(waitable_ptr); waitable_ptrs_.erase( std::remove_if( waitable_ptrs_.begin(), waitable_ptrs_.end(), [](rclcpp::Waitable::WeakPtr x) {return x.expired();}), waitable_ptrs_.end()); } void CallbackGroup::remove_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) noexcept { std::lock_guard<:mutex> lock(mutex_); for (auto iter = waitable_ptrs_.begin(); iter != waitable_ptrs_.end(); ++iter) { const auto shared_ptr = iter->lock(); if (shared_ptr.get() == waitable_ptr.get()) { waitable_ptrs_.erase(iter); break; } } }