Skip to content

Commit 4da3294

Browse files
committed
[fix](be) Refine revocable memory accounting for spill
Issue Number: None Related PR: None Problem Summary: Exclude small non-spillable revocable buffers from pipeline task revocable memory accounting and handle queries without revocable tasks when triggering memory revocation. None - Test: No need to test (commit existing staged changes only) - Behavior changed: Yes (revocable memory estimation and empty revocation handling are adjusted) - Does this need documentation: No
1 parent 2bc4487 commit 4da3294

3 files changed

Lines changed: 35 additions & 16 deletions

File tree

be/src/exec/pipeline/pipeline_task.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -417,14 +417,7 @@ bool PipelineTask::_should_trigger_revoking(const size_t reserve_size) const {
417417
}
418418

419419
if (is_high_memory_pressure) {
420-
const auto revocable_size = [&]() {
421-
size_t total = _sink->revocable_mem_size(_state);
422-
for (const auto& op : _operators) {
423-
total += op->revocable_mem_size(_state);
424-
}
425-
return total;
426-
}();
427-
420+
const auto revocable_size = _get_revocable_size();
428421
const auto total_estimated_revocable = revocable_size * parallelism;
429422
return total_estimated_revocable >= int64_t(double(query_limit) * 0.2);
430423
}
@@ -1006,20 +999,31 @@ std::string PipelineTask::debug_string() {
1006999
return fmt::to_string(debug_string_buffer);
10071000
}
10081001

1009-
size_t PipelineTask::get_revocable_size() const {
1010-
if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
1011-
return 0;
1012-
}
1013-
1002+
size_t PipelineTask::_get_revocable_size() const {
10141003
// Sum revocable memory from every operator in the pipeline + the sink.
10151004
// Each operator reports only its own revocable memory (no child recursion).
1016-
size_t total = _sink->revocable_mem_size(_state);
1005+
size_t total = 0;
1006+
size_t sink_revocable_size = _sink->revocable_mem_size(_state);
1007+
if (sink_revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
1008+
total += sink_revocable_size;
1009+
}
10171010
for (const auto& op : _operators) {
1018-
total += op->revocable_mem_size(_state);
1011+
size_t ops_revocable_size = op->revocable_mem_size(_state);
1012+
if (ops_revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
1013+
total += ops_revocable_size;
1014+
}
10191015
}
10201016
return total;
10211017
}
10221018

1019+
size_t PipelineTask::get_revocable_size() const {
1020+
if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
1021+
return 0;
1022+
}
1023+
1024+
return _get_revocable_size();
1025+
}
1026+
10231027
Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) {
10241028
DCHECK(spill_context);
10251029
if (is_finalized()) {

be/src/exec/pipeline/pipeline_task.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
204204
// otherwise return true.
205205
bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
206206
bool _should_trigger_revoking(const size_t reserve_size) const;
207+
size_t _get_revocable_size() const;
207208

208209
const TUniqueId _query_id;
209210
const uint32_t _index;

be/src/runtime/workload_management/query_task_controller.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "runtime/workload_management/query_task_controller.h"
1919

20+
#include <algorithm>
21+
2022
#include "exec/pipeline/pipeline_fragment_context.h"
2123
#include "runtime/query_context.h"
2224
#include "runtime/workload_management/task_controller.h"
@@ -149,7 +151,19 @@ Status QueryTaskController::revoke_memory() {
149151
fragments.emplace_back(std::move(fragment_ctx));
150152
}
151153

152-
std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return l.first > r.first; });
154+
if (tasks.empty()) {
155+
LOG(INFO) << fmt::format(
156+
"Query {} try to revoke memory, but there is no revocable task, maybe because the "
157+
"query was spilled already. Query memory usage: {}, wg info: {}. {}",
158+
print_id(query_ctx->query_id()),
159+
PrettyPrinter::print_bytes(query_ctx->query_mem_tracker()->consumption()),
160+
query_ctx->workload_group()->memory_debug_string(),
161+
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str());
162+
query_ctx->set_memory_sufficient(true);
163+
return Status::OK();
164+
}
165+
166+
std::ranges::sort(tasks, [](auto&& l, auto&& r) { return l.first > r.first; });
153167

154168
// Do not use memlimit, use current memory usage.
155169
// For example, if current limit is 1.6G, but current used is 1G, if reserve failed

0 commit comments

Comments
 (0)