From 64d208c2de3069981089c99d259a01f07bda3914 Mon Sep 17 00:00:00 2001 From: dqhl76 Date: Tue, 21 Apr 2026 10:17:27 +0800 Subject: [PATCH] fix: try to inject warehouse_id for history tables --- .../base/src/runtime/runtime_tracker.rs | 16 ++++ src/common/tracing/src/remote_log.rs | 8 +- src/common/tracing/tests/it/remote_log.rs | 1 + .../flight/v1/actions/init_query_env.rs | 1 + .../flight/v1/actions/init_query_fragments.rs | 2 + .../flight/v1/actions/start_prepared_query.rs | 2 + .../src/servers/http/v1/query/http_query.rs | 88 +++++++++++-------- .../src/servers/http/v1/streaming_load.rs | 2 + .../servers/mysql/mysql_interactive_worker.rs | 57 ++++++------ 9 files changed, 107 insertions(+), 70 deletions(-) diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index 445335d1352d3..ea28389ed71c3 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -135,6 +135,7 @@ impl CaptureLogSettings { pub struct TrackingPayload { pub query_id: Option, + pub warehouse_id: Option, pub profile: Option>, pub mem_stat: Option>, pub metrics: Option>, @@ -204,6 +205,7 @@ impl Clone for TrackingPayload { fn clone(&self) -> Self { TrackingPayload { query_id: self.query_id.clone(), + warehouse_id: self.warehouse_id.clone(), profile: self.profile.clone(), mem_stat: self.mem_stat.clone(), metrics: self.metrics.clone(), @@ -291,6 +293,7 @@ impl ThreadTracker { metrics: None, mem_stat: None, query_id: None, + warehouse_id: None, capture_log_settings: None, time_series_profile: None, local_time_series_profile: None, @@ -410,6 +413,19 @@ impl ThreadTracker { .unwrap_or(None) } + pub fn warehouse_id() -> Option<&'static String> { + TRACKER + .try_with(|tracker| { + tracker + .borrow() + .payload + .warehouse_id + .as_ref() + .map(|warehouse_id| unsafe { &*(warehouse_id as *const String) }) + }) + .unwrap_or(None) + } + pub fn capture_log_settings() -> Option<&'static Arc> { TRACKER .try_with(|tracker| { diff --git a/src/common/tracing/src/remote_log.rs b/src/common/tracing/src/remote_log.rs index 9bfb287e1f7b7..1bb0ac99e9256 100644 --- a/src/common/tracing/src/remote_log.rs +++ b/src/common/tracing/src/remote_log.rs @@ -58,7 +58,6 @@ use crate::loggers::collect_kvs; pub struct RemoteLog { cluster_id: String, node_id: String, - warehouse_id: Option, buffer: Arc, } @@ -113,13 +112,9 @@ impl RemoteLog { let node_id = labels.get("node_id").cloned().unwrap_or_default(); let rt = Runtime::with_worker_threads(2, Some("remote-log-writer".to_string()))?; let (tx, rx) = bounded(1); - // warehouse_id need to be specified after `create warehouse` - // TODO: inject warehouse_id like query_id - let warehouse_id = None; let remote_log = RemoteLog { cluster_id: labels.get("cluster_id").cloned().unwrap_or_default(), node_id: node_id.clone(), - warehouse_id, buffer: Arc::new(LogBuffer::new(tx.clone(), interval as u64)), }; rt.spawn(async move { RemoteLog::work(rx, &stage_name).await }); @@ -209,6 +204,7 @@ impl RemoteLog { pub fn prepare_log_element(&self, record: &Record) -> RemoteLogElement { let timestamp = Timestamp::now().as_microsecond(); let query_id = ThreadTracker::query_id().cloned(); + let warehouse_id = ThreadTracker::warehouse_id().cloned(); let target = record.target().to_string(); let message = record.args().to_string(); let fields = Map::from_iter(collect_kvs(record.key_values())); @@ -232,7 +228,7 @@ impl RemoteLog { target, cluster_id: self.cluster_id.clone(), node_id: self.node_id.clone(), - warehouse_id: self.warehouse_id.clone(), + warehouse_id, query_id, log_level, message, diff --git a/src/common/tracing/tests/it/remote_log.rs b/src/common/tracing/tests/it/remote_log.rs index 30c7cc3994820..e1ace2c0f80ac 100644 --- a/src/common/tracing/tests/it/remote_log.rs +++ b/src/common/tracing/tests/it/remote_log.rs @@ -72,6 +72,7 @@ fn test_basic_parse() -> anyhow::Result<()> { assert_eq!(remote_log_element.cluster_id, "cluster_id1"); assert_eq!(remote_log_element.node_id, "node_id1"); + assert_eq!(remote_log_element.warehouse_id, None); assert_eq!( remote_log_element.path, "databend_query::sessions::query_ctx: query_ctx.rs:656" diff --git a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs index 83d9ead8e45a6..1a45f4438f91b 100644 --- a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs +++ b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs @@ -45,6 +45,7 @@ pub async fn init_query_env(env: QueryEnv) -> Result<()> { let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(env.query_id.clone()); + tracking_payload.warehouse_id = env.cluster.get_warehouse_id().ok(); tracking_payload.mem_stat = Some(query_mem_stat.clone()); tracking_payload.workload_group_resource = tracking_workload_group; diff --git a/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs b/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs index 597a752a9ee3b..de7ecefc415e4 100644 --- a/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs +++ b/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs @@ -14,6 +14,7 @@ use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayloadExt; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use log::debug; @@ -29,6 +30,7 @@ pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> { let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.mem_stat = ctx.get_query_memory_tracking(); tracking_payload.query_id = Some(fragments.query_id.clone()); + tracking_payload.warehouse_id = ctx.get_cluster().get_warehouse_id().ok(); debug!("init query fragments with {:?}", fragments); diff --git a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs index fbb7e8fce55e7..6ccd42ad3d85a 100644 --- a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs +++ b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs @@ -13,6 +13,7 @@ // limitations under the License. use databend_common_base::runtime::ThreadTracker; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use log::debug; @@ -25,6 +26,7 @@ pub async fn start_prepared_query(id: String) -> Result<()> { let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(id.clone()); + tracking_payload.warehouse_id = ctx.get_cluster().get_warehouse_id().ok(); tracking_payload.mem_stat = ctx.get_query_memory_tracking(); let _guard = ThreadTracker::tracking(tracking_payload); diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index ad6cdbb8a0831..a421584efd26b 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -25,8 +25,11 @@ use databend_common_base::base::short_sql; use databend_common_base::runtime::CatchUnwindFuture; use databend_common_base::runtime::GlobalQueryRuntime; use databend_common_base::runtime::MemStat; +use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TrackingPayloadExt; use databend_common_catalog::session_type::SessionType; use databend_common_catalog::table_context::StageAttachment; +use databend_common_catalog::table_context::TableContext; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -846,48 +849,55 @@ impl HttpQuery { let query_session = query_context.get_current_session(); let query_state = self.execute_state.clone(); + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.warehouse_id = query_context.get_cluster().get_warehouse_id().ok(); GlobalQueryRuntime::instance().runtime().spawn( - async move { - let block_sender_closer = block_sender.closer(); - if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query( - query_state.clone(), - sql, - params, - query_session, - query_context.clone(), - arrow_result_version, - block_sender, - )) - .await - .with_context(|| "failed to start query") - .flatten() - { - crate::interpreters::hook_clear_m_cte_temp_table(&query_context) - .inspect_err(|e| warn!("clear_m_cte_temp_table fail: {e}")) - .ok(); - let state = ExecuteStopped { - stats: Progresses::default(), - schema: Default::default(), - result_format_settings: None, - has_result_set: None, - reason: Err(e.clone()), - session_state: ExecutorSessionState::new( - query_context.get_current_session(), - ), - query_duration_ms: query_context.get_query_duration_ms(), - affect: query_context.get_affect(), - warnings: query_context.pop_warnings(), - }; - - error!("Query state changed to Stopped, failed to start: {:?}", e); - Executor::start_to_stop(&query_state, ExecuteState::Stopped(Box::new(state))); - block_sender_closer.abort(); + tracking_payload.tracking( + async move { + let block_sender_closer = block_sender.closer(); + if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query( + query_state.clone(), + sql, + params, + query_session, + query_context.clone(), + arrow_result_version, + block_sender, + )) + .await + .with_context(|| "failed to start query") + .flatten() + { + crate::interpreters::hook_clear_m_cte_temp_table(&query_context) + .inspect_err(|e| warn!("clear_m_cte_temp_table fail: {e}")) + .ok(); + let state = ExecuteStopped { + stats: Progresses::default(), + schema: Default::default(), + result_format_settings: None, + has_result_set: None, + reason: Err(e.clone()), + session_state: ExecutorSessionState::new( + query_context.get_current_session(), + ), + query_duration_ms: query_context.get_query_duration_ms(), + affect: query_context.get_affect(), + warnings: query_context.pop_warnings(), + }; + + error!("Query state changed to Stopped, failed to start: {:?}", e); + Executor::start_to_stop( + &query_state, + ExecuteState::Stopped(Box::new(state)), + ); + block_sender_closer.abort(); + } } - } - .in_span(fastrace::Span::enter_with_local_parent( - "HttpQuery::start_query", - )), + .in_span(fastrace::Span::enter_with_local_parent( + "HttpQuery::start_query", + )), + ), ); Ok(()) diff --git a/src/query/service/src/servers/http/v1/streaming_load.rs b/src/query/service/src/servers/http/v1/streaming_load.rs index aacf4a1e9872c..6075e27380acc 100644 --- a/src/query/service/src/servers/http/v1/streaming_load.rs +++ b/src/query/service/src/servers/http/v1/streaming_load.rs @@ -82,6 +82,7 @@ fn execute_query( mem_stat: Arc, ) -> impl Future> { let id = http_query_context.query_id.clone(); + let warehouse_id = query_context.get_cluster().get_warehouse_id().ok(); let fut = async move { let interpreter = InterpreterFactory::get(query_context.clone(), &plan).await?; @@ -93,6 +94,7 @@ fn execute_query( }; let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(id.clone()); + tracking_payload.warehouse_id = warehouse_id; tracking_payload.mem_stat = Some(mem_stat); let root = get_http_tracing_span("http::execute_query", &http_query_context, &id); diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 40202c14d1d15..4376f74ba6633 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -419,31 +419,38 @@ impl InteractiveWorkerBase { info!("Normal query: {}", query); let context = self.session.create_query_context(self.version).await?; context.update_init_query_id(query_id); - - // Use interpreter_plan_sql, we can write the query log if an error occurs. - let (plan, _, _guard) = - interpreter_plan_sql(context.clone(), query, true, None).await?; - - let interpreter = InterpreterFactory::get(context.clone(), &plan).await?; - let has_result_set = plan.has_result_set(); - - let (blocks, extra_info) = Self::exec_query(interpreter.clone(), &context).await?; - let mut schema = plan.schema(); - if let Some(real_schema) = interpreter.get_dynamic_schema().await { - schema = real_schema; - } - - let format = context.get_output_format_settings()?; - Ok(( - QueryResult::create( - blocks, - extra_info, - has_result_set, - schema, - query.to_string(), - ), - Some(format), - )) + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.warehouse_id = context.get_cluster().get_warehouse_id().ok(); + + tracking_payload + .tracking(async { + // Use interpreter_plan_sql, we can write the query log if an error occurs. + let (plan, _, _guard) = + interpreter_plan_sql(context.clone(), query, true, None).await?; + + let interpreter = InterpreterFactory::get(context.clone(), &plan).await?; + let has_result_set = plan.has_result_set(); + + let (blocks, extra_info) = + Self::exec_query(interpreter.clone(), &context).await?; + let mut schema = plan.schema(); + if let Some(real_schema) = interpreter.get_dynamic_schema().await { + schema = real_schema; + } + + let format = context.get_output_format_settings()?; + Ok(( + QueryResult::create( + blocks, + extra_info, + has_result_set, + schema, + query.to_string(), + ), + Some(format), + )) + }) + .await } } }