Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 179 additions & 45 deletions libs/core/algorithms/include/hpx/parallel/algorithms/reduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace hpx {
/// with an execution policy object of type \a sequenced_policy
/// execute in sequential order in the calling thread.
///
/// The reduce operations in the parallel \a copy_if algorithm invoked
/// The reduce operations in the parallel \a reduce algorithm invoked
/// with an execution policy object of type \a parallel_policy
/// or \a parallel_task_policy are permitted to execute in an unordered
/// fashion in unspecified threads, and indeterminately sequenced
Expand Down Expand Up @@ -123,7 +123,7 @@ namespace hpx {
/// with an execution policy object of type \a sequenced_policy
/// execute in sequential order in the calling thread.
///
/// The reduce operations in the parallel \a copy_if algorithm invoked
/// The reduce operations in the parallel \a reduce algorithm invoked
/// with an execution policy object of type \a parallel_policy
/// or \a parallel_task_policy are permitted to execute in an unordered
/// fashion in unspecified threads, and indeterminately sequenced
Expand Down Expand Up @@ -356,8 +356,11 @@ namespace hpx {
#else // DOXYGEN

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/modules/concepts.hpp>
#include <hpx/modules/execution.hpp>
#include <hpx/modules/executors.hpp>
#include <hpx/modules/functional.hpp>
#include <hpx/modules/iterator_support.hpp>
#include <hpx/modules/pack_traversal.hpp>
#include <hpx/parallel/algorithms/detail/accumulate.hpp>
Comment on lines 358 to 366
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says the only non-formatting changes are in reduce.hpp, the regressions CMakeLists, and the new reduce_6647.cpp, but this PR also introduces functional changes in the execution module (execution_parameters*, rebind_executor_parameters.hpp) and updates execution unit tests. Please update the PR description to reflect these additional code changes so reviewers have the right scope in mind.

Copilot uses AI. Check for mistakes.
Expand All @@ -382,63 +385,194 @@ namespace hpx::parallel {
namespace detail {

/// \cond NOINTERNAL
HPX_CXX_CORE_EXPORT template <typename T>
struct reduce : public algorithm<reduce<T>, T>

// Custom executor parameters for reduce algorithm to prevent
// single-element partitions. reduce_partition requires at least 2
// elements per partition because it initializes the accumulator via
// op(*first, *next(first)), which is the only way to produce a T from
// value_type elements when T may differ from value_type (e.g. minmax).
struct reduce_executor_parameters
{
constexpr reduce() noexcept
: algorithm<reduce, T>("reduce")
private:
static HPX_FORCEINLINE constexpr std::pair<std::size_t, std::size_t>
adjust_chunk_size_and_max_chunks_impl(std::size_t num_elements,
std::size_t num_cores, std::size_t max_chunks,
std::size_t chunk_size) noexcept
{
Comment thread
BhoomishGupta marked this conversation as resolved.
if (num_elements <= 1)
{
return {chunk_size, max_chunks};
}

// Ensure minimum chunk size of 2 for reduce_partition.
if (chunk_size < 2)
{
chunk_size = (num_elements + num_cores - 1) / num_cores;
chunk_size = (std::max) (chunk_size, std::size_t(2));
}

// chunk_size_iterator gives the last partition
// num_elements % chunk_size elements (or chunk_size if
// evenly divisible). If the remainder is 1, that partition
// would violate reduce_partition's >= 2 requirement.
// Bump chunk_size until the remainder is 0 or >= 2.
while (
num_elements > chunk_size && num_elements % chunk_size == 1)
{
++chunk_size;
}

max_chunks = (num_elements + chunk_size - 1) / chunk_size;
return {chunk_size, max_chunks};
}

template <typename ExPolicy, typename InIterB, typename InIterE,
typename T_, typename Reduce>
static constexpr T sequential(ExPolicy&& policy, InIterB first,
InIterE last, T_&& init, Reduce&& r)
public:
template <typename Executor>
HPX_FORCEINLINE constexpr std::pair<std::size_t, std::size_t>
adjust_chunk_size_and_max_chunks(Executor&&,
std::size_t num_elements, std::size_t num_cores,
std::size_t max_chunks, std::size_t chunk_size) const noexcept
{
return detail::sequential_reduce<ExPolicy>(
HPX_FORWARD(ExPolicy, policy), first, last,
HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r));
return adjust_chunk_size_and_max_chunks_impl(
num_elements, num_cores, max_chunks, chunk_size);
}

template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
typename T_, typename Reduce>
static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first,
FwdIterE last, T_&& init, Reduce&& r)
template <typename InnerParams, typename Executor>
friend HPX_FORCEINLINE constexpr std::pair<std::size_t, std::size_t>
tag_override_invoke(hpx::execution::experimental::
adjust_chunk_size_and_max_chunks_t,
reduce_executor_parameters const&, InnerParams&& inner,
Executor&& exec, std::size_t num_elements,
std::size_t num_cores, std::size_t max_chunks,
std::size_t chunk_size)
{
constexpr bool has_scheduler_executor =
hpx::execution_policy_has_scheduler_executor_v<ExPolicy>;

if constexpr (!has_scheduler_executor)
{
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}
}

auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T {
T val = *part_begin;
return detail::sequential_reduce<ExPolicy>(
++part_begin, --part_size, HPX_MOVE(val), r);
};

return util::partitioner<ExPolicy, T>::call(
HPX_FORWARD(ExPolicy, policy), first,
detail::distance(first, last), HPX_MOVE(f1),
hpx::unwrapping(
[init = HPX_FORWARD(T_, init),
r = HPX_FORWARD(Reduce, r)](auto&& results) -> T {
return detail::sequential_reduce<ExPolicy>(
hpx::util::begin(results),
hpx::util::size(results), init, r);
}));
auto [adjusted_chunk_size, adjusted_max_chunks] = hpx::
execution::experimental::adjust_chunk_size_and_max_chunks(
HPX_FORWARD(InnerParams, inner),
HPX_FORWARD(Executor, exec), num_elements, num_cores,
max_chunks, chunk_size);

return adjust_chunk_size_and_max_chunks_impl(num_elements,
num_cores, adjusted_max_chunks, adjusted_chunk_size);
}
};
/// \endcond
} // namespace detail
} // namespace hpx::parallel

