Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/common/base/src/runtime/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl CaptureLogSettings {

pub struct TrackingPayload {
pub query_id: Option<String>,
pub warehouse_id: Option<String>,
pub profile: Option<Arc<Profile>>,
pub mem_stat: Option<Arc<MemStat>>,
pub metrics: Option<Arc<ScopedRegistry>>,
Expand Down Expand Up @@ -204,6 +205,7 @@ impl Clone for TrackingPayload {
fn clone(&self) -> Self {
TrackingPayload {
query_id: self.query_id.clone(),
warehouse_id: self.warehouse_id.clone(),
profile: self.profile.clone(),
mem_stat: self.mem_stat.clone(),
metrics: self.metrics.clone(),
Expand Down Expand Up @@ -291,6 +293,7 @@ impl ThreadTracker {
metrics: None,
mem_stat: None,
query_id: None,
warehouse_id: None,
capture_log_settings: None,
time_series_profile: None,
local_time_series_profile: None,
Expand Down Expand Up @@ -410,6 +413,19 @@ impl ThreadTracker {
.unwrap_or(None)
}

pub fn warehouse_id() -> Option<&'static String> {
TRACKER
.try_with(|tracker| {
tracker
.borrow()
.payload
.warehouse_id
.as_ref()
.map(|warehouse_id| unsafe { &*(warehouse_id as *const String) })
})
.unwrap_or(None)
}

pub fn capture_log_settings() -> Option<&'static Arc<CaptureLogSettings>> {
TRACKER
.try_with(|tracker| {
Expand Down
8 changes: 2 additions & 6 deletions src/common/tracing/src/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::loggers::collect_kvs;
pub struct RemoteLog {
cluster_id: String,
node_id: String,
warehouse_id: Option<String>,
buffer: Arc<LogBuffer>,
}

Expand Down Expand Up @@ -113,13 +112,9 @@ impl RemoteLog {
let node_id = labels.get("node_id").cloned().unwrap_or_default();
let rt = Runtime::with_worker_threads(2, Some("remote-log-writer".to_string()))?;
let (tx, rx) = bounded(1);
// warehouse_id need to be specified after `create warehouse`
// TODO: inject warehouse_id like query_id
let warehouse_id = None;
let remote_log = RemoteLog {
cluster_id: labels.get("cluster_id").cloned().unwrap_or_default(),
node_id: node_id.clone(),
warehouse_id,
buffer: Arc::new(LogBuffer::new(tx.clone(), interval as u64)),
};
rt.spawn(async move { RemoteLog::work(rx, &stage_name).await });
Expand Down Expand Up @@ -209,6 +204,7 @@ impl RemoteLog {
pub fn prepare_log_element(&self, record: &Record) -> RemoteLogElement {
let timestamp = Timestamp::now().as_microsecond();
let query_id = ThreadTracker::query_id().cloned();
let warehouse_id = ThreadTracker::warehouse_id().cloned();
let target = record.target().to_string();
let message = record.args().to_string();
let fields = Map::from_iter(collect_kvs(record.key_values()));
Expand All @@ -232,7 +228,7 @@ impl RemoteLog {
target,
cluster_id: self.cluster_id.clone(),
node_id: self.node_id.clone(),
warehouse_id: self.warehouse_id.clone(),
warehouse_id,
query_id,
log_level,
message,
Expand Down
1 change: 1 addition & 0 deletions src/common/tracing/tests/it/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ fn test_basic_parse() -> anyhow::Result<()> {

assert_eq!(remote_log_element.cluster_id, "cluster_id1");
assert_eq!(remote_log_element.node_id, "node_id1");
assert_eq!(remote_log_element.warehouse_id, None);
assert_eq!(
remote_log_element.path,
"databend_query::sessions::query_ctx: query_ctx.rs:656"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn init_query_env(env: QueryEnv) -> Result<()> {

let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(env.query_id.clone());
tracking_payload.warehouse_id = env.cluster.get_warehouse_id().ok();
tracking_payload.mem_stat = Some(query_mem_stat.clone());
tracking_payload.workload_group_resource = tracking_workload_group;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrackingPayloadExt;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use log::debug;
Expand All @@ -29,6 +30,7 @@ pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> {
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.mem_stat = ctx.get_query_memory_tracking();
tracking_payload.query_id = Some(fragments.query_id.clone());
tracking_payload.warehouse_id = ctx.get_cluster().get_warehouse_id().ok();

debug!("init query fragments with {:?}", fragments);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use databend_common_base::runtime::ThreadTracker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use log::debug;

Expand All @@ -25,6 +26,7 @@ pub async fn start_prepared_query(id: String) -> Result<()> {

let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(id.clone());
tracking_payload.warehouse_id = ctx.get_cluster().get_warehouse_id().ok();
tracking_payload.mem_stat = ctx.get_query_memory_tracking();
let _guard = ThreadTracker::tracking(tracking_payload);

Expand Down
88 changes: 49 additions & 39 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ use databend_common_base::base::short_sql;
use databend_common_base::runtime::CatchUnwindFuture;
use databend_common_base::runtime::GlobalQueryRuntime;
use databend_common_base::runtime::MemStat;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrackingPayloadExt;
use databend_common_catalog::session_type::SessionType;
use databend_common_catalog::table_context::StageAttachment;
use databend_common_catalog::table_context::TableContext;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -846,48 +849,55 @@ impl HttpQuery {
let query_session = query_context.get_current_session();

let query_state = self.execute_state.clone();
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.warehouse_id = query_context.get_cluster().get_warehouse_id().ok();

GlobalQueryRuntime::instance().runtime().spawn(
async move {
let block_sender_closer = block_sender.closer();
if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query(
query_state.clone(),
sql,
params,
query_session,
query_context.clone(),
arrow_result_version,
block_sender,
))
.await
.with_context(|| "failed to start query")
.flatten()
{
crate::interpreters::hook_clear_m_cte_temp_table(&query_context)
.inspect_err(|e| warn!("clear_m_cte_temp_table fail: {e}"))
.ok();
let state = ExecuteStopped {
stats: Progresses::default(),
schema: Default::default(),
result_format_settings: None,
has_result_set: None,
reason: Err(e.clone()),
session_state: ExecutorSessionState::new(
query_context.get_current_session(),
),
query_duration_ms: query_context.get_query_duration_ms(),
affect: query_context.get_affect(),
warnings: query_context.pop_warnings(),
};

error!("Query state changed to Stopped, failed to start: {:?}", e);
Executor::start_to_stop(&query_state, ExecuteState::Stopped(Box::new(state)));
block_sender_closer.abort();
tracking_payload.tracking(
async move {
let block_sender_closer = block_sender.closer();
if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query(
query_state.clone(),
sql,
params,
query_session,
query_context.clone(),
arrow_result_version,
block_sender,
))
.await
.with_context(|| "failed to start query")
.flatten()
{
crate::interpreters::hook_clear_m_cte_temp_table(&query_context)
.inspect_err(|e| warn!("clear_m_cte_temp_table fail: {e}"))
.ok();
let state = ExecuteStopped {
stats: Progresses::default(),
schema: Default::default(),
result_format_settings: None,
has_result_set: None,
reason: Err(e.clone()),
session_state: ExecutorSessionState::new(
query_context.get_current_session(),
),
query_duration_ms: query_context.get_query_duration_ms(),
affect: query_context.get_affect(),
warnings: query_context.pop_warnings(),
};

error!("Query state changed to Stopped, failed to start: {:?}", e);
Executor::start_to_stop(
&query_state,
ExecuteState::Stopped(Box::new(state)),
);
block_sender_closer.abort();
}
}
}
.in_span(fastrace::Span::enter_with_local_parent(
"HttpQuery::start_query",
)),
.in_span(fastrace::Span::enter_with_local_parent(
"HttpQuery::start_query",
)),
),
);

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/servers/http/v1/streaming_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ fn execute_query(
mem_stat: Arc<MemStat>,
) -> impl Future<Output = Result<()>> {
let id = http_query_context.query_id.clone();
let warehouse_id = query_context.get_cluster().get_warehouse_id().ok();
let fut = async move {
let interpreter = InterpreterFactory::get(query_context.clone(), &plan).await?;

Expand All @@ -93,6 +94,7 @@ fn execute_query(
};
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(id.clone());
tracking_payload.warehouse_id = warehouse_id;
tracking_payload.mem_stat = Some(mem_stat);

let root = get_http_tracing_span("http::execute_query", &http_query_context, &id);
Expand Down
57 changes: 32 additions & 25 deletions src/query/service/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,31 +419,38 @@ impl InteractiveWorkerBase {
info!("Normal query: {}", query);
let context = self.session.create_query_context(self.version).await?;
context.update_init_query_id(query_id);

// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, _, _guard) =
interpreter_plan_sql(context.clone(), query, true, None).await?;

let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
let has_result_set = plan.has_result_set();

let (blocks, extra_info) = Self::exec_query(interpreter.clone(), &context).await?;
let mut schema = plan.schema();
if let Some(real_schema) = interpreter.get_dynamic_schema().await {
schema = real_schema;
}

let format = context.get_output_format_settings()?;
Ok((
QueryResult::create(
blocks,
extra_info,
has_result_set,
schema,
query.to_string(),
),
Some(format),
))
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.warehouse_id = context.get_cluster().get_warehouse_id().ok();

tracking_payload
.tracking(async {
// Use interpreter_plan_sql, we can write the query log if an error occurs.
let (plan, _, _guard) =
interpreter_plan_sql(context.clone(), query, true, None).await?;

let interpreter = InterpreterFactory::get(context.clone(), &plan).await?;
let has_result_set = plan.has_result_set();

let (blocks, extra_info) =
Self::exec_query(interpreter.clone(), &context).await?;
let mut schema = plan.schema();
if let Some(real_schema) = interpreter.get_dynamic_schema().await {
schema = real_schema;
}

let format = context.get_output_format_settings()?;
Ok((
QueryResult::create(
blocks,
extra_info,
has_result_set,
schema,
query.to_string(),
),
Some(format),
))
})
.await
}
}
}
Expand Down
Loading