Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions src/common/cloud_control/proto/billing.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
syntax = "proto3";

package billingproto;

message BillingError {
string kind = 1;
string message = 2;
int32 code = 3;
}

message GetBillingUsageDailyRequest {
string tenant_id = 1;
string start_date = 2; // YYYY-MM-DD
string end_date = 3; // YYYY-MM-DD, empty means start_date
string sql_user = 4; // audit only, empty means absent
string query_id = 5; // audit only, empty means absent
}

message BillingUsageDailyRow {
// Billing date in YYYY-MM-DD format.
string usage_date = 1;

// Top-level billing category, aligned with Snowflake naming where practical.
// Examples: compute, storage, cloud services.
string usage_type = 2;

// More specific service category.
// Examples: WAREHOUSE_METERING, STORAGE, CLOUD_SERVICES.
string service_type = 3;

// Logical billable resource name when applicable, such as warehouse name.
// Empty string means absent.
string resource_name = 4;

// Billable usage quantity encoded as a decimal string to avoid float precision loss.
string usage = 5;

// Unit for usage.
// Examples: second, byte, request.
string usage_unit = 6;

// Reserved billing unit price field.
// Empty string means the rate is intentionally undisclosed.
string rate = 7;

// Billing dimension unit for the reserved rate field.
// Examples: second, tb_day, k_request.
// Returned when the billing dimension is known.
string rate_unit = 8;

// Final billed amount encoded as a decimal string.
string usage_in_currency = 9;

// Settlement currency, for example USD.
string currency = 10;

// Resource tags when the underlying usage is associated with a tagged resource.
map<string, string> tags = 11;

// JSON object string for category-specific or future-compatible extension fields.
// Examples:
// {"cluster_name":"cl-00000","max_clusters":1,"size":"XSmall"}
// {}
string details = 12;
}

message GetBillingUsageDailyResponse {
repeated BillingUsageDailyRow rows = 1;
Comment thread
everpcpc marked this conversation as resolved.
optional BillingError error = 2;
}

service BillingService {
rpc GetBillingUsageDaily(GetBillingUsageDailyRequest) returns (GetBillingUsageDailyResponse);
}
47 changes: 47 additions & 0 deletions src/common/cloud_control/src/billing_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use tonic::Request;
use tonic::transport::Channel;

use crate::pb::GetBillingUsageDailyRequest;
use crate::pb::GetBillingUsageDailyResponse;
use crate::pb::billing_service_client::BillingServiceClient;
use crate::task_client::MAX_DECODING_SIZE;
use crate::task_client::MAX_ENCODING_SIZE;

pub struct BillingClient {
pub client: BillingServiceClient<Channel>,
}

impl BillingClient {
pub async fn new(channel: Channel) -> Result<Arc<BillingClient>> {
let client = BillingServiceClient::new(channel)
.max_decoding_message_size(MAX_DECODING_SIZE)
.max_encoding_message_size(MAX_ENCODING_SIZE);
Ok(Arc::new(BillingClient { client }))
}

pub async fn get_billing_usage_daily(
&self,
req: Request<GetBillingUsageDailyRequest>,
) -> Result<GetBillingUsageDailyResponse> {
let mut client = self.client.clone();
let resp = client.get_billing_usage_daily(req).await?;
Ok(resp.into_inner())
}
}
8 changes: 8 additions & 0 deletions src/common/cloud_control/src/cloud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ use databend_common_base::base::GlobalInstance;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::billing_client::BillingClient;
use crate::notification_client::NotificationClient;
use crate::task_client::TaskClient;
use crate::worker_client::WorkerClient;

pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds

