diff --git a/libs/core/executors/CMakeLists.txt b/libs/core/executors/CMakeLists.txt index 8deb1494338..1f4b432f743 100644 --- a/libs/core/executors/CMakeLists.txt +++ b/libs/core/executors/CMakeLists.txt @@ -26,6 +26,8 @@ set(executors_headers hpx/executors/execution_policy_parameters.hpp hpx/executors/execution_policy_scheduling_property.hpp hpx/executors/execution_policy.hpp + hpx/executors/executor_scheduler.hpp + hpx/executors/executor_scheduler_fwd.hpp hpx/executors/explicit_scheduler_executor.hpp hpx/executors/fork_join_executor.hpp hpx/executors/limiting_executor.hpp diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp new file mode 100644 index 00000000000..ab1dcf48ff1 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler.hpp @@ -0,0 +1,142 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/executor_scheduler.hpp + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace hpx::execution::experimental { + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_operation_state + { + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + HPX_NO_UNIQUE_ADDRESS std::decay_t receiver_; + + template + executor_operation_state(Exec&& exec, Recv&& recv) + : exec_(HPX_FORWARD(Exec, exec)) + , receiver_(HPX_FORWARD(Recv, recv)) + { + } + + executor_operation_state(executor_operation_state&&) = delete; + executor_operation_state(executor_operation_state const&) = delete; + executor_operation_state& operator=( + executor_operation_state&&) = delete; + executor_operation_state& operator=( + executor_operation_state const&) = delete; + + ~executor_operation_state() = default; + + friend void tag_invoke(start_t, executor_operation_state& os) noexcept + { + hpx::detail::try_catch_exception_ptr( + [&]() { + hpx::parallel::execution::post(os.exec_, [&os]() mutable { + hpx::execution::experimental::set_value( + HPX_MOVE(os.receiver_)); + }); + }, + [&](std::exception_ptr ep) { + hpx::execution::experimental::set_error( + HPX_MOVE(os.receiver_), HPX_MOVE(ep)); + }); + } + }; + + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_sender + { + using sender_concept = hpx::execution::experimental::sender_t; + + HPX_NO_UNIQUE_ADDRESS std::decay_t exec_; + + using completion_signatures = + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_value_t(), + hpx::execution::experimental::set_error_t(std::exception_ptr)>; + + template + friend auto tag_invoke( + hpx::execution::experimental::get_completion_signatures_t, + executor_sender const&, Env) noexcept -> completion_signatures; + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_completion_scheduler_t< + hpx::execution::experimental::set_value_t>, + executor_sender const& s) noexcept + { + return executor_scheduler{s.exec_}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender&& s, Receiver&& receiver) + { + return {HPX_MOVE(s.exec_), HPX_FORWARD(Receiver, receiver)}; + } + + template + friend executor_operation_state tag_invoke( + connect_t, executor_sender const& s, Receiver&& receiver) + { + return {s.exec_, HPX_FORWARD(Receiver, receiver)}; + } + }; + + /////////////////////////////////////////////////////////////////////////// + HPX_CXX_CORE_EXPORT template + struct executor_scheduler + { + using executor_type = std::decay_t; + + HPX_NO_UNIQUE_ADDRESS executor_type exec_; + + constexpr executor_scheduler() = default; + + template + requires(!std::is_same_v, executor_scheduler>) + explicit executor_scheduler(Exec&& exec) noexcept + : exec_(HPX_FORWARD(Exec, exec)) + { + } + + constexpr bool operator==(executor_scheduler const& rhs) const noexcept + { + return exec_ == rhs.exec_; + } + + constexpr bool operator!=(executor_scheduler const& rhs) const noexcept + { + return !(*this == rhs); + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler&& sched) + { + return {HPX_MOVE(sched.exec_)}; + } + + friend executor_sender tag_invoke( + schedule_t, executor_scheduler const& sched) + { + return {sched.exec_}; + } + }; +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp b/libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp new file mode 100644 index 00000000000..dc0a00e95a8 --- /dev/null +++ b/libs/core/executors/include/hpx/executors/executor_scheduler_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2026 The STE||AR-Group +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +/// \file parallel/executors/executor_scheduler_fwd.hpp + +#pragma once + +#include + +namespace hpx::execution::experimental { + + // Forward declarations, see executor_scheduler.hpp + HPX_CXX_CORE_EXPORT template + struct executor_scheduler; + + HPX_CXX_CORE_EXPORT template + struct executor_sender; + + HPX_CXX_CORE_EXPORT template + struct executor_operation_state; + +} // namespace hpx::execution::experimental diff --git a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp index 599501bea1f..bf928f233af 100644 --- a/libs/core/executors/include/hpx/executors/fork_join_executor.hpp +++ b/libs/core/executors/include/hpx/executors/fork_join_executor.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -1240,6 +1241,14 @@ namespace hpx::execution::experimental { HPX_FORWARD(F, f), HPX_FORWARD(Fs, fs)...); } + template + friend void tag_invoke(hpx::parallel::execution::post_t, + fork_join_executor const& exec, F&& f, Ts&&... ts) + { + exec.shared_data_->sync_invoke( + HPX_FORWARD(F, f), HPX_FORWARD(Ts, ts)...); + } + template requires(std::invocable && (std::invocable && ...)) friend decltype(auto) tag_invoke( @@ -1373,10 +1382,29 @@ namespace hpx::execution::experimental { /// \endcond }; + // P2300 get_scheduler bridge — defined outside the class so that + // executor_scheduler is fully defined at point of use. + // This allows fork_join_executor to participate in P2300 sender/receiver + // pipelines via schedule(get_scheduler(exec)). + inline auto tag_invoke(hpx::execution::experimental::get_scheduler_t, + fork_join_executor const& exec) noexcept + -> hpx::execution::experimental::executor_scheduler + { + return hpx::execution::experimental::executor_scheduler< + fork_join_executor>(exec); + } + HPX_CXX_CORE_EXPORT HPX_CORE_EXPORT std::ostream& operator<<( std::ostream& os, fork_join_executor::loop_schedule schedule); /// \cond NOINTERNAL + + template <> + struct is_never_blocking_one_way_executor + : std::true_type + { + }; + template <> struct is_bulk_one_way_executor : std::true_type { diff --git a/libs/core/executors/tests/unit/fork_join_executor.cpp b/libs/core/executors/tests/unit/fork_join_executor.cpp index fc53babbee3..7a50507d77e 100644 --- a/libs/core/executors/tests/unit/fork_join_executor.cpp +++ b/libs/core/executors/tests/unit/fork_join_executor.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -511,11 +512,39 @@ void test_fork_join_static_large_range() HPX_TEST_EQ(sum.load(), expected_sum); } +/////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// +void test_get_scheduler() +{ + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + + std::cerr << "test_get_scheduler\n"; + + // Test 1: get_scheduler returns a valid scheduler + fork_join_executor exec{}; + auto sched = ex::get_scheduler(exec); + + // Test 2: scheduler can be used to schedule work via then + sync_wait + auto result = hpx::get<0>( + *(tt::sync_wait(ex::then(ex::schedule(sched), []() { return 42; })))); + HPX_TEST_EQ(result, 42); + + // Test 3: scheduled work runs on an HPX worker thread + hpx::thread::id scheduled_thread_id{}; + tt::sync_wait(ex::then(ex::schedule(sched), + [&]() { scheduled_thread_id = hpx::this_thread::get_id(); })); + HPX_TEST(scheduled_thread_id != hpx::thread::id{}); +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { static_check_executor(); + // P2300 get_scheduler bridge test + test_get_scheduler(); + // Call regression test for #6922 test_fork_join_static_large_range();