Skip to content

Commit 4640c70

Browse files
authored
[refactor](compaction) submit manual full compaction task to thread pool instead of detached thread (#61222)
Previously, manually triggering full compaction on a single tablet via HTTP API would create a detached thread for each request, which lacks concurrency control, deduplication, and metrics tracking. This PR changes the single-tablet full compaction path to use `submit_compaction_task()`, submitting to the base compaction thread pool — consistent with the table-level path and the cloud engine. Base and cumulative compaction remain unchanged. Added `force` HTTP parameter to skip permit limiter when submitting full compaction, allowing compaction to proceed even when resources are constrained. ### Usage ```bash # single tablet, default (with permit limiter) curl -X POST "http://be_host:http_port/api/compaction/run?tablet_id=12345&compact_type=full" # single tablet, force (skip permit limiter) curl -X POST "http://be_host:http_port/api/compaction/run?tablet_id=12345&compact_type=full&force=true" # table level curl -X POST "http://be_host:http_port/api/compaction/run?table_id=67890&compact_type=full" # table level, force curl -X POST "http://be_host:http_port/api/compaction/run?table_id=67890&compact_type=full&force=true" ```
1 parent fd83868 commit 4640c70

5 files changed

Lines changed: 313 additions & 46 deletions

File tree

be/src/service/http/action/compaction_action.cpp

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
#include "storage/compaction/cumulative_compaction.h"
4141
#include "storage/compaction/cumulative_compaction_policy.h"
4242
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
43-
#include "storage/compaction/full_compaction.h"
4443
#include "storage/compaction/single_replica_compaction.h"
4544
#include "storage/compaction_task_tracker.h"
4645
#include "storage/olap_define.h"
@@ -150,13 +149,22 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
150149
return Status::NotSupported("The remote = '{}' is not supported", param_remote);
151150
}
152151

152+
// "force" = "true" means skip permit limiter when submitting full compaction to thread pool
153+
bool force = false;
154+
std::string param_force = req->param(PARAM_COMPACTION_FORCE);
155+
if (param_force == "true") {
156+
force = true;
157+
} else if (!param_force.empty() && param_force != "false") {
158+
return Status::NotSupported("The force = '{}' is not supported", param_force);
159+
}
160+
153161
if (tablet_id == 0 && table_id != 0) {
154162
std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet(
155163
[table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; });
156164
for (const auto& tablet : tablet_vec) {
157165
tablet->set_last_full_compaction_schedule_time(UnixMillis());
158166
RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION,
159-
false, true, 1));
167+
force, true, 1));
160168
}
161169
} else {
162170
// 2. fetch the tablet by tablet_id
@@ -178,33 +186,36 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
178186
return Status::OK();
179187
})
180188

181-
// 3. execute compaction task
182-
std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
183-
return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
184-
});
185-
std::future<Status> future_obj = task.get_future();
186-
std::thread(std::move(task)).detach();
187-
188-
// 更新schedule_time
189-
if (compaction_type == PARAM_COMPACTION_BASE) {
190-
tablet->set_last_base_compaction_schedule_time(UnixMillis());
191-
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
192-
tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
193-
} else if (compaction_type == PARAM_COMPACTION_FULL) {
189+
if (compaction_type == PARAM_COMPACTION_FULL) {
190+
// 3. submit full compaction task to thread pool
194191
tablet->set_last_full_compaction_schedule_time(UnixMillis());
195-
}
192+
RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION,
193+
force, true, 1));
194+
} else {
195+
// 3. execute base/cumulative compaction task in a detached thread
196+
std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
197+
return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
198+
});
199+
std::future<Status> future_obj = task.get_future();
200+
std::thread(std::move(task)).detach();
201+
202+
if (compaction_type == PARAM_COMPACTION_BASE) {
203+
tablet->set_last_base_compaction_schedule_time(UnixMillis());
204+
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
205+
tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
206+
}
196207

