Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma once

#include <hpx/execution/algorithms/detail/partial_algorithm.hpp>
#include <hpx/execution_base/stdexec_forward.hpp>
#include <hpx/modules/concepts.hpp>
#include <hpx/modules/execution_base.hpp>
#include <hpx/modules/type_support.hpp>
Expand Down Expand Up @@ -58,4 +59,45 @@ namespace hpx::execution::experimental::detail {
return HPX_MOVE(p).invoke(HPX_MOVE(p.scheduler), HPX_FORWARD(U, u));
}
};

// Sibling of `inject_scheduler` that holds a reference to a `run_loop`
// instead of a scheduler. Used so that `make_future(loop)` yields a
// pipe-able adapter which, when piped with a sender, dispatches
// `tag_invoke(Tag, run_loop&, sender, ...)`.
HPX_CXX_CORE_EXPORT template <typename Tag, typename... Ts>
struct inject_run_loop
: partial_algorithm_base<Tag, hpx::util::make_index_pack_t<sizeof...(Ts)>,
Ts...>
{
private:
// Stored as a pointer because the partial may be moved and references
// can't be re-bound; the `run_loop&` lifetime is the caller's
// responsibility (the same as the `scheduler` case).
hpx::execution::experimental::run_loop* loop_;

Comment on lines +63 to +77
using base_type = partial_algorithm_base<Tag,
hpx::util::make_index_pack_t<sizeof...(Ts)>, Ts...>;

public:
template <typename... Ts_>
explicit constexpr inject_run_loop(
hpx::execution::experimental::run_loop& loop, Ts_&&... ts)
: base_type(HPX_FORWARD(Ts_, ts)...)
, loop_(&loop)
{
}

// clang-format off
template <typename U,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_sender_v<U>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto operator|(
U&& u, inject_run_loop p)
{
// NOLINTNEXTLINE(bugprone-use-after-move)
return HPX_MOVE(p).invoke(*p.loop_, HPX_FORWARD(U, u));
}
};
} // namespace hpx::execution::experimental::detail
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,6 @@ namespace hpx::execution::experimental {
// enforce proper formatting
namespace detail {

// Recover the parent `run_loop&` from a `run_loop::scheduler`.
//
// P2300 deliberately does not provide a public API to obtain the
// owning `run_loop&` from one of its schedulers. The only way to do
// this against the current stdexec implementation is to read the
// private `__loop_` member exposed by the scheduler's environment.
//
// This helper is the single point in HPX that touches that internal
// detail. If upstream stdexec ever renames/removes `__loop_` (as
// happened with `stdexec::tags`), only this function needs to change.
// Post-cleanup follow-up of #7123.
//
// The parameter is constrained to the concrete `run_loop::scheduler`
// type (rather than a generic template) because the implementation
// depends on `.__loop_` being the specific stdexec env layout.
// `run_loop::scheduler::schedule()` is `noexcept`, so the
// unconditional `noexcept` here is sound. The function is not marked
// `constexpr` because the `stdexec::schedule` CPO wrapper is not
// declared `constexpr` (GCC strict mode rejects calling it from a
// constexpr context, even though the underlying member is constexpr).
inline hpx::execution::experimental::run_loop&
get_run_loop_from_scheduler(
decltype(std::declval<hpx::execution::experimental::run_loop>()
.get_scheduler()) const& sched) noexcept
{
return static_cast<hpx::execution::experimental::run_loop&>(
*hpx::execution::experimental::get_env(schedule(sched))
.__loop_);
}

template <typename OperationState>
void start_operation_state(OperationState& op_state) noexcept
{
Expand Down Expand Up @@ -200,11 +170,9 @@ namespace hpx::execution::experimental {
template <typename Sender>
future_data_with_run_loop(init_no_addref no_addref,
other_allocator const& alloc,
decltype(std::declval<hpx::execution::experimental::run_loop>()
.get_scheduler()) const& sched,
Sender&& sender)
hpx::execution::experimental::run_loop& loop_, Sender&& sender)
: base_type(no_addref, alloc, HPX_FORWARD(Sender, sender))
, loop(get_run_loop_from_scheduler(sched))
, loop(loop_)
{
this->set_on_completed([this]() { loop.finish(); });
}
Expand Down Expand Up @@ -264,9 +232,8 @@ namespace hpx::execution::experimental {
///////////////////////////////////////////////////////////////////////
HPX_CXX_CORE_EXPORT template <typename Sender, typename Allocator>
auto make_future_with_run_loop(
decltype(std::declval<hpx::execution::experimental::run_loop>()
.get_scheduler()) const& sched,
Sender&& sender, Allocator const& allocator)
hpx::execution::experimental::run_loop& loop, Sender&& sender,
Allocator const& allocator)
{
using allocator_type = Allocator;

Expand Down Expand Up @@ -294,7 +261,7 @@ namespace hpx::execution::experimental {
hpx::util::allocator_deleter<other_allocator>{alloc});

allocator_traits::construct(alloc, p.get(), init_no_addref{}, alloc,
sched, HPX_FORWARD(Sender, sender));
loop, HPX_FORWARD(Sender, sender));

return hpx::traits::future_access<future<result_type>>::create(
p.release(), false);
Expand Down Expand Up @@ -383,12 +350,11 @@ namespace hpx::execution::experimental {
)>
// clang-format on
friend auto tag_invoke(make_future_t,
decltype(std::declval<hpx::execution::experimental::run_loop>()
.get_scheduler()) const& sched,
Sender&& sender, Allocator const& allocator = Allocator{})
hpx::execution::experimental::run_loop& loop, Sender&& sender,
Allocator const& allocator = Allocator{})
{
Comment on lines -389 to 398
return detail::make_future_with_run_loop(
sched, HPX_FORWARD(Sender, sender), allocator);
loop, HPX_FORWARD(Sender, sender), allocator);
Comment on lines +396 to +400
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guptapratykshh does this comment make sense?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed the scheduler-based binary tag_invoke overload, the tag_override_invoke constraint is_completion_scheduler_tag_invocable_v<set_value_t, Sender, make_future_t, Allocator> evaluates to false for run-loop-backed senders, so make_future(transfer_just(sched, ...)) falls to detail::make_future which never drives the loop → hang.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to do something about this?

Comment on lines 395 to +400
}

// clang-format off
Expand Down Expand Up @@ -421,6 +387,49 @@ namespace hpx::execution::experimental {
HPX_FORWARD(Scheduler, scheduler), allocator};
}