pub struct CloudControlApiProvider {
pub billing_client: Arc<BillingClient>,
pub task_client: Arc<TaskClient>,
pub notification_client: Arc<NotificationClient>,
pub worker_client: Arc<WorkerClient>,
Expand All @@ -42,10 +44,12 @@ impl CloudControlApiProvider {

let endpoint = Self::get_endpoint(endpoint, timeout).await?;
let channel = endpoint.connect_lazy();
let billing_client = BillingClient::new(channel.clone()).await?;
let task_client = TaskClient::new(channel.clone()).await?;
let notification_client = NotificationClient::new(channel.clone()).await?;
let worker_client = WorkerClient::new(channel).await?;
Ok(Arc::new(CloudControlApiProvider {
billing_client,
task_client,
notification_client,
worker_client,
Expand Down Expand Up @@ -85,6 +89,10 @@ impl CloudControlApiProvider {
self.task_client.clone()
}

pub fn get_billing_client(&self) -> Arc<BillingClient> {
self.billing_client.clone()
}

pub fn get_notification_client(&self) -> Arc<NotificationClient> {
self.notification_client.clone()
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/cloud_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod billing_client;
pub mod client_config;
pub mod cloud_api;
pub mod notification_client;
Expand All @@ -24,6 +25,7 @@ pub mod worker_client;
#[allow(clippy::large_enum_variant)]
/// ProtoBuf generated files.
pub mod pb {
tonic::include_proto!("billingproto");
// taskproto is proto package name.
tonic::include_proto!("taskproto");
tonic::include_proto!("notificationproto");
Expand Down
109 changes: 109 additions & 0 deletions src/common/cloud_control/tests/it/billing_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use databend_common_base::runtime;
use databend_common_cloud_control::billing_client::BillingClient;
use databend_common_cloud_control::pb::BillingUsageDailyRow;
use databend_common_cloud_control::pb::GetBillingUsageDailyRequest;
use databend_common_cloud_control::pb::GetBillingUsageDailyResponse;
use databend_common_cloud_control::pb::billing_service_server::BillingService;
use databend_common_cloud_control::pb::billing_service_server::BillingServiceServer;
use hyper_util::rt::TokioIo;
use tonic::Request;
use tonic::Response;
use tonic::Status;
use tonic::codegen::tokio_stream;
use tonic::transport::Endpoint;
use tonic::transport::Server;
use tonic::transport::Uri;
use tower::service_fn;

#[derive(Default)]
pub struct MockBillingService {}

#[tonic::async_trait]
impl BillingService for MockBillingService {
async fn get_billing_usage_daily(
&self,
request: Request<GetBillingUsageDailyRequest>,
) -> std::result::Result<Response<GetBillingUsageDailyResponse>, Status> {
Ok(Response::new(GetBillingUsageDailyResponse {
rows: vec![BillingUsageDailyRow {
usage_date: request.into_inner().start_date,
usage_type: "compute".to_string(),
service_type: "WAREHOUSE_METERING".to_string(),
resource_name: "default".to_string(),
usage: "2653".to_string(),
usage_unit: "second".to_string(),
rate: "".to_string(),
rate_unit: "second".to_string(),
usage_in_currency: "0.737".to_string(),
currency: "USD".to_string(),
tags: BTreeMap::from([("env".to_string(), "test".to_string())]),
details: "{\"cluster_name\":\"cl-00000\"}".to_string(),
}],
error: None,
}))
}
}

#[tokio::test(flavor = "current_thread")]
async fn test_billing_client_success_cases() -> anyhow::Result<()> {
let (client, server) = tokio::io::duplex(1024);
let client = TokioIo::new(client);

runtime::spawn(async move {
Server::builder()
.add_service(BillingServiceServer::new(MockBillingService::default()))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});

let mut client_io = Some(client);
let channel = Endpoint::try_from("http://[::]:0")
.unwrap()
.connect_with_connector(service_fn(move |_: Uri| {
let client = client_io.take();

async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::other("Client already taken"))
}
}
}))
.await
.unwrap();

let client = BillingClient::new(channel).await?;

let resp = client
.get_billing_usage_daily(Request::new(GetBillingUsageDailyRequest {
tenant_id: "tenant".to_string(),
start_date: "2026-03-01".to_string(),
end_date: "2026-03-31".to_string(),
sql_user: "root".to_string(),
query_id: "query-1".to_string(),
}))
.await?;
assert_eq!(resp.rows.len(), 1);
assert_eq!(resp.rows[0].usage_date, "2026-03-01");
assert_eq!(resp.rows[0].usage_type, "compute");
assert_eq!(resp.rows[0].resource_name, "default");

Ok(())
}
1 change: 1 addition & 0 deletions src/common/cloud_control/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod billing_client;
mod task_client;
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use databend_common_storages_iceberg::IcebergInspectTable;
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
use databend_meta_client::types::MetaId;
#[cfg(feature = "task-support")]
use databend_query_task_support::table_functions::BillingUsageDailyTable;
#[cfg(feature = "task-support")]
use databend_query_task_support::table_functions::TaskDependentsEnableTable;
#[cfg(feature = "task-support")]
use databend_query_task_support::table_functions::TaskDependentsTable;
Expand Down Expand Up @@ -344,6 +346,11 @@ impl TableFunctionFactory {
"task_history".to_string(),
(next_id(), Arc::new(TaskHistoryTable::create)),
);

creators.insert(
"billing_usage_daily".to_string(),
(next_id(), Arc::new(BillingUsageDailyTable::create)),
);
}

creators.insert(
Expand Down
Loading
Loading