197-
// 4. wait for result for 2 seconds by async
198-
std::future_status status = future_obj.wait_for(std::chrono::seconds(2));
199-
if (status == std::future_status::ready) {
200-
// fetch execute result
201-
Status olap_status = future_obj.get();
202-
if (!olap_status.ok()) {
203-
return olap_status;
208+
// 4. wait for result for 2 seconds by async
209+
std::future_status status = future_obj.wait_for(std::chrono::seconds(2));
210+
if (status == std::future_status::ready) {
211+
Status olap_status = future_obj.get();
212+
if (!olap_status.ok()) {
213+
return olap_status;
214+
}
215+
} else {
216+
LOG(INFO) << "Manual compaction task is timeout for waiting "
217+
<< (status == std::future_status::timeout);
204218
}
205-
} else {
206-
LOG(INFO) << "Manual compaction task is timeout for waiting "
207-
<< (status == std::future_status::timeout);
208219
}
209220
}
210221
LOG(INFO) << "Manual compaction task is successfully triggered";
@@ -375,19 +386,6 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
375386
}
376387
}
377388
}
378-
} else if (compaction_type == PARAM_COMPACTION_FULL) {
379-
FullCompaction full_compaction(_engine, tablet);
380-
res = do_compact(full_compaction, CompactionProfileType::FULL);
381-
if (!res) {
382-
if (res.is<FULL_NO_SUITABLE_VERSION>()) {
383-
// Ignore this error code.
384-
VLOG_NOTICE << "failed to init full compaction due to no suitable version,"
385-
<< "tablet=" << tablet->tablet_id();
386-
} else {
387-
LOG(WARNING) << "failed to do full compaction. res=" << res
388-
<< ", table=" << tablet->tablet_id();
389-
}
390-
}
391389
}
392390
timer.stop();
393391
LOG(INFO) << "Manual compaction task finish, status=" << res

be/src/service/http/action/compaction_action.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const std::string PARAM_COMPACTION_BASE = "base";
4141
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
4242
const std::string PARAM_COMPACTION_FULL = "full";
4343
const std::string PARAM_COMPACTION_REMOTE = "remote";
44+
const std::string PARAM_COMPACTION_FORCE = "force";
4445

4546
/// This action is used for viewing the compaction status.
4647
/// See compaction-action.md for details.
@@ -60,9 +61,9 @@ class CompactionAction : public HttpHandlerWithAuth {
6061
/// param compact_type in req to distinguish the task type, base or cumulative
6162
Status _handle_run_compaction(HttpRequest* req, std::string* json_result);
6263

63-
/// thread callback function for the tablet to do compaction
64+
/// thread callback function for the tablet to do base/cumulative compaction
6465
Status _execute_compaction_callback(TabletSharedPtr tablet, const std::string& compaction_type,
65-
bool fethch_from_remote);
66+
bool fetch_from_remote);
6667