// `make_future(loop.get_scheduler())` is intentionally rejected at
// the call site: the run-loop-backed binary
// `tag_invoke(make_future_t, run_loop::scheduler, sender, alloc)`
// overload is gone (replaced by the `run_loop&` overload above).
// Without these deletes, the generic scheduler partial above would
// accept the call, return an `inject_scheduler`, and fail with a
// confusing substitution error only when the partial is later piped
// with a sender. Use `make_future(loop, sender)` /
// `sender | make_future(loop)` instead.
Comment on lines +433 to +441
//
// Two deleted overloads (one per arity), and neither uses
// `HPX_CONCEPT_REQUIRES_`: that macro expands to a defaulted
// template parameter, and GCC rejects defaulted template arguments
// on friend *declarations* (which a `= delete`d friend is). The
// 2-argument overload therefore takes any second arg (allocator or
// not); that's intentional — `make_future(run_loop::scheduler, X)`
// shouldn't compile regardless of `X`.
friend constexpr auto tag_fallback_invoke(make_future_t,
hpx::execution::experimental::run_loop::scheduler const&) = delete;

template <typename T>
friend constexpr auto tag_fallback_invoke(make_future_t,
hpx::execution::experimental::run_loop::scheduler const&,
T const&) = delete;

// Partial-algorithm overload for the `run_loop&` form so that
// `make_future(loop)` can be piped with a sender. Returns an
// `inject_run_loop` adapter which, when piped with a sender, calls
// `tag_invoke(make_future_t, loop, sender, allocator)` above.
// clang-format off
template <typename Allocator = hpx::util::internal_allocator<>,
HPX_CONCEPT_REQUIRES_(
hpx::traits::is_allocator_v<Allocator>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(make_future_t,
hpx::execution::experimental::run_loop& loop,
Allocator const& allocator = Allocator{})
{
return hpx::execution::experimental::detail::inject_run_loop<
make_future_t, Allocator>{loop, allocator};
Comment on lines +468 to +473
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this comment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inject_scheduler partial was returned by make_future(scheduler) but had nothing to dispatch to (no binary tag_invoke(make_future_t, scheduler, sender, alloc) overload exists for any scheduler type after this PR: there only ever was one for run_loop::scheduler, which was replaced)

Comment on lines +468 to +473
}

// clang-format off
template <typename Allocator = hpx::util::internal_allocator<>,
HPX_CONCEPT_REQUIRES_(
Expand Down
59 changes: 48 additions & 11 deletions libs/core/execution/tests/unit/algorithm_run_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ void test_future_sender()
[[maybe_unused]] auto sched = loop.get_scheduler();

auto s = ex::transfer_just(sched, 3);
auto f = ex::make_future(std::move(s));
auto f = ex::make_future(loop, std::move(s));
HPX_TEST_EQ(f.get(), 3);
}

Expand All @@ -620,7 +620,7 @@ void test_future_sender()
ex::run_loop loop;
[[maybe_unused]] auto sched = loop.get_scheduler();

auto f = ex::transfer_just(sched, 3) | ex::make_future();
auto f = ex::transfer_just(sched, 3) | ex::make_future(loop);
HPX_TEST_EQ(f.get(), 3);
}