// Specialize trait to make reduce_executor_parameters a valid executor
// parameters type
namespace hpx::execution::experimental {

template <>
struct is_executor_parameters<
hpx::parallel::detail::reduce_executor_parameters> : std::true_type
{
};
} // namespace hpx::execution::experimental

namespace hpx::parallel { namespace detail {

// Helper function to reduce a partition without requiring an init value.
// Assumes partition size >= 2 (enforced by reduce_executor_parameters).
template <typename ExPolicy, typename FwdIterB, typename T, typename Reduce>
T reduce_partition(
FwdIterB part_begin, std::size_t part_size, Reduce const& r)
{
HPX_ASSERT(part_size >= 2);

Comment on lines +476 to +483
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce_partition assumes part_size >= 2 (asserts it) and the chunk-size adjustment only targets the fixed-size partitioning path. Execution policies using has_variable_chunk_size parameters (e.g. guided_chunk_size, dynamic_chunk_size, etc.) build variable-sized shapes and can still produce 1-element partitions, which would trip this assert (or worse in release builds). Consider also wrapping/overriding get_chunk_size (and/or the variable-shape generation) for reduce to guarantee no 1-element partitions, or otherwise handle part_size == 1 safely without double-counting elements.

Suggested change
// Helper function to reduce a partition without requiring an init value.
// Assumes partition size >= 2 (enforced by reduce_executor_parameters).
template <typename ExPolicy, typename FwdIterB, typename T, typename Reduce>
T reduce_partition(
FwdIterB part_begin, std::size_t part_size, Reduce const& r)
{
HPX_ASSERT(part_size >= 2);
// Helper function to reduce a non-empty partition without requiring an
// init value.
template <typename ExPolicy, typename FwdIterB, typename T, typename Reduce>
T reduce_partition(
FwdIterB part_begin, std::size_t part_size, Reduce const& r)
{
HPX_ASSERT(part_size != 0);
if (part_size == 1)
{
return *part_begin;
}

Copilot uses AI. Check for mistakes.
// Combine first two elements using the reduction operator
T init = HPX_INVOKE(r, *part_begin, *std::next(part_begin));

if (part_size == 2)
{
return init;
}

// Reduce remaining elements
return sequential_reduce<ExPolicy>(
std::next(part_begin, 2), part_size - 2, HPX_MOVE(init), r);
}

HPX_CXX_CORE_EXPORT template <typename T>
struct reduce : public algorithm<reduce<T>, T>
{
constexpr reduce() noexcept
: algorithm<reduce<T>, T>("reduce")
{
}

template <typename ExPolicy, typename InIterB, typename InIterE,
typename T_, typename Reduce>
static constexpr T sequential(ExPolicy&& policy, InIterB first,
InIterE last, T_&& init, Reduce&& r)
{
return sequential_reduce<ExPolicy>(HPX_FORWARD(ExPolicy, policy),
first, last, HPX_FORWARD(T_, init), HPX_FORWARD(Reduce, r));
}

template <typename ExPolicy, typename FwdIterB, typename FwdIterE,
typename T_, typename Reduce>
static decltype(auto) parallel(ExPolicy&& policy, FwdIterB first,
FwdIterE last, T_&& init, Reduce&& r)
{
constexpr bool has_scheduler_executor =
hpx::execution_policy_has_scheduler_executor_v<ExPolicy>;

// Handle empty range
if constexpr (!has_scheduler_executor)
{
if (first == last)
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_FORWARD(T_, init));
}
}

// Handle single-element case: can't partition into size >= 2
// This must be checked for all execution policies
auto const count = hpx::parallel::detail::distance(first, last);
Comment thread
BhoomishGupta marked this conversation as resolved.
if (count == 1)
{
T result = HPX_INVOKE(r, HPX_FORWARD(T_, init), *first);
if constexpr (has_scheduler_executor)
{
return result;
}
else
{
return util::detail::algorithm_result<ExPolicy, T>::get(
HPX_MOVE(result));
}
}

auto f1 = [r](FwdIterB part_begin, std::size_t part_size) -> T {
return reduce_partition<ExPolicy, FwdIterB, T>(
part_begin, part_size, r);
};

auto rebound_params =
hpx::execution::experimental::rebind_executor_parameters(
policy.parameters(), reduce_executor_parameters{});
Comment on lines +554 to +556
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says changes were limited to a few algorithm files/tests, but this PR also introduces a new executor-parameters customization point (adjust_chunk_size_and_max_chunks) plus rebind/dispatch wiring and adds/updates several execution unit tests and CI test lists. Please update the PR description (or narrow the PR scope) so reviewers can accurately assess the broader API and behavior changes included here.

Copilot uses AI. Check for mistakes.
auto reduce_policy =
hpx::execution::experimental::create_rebound_policy(
policy, HPX_MOVE(rebound_params));
using reduce_policy_type = std::decay_t<decltype(reduce_policy)>;

return util::partitioner<reduce_policy_type, T>::call(
HPX_MOVE(reduce_policy), first, count, HPX_MOVE(f1),
hpx::unwrapping(
[init = HPX_FORWARD(T_, init), r = HPX_FORWARD(Reduce, r)](
auto&& results) -> T {
return sequential_reduce<ExPolicy>(
hpx::util::begin(results), hpx::util::size(results),
init, r);
}));
}
};
/// \endcond
}} // namespace hpx::parallel::detail

