From b89528c5aa74888b87886d9e5421e363e4380285 Mon Sep 17 00:00:00 2001 From: smallfish Date: Thu, 30 Apr 2026 10:23:15 +0800 Subject: [PATCH 1/5] feat(query): add billing usage daily table function --- src/common/cloud_control/proto/billing.proto | 73 +++ .../cloud_control/src/billing_client.rs | 47 ++ src/common/cloud_control/src/cloud_api.rs | 8 + src/common/cloud_control/src/lib.rs | 2 + .../cloud_control/tests/it/billing_client.rs | 108 ++++ src/common/cloud_control/tests/it/main.rs | 1 + .../table_functions/table_function_factory.rs | 7 + .../tests/it/table_functions/billing_usage.rs | 232 ++++++++ .../service/tests/it/table_functions/mod.rs | 2 + .../table_functions/billing_usage_daily.rs | 504 ++++++++++++++++++ .../task_support/src/table_functions/mod.rs | 2 + 11 files changed, 986 insertions(+) create mode 100644 src/common/cloud_control/proto/billing.proto create mode 100644 src/common/cloud_control/src/billing_client.rs create mode 100644 src/common/cloud_control/tests/it/billing_client.rs create mode 100644 src/query/service/tests/it/table_functions/billing_usage.rs create mode 100644 src/query/task_support/src/table_functions/billing_usage_daily.rs diff --git a/src/common/cloud_control/proto/billing.proto b/src/common/cloud_control/proto/billing.proto new file mode 100644 index 0000000000000..f217fdfd914c3 --- /dev/null +++ b/src/common/cloud_control/proto/billing.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package billingproto; + +message BillingError { + string kind = 1; + string message = 2; + int32 code = 3; +} + +message GetBillingUsageDailyRequest { + string tenant_id = 1; + string billing_month = 2; // YYYY-MM + string sql_user = 3; // audit only, empty means absent + string query_id = 4; // audit only, empty means absent +} + +message BillingUsageDailyRow { + // Billing date in YYYY-MM-DD format. + string usage_date = 1; + + // Top-level billing category, aligned with Snowflake naming where practical. + // Examples: compute, storage, cloud services. + string usage_type = 2; + + // More specific service category. + // Examples: WAREHOUSE_METERING, STORAGE, CLOUD_SERVICES. + string service_type = 3; + + // Logical billable resource name when applicable, such as warehouse name. + // Empty string means absent. + string resource_name = 4; + + // Billable usage quantity encoded as a decimal string to avoid float precision loss. + string usage = 5; + + // Unit for usage. + // Examples: second, byte, request. + string usage_unit = 6; + + // Reserved billing unit price field. + // Empty string means the rate is intentionally undisclosed. + string rate = 7; + + // Billing dimension unit for the reserved rate field. + // Examples: second, tb_day, k_request. + // Returned when the billing dimension is known. + string rate_unit = 8; + + // Final billed amount encoded as a decimal string. + string usage_in_currency = 9; + + // Settlement currency, for example USD. + string currency = 10; + + // Resource tags when the underlying usage is associated with a tagged resource. + map tags = 11; + + // JSON object string for category-specific or future-compatible extension fields. + // Examples: + // {"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"} + // {} + string details = 12; +} + +message GetBillingUsageDailyResponse { + repeated BillingUsageDailyRow rows = 1; + optional BillingError error = 2; +} + +service BillingService { + rpc GetBillingUsageDaily(GetBillingUsageDailyRequest) returns (GetBillingUsageDailyResponse); +} diff --git a/src/common/cloud_control/src/billing_client.rs b/src/common/cloud_control/src/billing_client.rs new file mode 100644 index 0000000000000..b557e58702520 --- /dev/null +++ b/src/common/cloud_control/src/billing_client.rs @@ -0,0 +1,47 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use tonic::Request; +use tonic::transport::Channel; + +use crate::pb::GetBillingUsageDailyRequest; +use crate::pb::GetBillingUsageDailyResponse; +use crate::pb::billing_service_client::BillingServiceClient; +use crate::task_client::MAX_DECODING_SIZE; +use crate::task_client::MAX_ENCODING_SIZE; + +pub struct BillingClient { + pub client: BillingServiceClient, +} + +impl BillingClient { + pub async fn new(channel: Channel) -> Result> { + let client = BillingServiceClient::new(channel) + .max_decoding_message_size(MAX_DECODING_SIZE) + .max_encoding_message_size(MAX_ENCODING_SIZE); + Ok(Arc::new(BillingClient { client })) + } + + pub async fn get_billing_usage_daily( + &self, + req: Request, + ) -> Result { + let mut client = self.client.clone(); + let resp = client.get_billing_usage_daily(req).await?; + Ok(resp.into_inner()) + } +} diff --git a/src/common/cloud_control/src/cloud_api.rs b/src/common/cloud_control/src/cloud_api.rs index 22ca91591ab06..4514175980041 100644 --- a/src/common/cloud_control/src/cloud_api.rs +++ b/src/common/cloud_control/src/cloud_api.rs @@ -19,6 +19,7 @@ use databend_common_base::base::GlobalInstance; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use crate::billing_client::BillingClient; use crate::notification_client::NotificationClient; use crate::task_client::TaskClient; use crate::worker_client::WorkerClient; @@ -26,6 +27,7 @@ use crate::worker_client::WorkerClient; pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds pub struct CloudControlApiProvider { + pub billing_client: Arc, pub task_client: Arc, pub notification_client: Arc, pub worker_client: Arc, @@ -42,10 +44,12 @@ impl CloudControlApiProvider { let endpoint = Self::get_endpoint(endpoint, timeout).await?; let channel = endpoint.connect_lazy(); + let billing_client = BillingClient::new(channel.clone()).await?; let task_client = TaskClient::new(channel.clone()).await?; let notification_client = NotificationClient::new(channel.clone()).await?; let worker_client = WorkerClient::new(channel).await?; Ok(Arc::new(CloudControlApiProvider { + billing_client, task_client, notification_client, worker_client, @@ -85,6 +89,10 @@ impl CloudControlApiProvider { self.task_client.clone() } + pub fn get_billing_client(&self) -> Arc { + self.billing_client.clone() + } + pub fn get_notification_client(&self) -> Arc { self.notification_client.clone() } diff --git a/src/common/cloud_control/src/lib.rs b/src/common/cloud_control/src/lib.rs index 224eeb932cae5..9f6c3d28d0749 100644 --- a/src/common/cloud_control/src/lib.rs +++ b/src/common/cloud_control/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod billing_client; pub mod client_config; pub mod cloud_api; pub mod notification_client; @@ -24,6 +25,7 @@ pub mod worker_client; #[allow(clippy::large_enum_variant)] /// ProtoBuf generated files. pub mod pb { + tonic::include_proto!("billingproto"); // taskproto is proto package name. tonic::include_proto!("taskproto"); tonic::include_proto!("notificationproto"); diff --git a/src/common/cloud_control/tests/it/billing_client.rs b/src/common/cloud_control/tests/it/billing_client.rs new file mode 100644 index 0000000000000..a4e3716e3ee23 --- /dev/null +++ b/src/common/cloud_control/tests/it/billing_client.rs @@ -0,0 +1,108 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use databend_common_base::runtime; +use databend_common_cloud_control::billing_client::BillingClient; +use databend_common_cloud_control::pb::BillingUsageDailyRow; +use databend_common_cloud_control::pb::GetBillingUsageDailyRequest; +use databend_common_cloud_control::pb::GetBillingUsageDailyResponse; +use databend_common_cloud_control::pb::billing_service_server::BillingService; +use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer; +use hyper_util::rt::TokioIo; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::codegen::tokio_stream; +use tonic::transport::Endpoint; +use tonic::transport::Server; +use tonic::transport::Uri; +use tower::service_fn; + +#[derive(Default)] +pub struct MockBillingService {} + +#[tonic::async_trait] +impl BillingService for MockBillingService { + async fn get_billing_usage_daily( + &self, + request: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(GetBillingUsageDailyResponse { + rows: vec![BillingUsageDailyRow { + usage_date: request.into_inner().billing_month, + usage_type: "compute".to_string(), + service_type: "WAREHOUSE_METERING".to_string(), + resource_name: "default".to_string(), + usage: "2653".to_string(), + usage_unit: "second".to_string(), + rate: "".to_string(), + rate_unit: "second".to_string(), + usage_in_currency: "0.737".to_string(), + currency: "USD".to_string(), + tags: BTreeMap::from([("env".to_string(), "test".to_string())]), + details: "{\"cluster_name\":\"cl-00000\"}".to_string(), + }], + error: None, + })) + } +} + +#[tokio::test(flavor = "current_thread")] +async fn test_billing_client_success_cases() -> anyhow::Result<()> { + let (client, server) = tokio::io::duplex(1024); + let client = TokioIo::new(client); + + runtime::spawn(async move { + Server::builder() + .add_service(BillingServiceServer::new(MockBillingService::default())) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .await + }); + + let mut client_io = Some(client); + let channel = Endpoint::try_from("http://[::]:0") + .unwrap() + .connect_with_connector(service_fn(move |_: Uri| { + let client = client_io.take(); + + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::other("Client already taken")) + } + } + })) + .await + .unwrap(); + + let client = BillingClient::new(channel).await?; + + let resp = client + .get_billing_usage_daily(Request::new(GetBillingUsageDailyRequest { + tenant_id: "tenant".to_string(), + billing_month: "2026-03".to_string(), + sql_user: "root".to_string(), + query_id: "query-1".to_string(), + })) + .await?; + assert_eq!(resp.rows.len(), 1); + assert_eq!(resp.rows[0].usage_date, "2026-03"); + assert_eq!(resp.rows[0].usage_type, "compute"); + assert_eq!(resp.rows[0].resource_name, "default"); + + Ok(()) +} diff --git a/src/common/cloud_control/tests/it/main.rs b/src/common/cloud_control/tests/it/main.rs index c1ce66dde7616..24502e84bd3df 100644 --- a/src/common/cloud_control/tests/it/main.rs +++ b/src/common/cloud_control/tests/it/main.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod billing_client; mod task_client; diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index 039ca74704203..532565f9abf8f 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -39,6 +39,8 @@ use databend_common_storages_iceberg::IcebergInspectTable; use databend_common_storages_stream::stream_status_table_func::StreamStatusTable; use databend_meta_client::types::MetaId; #[cfg(feature = "task-support")] +use databend_query_task_support::table_functions::BillingUsageDailyTable; +#[cfg(feature = "task-support")] use databend_query_task_support::table_functions::TaskDependentsEnableTable; #[cfg(feature = "task-support")] use databend_query_task_support::table_functions::TaskDependentsTable; @@ -344,6 +346,11 @@ impl TableFunctionFactory { "task_history".to_string(), (next_id(), Arc::new(TaskHistoryTable::create)), ); + + creators.insert( + "billing_usage_daily".to_string(), + (next_id(), Arc::new(BillingUsageDailyTable::create)), + ); } creators.insert( diff --git a/src/query/service/tests/it/table_functions/billing_usage.rs b/src/query/service/tests/it/table_functions/billing_usage.rs new file mode 100644 index 0000000000000..5b3485a932b6b --- /dev/null +++ b/src/query/service/tests/it/table_functions/billing_usage.rs @@ -0,0 +1,232 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; + +use databend_common_base::runtime; +use databend_common_cloud_control::pb::BillingError as PbBillingError; +use databend_common_cloud_control::pb::BillingUsageDailyRow; +use databend_common_cloud_control::pb::GetBillingUsageDailyRequest; +use databend_common_cloud_control::pb::GetBillingUsageDailyResponse; +use databend_common_cloud_control::pb::billing_service_server::BillingService; +use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer; +use databend_common_config::InnerConfig; +use databend_common_expression::DataBlock; +use databend_common_expression::ScalarRef; +use databend_query::test_kits::ConfigBuilder; +use databend_query::test_kits::TestFixture; +use futures::TryStreamExt; +use tokio::net::TcpListener; +use tokio::sync::oneshot; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::transport::Server; + +#[derive(Clone, Default)] +struct BillingRequests { + usage_daily: Arc>>, +} + +#[derive(Clone, Default)] +struct MockBillingServiceImpl { + requests: BillingRequests, + usage_daily_error: Option, +} + +#[tonic::async_trait] +impl BillingService for MockBillingServiceImpl { + async fn get_billing_usage_daily( + &self, + request: Request, + ) -> std::result::Result, Status> { + self.requests + .usage_daily + .lock() + .unwrap() + .push(request.into_inner()); + + if let Some(error) = self.usage_daily_error.clone() { + return Ok(Response::new(GetBillingUsageDailyResponse { + rows: vec![], + error: Some(error), + })); + } + + Ok(Response::new(GetBillingUsageDailyResponse { + rows: vec![BillingUsageDailyRow { + usage_date: "2026-03-02".to_string(), + usage_type: "compute".to_string(), + service_type: "WAREHOUSE_METERING".to_string(), + resource_name: "default".to_string(), + usage: "2653".to_string(), + usage_unit: "second".to_string(), + rate: "".to_string(), + rate_unit: "second".to_string(), + usage_in_currency: "0.737".to_string(), + currency: "¥".to_string(), + tags: BTreeMap::from([("env".to_string(), "test".to_string())]), + details: "{\"cluster_name\":\"cl-00000\",\"max_clusters\":1,\"size\":\"XSmall\"}" + .to_string(), + }], + error: None, + })) + } +} + +fn extract_single_block(blocks: Vec) -> DataBlock { + let block = DataBlock::concat(&blocks).expect("concat blocks"); + assert_eq!(block.num_rows(), 1); + block +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let requests = BillingRequests::default(); + let mock = MockBillingServiceImpl { + requests: requests.clone(), + usage_daily_error: None, + }; + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_handle = runtime::spawn(async move { + Server::builder() + .add_service(BillingServiceServer::new(mock)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async { + let _ = shutdown_rx.await; + }) + .await + }); + + let mut config: InnerConfig = ConfigBuilder::create().build(); + config.query.common.cloud_control_grpc_server_address = Some(format!("http://{addr}")); + config.query.common.cloud_control_grpc_timeout = 5; + + let fixture = TestFixture::setup_with_config(&config).await?; + + let blocks = fixture + .execute_query( + "select usage_date, usage_type, service_type, resource_name, usage_in_currency, currency, tags, details \ + from billing_usage_daily(month => '2026-03')", + ) + .await? + .try_collect::>() + .await?; + let block = extract_single_block(blocks); + + assert_eq!( + block.get_by_offset(0).index(0).unwrap(), + ScalarRef::String("2026-03-02") + ); + assert_eq!( + block.get_by_offset(1).index(0).unwrap(), + ScalarRef::String("compute") + ); + assert_eq!( + block.get_by_offset(2).index(0).unwrap(), + ScalarRef::String("WAREHOUSE_METERING") + ); + assert_eq!( + block.get_by_offset(3).index(0).unwrap(), + ScalarRef::String("default") + ); + assert_eq!( + block.get_by_offset(4).index(0).unwrap(), + ScalarRef::String("0.737") + ); + assert_eq!( + block.get_by_offset(5).index(0).unwrap(), + ScalarRef::String("¥") + ); + + let expected_tags = + serde_json::to_vec(&HashMap::from([("env".to_string(), "test".to_string())]))?; + match block.get_by_offset(6).index(0).unwrap() { + ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_tags.as_slice()), + other => panic!("unexpected scalar type for tags: {other:?}"), + } + + let expected_details = serde_json::to_vec(&serde_json::json!({ + "cluster_name": "cl-00000", + "max_clusters": 1, + "size": "XSmall", + }))?; + match block.get_by_offset(7).index(0).unwrap() { + ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_details.as_slice()), + other => panic!("unexpected scalar type for details: {other:?}"), + } + + let usage_daily_requests = requests.usage_daily.lock().unwrap().clone(); + assert_eq!(usage_daily_requests.len(), 1); + assert_eq!(usage_daily_requests[0].billing_month, "2026-03"); + assert_eq!(usage_daily_requests[0].sql_user, "'root'@'%'"); + assert!(!usage_daily_requests[0].tenant_id.is_empty()); + assert!(!usage_daily_requests[0].query_id.is_empty()); + + let _ = shutdown_tx.send(()); + server_handle.await??; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_billing_usage_daily_table_function_surfaces_task_error() -> anyhow::Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let mock = MockBillingServiceImpl { + requests: BillingRequests::default(), + usage_daily_error: Some(PbBillingError { + kind: "Internal".to_string(), + message: "billing usage unavailable".to_string(), + code: 500, + }), + }; + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_handle = runtime::spawn(async move { + Server::builder() + .add_service(BillingServiceServer::new(mock)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async { + let _ = shutdown_rx.await; + }) + .await + }); + + let mut config: InnerConfig = ConfigBuilder::create().build(); + config.query.common.cloud_control_grpc_server_address = Some(format!("http://{addr}")); + config.query.common.cloud_control_grpc_timeout = 5; + + let fixture = TestFixture::setup_with_config(&config).await?; + + let stream = fixture + .execute_query("select * from billing_usage_daily(month => '2026-03')") + .await?; + let err = stream + .try_collect::>() + .await + .expect_err("billing usage task error should fail query"); + assert!(err.message().contains("billing usage unavailable")); + assert!(err.message().contains("Internal")); + + let _ = shutdown_tx.send(()); + server_handle.await??; + Ok(()) +} diff --git a/src/query/service/tests/it/table_functions/mod.rs b/src/query/service/tests/it/table_functions/mod.rs index 0879c0377a181..cc9656b8b6b25 100644 --- a/src/query/service/tests/it/table_functions/mod.rs +++ b/src/query/service/tests/it/table_functions/mod.rs @@ -12,4 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License.W +#[cfg(feature = "task-support")] +mod billing_usage; mod numbers_table; diff --git a/src/query/task_support/src/table_functions/billing_usage_daily.rs b/src/query/task_support/src/table_functions/billing_usage_daily.rs new file mode 100644 index 0000000000000..c20cda2fe29ae --- /dev/null +++ b/src/query/task_support/src/table_functions/billing_usage_daily.rs @@ -0,0 +1,504 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use chrono::DateTime; +use chrono::NaiveDate; +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table_args::TableArgs; +use databend_common_catalog::table_context::TableContext; +use databend_common_catalog::table_function::TableFunction; +use databend_common_cloud_control::client_config::build_client_config; +use databend_common_cloud_control::client_config::make_request; +use databend_common_cloud_control::cloud_api::CloudControlApiProvider; +use databend_common_cloud_control::pb::BillingError as PbBillingError; +use databend_common_cloud_control::pb::GetBillingUsageDailyRequest; +use databend_common_cloud_control::pb::GetBillingUsageDailyResponse; +use databend_common_config::GlobalConfig; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::FromData; +use databend_common_expression::infer_table_schema; +use databend_common_expression::types::DataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::VariantType; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_pipeline::core::OutputPort; +use databend_common_pipeline::core::Pipeline; +use databend_common_pipeline::core::ProcessorPtr; +use databend_common_pipeline::sources::AsyncSource; +use databend_common_pipeline::sources::AsyncSourcer; +use databend_common_storages_factory::Table; +use serde_json::Value; + +pub struct BillingUsageDailyTable { + table_info: TableInfo, + args_parsed: BillingUsageDailyArgsParsed, + table_args: TableArgs, +} + +impl BillingUsageDailyTable { + pub fn create( + database_name: &str, + table_func_name: &str, + table_id: u64, + table_args: TableArgs, + ) -> Result> { + let args_parsed = BillingUsageDailyArgsParsed::parse(&table_args)?; + let table_info = TableInfo { + ident: TableIdent::new(table_id, 0), + desc: format!("'{}'.'{}'", database_name, table_func_name), + name: String::from("billing_usage_daily"), + meta: TableMeta { + schema: infer_table_schema(&billing_usage_daily_schema()) + .expect("failed to infer billing_usage_daily schema"), + engine: String::from(table_func_name), + created_on: DateTime::from_timestamp(0, 0).unwrap(), + updated_on: DateTime::from_timestamp(0, 0).unwrap(), + ..Default::default() + }, + ..Default::default() + }; + + Ok(Arc::new(Self { + table_info, + args_parsed, + table_args, + })) + } +} + +#[async_trait::async_trait] +impl Table for BillingUsageDailyTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + async fn read_partitions( + &self, + _ctx: Arc, + _push_downs: Option, + _dry_run: bool, + ) -> Result<(PartStatistics, Partitions)> { + Ok((PartStatistics::new_exact(1, 1, 1, 1), Partitions::default())) + } + + fn table_args(&self) -> Option { + Some(self.table_args.clone()) + } + + fn read_data( + &self, + ctx: Arc, + _plan: &DataSourcePlan, + pipeline: &mut Pipeline, + _put_cache: bool, + ) -> Result<()> { + pipeline.add_source( + |output| BillingUsageDailySource::create(ctx.clone(), output, self.args_parsed.clone()), + 1, + )?; + Ok(()) + } +} + +struct BillingUsageDailySource { + is_finished: bool, + args_parsed: BillingUsageDailyArgsParsed, + ctx: Arc, +} + +impl BillingUsageDailySource { + fn create( + ctx: Arc, + output: Arc, + args_parsed: BillingUsageDailyArgsParsed, + ) -> Result { + AsyncSourcer::create(ctx.get_scan_progress(), output, Self { + ctx, + args_parsed, + is_finished: false, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for BillingUsageDailySource { + const NAME: &'static str = "billing_usage_daily"; + + async fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + self.is_finished = true; + + if GlobalConfig::instance() + .query + .common + .cloud_control_grpc_server_address + .is_none() + { + return Err(ErrorCode::CloudControlNotEnabled( + "cannot view billing_usage_daily without cloud control enabled, please set cloud_control_grpc_server_address in config", + )); + } + + let cloud_api = CloudControlApiProvider::instance(); + let tenant = self.ctx.get_tenant(); + let query_id = self.ctx.get_id(); + let user = self + .ctx + .get_current_user()? + .identity() + .display() + .to_string(); + + let cfg = build_client_config( + tenant.tenant_name().to_string(), + user.clone(), + query_id.clone(), + cloud_api.get_timeout(), + ); + let req = GetBillingUsageDailyRequest { + tenant_id: tenant.tenant_name().to_string(), + billing_month: self.args_parsed.month.clone(), + sql_user: user, + query_id, + }; + let resp = cloud_api + .get_billing_client() + .get_billing_usage_daily(make_request(req, cfg)) + .await?; + Ok(Some(parse_billing_usage_daily_response(resp)?)) + } +} + +impl TableFunction for BillingUsageDailyTable { + fn function_name(&self) -> &str { + self.name() + } + + fn as_table<'a>(self: Arc) -> Arc + where Self: 'a { + self + } +} + +#[derive(Clone, Debug)] +struct BillingUsageDailyArgsParsed { + month: String, +} + +impl BillingUsageDailyArgsParsed { + fn parse(table_args: &TableArgs) -> Result { + let args = table_args.expect_all_named("billing_usage_daily")?; + if args.len() != 1 { + return Err(ErrorCode::BadArguments( + "billing_usage_daily requires exactly one named argument: month".to_string(), + )); + } + + let mut month = None; + for (k, v) in &args { + match k.to_lowercase().as_str() { + "month" => month = v.as_string().cloned(), + _ => { + return Err(ErrorCode::BadArguments(format!( + "unknown param {} for billing_usage_daily", + k + ))); + } + } + } + + let month = month.ok_or_else(|| { + ErrorCode::BadArguments( + "billing_usage_daily requires named string argument: month".to_string(), + ) + })?; + + validate_month(&month)?; + Ok(Self { month }) + } +} + +fn validate_month(month: &str) -> Result<()> { + NaiveDate::parse_from_str(&format!("{month}-01"), "%Y-%m-%d").map_err(|_| { + ErrorCode::BadArguments("invalid month format, expected YYYY-MM".to_string()) + })?; + Ok(()) +} + +fn billing_usage_daily_schema() -> DataSchemaRef { + Arc::new(DataSchema::new(vec![ + DataField::new("usage_date", DataType::String), + DataField::new("usage_type", DataType::String), + DataField::new("service_type", DataType::String), + DataField::new("resource_name", DataType::String), + DataField::new("usage", DataType::String), + DataField::new("usage_unit", DataType::String), + DataField::new("rate", DataType::String), + DataField::new("rate_unit", DataType::String), + DataField::new("usage_in_currency", DataType::String), + DataField::new("currency", DataType::String), + DataField::new("tags", DataType::Variant), + DataField::new("details", DataType::Variant), + ])) +} + +fn parse_billing_usage_daily_response(resp: GetBillingUsageDailyResponse) -> Result { + if let Some(error) = resp.error.as_ref() { + return Err(billing_error_to_error_code( + "get_billing_usage_daily", + error, + )); + } + + Ok(parse_billing_usage_daily_to_datablock(resp)) +} + +fn billing_error_to_error_code(operation: &str, error: &PbBillingError) -> ErrorCode { + ErrorCode::CloudControlConnectError(format!( + "cloud control {operation} failed: {} (kind: {}, code: {})", + error.message, error.kind, error.code + )) +} + +fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> DataBlock { + let mut usage_date = Vec::with_capacity(resp.rows.len()); + let mut usage_type = Vec::with_capacity(resp.rows.len()); + let mut service_type = Vec::with_capacity(resp.rows.len()); + let mut resource_name = Vec::with_capacity(resp.rows.len()); + let mut usage = Vec::with_capacity(resp.rows.len()); + let mut usage_unit = Vec::with_capacity(resp.rows.len()); + let mut rate = Vec::with_capacity(resp.rows.len()); + let mut rate_unit = Vec::with_capacity(resp.rows.len()); + let mut usage_in_currency = Vec::with_capacity(resp.rows.len()); + let mut currency = Vec::with_capacity(resp.rows.len()); + let mut tags = Vec::with_capacity(resp.rows.len()); + let mut details = Vec::with_capacity(resp.rows.len()); + + for row in resp.rows { + usage_date.push(row.usage_date); + usage_type.push(row.usage_type); + service_type.push(row.service_type); + resource_name.push(row.resource_name); + usage.push(row.usage); + usage_unit.push(row.usage_unit); + rate.push(row.rate); + rate_unit.push(row.rate_unit); + usage_in_currency.push(row.usage_in_currency); + currency.push(row.currency); + tags.push(serde_json::to_vec(&row.tags).unwrap_or_else(|_| b"{}".to_vec())); + details.push(json_text_to_variant(&row.details)); + } + + DataBlock::new_from_columns(vec![ + StringType::from_data(usage_date), + StringType::from_data(usage_type), + StringType::from_data(service_type), + StringType::from_data(resource_name), + StringType::from_data(usage), + StringType::from_data(usage_unit), + StringType::from_data(rate), + StringType::from_data(rate_unit), + StringType::from_data(usage_in_currency), + StringType::from_data(currency), + VariantType::from_data(tags), + VariantType::from_data(details), + ]) +} + +fn json_text_to_variant(raw: &str) -> Vec { + if raw.trim().is_empty() { + return b"{}".to_vec(); + } + + match serde_json::from_str::(raw) { + Ok(value) => serde_json::to_vec(&value).unwrap_or_else(|_| b"{}".to_vec()), + Err(_) => { + serde_json::to_vec(&Value::String(raw.to_string())).unwrap_or_else(|_| b"\"\"".to_vec()) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::collections::HashMap; + + use databend_common_catalog::table_args::TableArgs; + use databend_common_expression::Scalar; + use databend_common_expression::ScalarRef; + + use super::*; + + #[test] + fn test_parse_args_accepts_named_month() { + let args = TableArgs::new_named(HashMap::from([( + "month".to_string(), + Scalar::String("2026-03".to_string()), + )])); + + let parsed = BillingUsageDailyArgsParsed::parse(&args).unwrap(); + assert_eq!(parsed.month, "2026-03"); + } + + #[test] + fn test_parse_args_accepts_uppercase_named_month() { + let args = TableArgs::new_named(HashMap::from([( + "MONTH".to_string(), + Scalar::String("2026-03".to_string()), + )])); + + let parsed = BillingUsageDailyArgsParsed::parse(&args).unwrap(); + assert_eq!(parsed.month, "2026-03"); + } + + #[test] + fn test_parse_args_rejects_non_named_or_invalid_month() { + let positioned = TableArgs::new_positioned(vec![Scalar::String("2026-03".to_string())]); + assert!(BillingUsageDailyArgsParsed::parse(&positioned).is_err()); + + let missing = TableArgs::new_named(HashMap::new()); + assert!(BillingUsageDailyArgsParsed::parse(&missing).is_err()); + + let invalid = TableArgs::new_named(HashMap::from([( + "month".to_string(), + Scalar::String("202603".to_string()), + )])); + assert!(BillingUsageDailyArgsParsed::parse(&invalid).is_err()); + } + + #[test] + fn test_parse_usage_daily_response_to_datablock() { + let expected_tags = + serde_json::to_vec(&HashMap::from([("env".to_string(), "test".to_string())])).unwrap(); + let expected_details = serde_json::to_vec(&serde_json::json!({ + "cluster_name": "cl-00000", + "max_clusters": 1, + "size": "XSmall", + })) + .unwrap(); + + let block = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { + rows: vec![databend_common_cloud_control::pb::BillingUsageDailyRow { + usage_date: "2026-03-02".to_string(), + usage_type: "compute".to_string(), + service_type: "WAREHOUSE_METERING".to_string(), + resource_name: "default".to_string(), + usage: "2653".to_string(), + usage_unit: "second".to_string(), + rate: "".to_string(), + rate_unit: "second".to_string(), + usage_in_currency: "0.737".to_string(), + currency: "¥".to_string(), + tags: BTreeMap::from([("env".to_string(), "test".to_string())]), + details: "{\"cluster_name\":\"cl-00000\",\"max_clusters\":1,\"size\":\"XSmall\"}" + .to_string(), + }], + error: None, + }); + + assert_eq!(block.num_rows(), 1); + assert_eq!(block.num_columns(), 12); + + assert_eq!( + block.get_by_offset(0).index(0).unwrap(), + ScalarRef::String("2026-03-02") + ); + assert_eq!( + block.get_by_offset(1).index(0).unwrap(), + ScalarRef::String("compute") + ); + assert_eq!( + block.get_by_offset(8).index(0).unwrap(), + ScalarRef::String("0.737") + ); + assert_eq!( + block.get_by_offset(9).index(0).unwrap(), + ScalarRef::String("¥") + ); + + match block.get_by_offset(10).index(0).unwrap() { + ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_tags.as_slice()), + other => panic!("unexpected scalar type for tags: {other:?}"), + } + + match block.get_by_offset(11).index(0).unwrap() { + ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_details.as_slice()), + other => panic!("unexpected scalar type for details: {other:?}"), + } + } + + #[test] + fn test_parse_usage_daily_response_preserves_invalid_details_as_string_variant() { + let block = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { + rows: vec![databend_common_cloud_control::pb::BillingUsageDailyRow { + usage_date: "2026-03-02".to_string(), + usage_type: "storage".to_string(), + service_type: "STORAGE".to_string(), + resource_name: String::new(), + usage: "1580580967006".to_string(), + usage_unit: "byte".to_string(), + rate: "".to_string(), + rate_unit: "tb_day".to_string(), + usage_in_currency: "1.067".to_string(), + currency: "$".to_string(), + tags: BTreeMap::new(), + details: "not-json".to_string(), + }], + error: None, + }); + + let expected = serde_json::to_vec(&Value::String("not-json".to_string())).unwrap(); + match block.get_by_offset(11).index(0).unwrap() { + ScalarRef::Variant(bytes) => assert_eq!(bytes, expected.as_slice()), + other => panic!("unexpected scalar type for details: {other:?}"), + } + } + + #[test] + fn test_parse_usage_daily_response_surfaces_task_error() { + let err = parse_billing_usage_daily_response(GetBillingUsageDailyResponse { + rows: vec![], + error: Some(PbBillingError { + kind: "Internal".to_string(), + message: "billing usage unavailable".to_string(), + code: 500, + }), + }) + .expect_err("billing error should be returned"); + + assert!(err.message().contains("get_billing_usage_daily")); + assert!(err.message().contains("billing usage unavailable")); + assert!(err.message().contains("Internal")); + assert!(err.message().contains("500")); + } +} diff --git a/src/query/task_support/src/table_functions/mod.rs b/src/query/task_support/src/table_functions/mod.rs index 5fc630727dd7f..9e210ea4ac948 100644 --- a/src/query/task_support/src/table_functions/mod.rs +++ b/src/query/task_support/src/table_functions/mod.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod billing_usage_daily; mod task_dependents; mod task_dependents_enable; mod task_history; +pub use billing_usage_daily::BillingUsageDailyTable; pub use task_dependents::TaskDependentsTable; pub use task_dependents_enable::TaskDependentsEnableTable; pub use task_history::TaskHistoryTable; From 4e8752d68189b4c38977f868a4b30e6d5adefa44 Mon Sep 17 00:00:00 2001 From: smallfish Date: Thu, 30 Apr 2026 14:59:14 +0800 Subject: [PATCH 2/5] update: tags/details with jsonb, add permission check --- Cargo.lock | 1 + .../tests/it/table_functions/billing_usage.rs | 109 ++++++++++++++++-- src/query/task_support/Cargo.toml | 1 + .../table_functions/billing_usage_daily.rs | 86 ++++++++------ 4 files changed, 151 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0befddf542965..25d8032b3305b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6154,6 +6154,7 @@ dependencies = [ "databend-common-users", "itertools 0.13.0", "jiff", + "jsonb", "serde_json", ] diff --git a/src/query/service/tests/it/table_functions/billing_usage.rs b/src/query/service/tests/it/table_functions/billing_usage.rs index 5b3485a932b6b..b2a320d2325b1 100644 --- a/src/query/service/tests/it/table_functions/billing_usage.rs +++ b/src/query/service/tests/it/table_functions/billing_usage.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::collections::BTreeMap; -use std::collections::HashMap; +use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; use databend_common_base::runtime; +use databend_common_catalog::session_type::SessionType; use databend_common_cloud_control::pb::BillingError as PbBillingError; use databend_common_cloud_control::pb::BillingUsageDailyRow; use databend_common_cloud_control::pb::GetBillingUsageDailyRequest; @@ -27,9 +28,14 @@ use databend_common_cloud_control::pb::billing_service_server::BillingServiceSer use databend_common_config::InnerConfig; use databend_common_expression::DataBlock; use databend_common_expression::ScalarRef; +use databend_common_meta_app::principal::UserInfo; +use databend_common_version::BUILD_INFO; +use databend_query::interpreters::InterpreterFactory; +use databend_query::sql::Planner; use databend_query::test_kits::ConfigBuilder; use databend_query::test_kits::TestFixture; use futures::TryStreamExt; +use jsonb::OwnedJsonb; use tokio::net::TcpListener; use tokio::sync::oneshot; use tokio_stream::wrappers::TcpListenerStream; @@ -157,27 +163,56 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu ScalarRef::String("¥") ); - let expected_tags = - serde_json::to_vec(&HashMap::from([("env".to_string(), "test".to_string())]))?; + let expected_tags = OwnedJsonb::from_str(r#"{"env":"test"}"#)?.to_vec(); match block.get_by_offset(6).index(0).unwrap() { ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_tags.as_slice()), other => panic!("unexpected scalar type for tags: {other:?}"), } - let expected_details = serde_json::to_vec(&serde_json::json!({ - "cluster_name": "cl-00000", - "max_clusters": 1, - "size": "XSmall", - }))?; + let expected_details = + OwnedJsonb::from_str(r#"{"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"}"#)? + .to_vec(); match block.get_by_offset(7).index(0).unwrap() { ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_details.as_slice()), other => panic!("unexpected scalar type for details: {other:?}"), } + let blocks = fixture + .execute_query( + "select tags:env::String, details:size::String, details:max_clusters::String \ + from billing_usage_daily(month => '2026-03') \ + where tags:env::String = 'test' and details:size::String = 'XSmall'", + ) + .await? + .try_collect::>() + .await?; + let block = extract_single_block(blocks); + + assert_eq!( + block.get_by_offset(0).index(0).unwrap(), + ScalarRef::String("test") + ); + assert_eq!( + block.get_by_offset(1).index(0).unwrap(), + ScalarRef::String("XSmall") + ); + assert_eq!( + block.get_by_offset(2).index(0).unwrap(), + ScalarRef::String("1") + ); + let usage_daily_requests = requests.usage_daily.lock().unwrap().clone(); - assert_eq!(usage_daily_requests.len(), 1); - assert_eq!(usage_daily_requests[0].billing_month, "2026-03"); - assert_eq!(usage_daily_requests[0].sql_user, "'root'@'%'"); + assert_eq!(usage_daily_requests.len(), 2); + assert!( + usage_daily_requests + .iter() + .all(|request| request.billing_month == "2026-03") + ); + assert!( + usage_daily_requests + .iter() + .all(|request| request.sql_user == "'root'@'%'") + ); assert!(!usage_daily_requests[0].tenant_id.is_empty()); assert!(!usage_daily_requests[0].query_id.is_empty()); @@ -230,3 +265,55 @@ async fn test_billing_usage_daily_table_function_surfaces_task_error() -> anyhow server_handle.await??; Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn test_billing_usage_daily_table_function_requires_super_privilege() -> anyhow::Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let requests = BillingRequests::default(); + let mock = MockBillingServiceImpl { + requests: requests.clone(), + usage_daily_error: None, + }; + + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_handle = runtime::spawn(async move { + Server::builder() + .add_service(BillingServiceServer::new(mock)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async { + let _ = shutdown_rx.await; + }) + .await + }); + + let mut config: InnerConfig = ConfigBuilder::create().build(); + config.query.common.cloud_control_grpc_server_address = Some(format!("http://{addr}")); + config.query.common.cloud_control_grpc_timeout = 5; + + let fixture = TestFixture::setup_with_config(&config).await?; + let session = fixture.new_session_with_type(SessionType::Dummy).await?; + session + .set_authed_user(UserInfo::new_no_auth("billing_viewer", "%"), None) + .await?; + + let ctx = session.create_query_context(&BUILD_INFO).await?; + let mut planner = Planner::new(ctx.clone()); + let (plan, _) = planner + .plan_sql("select * from billing_usage_daily(month => '2026-03')") + .await?; + let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; + let stream = executor.execute(ctx).await?; + let err = stream + .try_collect::>() + .await + .expect_err("billing usage should require SUPER privilege"); + + assert!(err.message().contains("privilege [SUPER] is required")); + assert!(err.message().contains("billing_usage_daily")); + assert!(requests.usage_daily.lock().unwrap().is_empty()); + + let _ = shutdown_tx.send(()); + server_handle.await??; + Ok(()) +} diff --git a/src/query/task_support/Cargo.toml b/src/query/task_support/Cargo.toml index 6a7e636cbe191..2134d56507024 100644 --- a/src/query/task_support/Cargo.toml +++ b/src/query/task_support/Cargo.toml @@ -30,6 +30,7 @@ databend-common-storages-system = { workspace = true } databend-common-users = { workspace = true } itertools = { workspace = true } jiff = { workspace = true } +jsonb = { workspace = true } serde_json = { workspace = true } [lints] diff --git a/src/query/task_support/src/table_functions/billing_usage_daily.rs b/src/query/task_support/src/table_functions/billing_usage_daily.rs index c20cda2fe29ae..9561224150e4b 100644 --- a/src/query/task_support/src/table_functions/billing_usage_daily.rs +++ b/src/query/task_support/src/table_functions/billing_usage_daily.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::str::FromStr; use std::sync::Arc; use chrono::DateTime; @@ -42,6 +43,8 @@ use databend_common_expression::infer_table_schema; use databend_common_expression::types::DataType; use databend_common_expression::types::StringType; use databend_common_expression::types::VariantType; +use databend_common_meta_app::principal::GrantObject; +use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; @@ -51,7 +54,7 @@ use databend_common_pipeline::core::ProcessorPtr; use databend_common_pipeline::sources::AsyncSource; use databend_common_pipeline::sources::AsyncSourcer; use databend_common_storages_factory::Table; -use serde_json::Value; +use jsonb::OwnedJsonb; pub struct BillingUsageDailyTable { table_info: TableInfo, @@ -169,6 +172,8 @@ impl AsyncSource for BillingUsageDailySource { )); } + ensure_billing_usage_privilege(&self.ctx).await?; + let cloud_api = CloudControlApiProvider::instance(); let tenant = self.ctx.get_tenant(); let query_id = self.ctx.get_id(); @@ -199,6 +204,21 @@ impl AsyncSource for BillingUsageDailySource { } } +async fn ensure_billing_usage_privilege(ctx: &Arc) -> Result<()> { + if ctx + .validate_privilege(&GrantObject::Global, UserPrivilegeType::Super, false) + .await + .is_ok() + { + return Ok(()); + } + + let user = ctx.get_current_user()?.identity().display().to_string(); + Err(ErrorCode::PermissionDenied(format!( + "Permission denied: privilege [SUPER] is required on *.* to query billing_usage_daily for user {user}", + ))) +} + impl TableFunction for BillingUsageDailyTable { fn function_name(&self) -> &str { self.name() @@ -280,7 +300,7 @@ fn parse_billing_usage_daily_response(resp: GetBillingUsageDailyResponse) -> Res )); } - Ok(parse_billing_usage_daily_to_datablock(resp)) + parse_billing_usage_daily_to_datablock(resp) } fn billing_error_to_error_code(operation: &str, error: &PbBillingError) -> ErrorCode { @@ -290,7 +310,7 @@ fn billing_error_to_error_code(operation: &str, error: &PbBillingError) -> Error )) } -fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> DataBlock { +fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> Result { let mut usage_date = Vec::with_capacity(resp.rows.len()); let mut usage_type = Vec::with_capacity(resp.rows.len()); let mut service_type = Vec::with_capacity(resp.rows.len()); @@ -315,11 +335,14 @@ fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> rate_unit.push(row.rate_unit); usage_in_currency.push(row.usage_in_currency); currency.push(row.currency); - tags.push(serde_json::to_vec(&row.tags).unwrap_or_else(|_| b"{}".to_vec())); - details.push(json_text_to_variant(&row.details)); + let tags_json = serde_json::to_string(&row.tags).map_err(|err| { + ErrorCode::Internal(format!("failed to serialize billing usage tags: {err}")) + })?; + tags.push(json_text_to_variant("tags", &tags_json)?); + details.push(json_text_to_variant("details", &row.details)?); } - DataBlock::new_from_columns(vec![ + Ok(DataBlock::new_from_columns(vec![ StringType::from_data(usage_date), StringType::from_data(usage_type), StringType::from_data(service_type), @@ -332,20 +355,15 @@ fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> StringType::from_data(currency), VariantType::from_data(tags), VariantType::from_data(details), - ]) + ])) } -fn json_text_to_variant(raw: &str) -> Vec { - if raw.trim().is_empty() { - return b"{}".to_vec(); - } - - match serde_json::from_str::(raw) { - Ok(value) => serde_json::to_vec(&value).unwrap_or_else(|_| b"{}".to_vec()), - Err(_) => { - serde_json::to_vec(&Value::String(raw.to_string())).unwrap_or_else(|_| b"\"\"".to_vec()) - } - } +fn json_text_to_variant(field: &str, raw: &str) -> Result> { + let json = if raw.trim().is_empty() { "{}" } else { raw }; + let jsonb = OwnedJsonb::from_str(json).map_err(|err| { + ErrorCode::BadBytes(format!("invalid billing usage {field} JSON value: {err}")) + })?; + Ok(jsonb.to_vec()) } #[cfg(test)] @@ -398,14 +416,11 @@ mod tests { #[test] fn test_parse_usage_daily_response_to_datablock() { - let expected_tags = - serde_json::to_vec(&HashMap::from([("env".to_string(), "test".to_string())])).unwrap(); - let expected_details = serde_json::to_vec(&serde_json::json!({ - "cluster_name": "cl-00000", - "max_clusters": 1, - "size": "XSmall", - })) - .unwrap(); + let expected_tags = OwnedJsonb::from_str(r#"{"env":"test"}"#).unwrap().to_vec(); + let expected_details = + OwnedJsonb::from_str(r#"{"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"}"#) + .unwrap() + .to_vec(); let block = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { rows: vec![databend_common_cloud_control::pb::BillingUsageDailyRow { @@ -424,7 +439,8 @@ mod tests { .to_string(), }], error: None, - }); + }) + .unwrap(); assert_eq!(block.num_rows(), 1); assert_eq!(block.num_columns(), 12); @@ -458,8 +474,8 @@ mod tests { } #[test] - fn test_parse_usage_daily_response_preserves_invalid_details_as_string_variant() { - let block = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { + fn test_parse_usage_daily_response_rejects_invalid_details_json() { + let err = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { rows: vec![databend_common_cloud_control::pb::BillingUsageDailyRow { usage_date: "2026-03-02".to_string(), usage_type: "storage".to_string(), @@ -475,13 +491,13 @@ mod tests { details: "not-json".to_string(), }], error: None, - }); + }) + .expect_err("invalid details JSON should be rejected"); - let expected = serde_json::to_vec(&Value::String("not-json".to_string())).unwrap(); - match block.get_by_offset(11).index(0).unwrap() { - ScalarRef::Variant(bytes) => assert_eq!(bytes, expected.as_slice()), - other => panic!("unexpected scalar type for details: {other:?}"), - } + assert!( + err.message() + .contains("invalid billing usage details JSON value") + ); } #[test] From e855d466bec102c4dbd5fb2e1bf6722220ee05ad Mon Sep 17 00:00:00 2001 From: smallfish Date: Thu, 30 Apr 2026 19:47:00 +0800 Subject: [PATCH 3/5] refactor args with start/end date (remove month) --- src/common/cloud_control/proto/billing.proto | 7 +- .../cloud_control/tests/it/billing_client.rs | 7 +- .../tests/it/table_functions/billing_usage.rs | 15 ++- .../table_functions/billing_usage_daily.rs | 95 +++++++++++++------ 4 files changed, 84 insertions(+), 40 deletions(-) diff --git a/src/common/cloud_control/proto/billing.proto b/src/common/cloud_control/proto/billing.proto index f217fdfd914c3..8fd4da3f59888 100644 --- a/src/common/cloud_control/proto/billing.proto +++ b/src/common/cloud_control/proto/billing.proto @@ -10,9 +10,10 @@ message BillingError { message GetBillingUsageDailyRequest { string tenant_id = 1; - string billing_month = 2; // YYYY-MM - string sql_user = 3; // audit only, empty means absent - string query_id = 4; // audit only, empty means absent + string start_date = 2; // YYYY-MM-DD + string end_date = 3; // YYYY-MM-DD, empty means start_date + string sql_user = 4; // audit only, empty means absent + string query_id = 5; // audit only, empty means absent } message BillingUsageDailyRow { diff --git a/src/common/cloud_control/tests/it/billing_client.rs b/src/common/cloud_control/tests/it/billing_client.rs index a4e3716e3ee23..fd189ae250c19 100644 --- a/src/common/cloud_control/tests/it/billing_client.rs +++ b/src/common/cloud_control/tests/it/billing_client.rs @@ -42,7 +42,7 @@ impl BillingService for MockBillingService { ) -> std::result::Result, Status> { Ok(Response::new(GetBillingUsageDailyResponse { rows: vec![BillingUsageDailyRow { - usage_date: request.into_inner().billing_month, + usage_date: request.into_inner().start_date, usage_type: "compute".to_string(), service_type: "WAREHOUSE_METERING".to_string(), resource_name: "default".to_string(), @@ -94,13 +94,14 @@ async fn test_billing_client_success_cases() -> anyhow::Result<()> { let resp = client .get_billing_usage_daily(Request::new(GetBillingUsageDailyRequest { tenant_id: "tenant".to_string(), - billing_month: "2026-03".to_string(), + start_date: "2026-03-01".to_string(), + end_date: "2026-03-31".to_string(), sql_user: "root".to_string(), query_id: "query-1".to_string(), })) .await?; assert_eq!(resp.rows.len(), 1); - assert_eq!(resp.rows[0].usage_date, "2026-03"); + assert_eq!(resp.rows[0].usage_date, "2026-03-01"); assert_eq!(resp.rows[0].usage_type, "compute"); assert_eq!(resp.rows[0].resource_name, "default"); diff --git a/src/query/service/tests/it/table_functions/billing_usage.rs b/src/query/service/tests/it/table_functions/billing_usage.rs index b2a320d2325b1..2004753213b0d 100644 --- a/src/query/service/tests/it/table_functions/billing_usage.rs +++ b/src/query/service/tests/it/table_functions/billing_usage.rs @@ -131,7 +131,7 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu let blocks = fixture .execute_query( "select usage_date, usage_type, service_type, resource_name, usage_in_currency, currency, tags, details \ - from billing_usage_daily(month => '2026-03')", + from billing_usage_daily(start_date => '2026-03-01', end_date => '2026-03-31')", ) .await? .try_collect::>() @@ -180,7 +180,7 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu let blocks = fixture .execute_query( "select tags:env::String, details:size::String, details:max_clusters::String \ - from billing_usage_daily(month => '2026-03') \ + from billing_usage_daily(start_date => '2026-03-01', end_date => '2026-03-31') \ where tags:env::String = 'test' and details:size::String = 'XSmall'", ) .await? @@ -206,7 +206,12 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu assert!( usage_daily_requests .iter() - .all(|request| request.billing_month == "2026-03") + .all(|request| request.start_date == "2026-03-01") + ); + assert!( + usage_daily_requests + .iter() + .all(|request| request.end_date == "2026-03-31") ); assert!( usage_daily_requests @@ -252,7 +257,7 @@ async fn test_billing_usage_daily_table_function_surfaces_task_error() -> anyhow let fixture = TestFixture::setup_with_config(&config).await?; let stream = fixture - .execute_query("select * from billing_usage_daily(month => '2026-03')") + .execute_query("select * from billing_usage_daily(start_date => '2026-03-01')") .await?; let err = stream .try_collect::>() @@ -300,7 +305,7 @@ async fn test_billing_usage_daily_table_function_requires_super_privilege() -> a let ctx = session.create_query_context(&BUILD_INFO).await?; let mut planner = Planner::new(ctx.clone()); let (plan, _) = planner - .plan_sql("select * from billing_usage_daily(month => '2026-03')") + .plan_sql("select * from billing_usage_daily(start_date => '2026-03-01')") .await?; let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; let stream = executor.execute(ctx).await?; diff --git a/src/query/task_support/src/table_functions/billing_usage_daily.rs b/src/query/task_support/src/table_functions/billing_usage_daily.rs index 9561224150e4b..4fc26bd38d75a 100644 --- a/src/query/task_support/src/table_functions/billing_usage_daily.rs +++ b/src/query/task_support/src/table_functions/billing_usage_daily.rs @@ -192,7 +192,8 @@ impl AsyncSource for BillingUsageDailySource { ); let req = GetBillingUsageDailyRequest { tenant_id: tenant.tenant_name().to_string(), - billing_month: self.args_parsed.month.clone(), + start_date: self.args_parsed.start_date.clone(), + end_date: self.args_parsed.end_date.clone(), sql_user: user, query_id, }; @@ -232,22 +233,25 @@ impl TableFunction for BillingUsageDailyTable { #[derive(Clone, Debug)] struct BillingUsageDailyArgsParsed { - month: String, + start_date: String, + end_date: String, } impl BillingUsageDailyArgsParsed { fn parse(table_args: &TableArgs) -> Result { let args = table_args.expect_all_named("billing_usage_daily")?; - if args.len() != 1 { + if args.is_empty() || args.len() > 2 { return Err(ErrorCode::BadArguments( - "billing_usage_daily requires exactly one named argument: month".to_string(), + "billing_usage_daily requires named argument: start_date, and optional argument: end_date".to_string(), )); } - let mut month = None; + let mut start_date = None; + let mut end_date = None; for (k, v) in &args { match k.to_lowercase().as_str() { - "month" => month = v.as_string().cloned(), + "start_date" => start_date = v.as_string().cloned(), + "end_date" => end_date = v.as_string().cloned(), _ => { return Err(ErrorCode::BadArguments(format!( "unknown param {} for billing_usage_daily", @@ -257,22 +261,35 @@ impl BillingUsageDailyArgsParsed { } } - let month = month.ok_or_else(|| { + let start_date = start_date.ok_or_else(|| { ErrorCode::BadArguments( - "billing_usage_daily requires named string argument: month".to_string(), + "billing_usage_daily requires named string argument: start_date".to_string(), ) })?; + let start = parse_date_arg("start_date", &start_date)?; - validate_month(&month)?; - Ok(Self { month }) + let end_date = end_date.unwrap_or_else(|| start_date.clone()); + let end = parse_date_arg("end_date", &end_date)?; + if end < start { + return Err(ErrorCode::BadArguments( + "end_date must be greater than or equal to start_date".to_string(), + )); + } + + Ok(Self { + start_date, + end_date, + }) } } -fn validate_month(month: &str) -> Result<()> { - NaiveDate::parse_from_str(&format!("{month}-01"), "%Y-%m-%d").map_err(|_| { - ErrorCode::BadArguments("invalid month format, expected YYYY-MM".to_string()) - })?; - Ok(()) +fn parse_date_arg(name: &str, date: &str) -> Result { + parse_date(date) + .map_err(|_| ErrorCode::BadArguments(format!("invalid {name} format, expected YYYY-MM-DD"))) +} + +fn parse_date(date: &str) -> std::result::Result { + NaiveDate::parse_from_str(date, "%Y-%m-%d") } fn billing_usage_daily_schema() -> DataSchemaRef { @@ -378,40 +395,60 @@ mod tests { use super::*; #[test] - fn test_parse_args_accepts_named_month() { + fn test_parse_args_accepts_start_date_only() { let args = TableArgs::new_named(HashMap::from([( - "month".to_string(), - Scalar::String("2026-03".to_string()), + "start_date".to_string(), + Scalar::String("2026-03-02".to_string()), )])); let parsed = BillingUsageDailyArgsParsed::parse(&args).unwrap(); - assert_eq!(parsed.month, "2026-03"); + assert_eq!(parsed.start_date, "2026-03-02"); + assert_eq!(parsed.end_date, "2026-03-02"); } #[test] - fn test_parse_args_accepts_uppercase_named_month() { - let args = TableArgs::new_named(HashMap::from([( - "MONTH".to_string(), - Scalar::String("2026-03".to_string()), - )])); + fn test_parse_args_accepts_uppercase_named_date_range() { + let args = TableArgs::new_named(HashMap::from([ + ( + "START_DATE".to_string(), + Scalar::String("2026-03-01".to_string()), + ), + ( + "END_DATE".to_string(), + Scalar::String("2026-03-31".to_string()), + ), + ])); let parsed = BillingUsageDailyArgsParsed::parse(&args).unwrap(); - assert_eq!(parsed.month, "2026-03"); + assert_eq!(parsed.start_date, "2026-03-01"); + assert_eq!(parsed.end_date, "2026-03-31"); } #[test] - fn test_parse_args_rejects_non_named_or_invalid_month() { - let positioned = TableArgs::new_positioned(vec![Scalar::String("2026-03".to_string())]); + fn test_parse_args_rejects_non_named_or_invalid_date_range() { + let positioned = TableArgs::new_positioned(vec![Scalar::String("2026-03-01".to_string())]); assert!(BillingUsageDailyArgsParsed::parse(&positioned).is_err()); let missing = TableArgs::new_named(HashMap::new()); assert!(BillingUsageDailyArgsParsed::parse(&missing).is_err()); let invalid = TableArgs::new_named(HashMap::from([( - "month".to_string(), - Scalar::String("202603".to_string()), + "start_date".to_string(), + Scalar::String("20260301".to_string()), )])); assert!(BillingUsageDailyArgsParsed::parse(&invalid).is_err()); + + let reversed = TableArgs::new_named(HashMap::from([ + ( + "start_date".to_string(), + Scalar::String("2026-03-02".to_string()), + ), + ( + "end_date".to_string(), + Scalar::String("2026-03-01".to_string()), + ), + ])); + assert!(BillingUsageDailyArgsParsed::parse(&reversed).is_err()); } #[test] From 1fda1e53942c17e2728d247a6599ff917324a282 Mon Sep 17 00:00:00 2001 From: smallfish Date: Thu, 30 Apr 2026 22:04:11 +0800 Subject: [PATCH 4/5] updated with review --- .../table_functions/billing_usage_daily.rs | 197 ++++++++++++++++-- src/query/service/src/table_functions/mod.rs | 2 + .../table_functions/table_function_factory.rs | 13 +- .../tests/it/table_functions/billing_usage.rs | 79 ++++++- .../service/tests/it/table_functions/mod.rs | 1 - src/query/task_support/Cargo.toml | 1 - .../task_support/src/table_functions/mod.rs | 2 - 7 files changed, 257 insertions(+), 38 deletions(-) rename src/query/{task_support => service}/src/table_functions/billing_usage_daily.rs (74%) diff --git a/src/query/task_support/src/table_functions/billing_usage_daily.rs b/src/query/service/src/table_functions/billing_usage_daily.rs similarity index 74% rename from src/query/task_support/src/table_functions/billing_usage_daily.rs rename to src/query/service/src/table_functions/billing_usage_daily.rs index 4fc26bd38d75a..8910c3aa0f25a 100644 --- a/src/query/task_support/src/table_functions/billing_usage_daily.rs +++ b/src/query/service/src/table_functions/billing_usage_daily.rs @@ -41,6 +41,9 @@ use databend_common_expression::DataSchemaRef; use databend_common_expression::FromData; use databend_common_expression::infer_table_schema; use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::DecimalSize; use databend_common_expression::types::StringType; use databend_common_expression::types::VariantType; use databend_common_meta_app::principal::GrantObject; @@ -206,18 +209,22 @@ impl AsyncSource for BillingUsageDailySource { } async fn ensure_billing_usage_privilege(ctx: &Arc) -> Result<()> { - if ctx + match ctx .validate_privilege(&GrantObject::Global, UserPrivilegeType::Super, false) .await - .is_ok() { - return Ok(()); - } + Ok(_) => Ok(()), + Err(err) => { + if err.code() != ErrorCode::PERMISSION_DENIED { + return Err(err); + } - let user = ctx.get_current_user()?.identity().display().to_string(); - Err(ErrorCode::PermissionDenied(format!( - "Permission denied: privilege [SUPER] is required on *.* to query billing_usage_daily for user {user}", - ))) + let user = ctx.get_current_user()?.identity().display().to_string(); + Err(ErrorCode::PermissionDenied(format!( + "Permission denied: privilege [SUPER] is required on *.* to query billing_usage_daily for user {user}", + ))) + } + } } impl TableFunction for BillingUsageDailyTable { @@ -292,17 +299,28 @@ fn parse_date(date: &str) -> std::result::Result NaiveDate::parse_from_str(date, "%Y-%m-%d") } +fn usage_decimal_size() -> DecimalSize { + DecimalSize::new_unchecked(38, 0) +} + +fn money_decimal_size() -> DecimalSize { + DecimalSize::new_unchecked(38, 12) +} + fn billing_usage_daily_schema() -> DataSchemaRef { Arc::new(DataSchema::new(vec![ - DataField::new("usage_date", DataType::String), + DataField::new("usage_date", DataType::Date), DataField::new("usage_type", DataType::String), DataField::new("service_type", DataType::String), DataField::new("resource_name", DataType::String), - DataField::new("usage", DataType::String), + DataField::new("usage", DataType::Decimal(usage_decimal_size())), DataField::new("usage_unit", DataType::String), - DataField::new("rate", DataType::String), + DataField::new( + "rate", + DataType::Nullable(Box::new(DataType::Decimal(money_decimal_size()))), + ), DataField::new("rate_unit", DataType::String), - DataField::new("usage_in_currency", DataType::String), + DataField::new("usage_in_currency", DataType::Decimal(money_decimal_size())), DataField::new("currency", DataType::String), DataField::new("tags", DataType::Variant), DataField::new("details", DataType::Variant), @@ -342,15 +360,27 @@ fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> let mut details = Vec::with_capacity(resp.rows.len()); for row in resp.rows { - usage_date.push(row.usage_date); + usage_date.push(parse_usage_date("usage_date", &row.usage_date)?); usage_type.push(row.usage_type); service_type.push(row.service_type); resource_name.push(row.resource_name); - usage.push(row.usage); + usage.push(parse_decimal_field( + "usage", + &row.usage, + usage_decimal_size(), + )?); usage_unit.push(row.usage_unit); - rate.push(row.rate); + rate.push(parse_optional_decimal_field( + "rate", + &row.rate, + money_decimal_size(), + )?); rate_unit.push(row.rate_unit); - usage_in_currency.push(row.usage_in_currency); + usage_in_currency.push(parse_decimal_field( + "usage_in_currency", + &row.usage_in_currency, + money_decimal_size(), + )?); currency.push(row.currency); let tags_json = serde_json::to_string(&row.tags).map_err(|err| { ErrorCode::Internal(format!("failed to serialize billing usage tags: {err}")) @@ -360,21 +390,108 @@ fn parse_billing_usage_daily_to_datablock(resp: GetBillingUsageDailyResponse) -> } Ok(DataBlock::new_from_columns(vec![ - StringType::from_data(usage_date), + DateType::from_data(usage_date), StringType::from_data(usage_type), StringType::from_data(service_type), StringType::from_data(resource_name), - StringType::from_data(usage), + Decimal128Type::from_data_with_size(usage, Some(usage_decimal_size())), StringType::from_data(usage_unit), - StringType::from_data(rate), + Decimal128Type::from_opt_data_with_size(rate, Some(money_decimal_size())), StringType::from_data(rate_unit), - StringType::from_data(usage_in_currency), + Decimal128Type::from_data_with_size(usage_in_currency, Some(money_decimal_size())), StringType::from_data(currency), VariantType::from_data(tags), VariantType::from_data(details), ])) } +fn parse_usage_date(field: &str, raw: &str) -> Result { + let date = parse_date(raw).map_err(|err| { + ErrorCode::BadBytes(format!("invalid billing usage {field} date value: {err}")) + })?; + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + Ok(date.signed_duration_since(epoch).num_days() as i32) +} + +fn parse_optional_decimal_field(field: &str, raw: &str, size: DecimalSize) -> Result> { + if raw.trim().is_empty() { + return Ok(None); + } + parse_decimal_field(field, raw, size).map(Some) +} + +fn parse_decimal_field(field: &str, raw: &str, size: DecimalSize) -> Result { + parse_decimal(raw.trim(), size).map_err(|err| { + ErrorCode::BadBytes(format!( + "invalid billing usage {field} decimal value '{raw}': {err}" + )) + }) +} + +fn parse_decimal(raw: &str, size: DecimalSize) -> std::result::Result { + if raw.is_empty() { + return Err("empty value".to_string()); + } + + let (negative, digits) = match raw.as_bytes()[0] { + b'+' => (false, &raw[1..]), + b'-' => (true, &raw[1..]), + _ => (false, raw), + }; + if digits.is_empty() { + return Err("missing digits".to_string()); + } + + let mut parts = digits.split('.'); + let int_part = parts.next().unwrap(); + let frac_part = parts.next().unwrap_or(""); + if parts.next().is_some() { + return Err("multiple decimal points".to_string()); + } + if int_part.is_empty() && frac_part.is_empty() { + return Err("missing digits".to_string()); + } + if !int_part.bytes().all(|b| b.is_ascii_digit()) + || !frac_part.bytes().all(|b| b.is_ascii_digit()) + { + return Err("unexpected character".to_string()); + } + + let scale = size.scale() as usize; + let integer_digits_limit = (size.precision() - size.scale()) as usize; + let integer_digits = int_part.trim_start_matches('0').len(); + if integer_digits > integer_digits_limit { + return Err("decimal overflow".to_string()); + } + + if frac_part.len() > scale && frac_part.as_bytes()[scale..].iter().any(|b| *b != b'0') { + return Err(format!( + "scale exceeds declared decimal scale {}", + size.scale() + )); + } + + let mut value = 0_i128; + for b in int_part.bytes().chain(frac_part.bytes().take(scale)) { + value = value + .checked_mul(10) + .and_then(|v| v.checked_add((b - b'0') as i128)) + .ok_or_else(|| "decimal overflow".to_string())?; + } + for _ in frac_part.len()..scale { + value = value + .checked_mul(10) + .ok_or_else(|| "decimal overflow".to_string())?; + } + + if negative { + value = value + .checked_neg() + .ok_or_else(|| "decimal overflow".to_string())?; + } + Ok(value) +} + fn json_text_to_variant(field: &str, raw: &str) -> Result> { let json = if raw.trim().is_empty() { "{}" } else { raw }; let jsonb = OwnedJsonb::from_str(json).map_err(|err| { @@ -391,6 +508,7 @@ mod tests { use databend_common_catalog::table_args::TableArgs; use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; + use databend_common_expression::types::DecimalScalar; use super::*; @@ -484,15 +602,23 @@ mod tests { assert_eq!( block.get_by_offset(0).index(0).unwrap(), - ScalarRef::String("2026-03-02") + ScalarRef::Date(parse_usage_date("usage_date", "2026-03-02").unwrap()) ); assert_eq!( block.get_by_offset(1).index(0).unwrap(), ScalarRef::String("compute") ); + assert_eq!( + block.get_by_offset(4).index(0).unwrap(), + ScalarRef::Decimal(DecimalScalar::Decimal128(2653, usage_decimal_size())) + ); + assert_eq!(block.get_by_offset(6).index(0).unwrap(), ScalarRef::Null); assert_eq!( block.get_by_offset(8).index(0).unwrap(), - ScalarRef::String("0.737") + ScalarRef::Decimal(DecimalScalar::Decimal128( + 737_000_000_000, + money_decimal_size() + )) ); assert_eq!( block.get_by_offset(9).index(0).unwrap(), @@ -510,6 +636,33 @@ mod tests { } } + #[test] + fn test_parse_usage_daily_response_rejects_invalid_typed_values() { + let err = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { + rows: vec![databend_common_cloud_control::pb::BillingUsageDailyRow { + usage_date: "2026-03-02".to_string(), + usage_type: "compute".to_string(), + service_type: "WAREHOUSE_METERING".to_string(), + resource_name: "default".to_string(), + usage: "1.1".to_string(), + usage_unit: "second".to_string(), + rate: "".to_string(), + rate_unit: "second".to_string(), + usage_in_currency: "0.737".to_string(), + currency: "$".to_string(), + tags: BTreeMap::new(), + details: "{}".to_string(), + }], + error: None, + }) + .expect_err("fractional usage should be rejected for Decimal(38, 0)"); + + assert!( + err.message() + .contains("invalid billing usage usage decimal value") + ); + } + #[test] fn test_parse_usage_daily_response_rejects_invalid_details_json() { let err = parse_billing_usage_daily_to_datablock(GetBillingUsageDailyResponse { diff --git a/src/query/service/src/table_functions/mod.rs b/src/query/service/src/table_functions/mod.rs index d3881ec7e3b12..3492970155987 100644 --- a/src/query/service/src/table_functions/mod.rs +++ b/src/query/service/src/table_functions/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod async_crash_me; +mod billing_usage_daily; mod copy_history; mod fuse_vacuum2; #[cfg(feature = "storage-stage")] @@ -35,6 +36,7 @@ mod tag_references; mod temporary_tables_table; mod udf_table; +pub use billing_usage_daily::BillingUsageDailyTable; pub use copy_history::CopyHistoryTable; pub use numbers::NumbersPartInfo; pub use numbers::NumbersTable; diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index 532565f9abf8f..ce6194ee2c62c 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -39,8 +39,6 @@ use databend_common_storages_iceberg::IcebergInspectTable; use databend_common_storages_stream::stream_status_table_func::StreamStatusTable; use databend_meta_client::types::MetaId; #[cfg(feature = "task-support")] -use databend_query_task_support::table_functions::BillingUsageDailyTable; -#[cfg(feature = "task-support")] use databend_query_task_support::table_functions::TaskDependentsEnableTable; #[cfg(feature = "task-support")] use databend_query_task_support::table_functions::TaskDependentsTable; @@ -51,6 +49,7 @@ use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUNC_ID_BEGIN; use itertools::Itertools; use parking_lot::RwLock; +use super::BillingUsageDailyTable; use super::LicenseInfoTable; use super::TenantQuotaTable; use super::others::UdfEchoTable; @@ -111,9 +110,6 @@ pub struct TableFunctionFactory { impl TableFunctionFactory { pub fn create(config: &InnerConfig) -> Self { - #[cfg(not(feature = "task-support"))] - let _ = config; - let mut id = SYS_TBL_FUNC_ID_BEGIN; let mut next_id = || -> MetaId { if id >= SYS_TBL_FUC_ID_END { @@ -346,7 +342,14 @@ impl TableFunctionFactory { "task_history".to_string(), (next_id(), Arc::new(TaskHistoryTable::create)), ); + } + if config + .query + .common + .cloud_control_grpc_server_address + .is_some() + { creators.insert( "billing_usage_daily".to_string(), (next_id(), Arc::new(BillingUsageDailyTable::create)), diff --git a/src/query/service/tests/it/table_functions/billing_usage.rs b/src/query/service/tests/it/table_functions/billing_usage.rs index 2004753213b0d..6841743de9272 100644 --- a/src/query/service/tests/it/table_functions/billing_usage.rs +++ b/src/query/service/tests/it/table_functions/billing_usage.rs @@ -17,6 +17,7 @@ use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; +use chrono::NaiveDate; use databend_common_base::runtime; use databend_common_catalog::session_type::SessionType; use databend_common_cloud_control::pb::BillingError as PbBillingError; @@ -27,11 +28,15 @@ use databend_common_cloud_control::pb::billing_service_server::BillingService; use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer; use databend_common_config::InnerConfig; use databend_common_expression::DataBlock; +use databend_common_expression::NumberScalar; use databend_common_expression::ScalarRef; +use databend_common_expression::types::DecimalScalar; +use databend_common_expression::types::DecimalSize; use databend_common_meta_app::principal::UserInfo; use databend_common_version::BUILD_INFO; use databend_query::interpreters::InterpreterFactory; use databend_query::sql::Planner; +use databend_query::table_functions::TableFunctionFactory; use databend_query::test_kits::ConfigBuilder; use databend_query::test_kits::TestFixture; use futures::TryStreamExt; @@ -101,6 +106,26 @@ fn extract_single_block(blocks: Vec) -> DataBlock { block } +fn date_days(date: &str) -> i32 { + let date = NaiveDate::parse_from_str(date, "%Y-%m-%d").unwrap(); + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + date.signed_duration_since(epoch).num_days() as i32 +} + +#[test] +fn test_billing_usage_daily_table_function_registered_only_with_cloud_control() { + let config: InnerConfig = ConfigBuilder::create().build(); + let factory = TableFunctionFactory::create(&config); + assert!(!factory.exists("billing_usage_daily")); + + let mut config: InnerConfig = ConfigBuilder::create().build(); + config.query.common.cloud_control_grpc_server_address = + Some("http://127.0.0.1:65535".to_string()); + let factory = TableFunctionFactory::create(&config); + assert!(factory.exists("billing_usage_daily")); + assert!(factory.exists("BILLING_USAGE_DAILY")); +} + #[tokio::test(flavor = "multi_thread")] async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Result<()> { let listener = TcpListener::bind("127.0.0.1:0").await?; @@ -130,7 +155,7 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu let blocks = fixture .execute_query( - "select usage_date, usage_type, service_type, resource_name, usage_in_currency, currency, tags, details \ + "select usage_date, usage_type, service_type, resource_name, usage, rate, usage_in_currency, currency, tags, details \ from billing_usage_daily(start_date => '2026-03-01', end_date => '2026-03-31')", ) .await? @@ -140,7 +165,7 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu assert_eq!( block.get_by_offset(0).index(0).unwrap(), - ScalarRef::String("2026-03-02") + ScalarRef::Date(date_days("2026-03-02")) ); assert_eq!( block.get_by_offset(1).index(0).unwrap(), @@ -156,15 +181,26 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu ); assert_eq!( block.get_by_offset(4).index(0).unwrap(), - ScalarRef::String("0.737") + ScalarRef::Decimal(DecimalScalar::Decimal128( + 2653, + DecimalSize::new_unchecked(38, 0) + )) ); + assert_eq!(block.get_by_offset(5).index(0).unwrap(), ScalarRef::Null); assert_eq!( - block.get_by_offset(5).index(0).unwrap(), + block.get_by_offset(6).index(0).unwrap(), + ScalarRef::Decimal(DecimalScalar::Decimal128( + 737_000_000_000, + DecimalSize::new_unchecked(38, 12) + )) + ); + assert_eq!( + block.get_by_offset(7).index(0).unwrap(), ScalarRef::String("¥") ); let expected_tags = OwnedJsonb::from_str(r#"{"env":"test"}"#)?.to_vec(); - match block.get_by_offset(6).index(0).unwrap() { + match block.get_by_offset(8).index(0).unwrap() { ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_tags.as_slice()), other => panic!("unexpected scalar type for tags: {other:?}"), } @@ -172,11 +208,40 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu let expected_details = OwnedJsonb::from_str(r#"{"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"}"#)? .to_vec(); - match block.get_by_offset(7).index(0).unwrap() { + match block.get_by_offset(9).index(0).unwrap() { ScalarRef::Variant(bytes) => assert_eq!(bytes, expected_details.as_slice()), other => panic!("unexpected scalar type for details: {other:?}"), } + let blocks = fixture + .execute_query( + "select count(), sum(usage), sum(usage_in_currency) \ + from billing_usage_daily(start_date => '2026-03-01', end_date => '2026-03-31') \ + where usage_date = to_date('2026-03-02') and usage = 2653 and usage_in_currency = 0.737 and rate is null", + ) + .await? + .try_collect::>() + .await?; + let block = extract_single_block(blocks); + assert_eq!( + block.get_by_offset(0).index(0).unwrap(), + ScalarRef::Number(NumberScalar::UInt64(1)) + ); + assert_eq!( + block.get_by_offset(1).index(0).unwrap(), + ScalarRef::Decimal(DecimalScalar::Decimal128( + 2653, + DecimalSize::new_unchecked(38, 0) + )) + ); + assert_eq!( + block.get_by_offset(2).index(0).unwrap(), + ScalarRef::Decimal(DecimalScalar::Decimal128( + 737_000_000_000, + DecimalSize::new_unchecked(38, 12) + )) + ); + let blocks = fixture .execute_query( "select tags:env::String, details:size::String, details:max_clusters::String \ @@ -202,7 +267,7 @@ async fn test_billing_usage_daily_table_function_via_mock_grpc() -> anyhow::Resu ); let usage_daily_requests = requests.usage_daily.lock().unwrap().clone(); - assert_eq!(usage_daily_requests.len(), 2); + assert_eq!(usage_daily_requests.len(), 3); assert!( usage_daily_requests .iter() diff --git a/src/query/service/tests/it/table_functions/mod.rs b/src/query/service/tests/it/table_functions/mod.rs index cc9656b8b6b25..89a5650cdf40a 100644 --- a/src/query/service/tests/it/table_functions/mod.rs +++ b/src/query/service/tests/it/table_functions/mod.rs @@ -12,6 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License.W -#[cfg(feature = "task-support")] mod billing_usage; mod numbers_table; diff --git a/src/query/task_support/Cargo.toml b/src/query/task_support/Cargo.toml index 2134d56507024..6a7e636cbe191 100644 --- a/src/query/task_support/Cargo.toml +++ b/src/query/task_support/Cargo.toml @@ -30,7 +30,6 @@ databend-common-storages-system = { workspace = true } databend-common-users = { workspace = true } itertools = { workspace = true } jiff = { workspace = true } -jsonb = { workspace = true } serde_json = { workspace = true } [lints] diff --git a/src/query/task_support/src/table_functions/mod.rs b/src/query/task_support/src/table_functions/mod.rs index 9e210ea4ac948..5fc630727dd7f 100644 --- a/src/query/task_support/src/table_functions/mod.rs +++ b/src/query/task_support/src/table_functions/mod.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod billing_usage_daily; mod task_dependents; mod task_dependents_enable; mod task_history; -pub use billing_usage_daily::BillingUsageDailyTable; pub use task_dependents::TaskDependentsTable; pub use task_dependents_enable::TaskDependentsEnableTable; pub use task_history::TaskHistoryTable; From d9315f188821db0d5f1cdd463aa0f1951e8e83a5 Mon Sep 17 00:00:00 2001 From: smallfish Date: Fri, 1 May 2026 10:43:58 +0800 Subject: [PATCH 5/5] fix ci check --- src/query/service/tests/it/table_functions/billing_usage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/tests/it/table_functions/billing_usage.rs b/src/query/service/tests/it/table_functions/billing_usage.rs index 6841743de9272..cf8e5fbefaa94 100644 --- a/src/query/service/tests/it/table_functions/billing_usage.rs +++ b/src/query/service/tests/it/table_functions/billing_usage.rs @@ -28,10 +28,10 @@ use databend_common_cloud_control::pb::billing_service_server::BillingService; use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer; use databend_common_config::InnerConfig; use databend_common_expression::DataBlock; -use databend_common_expression::NumberScalar; use databend_common_expression::ScalarRef; use databend_common_expression::types::DecimalScalar; use databend_common_expression::types::DecimalSize; +use databend_common_expression::types::NumberScalar; use databend_common_meta_app::principal::UserInfo; use databend_common_version::BUILD_INFO; use databend_query::interpreters::InterpreterFactory;