Expand All @@ -629,7 +629,12 @@ void test_future_sender()
ex::run_loop loop;
[[maybe_unused]] auto sched = loop.get_scheduler();

auto f = ex::just(3) | ex::make_future(sched);
// Use transfer_just(sched, ...) so the partial-pipe path actually
// schedules work onto `loop` and depends on `loop.run()` being
// driven by the resulting future's `get()` (a bug in the
// `inject_run_loop` partial would hang here, not silently pass
// as it would with `just(3)`).
auto f = ex::transfer_just(sched, 3) | ex::make_future(loop);
HPX_TEST_EQ(f.get(), 3);
}

Expand All @@ -640,7 +645,7 @@ void test_future_sender()

std::atomic<bool> called{false};
auto s = ex::schedule(sched) | ex::then([&] { called = true; });
auto f = ex::make_future(std::move(s));
auto f = ex::make_future(loop, std::move(s));
f.get();
HPX_TEST(called);
}
Expand All @@ -653,7 +658,7 @@ void test_future_sender()
auto s1 = ex::transfer_just(sched, std::size_t(42));
auto s2 = ex::transfer_just(sched, 3.14);
auto s3 = ex::transfer_just(sched, std::string("hello"));
auto f = ex::make_future(sched,
auto f = ex::make_future(loop,
ex::then(ex::when_all(std::move(s1), std::move(s2), std::move(s3)),
[](std::size_t x, double, std::string z) {
return z.size() + x;
Expand All @@ -667,7 +672,7 @@ void test_future_sender()
ex::run_loop loop;
[[maybe_unused]] auto sched = loop.get_scheduler();
auto result = tt::sync_wait(
ex::as_sender(ex::make_future(ex::transfer_just(sched, 42))));
ex::as_sender(ex::make_future(loop, ex::transfer_just(sched, 42))));
HPX_TEST_EQ(hpx::get<0>(*result), 42);
}

Expand All @@ -681,9 +686,9 @@ void test_future_sender()
return 42;
});

HPX_TEST_EQ(
ex::make_future(ex::transfer(ex::as_sender(std::move(f)), sched))
.get(),
HPX_TEST_EQ(ex::make_future(
loop, ex::transfer(ex::as_sender(std::move(f)), sched))
.get(),
42);
}

Expand All @@ -695,7 +700,7 @@ void test_future_sender()
auto s1 = ex::transfer_just(sched, std::size_t(42));
auto s2 = ex::transfer_just(sched, 3.14);
auto s3 = ex::transfer_just(sched, std::string("hello"));
auto f = ex::make_future(sched,
auto f = ex::make_future(loop,
ex::then(ex::when_all(std::move(s1), std::move(s2), std::move(s3)),
[](std::size_t x, double, std::string z) {
return z.size() + x;
Expand All @@ -705,13 +710,45 @@ void test_future_sender()
auto t2 = sf.then([](auto&& sf) { return sf.get() + 2; });
auto t1s = ex::then(
ex::as_sender(std::move(t1)), [](std::size_t x) { return x + 1; });
auto t1f = ex::make_future(sched, std::move(t1s));
auto t1f = ex::make_future(loop, std::move(t1s));
auto last = hpx::dataflow(
hpx::unwrapping([](std::size_t x, std::size_t y) { return x + y; }),
t1f, t2);

HPX_TEST_EQ(last.get(), std::size_t(18));
}

// The following two cases cover the public `make_future(sender)` and
// `sender | make_future()` paths for run-loop-backed senders. After
// this PR these no longer drive the loop themselves (only the
// explicit `make_future(loop, sender)` form does), but the overloads
// are still public — exercising them here with a separate driver
// thread keeps that contract test-covered.
std::cout << "9 (sender_only_make_future_with_driver)\n";
{
ex::run_loop loop;
auto driver = hpx::thread([&] { loop.run(); });
[[maybe_unused]] auto sched = loop.get_scheduler();

auto f = ex::make_future(ex::transfer_just(sched, 3));
HPX_TEST_EQ(f.get(), 3);

loop.finish();
driver.join();
}

std::cout << "10 (sender_pipe_make_future_with_driver)\n";
{
ex::run_loop loop;
auto driver = hpx::thread([&] { loop.run(); });
[[maybe_unused]] auto sched = loop.get_scheduler();

auto f = ex::transfer_just(sched, 3) | ex::make_future();
HPX_TEST_EQ(f.get(), 3);

loop.finish();
driver.join();
}
}

void test_ensure_started()
Expand Down
Loading