diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp index 01e04e3d7379..4428698f1649 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp @@ -63,7 +63,7 @@ namespace hpx { /// with an execution policy object of type \a sequenced_policy /// execute in sequential order in the calling thread. /// - /// The reduce operations in the parallel \a copy_if algorithm invoked + /// The reduce operations in the parallel \a reduce algorithm invoked /// with an execution policy object of type \a parallel_policy /// or \a parallel_task_policy are permitted to execute in an unordered /// fashion in unspecified threads, and indeterminately sequenced @@ -123,7 +123,7 @@ namespace hpx { /// with an execution policy object of type \a sequenced_policy /// execute in sequential order in the calling thread. /// - /// The reduce operations in the parallel \a copy_if algorithm invoked + /// The reduce operations in the parallel \a reduce algorithm invoked /// with an execution policy object of type \a parallel_policy /// or \a parallel_task_policy are permitted to execute in an unordered /// fashion in unspecified threads, and indeterminately sequenced @@ -356,8 +356,11 @@ namespace hpx { #else // DOXYGEN #include +#include #include +#include #include +#include #include #include #include @@ -382,63 +385,194 @@ namespace hpx::parallel { namespace detail { /// \cond NOINTERNAL - HPX_CXX_CORE_EXPORT template - struct reduce : public algorithm, T> + + // Custom executor parameters for reduce algorithm to prevent + // single-element partitions. reduce_partition requires at least 2 + // elements per partition because it initializes the accumulator via + // op(*first, *next(first)), which is the only way to produce a T from + // value_type elements when T may differ from value_type (e.g. minmax). + struct reduce_executor_parameters { - constexpr reduce() noexcept - : algorithm("reduce") + private: + static HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks_impl(std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept { + if (num_elements <= 1) + { + return {chunk_size, max_chunks}; + } + + // Ensure minimum chunk size of 2 for reduce_partition. + if (chunk_size < 2) + { + chunk_size = (num_elements + num_cores - 1) / num_cores; + chunk_size = (std::max) (chunk_size, std::size_t(2)); + } + + // chunk_size_iterator gives the last partition + // num_elements % chunk_size elements (or chunk_size if + // evenly divisible). If the remainder is 1, that partition + // would violate reduce_partition's >= 2 requirement. + // Bump chunk_size until the remainder is 0 or >= 2. + while ( + num_elements > chunk_size && num_elements % chunk_size == 1) + { + ++chunk_size; + } + + max_chunks = (num_elements + chunk_size - 1) / chunk_size; + return {chunk_size, max_chunks}; } - template - static constexpr T sequential(ExPolicy&& policy, InIterB first, - InIterE last, T_&& init, Reduce&& r) + public: + template + HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks(Executor&&, + std::size_t num_elements, std::size_t num_cores, + std::size_t max_chunks, std::size_t chunk_size) const noexcept { - return detail::sequential_reduce( - HPX_FORWARD(ExPolicy, policy), first, last, - HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); + return adjust_chunk_size_and_max_chunks_impl( + num_elements, num_cores, max_chunks, chunk_size); } - template - static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first, - FwdIterE last, T_&& init, Reduce&& r) + template + friend HPX_FORCEINLINE constexpr std::pair + tag_override_invoke(hpx::execution::experimental:: + adjust_chunk_size_and_max_chunks_t, + reduce_executor_parameters const&, InnerParams&& inner, + Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) { - constexpr bool has_scheduler_executor = - hpx::execution_policy_has_scheduler_executor_v; - - if constexpr (!has_scheduler_executor) - { - if (first == last) - { - return util::detail::algorithm_result::get( - HPX_FORWARD(T_, init)); - } - } - - auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { - T val = *part_begin; - return detail::sequential_reduce( - ++part_begin, --part_size, HPX_MOVE(val), r); - }; - - return util::partitioner::call( - HPX_FORWARD(ExPolicy, policy), first, - detail::distance(first, last), HPX_MOVE(f1), - hpx::unwrapping( - [init = HPX_FORWARD(T_, init), - r = HPX_FORWARD(Reduce, r)](auto&& results) -> T { - return detail::sequential_reduce( - hpx::util::begin(results), - hpx::util::size(results), init, r); - })); + auto [adjusted_chunk_size, adjusted_max_chunks] = hpx:: + execution::experimental::adjust_chunk_size_and_max_chunks( + HPX_FORWARD(InnerParams, inner), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + max_chunks, chunk_size); + + return adjust_chunk_size_and_max_chunks_impl(num_elements, + num_cores, adjusted_max_chunks, adjusted_chunk_size); } }; /// \endcond } // namespace detail } // namespace hpx::parallel +// Specialize trait to make reduce_executor_parameters a valid executor +// parameters type +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters< + hpx::parallel::detail::reduce_executor_parameters> : std::true_type + { + }; +} // namespace hpx::execution::experimental + +namespace hpx::parallel { namespace detail { + + // Helper function to reduce a partition without requiring an init value. + // Assumes partition size >= 2 (enforced by reduce_executor_parameters). + template + T reduce_partition( + FwdIterB part_begin, std::size_t part_size, Reduce const& r) + { + HPX_ASSERT(part_size >= 2); + + // Combine first two elements using the reduction operator + T init = HPX_INVOKE(r, *part_begin, *std::next(part_begin)); + + if (part_size == 2) + { + return init; + } + + // Reduce remaining elements + return sequential_reduce( + std::next(part_begin, 2), part_size - 2, HPX_MOVE(init), r); + } + + HPX_CXX_CORE_EXPORT template + struct reduce : public algorithm, T> + { + constexpr reduce() noexcept + : algorithm, T>("reduce") + { + } + + template + static constexpr T sequential(ExPolicy&& policy, InIterB first, + InIterE last, T_&& init, Reduce&& r) + { + return sequential_reduce(HPX_FORWARD(ExPolicy, policy), + first, last, HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r)); + } + + template + static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first, + FwdIterE last, T_&& init, Reduce&& r) + { + constexpr bool has_scheduler_executor = + hpx::execution_policy_has_scheduler_executor_v; + + // Handle empty range + if constexpr (!has_scheduler_executor) + { + if (first == last) + { + return util::detail::algorithm_result::get( + HPX_FORWARD(T_, init)); + } + } + + // Handle single-element case: can't partition into size >= 2 + // This must be checked for all execution policies + auto const count = hpx::parallel::detail::distance(first, last); + if (count == 1) + { + T result = HPX_INVOKE(r, HPX_FORWARD(T_, init), *first); + if constexpr (has_scheduler_executor) + { + return result; + } + else + { + return util::detail::algorithm_result::get( + HPX_MOVE(result)); + } + } + + auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T { + return reduce_partition( + part_begin, part_size, r); + }; + + auto rebound_params = + hpx::execution::experimental::rebind_executor_parameters( + policy.parameters(), reduce_executor_parameters{}); + auto reduce_policy = + hpx::execution::experimental::create_rebound_policy( + policy, HPX_MOVE(rebound_params)); + using reduce_policy_type = std::decay_t; + + return util::partitioner::call( + HPX_MOVE(reduce_policy), first, count, HPX_MOVE(f1), + hpx::unwrapping( + [init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)]( + auto&& results) -> T { + return sequential_reduce( + hpx::util::begin(results), hpx::util::size(results), + init, r); + })); + } + }; + /// \endcond +}} // namespace hpx::parallel::detail + namespace hpx { /////////////////////////////////////////////////////////////////////////// diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp index e4d476a2def3..44f68e10cf1e 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/chunk_size.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -59,26 +60,9 @@ namespace hpx::parallel::util::detail { { if (max_chunks == 0) { - if (chunk_size == 0) - { - std::size_t const cores_times_4 = 4 * cores; // -V112 - - // try to calculate chunk-size and maximum number of chunks - chunk_size = (count + cores_times_4 - 1) / cores_times_4; - - // we should not consider more chunks than we have elements - max_chunks = (std::min) (cores_times_4, count); // -V112 - - // we should not make chunks smaller than what's determined by - // the max chunk size - chunk_size = (std::max) (chunk_size, - (count + max_chunks - 1) / max_chunks); - } - else - { - // max_chunks == 0 && chunk_size != 0 - max_chunks = (count + chunk_size - 1) / chunk_size; - } + std::tie(chunk_size, max_chunks) = hpx::execution::experimental:: + detail::adjust_chunk_size_and_max_chunks_default( + count, cores, max_chunks, chunk_size); return; } @@ -88,25 +72,9 @@ namespace hpx::parallel::util::detail { return; } - if (chunk_size == 0) - { - // max_chunks != 0 - chunk_size = (count + max_chunks - 1) / max_chunks; - } - else - { - // max_chunks != 0 && chunk_size != 0 - - // in this case we make sure that there are no more chunks than - // max_chunks - std::size_t const calculated_max_chunks = - (count + chunk_size - 1) / chunk_size; - - if (calculated_max_chunks > max_chunks) - { - chunk_size = (count + max_chunks - 1) / max_chunks; - } - } + std::tie(chunk_size, max_chunks) = hpx::execution::experimental:: + detail::adjust_chunk_size_and_max_chunks_default( + count, cores, max_chunks, chunk_size); } //////////////////////////////////////////////////////////////////////////// @@ -150,8 +118,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); auto last = next_or_subrange(it_or_r, count, 0); Stride stride = parallel::detail::abs(s); @@ -232,8 +202,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); auto last = next_or_subrange(it_or_r, count, 0); @@ -410,8 +382,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), hpx::chrono::null_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); if (stride != 1) { @@ -501,8 +475,10 @@ namespace hpx::parallel::util::detail { hpx::execution::experimental::get_chunk_size(policy.parameters(), policy.executor(), iteration_duration, cores, count); - // make sure, chunk size and max_chunks are consistent - adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size); + std::tie(chunk_size, max_chunks) = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + policy.parameters(), policy.executor(), count, cores, + max_chunks, chunk_size); if (stride != 1) { diff --git a/libs/core/algorithms/tests/regressions/CMakeLists.txt b/libs/core/algorithms/tests/regressions/CMakeLists.txt index 79fbcceea1cd..48e2507d9d62 100644 --- a/libs/core/algorithms/tests/regressions/CMakeLists.txt +++ b/libs/core/algorithms/tests/regressions/CMakeLists.txt @@ -21,6 +21,7 @@ set(tests mismatch_differently_sized_ranges num_cores reduce_3641 + reduce_6647 scan_different_inits scan_non_commutative scan_shortlength diff --git a/libs/core/algorithms/tests/regressions/reduce_6647.cpp b/libs/core/algorithms/tests/regressions/reduce_6647.cpp new file mode 100644 index 000000000000..6d9e49fc07a5 --- /dev/null +++ b/libs/core/algorithms/tests/regressions/reduce_6647.cpp @@ -0,0 +1,79 @@ +// Copyright (c) 2026 Bhoomish Gupta +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// #6647: Incorrect reduce implementation + +#include +#include +#include + +#include +#include +#include + +struct minmax +{ + std::pair operator()( + std::pair lhs, std::pair rhs) const + { + return { + lhs.first < rhs.first ? lhs.first : rhs.first, + lhs.second < rhs.second ? rhs.second : lhs.second, + }; + } + + std::pair operator()(std::pair lhs, int rhs) const + { + return (*this)(lhs, std::pair{rhs, rhs}); + } + + std::pair operator()(int lhs, std::pair rhs) const + { + return (*this)(std::pair{lhs, lhs}, rhs); + } + + std::pair operator()(int lhs, int rhs) const + { + return (*this)( + std::pair{lhs, lhs}, std::pair{rhs, rhs}); + } +}; + +void test_reduce_case( + std::vector const& c, std::pair const& expected) +{ + auto const init = std::pair{INT_MAX, INT_MIN}; + + auto result = + hpx::reduce(hpx::execution::seq, c.begin(), c.end(), init, minmax{}); + HPX_TEST_EQ(result.first, expected.first); + HPX_TEST_EQ(result.second, expected.second); + + result = + hpx::reduce(hpx::execution::par, c.begin(), c.end(), init, minmax{}); + HPX_TEST_EQ(result.first, expected.first); + HPX_TEST_EQ(result.second, expected.second); +} + +int hpx_main() +{ + test_reduce_case({}, {INT_MAX, INT_MIN}); + test_reduce_case({5}, {5, 5}); + test_reduce_case({3, 1}, {1, 3}); + test_reduce_case({3, 1, 4}, {1, 4}); + test_reduce_case({9, 2, 7, 1, 6}, {1, 9}); + test_reduce_case({3, 1, 4, 1, 5, 9, 2, 6}, {1, 9}); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp index 0624eb36ea8c..5c33f8c4faee 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters.hpp @@ -599,6 +599,116 @@ namespace hpx::execution::experimental::detail { call(static_cast(params), HPX_FORWARD(Executor, exec)); } }; + + /////////////////////////////////////////////////////////////////////// + // define member traits + HPX_HAS_MEMBER_XXX_TRAIT_DEF(adjust_chunk_size_and_max_chunks) + + /////////////////////////////////////////////////////////////////////// + // shared default implementation allowing to handle + // adjust_chunk_size_and_max_chunks + HPX_FORCEINLINE constexpr std::pair + adjust_chunk_size_and_max_chunks_default(std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept + { + if (max_chunks == 0) + { + if (chunk_size == 0) + { + std::size_t const cores_times_4 = 4 * num_cores; // -V112 + + chunk_size = (num_elements + cores_times_4 - 1) / cores_times_4; + + max_chunks = + (std::min) (cores_times_4, num_elements); // -V112 + + chunk_size = (std::max) (chunk_size, + (num_elements + max_chunks - 1) / max_chunks); + } + else + { + // max_chunks == 0 && chunk_size != 0 + max_chunks = (num_elements + chunk_size - 1) / chunk_size; + } + } + else if (chunk_size == 0) + { + chunk_size = (num_elements + max_chunks - 1) / max_chunks; + } + else + { + // max_chunks != 0 && chunk_size != 0 + std::size_t const calculated_max_chunks = + (num_elements + chunk_size - 1) / chunk_size; + + if (calculated_max_chunks > max_chunks) + { + chunk_size = (num_elements + max_chunks - 1) / max_chunks; + } + } + + return {chunk_size, max_chunks}; + } + + /////////////////////////////////////////////////////////////////////// + // default property implementation allowing to handle + // adjust_chunk_size_and_max_chunks + struct adjust_chunk_size_and_max_chunks_property + { + template + HPX_FORCEINLINE static constexpr std::pair + adjust_chunk_size_and_max_chunks(Target, std::size_t num_elements, + std::size_t num_cores, std::size_t max_chunks, + std::size_t chunk_size) noexcept + { + return adjust_chunk_size_and_max_chunks_default( + num_elements, num_cores, max_chunks, chunk_size); + } + }; + + ////////////////////////////////////////////////////////////////////// + // Generate a type that is guaranteed to support + // adjust_chunk_size_and_max_chunks + using get_adjust_chunk_size_and_max_chunks_t = + get_parameters_property_t; + + inline constexpr get_adjust_chunk_size_and_max_chunks_t + get_adjust_chunk_size_and_max_chunks{}; + + /////////////////////////////////////////////////////////////////////// + // customization point for interface adjust_chunk_size_and_max_chunks() + template + struct adjust_chunk_size_and_max_chunks_fn_helper>> + { + template + HPX_FORCEINLINE static constexpr std::pair + call(Parameters& params, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) + { + auto get_prop = get_adjust_chunk_size_and_max_chunks( + HPX_FORWARD(Executor, exec), params, + adjust_chunk_size_and_max_chunks_property{}); + + return get_prop.first.adjust_chunk_size_and_max_chunks( + HPX_FORWARD(decltype(get_prop.second), get_prop.second), + num_elements, num_cores, num_chunks, chunk_size); + } + + template + HPX_FORCEINLINE static constexpr std::pair + call(AnyParameters params, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) + { + return call(static_cast(params), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + }; /// \endcond /// \cond NOINTERNAL @@ -821,6 +931,30 @@ namespace hpx::execution::experimental::detail { } }; + /////////////////////////////////////////////////////////////////////// + template + struct adjust_chunk_size_and_max_chunks_call_helper + { + }; + + template + struct adjust_chunk_size_and_max_chunks_call_helper>> + { + template + HPX_FORCEINLINE std::pair + adjust_chunk_size_and_max_chunks(Executor&& exec, + std::size_t num_elements, std::size_t num_cores, + std::size_t num_chunks, std::size_t chunk_size) + { + auto& wrapped = + static_cast*>(this)->member_.get(); + return wrapped.adjust_chunk_size_and_max_chunks( + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + }; + /////////////////////////////////////////////////////////////////////// template struct base_member_helper @@ -846,6 +980,8 @@ namespace hpx::execution::experimental::detail { , processing_units_count_call_helper> , reset_thread_distribution_call_helper> , collect_execution_parameters_call_helper> + , adjust_chunk_size_and_max_chunks_call_helper> { using wrapper_type = std::reference_wrapper; @@ -885,6 +1021,8 @@ namespace hpx::execution::experimental::detail { HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(maximal_number_of_chunks); HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(reset_thread_distribution); HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY(collect_execution_parameters); + HPX_STATIC_ASSERT_ON_PARAMETERS_AMBIGUITY( + adjust_chunk_size_and_max_chunks); constexpr executor_parameters() requires(hpx::util::all_of_v...>) diff --git a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp index 05f0e5c07f5f..e39d4be90f60 100644 --- a/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp +++ b/libs/core/execution/include/hpx/execution/executors/execution_parameters_fwd.hpp @@ -60,6 +60,10 @@ namespace hpx::execution::experimental { template struct collect_execution_parameters_fn_helper; + + template + struct adjust_chunk_size_and_max_chunks_fn_helper; /// \endcond } // namespace detail @@ -422,6 +426,48 @@ namespace hpx::execution::experimental { } } collect_execution_parameters{}; + /// Adjust the chunk size and maximal number of chunks for a parallel + /// algorithm execution + /// + /// \param params [in] The executor parameters object to use for + /// adjusting the chunk size. + /// \param exec [in] The executor object which will be used + /// for scheduling of the loop iterations. + /// \param num_elements [in] The overall number of elements for the + /// algorithm. + /// \param num_cores [in] The overall number of cores to utilize + /// for the algorithm. + /// \param num_chunks [in] The overall number of chunks for the + /// algorithm. + /// \param chunk_size [in] The size of the chunks created for the + /// algorithm. + /// + /// \note This calls params.adjust_chunk_size_and_max_chunks(exec, ...) + /// if it exists; otherwise it applies the default chunk size and + /// max chunks adjustment logic. + /// + HPX_CXX_CORE_EXPORT inline constexpr struct + adjust_chunk_size_and_max_chunks_t final + : hpx::functional::detail::tag_priority< + adjust_chunk_size_and_max_chunks_t> + { + private: + template + requires(hpx::traits::is_executor_parameters_v && + hpx::traits::is_executor_any_v) + friend HPX_FORCEINLINE decltype(auto) tag_fallback_invoke( + adjust_chunk_size_and_max_chunks_t, Parameters&& params, + Executor&& exec, std::size_t num_elements, std::size_t num_cores, + std::size_t num_chunks, std::size_t chunk_size) + { + return detail::adjust_chunk_size_and_max_chunks_fn_helper< + hpx::util::decay_unwrap_t, + std::decay_t>::call(HPX_FORWARD(Parameters, params), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + } adjust_chunk_size_and_max_chunks{}; + template <> struct is_scheduling_property : std::true_type diff --git a/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp b/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp index 40b1aff2f1b3..31dc919ff556 100644 --- a/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp +++ b/libs/core/execution/include/hpx/execution/executors/rebind_executor_parameters.hpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace hpx::execution::experimental { @@ -240,6 +241,22 @@ namespace hpx::execution::experimental { collect_execution_parameters_t, Params&&, InnerParams&&, Executor&&, std::size_t, std::size_t, std::size_t, std::size_t>; + // wrapping adjust_chunk_size_and_max_chunks + HPX_CXX_CORE_EXPORT template + inline constexpr bool supports_adjust_chunk_size_and_max_chunks_v = + hpx::functional::detail::is_tag_override_invocable_v< + adjust_chunk_size_and_max_chunks_t, Params&&, Executor&&, + std::size_t, std::size_t, std::size_t, std::size_t> || + hpx::execution::experimental::detail:: + has_adjust_chunk_size_and_max_chunks_v>; + + HPX_CXX_CORE_EXPORT template + inline constexpr bool supports_wrapping_adjust_chunk_size_and_max_chunks_v = + hpx::functional::detail::is_tag_override_invocable_v< + adjust_chunk_size_and_max_chunks_t, Params&&, InnerParams&&, + Executor&&, std::size_t, std::size_t, std::size_t, std::size_t>; + /////////////////////////////////////////////////////////////////////////// HPX_CXX_CORE_EXPORT template requires(hpx::executor_parameters && @@ -402,8 +419,8 @@ namespace hpx::execution::experimental { detail::wrapped_forward(this_), HPX_FORWARD(Executor, exec)); } - if constexpr (supports_mark_begin_execution_v< - detail::wrapping_t, Executor>) + else if constexpr (supports_mark_begin_execution_v< + detail::wrapping_t, Executor>) { mark_begin_execution(detail::wrapping_forward(this_), HPX_FORWARD(Executor, exec)); @@ -622,6 +639,54 @@ namespace hpx::execution::experimental { } } + // wrapping adjust_chunk_size_and_max_chunks + + // clang-format off + template + requires(std::same_as> && ( + supports_wrapping_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, detail::wrapped_t, + Executor> || + supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, Executor> || + supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapped_t, Executor> + )) + // clang-format on + friend constexpr std::pair + tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + Params&& this_, Executor&& exec, std::size_t const num_elements, + std::size_t const num_cores, std::size_t const num_chunks, + std::size_t const chunk_size) + { + if constexpr (supports_wrapping_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, + detail::wrapped_t, Executor>) + { + return adjust_chunk_size_and_max_chunks( + detail::wrapping_forward(this_), + detail::wrapped_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + else if constexpr (supports_adjust_chunk_size_and_max_chunks_v< + detail::wrapping_t, Executor>) + { + return adjust_chunk_size_and_max_chunks( + detail::wrapping_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + else + { + return adjust_chunk_size_and_max_chunks( + detail::wrapped_forward(this_), + HPX_FORWARD(Executor, exec), num_elements, num_cores, + num_chunks, chunk_size); + } + } + Wrapped wrapped; Wrapping wrapping; }; diff --git a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp index d97a4de0b677..cc4921ff5955 100644 --- a/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp +++ b/libs/core/execution/tests/unit/executor_parameters_dispatching.cpp @@ -651,6 +651,91 @@ void test_mark_end_execution() } } +/////////////////////////////////////////////////////////////////////////////// +// adjust_chunk_size_and_max_chunks + +struct test_executor_adjust_chunk_size : hpx::execution::parallel_executor +{ + test_executor_adjust_chunk_size() = default; + + template + friend std::pair tag_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + Parameters&&, test_executor_adjust_chunk_size, + std::size_t /*num_elements*/, std::size_t /*num_cores*/, + std::size_t /*num_chunks*/, std::size_t /*chunk_size*/) + { + ++exec_count; + return {1, 1}; + } +}; + +template <> +struct hpx::execution::experimental::is_two_way_executor< + test_executor_adjust_chunk_size> : std::true_type +{ +}; + +struct test_adjust_chunk_size +{ + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_adjust_chunk_size, Executor&&, std::size_t /*num_elements*/, + std::size_t /*num_cores*/, std::size_t /*num_chunks*/, + std::size_t /*chunk_size*/) + { + ++params_count; + return {1, 1}; + } +}; + +template <> +struct hpx::execution::experimental::is_executor_parameters< + test_adjust_chunk_size> : std::true_type +{ +}; + +/////////////////////////////////////////////////////////////////////////////// +void test_adjust_chunk_size_and_max_chunks() +{ + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + test_adjust_chunk_size{}, hpx::execution::par.executor(), 100, 4, 0, + 0); + + HPX_TEST_EQ(params_count, static_cast(1)); + HPX_TEST_EQ(exec_count, static_cast(0)); + } + + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + hpx::execution::par.parameters(), test_executor_adjust_chunk_size{}, + 100, 4, 0, 0); + + HPX_TEST_EQ(params_count, static_cast(0)); + HPX_TEST_EQ(exec_count, static_cast(1)); + } + + { + params_count = 0; + exec_count = 0; + + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + test_adjust_chunk_size{}, test_executor_adjust_chunk_size{}, 100, 4, + 0, 0); + + HPX_TEST_EQ(params_count, static_cast(1)); + HPX_TEST_EQ(exec_count, static_cast(0)); + } +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { @@ -662,6 +747,7 @@ int hpx_main() test_mark_begin_execution(); test_mark_end_of_scheduling(); test_mark_end_execution(); + test_adjust_chunk_size_and_max_chunks(); return hpx::local::finalize(); } diff --git a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp index 2dc47c3c8a49..11ecf4f6847c 100644 --- a/libs/core/execution/tests/unit/rebind_executor_parameters.cpp +++ b/libs/core/execution/tests/unit/rebind_executor_parameters.cpp @@ -793,6 +793,213 @@ void replace_collect_execution_parameters() } } +/////////////////////////////////////////////////////////////////////////////// +// test parameters object with adjust_chunk_size_and_max_chunks +struct test_replaced_adjust_chunk_size_and_max_chunks +{ + explicit test_replaced_adjust_chunk_size_and_max_chunks( + std::atomic& invoked) noexcept + : invoked(&invoked) + { + } + + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_replaced_adjust_chunk_size_and_max_chunks& self, Executor&&, + std::size_t num_elements, std::size_t num_cores, std::size_t num_chunks, + std::size_t chunk_size) noexcept + { + *self.invoked = true; + + if (chunk_size == 0) + { + chunk_size = (num_elements + num_cores - 1) / num_cores; + } + + if (chunk_size == 0) + { + chunk_size = 1; + } + + if (num_chunks == 0) + { + num_chunks = (num_elements + chunk_size - 1) / chunk_size; + } + + return {chunk_size, num_chunks}; + } + + std::atomic* invoked; +}; + +struct test_wrapping_adjust_chunk_size_and_max_chunks +{ + explicit test_wrapping_adjust_chunk_size_and_max_chunks( + std::atomic& invoked) noexcept + : invoked(&invoked) + { + } + + template + friend std::pair tag_override_invoke( + hpx::execution::experimental::adjust_chunk_size_and_max_chunks_t, + test_wrapping_adjust_chunk_size_and_max_chunks& self, + InnerParams&& inner, Executor&& exec, std::size_t num_elements, + std::size_t num_cores, std::size_t num_chunks, std::size_t chunk_size) + { + auto result = + hpx::execution::experimental::adjust_chunk_size_and_max_chunks( + HPX_FORWARD(InnerParams, inner), HPX_FORWARD(Executor, exec), + num_elements, num_cores, num_chunks, chunk_size); + + *self.invoked = true; + return result; + } + + std::atomic* invoked; +}; + +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters< + test_replaced_adjust_chunk_size_and_max_chunks> : std::true_type + { + }; + + template <> + struct is_executor_parameters< + test_wrapping_adjust_chunk_size_and_max_chunks> : std::true_type + { + }; +} // namespace hpx::execution::experimental + +void replace_adjust_chunk_size_and_max_chunks() +{ + using namespace hpx::execution; + using namespace hpx::execution::experimental; + + // replace chunk adjustment with another parameters object exposing it + { + std::atomic invoked_replaced(false); + + auto params = + join_executor_parameters(experimental::static_chunk_size()); + auto bound_params = rebind_executor_parameters(params, + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // replace a parameters object not exposing chunk adjustment + { + std::atomic invoked_replaced(false); + + auto params = join_executor_parameters(experimental::max_num_chunks()); + auto bound_params = rebind_executor_parameters(params, + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // replace chunk adjustment with a parameters object not exposing it + { + std::atomic invoked_replaced(false); + + auto params = join_executor_parameters( + test_replaced_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto bound_params = + rebind_executor_parameters(params, experimental::num_cores(4)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + } + + // test wrapped chunk adjustment + { + std::atomic invoked_replaced(false); + std::atomic invoked_inner_replaced(false); + + auto params = join_executor_parameters( + test_replaced_adjust_chunk_size_and_max_chunks( + invoked_inner_replaced)); + auto bound_params = rebind_executor_parameters(params, + test_wrapping_adjust_chunk_size_and_max_chunks(invoked_replaced)); + auto policy = create_rebound_policy(par, bound_params); + parameters_test(policy); + + HPX_TEST(invoked_replaced); + HPX_TEST(invoked_inner_replaced); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// test mark_begin_execution dispatching order +struct test_dual_mark_begin_execution +{ + explicit test_dual_mark_begin_execution(std::atomic& wrapping_wrapped, + std::atomic& wrapping_only) noexcept + : wrapping_wrapped(&wrapping_wrapped) + , wrapping_only(&wrapping_only) + { + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + test_dual_mark_begin_execution const& self, InnerParams&&, + Executor&&) noexcept + { + ++*self.wrapping_wrapped; + } + + template + friend void tag_override_invoke( + hpx::execution::experimental::mark_begin_execution_t, + test_dual_mark_begin_execution const& self, Executor&&) noexcept + { + ++*self.wrapping_only; + } + + std::atomic* wrapping_wrapped; + std::atomic* wrapping_only; +}; + +namespace hpx::execution::experimental { + + template <> + struct is_executor_parameters + : std::true_type + { + }; +} // namespace hpx::execution::experimental + +void verify_single_mark_begin_dispatch() +{ + using namespace hpx::execution; + using namespace hpx::execution::experimental; + + std::atomic wrapping_wrapped_count(0); + std::atomic wrapping_only_count(0); + + auto params = join_executor_parameters(experimental::num_cores(4)); + auto bound_params = rebind_executor_parameters(params, + test_dual_mark_begin_execution( + wrapping_wrapped_count, wrapping_only_count)); + auto policy = create_rebound_policy(par, bound_params); + + parameters_test(policy); + + HPX_TEST_EQ(wrapping_wrapped_count.load(), 1); + HPX_TEST_EQ(wrapping_only_count.load(), 0); +} + /////////////////////////////////////////////////////////////////////////////// int hpx_main() { @@ -802,6 +1009,8 @@ int hpx_main() replace_execution_markers(); replace_processing_units_count(); replace_collect_execution_parameters(); + replace_adjust_chunk_size_and_max_chunks(); + verify_single_mark_begin_dispatch(); return hpx::local::finalize(); }