namespace hpx {

///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <algorithm>
#include <cstddef>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -59,26 +60,9 @@ namespace hpx::parallel::util::detail {
{
if (max_chunks == 0)
{
if (chunk_size == 0)
{
std::size_t const cores_times_4 = 4 * cores; // -V112

// try to calculate chunk-size and maximum number of chunks
chunk_size = (count + cores_times_4 - 1) / cores_times_4;

// we should not consider more chunks than we have elements
max_chunks = (std::min) (cores_times_4, count); // -V112

// we should not make chunks smaller than what's determined by
// the max chunk size
chunk_size = (std::max) (chunk_size,
(count + max_chunks - 1) / max_chunks);
}
else
{
// max_chunks == 0 && chunk_size != 0
max_chunks = (count + chunk_size - 1) / chunk_size;
}
std::tie(chunk_size, max_chunks) = hpx::execution::experimental::
detail::adjust_chunk_size_and_max_chunks_default(
count, cores, max_chunks, chunk_size);
return;
}

Expand All @@ -88,25 +72,9 @@ namespace hpx::parallel::util::detail {
return;
}

if (chunk_size == 0)
{
// max_chunks != 0
chunk_size = (count + max_chunks - 1) / max_chunks;
}
else
{
// max_chunks != 0 && chunk_size != 0

// in this case we make sure that there are no more chunks than
// max_chunks
std::size_t const calculated_max_chunks =
(count + chunk_size - 1) / chunk_size;

if (calculated_max_chunks > max_chunks)
{
chunk_size = (count + max_chunks - 1) / max_chunks;
}
}
std::tie(chunk_size, max_chunks) = hpx::execution::experimental::
detail::adjust_chunk_size_and_max_chunks_default(
count, cores, max_chunks, chunk_size);
}

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -150,8 +118,10 @@ namespace hpx::parallel::util::detail {
hpx::execution::experimental::get_chunk_size(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, cores, count);

// make sure, chunk size and max_chunks are consistent
adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size);
std::tie(chunk_size, max_chunks) =
hpx::execution::experimental::adjust_chunk_size_and_max_chunks(
policy.parameters(), policy.executor(), count, cores,
max_chunks, chunk_size);

auto last = next_or_subrange(it_or_r, count, 0);
Stride stride = parallel::detail::abs(s);
Expand Down Expand Up @@ -232,8 +202,10 @@ namespace hpx::parallel::util::detail {
hpx::execution::experimental::get_chunk_size(policy.parameters(),
policy.executor(), iteration_duration, cores, count);

// make sure, chunk size and max_chunks are consistent
adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size);
std::tie(chunk_size, max_chunks) =
hpx::execution::experimental::adjust_chunk_size_and_max_chunks(
policy.parameters(), policy.executor(), count, cores,
max_chunks, chunk_size);

auto last = next_or_subrange(it_or_r, count, 0);

Expand Down Expand Up @@ -410,8 +382,10 @@ namespace hpx::parallel::util::detail {
hpx::execution::experimental::get_chunk_size(policy.parameters(),
policy.executor(), hpx::chrono::null_duration, cores, count);

// make sure, chunk size and max_chunks are consistent
adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size);
std::tie(chunk_size, max_chunks) =
hpx::execution::experimental::adjust_chunk_size_and_max_chunks(
policy.parameters(), policy.executor(), count, cores,
max_chunks, chunk_size);

if (stride != 1)
{
Expand Down Expand Up @@ -501,8 +475,10 @@ namespace hpx::parallel::util::detail {
hpx::execution::experimental::get_chunk_size(policy.parameters(),
policy.executor(), iteration_duration, cores, count);

// make sure, chunk size and max_chunks are consistent
adjust_chunk_size_and_max_chunks(cores, count, max_chunks, chunk_size);
std::tie(chunk_size, max_chunks) =
hpx::execution::experimental::adjust_chunk_size_and_max_chunks(
policy.parameters(), policy.executor(), count, cores,
max_chunks, chunk_size);

if (stride != 1)
{
Expand Down
1 change: 1 addition & 0 deletions libs/core/algorithms/tests/regressions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(tests
mismatch_differently_sized_ranges
num_cores
reduce_3641
reduce_6647
scan_different_inits
scan_non_commutative
scan_shortlength
Expand Down
Loading
Loading