Skip to content

Commit decf3ae

Browse files
committed
Address one FIXME and adjust header includes
1 parent d9f9e17 commit decf3ae

3 files changed

Lines changed: 16 additions & 25 deletions

File tree

phlex/model/flush_gate.cpp

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
#include "phlex/model/flush_gate.hpp"
22

3-
#include "spdlog/spdlog.h"
4-
53
#include <cassert>
64
#include <functional>
7-
#include <mutex>
85
#include <ranges>
96
#include <utility>
107

118
namespace phlex::experimental {
129

1310
flush_gate::flush_gate(data_cell_index_ptr index, std::size_t expected_flush_count) :
14-
index_{std::move(index)},
15-
committed_counts_{std::make_shared<data_cell_counts>()},
16-
expected_flush_count_{expected_flush_count}
11+
index_{std::move(index)}, expected_flush_count_{expected_flush_count}
1712
{
1813
}
1914

@@ -24,13 +19,13 @@ namespace phlex::experimental {
2419

2520
std::size_t flush_gate::committed_total_count() const
2621
{
27-
return std::ranges::fold_left(*committed_counts_ | std::views::values, 0uz, std::plus{});
22+
return std::ranges::fold_left(committed_counts_ | std::views::values, 0uz, std::plus{});
2823
}
2924

3025
std::size_t flush_gate::committed_count_for_layer(
3126
data_cell_index::hash_type const layer_hash) const
3227
{
33-
return committed_counts_->count(layer_hash);
28+
return committed_counts_.count(layer_hash);
3429
}
3530

3631
void flush_gate::update_expected_count(data_cell_index::hash_type const layer_hash,
@@ -40,11 +35,10 @@ namespace phlex::experimental {
4035
++received_flush_count_;
4136
}
4237

43-
void flush_gate::roll_up_child(data_cell_counts_const_ptr child_committed_counts)
38+
void flush_gate::roll_up_child(data_cell_counts const& child_committed_counts)
4439
{
45-
assert(child_committed_counts);
46-
for (auto const& [layer_hash, count] : *child_committed_counts) {
47-
committed_counts_->add_to(layer_hash, count);
40+
for (auto const& [layer_hash, count] : child_committed_counts) {
41+
committed_counts_.add_to(layer_hash, count);
4842
}
4943
--pending_child_rollups_;
5044
}
@@ -89,7 +83,7 @@ namespace phlex::experimental {
8983
void flush_gate::commit()
9084
{
9185
for (auto const& [layer_hash, count] : expected_counts_) {
92-
committed_counts_->add_to(layer_hash, count.load());
86+
committed_counts_.add_to(layer_hash, count.load());
9387
}
9488

9589
// At some point, we might consider clearing the expected_counts_ map to free memory,

phlex/model/flush_gate.hpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <functional>
4141
#include <memory>
4242
#include <mutex>
43+
#include <utility>
4344

4445
namespace phlex::experimental {
4546

@@ -57,7 +58,7 @@ namespace phlex::experimental {
5758
std::size_t expected_total_count() const;
5859
std::size_t committed_total_count() const;
5960
std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const;
60-
data_cell_counts_const_ptr committed_counts() const { return committed_counts_; }
61+
data_cell_counts const& committed_counts() const { return committed_counts_; }
6162

6263
// Merges an expected child count into the accumulated expected counts. Each call
6364
// represents one flush message arriving (e.g. one unfold completing for this index).
@@ -66,7 +67,7 @@ namespace phlex::experimental {
6667
// Records that a non-lowest direct child has rolled up: merges its committed_counts
6768
// into this gate's and decrements the pending-rollups balance. The two steps are
6869
// bundled because every rollup must do both, in the same call.
69-
void roll_up_child(data_cell_counts_const_ptr child_committed_counts);
70+
void roll_up_child(data_cell_counts const& child_committed_counts);
7071

7172
// Announces that n additional non-lowest direct children are expected to roll up.
7273
// Lowest-layer children require no such bookkeeping: their counts are fully accounted
@@ -84,11 +85,7 @@ namespace phlex::experimental {
8485

8586
data_cell_index_ptr const index_;
8687
std::once_flag commit_once_;
87-
// FIXME: We express committed_counts_ as a shared pointer so that we can copy the committed
88-
// counts (this is done for determining the flush values for folds). Once the fold
89-
// flushes are incorporated as part of the multi-layer join node infrastructure, it
90-
// should be possible for committed_counts_ to no longer be a pointer, but a value.
91-
std::shared_ptr<data_cell_counts> committed_counts_;
88+
data_cell_counts committed_counts_;
9289
// Accumulated expected child counts from all unfolds.
9390
data_cell_counts expected_counts_;
9491
std::atomic<std::size_t> received_flush_count_{0};

test/flush_gate_test.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
#include "oneapi/tbb/concurrent_hash_map.h"
2424
#include "oneapi/tbb/concurrent_vector.h"
2525
#include "oneapi/tbb/parallel_for.h"
26-
#include "spdlog/spdlog.h"
2726

2827
#include <memory>
2928
#include <ranges>
29+
#include <vector>
3030

3131
using namespace phlex;
3232
using namespace phlex::experimental;
@@ -222,13 +222,13 @@ TEST_CASE("flush_gate: roll_up_child accumulates across multiple children", "[fl
222222
job_gate->update_expected_count(run_layer_hash, 2);
223223

224224
// Simulate run 0 rolling up with 3 spills.
225-
auto run0_committed = std::make_shared<data_cell_counts>();
226-
run0_committed->add_to(spill_layer_hash, 3);
225+
data_cell_counts run0_committed;
226+
run0_committed.add_to(spill_layer_hash, 3);
227227
job_gate->roll_up_child(run0_committed);
228228

229229
// Simulate run 1 rolling up with 5 spills.
230-
auto run1_committed = std::make_shared<data_cell_counts>();
231-
run1_committed->add_to(spill_layer_hash, 5);
230+
data_cell_counts run1_committed;
231+
run1_committed.add_to(spill_layer_hash, 5);
232232
job_gate->roll_up_child(run1_committed);
233233

234234
REQUIRE(job_gate->all_children_accounted());

0 commit comments

Comments
 (0)