6768
/// fetch compaction running status
6869
Status _handle_run_status_compaction(HttpRequest* req, std::string* json_result);
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "service/http/action/compaction_action.h"
19+
20+
#include <event2/http.h>
21+
#include <gtest/gtest.h>
22+
23+
#include <string>
24+
25+
#include "service/http/http_request.h"
26+
#include "storage/olap_define.h"
27+
#include "storage/options.h"
28+
#include "storage/storage_engine.h"
29+
#include "util/uid_util.h"
30+
31+
namespace doris {
32+
33+
// Test fixture: creates a minimal StorageEngine with TabletManager initialized.
34+
// No real tablets are added, so get_tablet() returns nullptr for any id.
35+
// Tests use -fno-access-control to call private methods directly.
36+
class CompactionActionTest : public testing::Test {
37+
public:
38+
void SetUp() override {
39+
EngineOptions options;
40+
options.backend_uid = UniqueId::gen_uid();
41+
_storage_engine = std::make_unique<StorageEngine>(options);
42+
_evhttp_req = evhttp_request_new(nullptr, nullptr);
43+
}
44+
45+
void TearDown() override {
46+
if (_evhttp_req != nullptr) {
47+
evhttp_request_free(_evhttp_req);
48+
}
49+
_storage_engine.reset();
50+
}
51+
52+
protected:
53+
CompactionAction _make_run_action() {
54+
return {CompactionActionType::RUN_COMPACTION, nullptr, *_storage_engine,
55+
TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN};
56+
}
57+
58+
CompactionAction _make_status_action() {
59+
return {CompactionActionType::RUN_COMPACTION_STATUS, nullptr, *_storage_engine,
60+
TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN};
61+
}
62+
63+
CompactionAction _make_show_action() {
64+
return {CompactionActionType::SHOW_INFO, nullptr, *_storage_engine, TPrivilegeHier::GLOBAL,
65+
TPrivilegeType::ADMIN};
66+
}
67+
68+
evhttp_request* _evhttp_req = nullptr;
69+
std::unique_ptr<StorageEngine> _storage_engine;
70+
};
71+
72+
// ==================== _handle_run_compaction tests ====================
73+
74+
TEST_F(CompactionActionTest, RunCompactionMissingBothIds) {
75+
auto action = _make_run_action();
76+
HttpRequest req(_evhttp_req);
77+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
78+
79+
std::string json_result;
80+
Status st = action._handle_run_compaction(&req, &json_result);
81+
EXPECT_FALSE(st.ok());
82+
EXPECT_TRUE(st.to_string().find("can not be empty at the same time") != std::string::npos);
83+
}
84+
85+
TEST_F(CompactionActionTest, RunCompactionBothIdsSet) {
86+
auto action = _make_run_action();
87+
HttpRequest req(_evhttp_req);
88+
req._params[TABLET_ID_KEY] = "12345";
89+
req._params[TABLE_ID_KEY] = "67890";
90+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
91+
92+
std::string json_result;
93+
Status st = action._handle_run_compaction(&req, &json_result);
94+
EXPECT_FALSE(st.ok());
95+
EXPECT_TRUE(st.to_string().find("can not be set at the same time") != std::string::npos);
96+
}
97+
98+
TEST_F(CompactionActionTest, RunCompactionInvalidType) {
99+
auto action = _make_run_action();
100+
HttpRequest req(_evhttp_req);
101+
req._params[TABLET_ID_KEY] = "12345";
102+
req._params[PARAM_COMPACTION_TYPE] = "invalid_type";
103+
104+
std::string json_result;
105+
Status st = action._handle_run_compaction(&req, &json_result);
106+
EXPECT_FALSE(st.ok());
107+
EXPECT_TRUE(st.to_string().find("not supported") != std::string::npos);
108+
}
109+
110+
TEST_F(CompactionActionTest, RunCompactionInvalidRemoteParam) {
111+
auto action = _make_run_action();
112+
HttpRequest req(_evhttp_req);
113+
req._params[TABLET_ID_KEY] = "12345";
114+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
115+
req._params[PARAM_COMPACTION_REMOTE] = "invalid_value";
116+
117+
std::string json_result;
118+
Status st = action._handle_run_compaction(&req, &json_result);
119+
EXPECT_FALSE(st.ok());
120+
EXPECT_TRUE(st.to_string().find("not supported") != std::string::npos);
121+
}
122+
123+
TEST_F(CompactionActionTest, RunCompactionInvalidTabletId) {
124+
auto action = _make_run_action();
125+
HttpRequest req(_evhttp_req);
126+
req._params[TABLET_ID_KEY] = "not_a_number";
127+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
128+
129+
std::string json_result;
130+
Status st = action._handle_run_compaction(&req, &json_result);
131+
EXPECT_FALSE(st.ok());
132+
EXPECT_TRUE(st.to_string().find("convert") != std::string::npos);
133+
}
134+
135+
TEST_F(CompactionActionTest, RunFullCompactionTabletNotFound) {
136+
auto action = _make_run_action();
137+
HttpRequest req(_evhttp_req);
138+
req._params[TABLET_ID_KEY] = "99999";
139+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
140+
141+
std::string json_result;
142+
Status st = action._handle_run_compaction(&req, &json_result);
143+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
144+
}
145+
146+
TEST_F(CompactionActionTest, RunBaseCompactionTabletNotFound) {
147+
auto action = _make_run_action();
148+
HttpRequest req(_evhttp_req);
149+
req._params[TABLET_ID_KEY] = "99999";
150+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_BASE;
151+
152+
std::string json_result;
153+
Status st = action._handle_run_compaction(&req, &json_result);
154+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
155+
}
156+
157+
TEST_F(CompactionActionTest, RunCumulativeCompactionTabletNotFound) {
158+
auto action = _make_run_action();
159+
HttpRequest req(_evhttp_req);
160+
req._params[TABLET_ID_KEY] = "99999";
161+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_CUMULATIVE;
162+
163+
std::string json_result;
164+
Status st = action._handle_run_compaction(&req, &json_result);
165+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
166+
}
167+
168+
// ==================== force parameter tests ====================
169+
170+
TEST_F(CompactionActionTest, RunCompactionInvalidForceParam) {
171+
auto action = _make_run_action();
172+
HttpRequest req(_evhttp_req);
173+
req._params[TABLET_ID_KEY] = "12345";
174+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
175+
req._params[PARAM_COMPACTION_FORCE] = "invalid_value";
176+
177+
std::string json_result;
178+
Status st = action._handle_run_compaction(&req, &json_result);
179+
EXPECT_FALSE(st.ok());
180+
EXPECT_TRUE(st.to_string().find("not supported") != std::string::npos);
181+
}
182+
183+
TEST_F(CompactionActionTest, RunFullCompactionForceTrue) {
184+
auto action = _make_run_action();
185+
HttpRequest req(_evhttp_req);
186+
req._params[TABLET_ID_KEY] = "99999";
187+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
188+
req._params[PARAM_COMPACTION_FORCE] = "true";
189+
190+
std::string json_result;
191+
Status st = action._handle_run_compaction(&req, &json_result);
192+
// tablet not found, but force param was parsed successfully
193+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
194+
}
195+
196+
TEST_F(CompactionActionTest, RunFullCompactionForceFalse) {
197+
auto action = _make_run_action();
198+
HttpRequest req(_evhttp_req);
199+
req._params[TABLET_ID_KEY] = "99999";
200+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
201+
req._params[PARAM_COMPACTION_FORCE] = "false";
202+
203+
std::string json_result;
204+
Status st = action._handle_run_compaction(&req, &json_result);
205+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
206+
}
207+
208+
TEST_F(CompactionActionTest, RunCompactionForceWithTableId) {
209+
auto action = _make_run_action();
210+
HttpRequest req(_evhttp_req);
211+
req._params[TABLE_ID_KEY] = "99999";
212+
req._params[PARAM_COMPACTION_TYPE] = PARAM_COMPACTION_FULL;
213+
req._params[PARAM_COMPACTION_FORCE] = "true";
214+
215+
std::string json_result;
216+
// table_id path with no matching tablets returns success (empty loop)
217+
Status st = action._handle_run_compaction(&req, &json_result);
218+
EXPECT_TRUE(st.ok());
219+
}
220+
221+
// ==================== _handle_show_compaction tests ====================
222+
223+
TEST_F(CompactionActionTest, ShowCompactionMissingTabletId) {
224+
auto action = _make_show_action();
225+
HttpRequest req(_evhttp_req);
226+
227+
std::string json_result;
228+
Status st = action._handle_show_compaction(&req, &json_result);
229+
EXPECT_FALSE(st.ok());
230+
}
231+
232+
TEST_F(CompactionActionTest, ShowCompactionTabletNotFound) {
233+
auto action = _make_show_action();
234+
HttpRequest req(_evhttp_req);
235+
req._params[TABLET_ID_KEY] = "99999";
236+
237+
std::string json_result;
238+
Status st = action._handle_show_compaction(&req, &json_result);
239+
EXPECT_TRUE(st.is<ErrorCode::NOT_FOUND>());
240+
}
241+
242+
// ==================== _handle_run_status_compaction tests ====================
243+
244+
TEST_F(CompactionActionTest, RunStatusCompactionOverall) {
245+
auto action = _make_status_action();
246+
HttpRequest req(_evhttp_req);
247+
// tablet_id not set → returns overall compaction status
248+
std::string json_result;
249+
Status st = action._handle_run_status_compaction(&req, &json_result);
250+
EXPECT_TRUE(st.ok());
251+
}
252+
253+
TEST_F(CompactionActionTest, RunStatusCompactionTabletNotFound) {
254+
auto action = _make_status_action();
255+
HttpRequest req(_evhttp_req);
256+
req._params[TABLET_ID_KEY] = "99999";
257+
258+
std::string json_result;
259+
Status st = action._handle_run_status_compaction(&req, &json_result);
260+
EXPECT_FALSE(st.ok());
261+
}
262+
263+
} // namespace doris

0 commit comments

Comments
 (0)