diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 4f126b9633..78295197d1 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -38,6 +38,7 @@ jobs: run: | git submodule update --init \ thirdparty/asio/asio \ + thirdparty/concurrentqueue/concurrentqueue \ thirdparty/benchmark/benchmark \ thirdparty/ecaludp/ecaludp \ thirdparty/fineftp/fineftp-server \ diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index e93cd76b56..d08931b43f 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -81,6 +81,7 @@ jobs: run: | git submodule update --init \ thirdparty/asio/asio \ + thirdparty/concurrentqueue/concurrentqueue \ thirdparty/benchmark/benchmark \ thirdparty/ecaludp/ecaludp \ thirdparty/fineftp/fineftp-server \ diff --git a/.gitmodules b/.gitmodules index f1b360bb89..e57be9fa72 100644 --- a/.gitmodules +++ b/.gitmodules @@ -61,3 +61,6 @@ [submodule "thirdparty/protozero/protozero"] path = thirdparty/protozero/protozero url = https://github.com/mapbox/protozero.git +[submodule "thirdparty/concurrentqueue/concurrentqueue"] + path = thirdparty/concurrentqueue/concurrentqueue + url = https://github.com/cameron314/concurrentqueue.git diff --git a/LICENSES/LICENSES_all/concurrentqueue/LICENSE.md b/LICENSES/LICENSES_all/concurrentqueue/LICENSE.md new file mode 100644 index 0000000000..519338976f --- /dev/null +++ b/LICENSES/LICENSES_all/concurrentqueue/LICENSE.md @@ -0,0 +1,62 @@ +This license file applies to everything in this repository except that which +is explicitly annotated as being written by other authors, i.e. the Boost +queue (included in the benchmarks for comparison), Intel's TBB library (ditto), +dlib::pipe (ditto), +the CDSChecker tool (used for verification), the Relacy model checker (ditto), +and Jeff Preshing's semaphore implementation (used in the blocking queue) which +has a zlib license (embedded in lightweightsempahore.h). + +--- + +Simplified BSD License: + +Copyright (c) 2013-2016, Cameron Desrochers. +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, this list of +conditions and the following disclaimer. +- Redistributions in binary form must reproduce the above copyright notice, this list of +conditions and the following disclaimer in the documentation and/or other materials +provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +--- + +I have also chosen to dual-license under the Boost Software License as an alternative to +the Simplified BSD license above: + +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/NOTICE.md b/NOTICE.md index cd8cd46758..46e017924f 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -84,6 +84,16 @@ eCAL makes use of the following external components: - Git Submodule `/thirdparty/benchmark/benchmark` - Testing +### concurrentqueue + +- **License**: BSD-2-Clause +- **Copyright**: Copyright (c) 2013-2016, Cameron Desrochers. +- **Repository**: https://github.com/cameron314/concurrentqueue +- **Included in**: + - Git Submodule `/thirdparty/concurrentqueue/concurrentqueue` + - Windows builds + - Linux builds + ### Cap'n Proto - **License**: MIT diff --git a/app/rec/rec_client_core/CMakeLists.txt b/app/rec/rec_client_core/CMakeLists.txt index fceded4913..1bcfa8e7c5 100644 --- a/app/rec/rec_client_core/CMakeLists.txt +++ b/app/rec/rec_client_core/CMakeLists.txt @@ -20,6 +20,7 @@ find_package(Threads REQUIRED) find_package(Protobuf REQUIRED) find_package(spdlog REQUIRED) +find_package(concurrentqueue REQUIRED) if (ECAL_USE_CURL) find_package(CURL REQUIRED) @@ -106,6 +107,7 @@ target_link_libraries(${PROJECT_NAME} Threads::Threads eCAL::ecal-utils EcalParser + concurrentqueue::concurrentqueue ) if(ECAL_USE_CURL) diff --git a/app/rec/rec_client_core/src/ecal_rec_impl.cpp b/app/rec/rec_client_core/src/ecal_rec_impl.cpp index da219b6a88..f78fbed470 100644 --- a/app/rec/rec_client_core/src/ecal_rec_impl.cpp +++ b/app/rec/rec_client_core/src/ecal_rec_impl.cpp @@ -39,6 +39,7 @@ #include "rec_client_core/topic_info.h" #include "rec_client_core/upload_config.h" +#include #include #include #include @@ -66,6 +67,59 @@ namespace eCAL { namespace rec { + class EcalRecImpl::ReceiveDispatchThread : public InterruptibleLoopThread + { + public: + ReceiveDispatchThread(EcalRecImpl& ecal_rec_impl_) + : InterruptibleLoopThread(std::chrono::milliseconds(10)) + , ecal_rec_impl(ecal_rec_impl_) + , token(ecal_rec_impl_.receive_dispatch_queue_) + { + } + + protected: + // forward data from the queue to the actual buffer + void Loop() override + { + constexpr int max_items = 16; + std::array, max_items> frames; + size_t no_items = 0; + do + { + no_items = ecal_rec_impl.receive_dispatch_queue_.try_dequeue_bulk(token, frames.data(), frames.size()); + assert(no_items < max_items); + + for (size_t i = 0; i < no_items; ++i) + { + auto& frame = frames[i]; + + // Add to the pre-buffer (it is thread-safe by using a mutex internally) + ecal_rec_impl.pre_buffer_.push_back(frame); + + { + // Add to the currently recording job (if there is any) + const std::shared_lock recorder_lock(ecal_rec_impl.recorder_mutex_); + if (ecal_rec_impl.recording_recorder_job_ != nullptr) + { + ecal_rec_impl.recording_recorder_job_->AddFrame(frame); + } + } + + { + // Add to the subscriber statistics + const std::lock_guard subscriber_statistics_lock(ecal_rec_impl.subscriber_throughput_mutex_); + ecal_rec_impl.subscriber_throughput_statistics_.AddFrame(frame->data_.size()); + } + } + } while (no_items > 0); + } + + private: + EcalRecImpl& ecal_rec_impl; + moodycamel::ConsumerToken token; + }; + + EcalRecImpl::EcalRecImpl() : addon_manager_(std::make_unique([this](int64_t job_id, const std::string& addon_id, const RecAddonJobStatus& job_status) { @@ -90,11 +144,15 @@ namespace eCAL monitoring_thread_ = std::make_unique(*this); monitoring_thread_->Start(); + + receive_dispatch_thread = std::make_unique(*this); + receive_dispatch_thread->Start(); } EcalRecImpl::~EcalRecImpl() { - DisconnectFromEcal(); + receive_dispatch_thread->Interrupt(); + receive_dispatch_thread->Join(); // Interrupt monitoring thread monitoring_thread_->Interrupt(); @@ -723,28 +781,14 @@ namespace eCAL void EcalRecImpl::EcalMessageReceived(const eCAL::STopicId& topic_id_, const eCAL::SReceiveCallbackData& data_) { + const thread_local moodycamel::ProducerToken token(receive_dispatch_queue_); + auto ecal_receive_time = eCAL::Time::ecal_clock::now(); auto system_receive_time = std::chrono::steady_clock::now(); std::shared_ptr frame = std::make_shared(&data_, topic_id_.topic_name, ecal_receive_time, system_receive_time); - // Add to the pre-buffer (it is thread-safe by using a mutex internally) - pre_buffer_.push_back(frame); - - { - // Add to the currently recording job (if there is any) - std::shared_lock recorder_lock(recorder_mutex_); - if (recording_recorder_job_ != nullptr) - { - recording_recorder_job_->AddFrame(std::move(frame)); - } - } - - { - // Add to the subscriber statistics - const std::lock_guard subscriber_statistics_lock(subscriber_throughput_mutex_); - subscriber_throughput_statistics_.AddFrame(data_.buffer_size); - } + receive_dispatch_queue_.enqueue(token, std::move(frame)); } Throughput EcalRecImpl::GetSubscriberThroughput() const diff --git a/app/rec/rec_client_core/src/ecal_rec_impl.h b/app/rec/rec_client_core/src/ecal_rec_impl.h index d73382a159..28bccf9cc3 100644 --- a/app/rec/rec_client_core/src/ecal_rec_impl.h +++ b/app/rec/rec_client_core/src/ecal_rec_impl.h @@ -34,6 +34,8 @@ #include #include +#include + #include #include #include @@ -54,6 +56,7 @@ namespace eCAL class MonitoringThread; class AddonManager; + struct SubscriberStatisticsEntry { std::chrono::steady_clock::time_point start_of_statistics; @@ -63,6 +66,9 @@ namespace eCAL class EcalRecImpl { + private: + class ReceiveDispatchThread; + public: EcalRecImpl(); ~EcalRecImpl(); @@ -167,6 +173,8 @@ namespace eCAL std::unique_ptr garbage_collector_trigger_thread_; /** frame_buffer_, buffer_writer_threads_, max_pre_buffer_length_ */ std::unique_ptr monitoring_thread_; /** connected_to_ecal_, FilterAvailableTopics_NoLock(hosts_filter_, topic_whitelist_, topic_blacklist_), CreateNewSubscribers_NoLock(subscriber_map_), main_writer_thread_, buffer_writer_threads_ */ + std::unique_ptr receive_dispatch_thread; + moodycamel::BlockingConcurrentQueue> receive_dispatch_queue_; /* Lock-free queue to keep callbacks busy for the least amount of time */ // Pre-buffer FrameBuffer pre_buffer_; /** < Thread-safe framebuffer */ diff --git a/cmake/submodule_dependencies.cmake b/cmake/submodule_dependencies.cmake index 622a81b21c..959ddbb39b 100644 --- a/cmake/submodule_dependencies.cmake +++ b/cmake/submodule_dependencies.cmake @@ -6,6 +6,7 @@ set(ecal_submodule_dependency_provider_root_dir ${CMAKE_CURRENT_LIST_DIR}) set(ecal_submodule_dependencies asio CMakeFunctions + concurrentqueue CURL ecaludp fineftp diff --git a/doc/rst/license/thirdparty_licenses.rst b/doc/rst/license/thirdparty_licenses.rst index b95d1b61d7..bff4459270 100644 --- a/doc/rst/license/thirdparty_licenses.rst +++ b/doc/rst/license/thirdparty_licenses.rst @@ -36,6 +36,12 @@ Some dependencies, like GoogleTest, are not used in our officially distributed b - Apache-2.0 + * - :ref:`concurrentqueue ` + + - Copyright (c) 2013-2016, Cameron Desrochers. + + - BSD-2-Clause + * - :ref:`Cap'n Proto ` - Copyright (c) 2013-2017 Sandstorm Development Group, Inc.; Cloudflare, Inc.; and other contributors. Each commit is copyright by its respective author or author's employer. @@ -206,6 +212,8 @@ Some dependencies, like GoogleTest, are not used in our officially distributed b thirdparty_licenses/benchmark + thirdparty_licenses/concurrentqueue + thirdparty_licenses/cap_n_proto thirdparty_licenses/convert_utf diff --git a/doc/rst/license/thirdparty_licenses/concurrentqueue.rst b/doc/rst/license/thirdparty_licenses/concurrentqueue.rst new file mode 100644 index 0000000000..c54affb09d --- /dev/null +++ b/doc/rst/license/thirdparty_licenses/concurrentqueue.rst @@ -0,0 +1,53 @@ +.. + THIS FILE IS AUTO-GENERATED AND SHOULD NOT BE EDITED MANUALLY. + +.. include:: /include.txt + +.. _thirdparty_licenses_concurrentqueue: + +=============================================== +concurrentqueue +=============================================== + +.. list-table:: + :widths: 20 80 + :header-rows: 0 + + * - **License** + + - BSD-2-Clause + + * - **Copyright** + + - Copyright (c) 2013-2016, Cameron Desrochers. + + * - **Repository** + + - https://github.com/cameron314/concurrentqueue + + * - **Upstream version** [#upstreamversion]_ + + - `593df78ec309be7a7b456b3334025ccade1d2d66 `_ + + * - **Integration** + + - + + - |fa-github| Git Submodule :file:`/thirdparty/concurrentqueue/concurrentqueue` + + - |fa-windows| Windows builds + + - |fa-ubuntu| Linux builds + +.. [#upstreamversion] *The actual version used for building may differ from the listed dependency version.* + *Especially Linux binaries are often built against system packages, if available.* + *Check build files for further information.* + +License Files +============= + +:file:`LICENSE.md` +-------------------------------------------------------------------------------- + +.. literalinclude:: concurrentqueue/LICENSE.md + :language: none diff --git a/doc/rst/license/thirdparty_licenses/concurrentqueue/LICENSE.md b/doc/rst/license/thirdparty_licenses/concurrentqueue/LICENSE.md new file mode 100644 index 0000000000..519338976f --- /dev/null +++ b/doc/rst/license/thirdparty_licenses/concurrentqueue/LICENSE.md @@ -0,0 +1,62 @@ +This license file applies to everything in this repository except that which +is explicitly annotated as being written by other authors, i.e. the Boost +queue (included in the benchmarks for comparison), Intel's TBB library (ditto), +dlib::pipe (ditto), +the CDSChecker tool (used for verification), the Relacy model checker (ditto), +and Jeff Preshing's semaphore implementation (used in the blocking queue) which +has a zlib license (embedded in lightweightsempahore.h). + +--- + +Simplified BSD License: + +Copyright (c) 2013-2016, Cameron Desrochers. +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, this list of +conditions and the following disclaimer. +- Redistributions in binary form must reproduce the above copyright notice, this list of +conditions and the following disclaimer in the documentation and/or other materials +provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +--- + +I have also chosen to dual-license under the Boost Software License as an alternative to +the Simplified BSD license above: + +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/thirdparty/concurrentqueue/build-concurrentqueue.cmake b/thirdparty/concurrentqueue/build-concurrentqueue.cmake new file mode 100644 index 0000000000..e358bfda37 --- /dev/null +++ b/thirdparty/concurrentqueue/build-concurrentqueue.cmake @@ -0,0 +1,25 @@ +include_guard(GLOBAL) + +include(GNUInstallDirs) + +add_library(concurrentqueue INTERFACE EXCLUDE_FROM_ALL) +target_include_directories(concurrentqueue INTERFACE + $ +) + +add_library(concurrentqueue::concurrentqueue ALIAS concurrentqueue) + +# We don't want to install concurrentqueue. However we need to have the commands +# Otherwise CMake will not be happy +install( + TARGETS concurrentqueue + EXPORT concurrentqueueTargets +) + +install( + EXPORT concurrentqueueTargets + FILE concurrentqueueTargets.cmake + DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake" + NAMESPACE concurrentqueue:: + COMPONENT concurrentqueue_dev +) diff --git a/thirdparty/concurrentqueue/concurrentqueue b/thirdparty/concurrentqueue/concurrentqueue new file mode 160000 index 0000000000..593df78ec3 --- /dev/null +++ b/thirdparty/concurrentqueue/concurrentqueue @@ -0,0 +1 @@ +Subproject commit 593df78ec309be7a7b456b3334025ccade1d2d66 diff --git a/thirdparty/concurrentqueue/sbom.py b/thirdparty/concurrentqueue/sbom.py new file mode 100644 index 0000000000..51596706bb --- /dev/null +++ b/thirdparty/concurrentqueue/sbom.py @@ -0,0 +1,31 @@ +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # Add ecal_license_utils to path +import ecal_license_utils + + +def get_sbom(): + component_name = "concurrentqueue" + component_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "concurrentqueue") + + sbom = {} + sbom[component_name] = {} + sbom[component_name]["include_type"] = [ + ecal_license_utils.include_type.SUBMODULE, + ecal_license_utils.include_type.WINDOWS_BUILDS, + ecal_license_utils.include_type.LINUX_BUILDS, + ] + sbom[component_name]["path"] = component_dir + sbom[component_name]["license"] = "BSD-2-Clause" + sbom[component_name]["license_files"] = [ + os.path.join(sbom[component_name]["path"], "LICENSE.md"), + ] + sbom[component_name]["thirdparty_license_files"] = [] + sbom[component_name]["copyright"] = ecal_license_utils.get_copyright_from_file(sbom[component_name]["license_files"][0]) + sbom[component_name]["homepage"] = None + sbom[component_name]["repo_url"] = ecal_license_utils.get_repo_url_from_submodule(component_dir) + sbom[component_name]["git_version"] = ecal_license_utils.get_git_version_from_submodule(component_dir) + sbom[component_name]["git_version_url"] = sbom[component_name]["repo_url"] + "/tree/" + sbom[component_name]["git_version"] + + return sbom