From 926cf07758fc91b63b9124660fd57595dd811f1f Mon Sep 17 00:00:00 2001 From: Dewinz Date: Wed, 10 Jun 2026 14:01:21 +0200 Subject: [PATCH 1/2] test: add integration tests for mass mutate in data-api --- helper/data-api/test-component/src/lib.rs | 241 ++++++++++++++++- .../tests/data_api_integration_test.rs | 251 +++++++++++++++++- 2 files changed, 471 insertions(+), 21 deletions(-) diff --git a/helper/data-api/test-component/src/lib.rs b/helper/data-api/test-component/src/lib.rs index a0f67d5..ef3a91f 100644 --- a/helper/data-api/test-component/src/lib.rs +++ b/helper/data-api/test-component/src/lib.rs @@ -16,6 +16,8 @@ struct RequestBody { context: Option, query: String, variables: String, + #[serde(default)] + capture_endpoint: Option, } struct Component; @@ -40,30 +42,32 @@ impl From for http::Response { } } -fn inner_handle(request: http::IncomingRequest) -> Result, Error> { +fn read_body(request: http::IncomingRequest) -> Result, Error> { let body = request.body(); - body.subscribe().block(); let body_bytes = body .read(MAX_READ) - .map_err(|e| Error::FailedToReadBody(e.to_string()))?; + .map_err(|error| Error::FailedToReadBody(error.to_string()))?; + Ok(body_bytes) +} +fn handle_request(body_bytes: &[u8]) -> Result, Error> { let RequestBody { context, query, variables, - } = match serde_json::from_slice(&body_bytes) { + .. + } = match serde_json::from_slice(body_bytes) { Ok(rb) => rb, - Err(e) => return Err(Error::InvalidInput(e.to_string())), + Err(error) => return Err(Error::InvalidInput(error.to_string())), }; - let context = context.unwrap_or(DataApiContext::default()); + let context = context.unwrap_or_default(); let helper_context = HelperContext { application_id: context .application_id - .unwrap_or("empty".to_string()) - .to_string(), + .unwrap_or_else(|| "empty".to_string()), action_id: "empty".to_string(), log_id: "empty".to_string(), encrypted_configurations: None, @@ -73,21 +77,234 @@ fn inner_handle(request: http::IncomingRequest) -> Result let result = data_api_request(&helper_context, &query, &variables); match result { Ok(response) => Ok(http::Response::new(response)), - Err(e) => { - let mut response = http::Response::new(e); + Err(error) => { + let mut response = http::Response::new(error); *response.status_mut() = http::StatusCode::BAD_REQUEST; Ok(response) }, } } +fn make_helper_context(context: Option) -> HelperContext { + let ctx = context.unwrap_or_default(); + HelperContext { + application_id: ctx + .application_id + .unwrap_or_else(|| "empty".to_string()), + action_id: "empty".to_string(), + log_id: "empty".to_string(), + encrypted_configurations: None, + jwt: ctx.jwt, + } +} + +fn handle_capture_create(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + let result = data_api_request( + &helper_context, + r#"mutation ($input: userInput) { createUser(input: $input) { id } }"#, + r#"{"name": "Alice"}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + +fn handle_capture_update(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + let result = data_api_request( + &helper_context, + r#"mutation ($id: Int!, $input: userInput) { updateUser(id: $id, input: $input) { id } }"#, + r#"{"id": 5, "name": "Bob"}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + +fn handle_capture_delete(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + let result = data_api_request( + &helper_context, + r#"mutation ($id: Int!) { deleteUser(id: $id) { id } }"#, + r#"{"id": 5}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + +fn handle_capture_passthrough(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + let result = data_api_request( + &helper_context, + r#"query { allUser { id name } }"#, + r#"{}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + +fn handle_capture_all_operations(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + + let _ = data_api_request( + &helper_context, + r#"mutation ($input: userInput) { createUser(input: $input) { id } }"#, + r#"{"name": "Alice"}"#, + ); + + let _ = data_api_request( + &helper_context, + r#"mutation ($id: Int!, $input: userInput) { updateUser(id: $id, input: $input) { id } }"#, + r#"{"id": 5, "name": "Bob"}"#, + ); + + let result = data_api_request( + &helper_context, + r#"mutation ($id: Int!) { deleteUser(id: $id) { id } }"#, + r#"{"id": 10}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + +fn handle_capture_negative_id_relation(body_bytes: &[u8]) -> http::Result> { + let RequestBody { context, .. } = match serde_json::from_slice(body_bytes) { + Ok(rb) => rb, + Err(error) => { + let error = Error::InvalidInput(error.to_string()); + return Ok(error.into()); + } + }; + + let helper_context = make_helper_context(context); + + let _ = data_api_request( + &helper_context, + r#"mutation ($input: userInput) { createUser(input: $input) { id } }"#, + r#"{"id": -1, "name": "Alice"}"#, + ); + + let result = data_api_request( + &helper_context, + r#"mutation ($input: orderInput) { createOrder(input: $input) { id } }"#, + r#"{"user": {"id": -1}, "product": "Widget"}"#, + ); + + Ok(match result { + Ok(response) => http::Response::new(response), + Err(error) => { + let mut response = http::Response::new(error); + *response.status_mut() = http::StatusCode::BAD_REQUEST; + response + } + }) +} + impl http::Server for Component { fn handle( request: http::IncomingRequest, ) -> http::Result> { - match inner_handle(request) { + let body_bytes = match read_body(request) { + Ok(bytes) => bytes, + Err(error) => return Ok(error.into()), + }; + + // Try to parse the request body to get the capture endpoint + if let Ok(request_body) = serde_json::from_slice::(&body_bytes) { + if let Some(endpoint) = request_body.capture_endpoint { + return match endpoint.as_str() { + "create" => handle_capture_create(&body_bytes), + "update" => handle_capture_update(&body_bytes), + "delete" => handle_capture_delete(&body_bytes), + "passthrough" => handle_capture_passthrough(&body_bytes), + "all-operations" => handle_capture_all_operations(&body_bytes), + "negative-id-relation" => handle_capture_negative_id_relation(&body_bytes), + _ => { + let error = Error::InvalidInput("Unknown capture endpoint".to_string()); + Ok(error.into()) + } + }; + } + } + + // Default handler + match handle_request(&body_bytes) { Ok(response) => Ok(response), - Err(e) => Ok(e.into()), + Err(error) => Ok(error.into()), } } } diff --git a/helper/data-api/tests/data_api_integration_test.rs b/helper/data-api/tests/data_api_integration_test.rs index c5a9b35..9faf8aa 100644 --- a/helper/data-api/tests/data_api_integration_test.rs +++ b/helper/data-api/tests/data_api_integration_test.rs @@ -6,7 +6,7 @@ use serial_test::serial; use std::collections::HashMap; use std::{ net::SocketAddr, - sync::{Arc, Once}, + sync::{Arc, Mutex, Once}, }; use tokio::task; use tonic::metadata::MetadataMap; @@ -65,13 +65,20 @@ fn init_tracing_for_test() { }) } -pub async fn start_test_grpc_server(addr: SocketAddr) -> anyhow::Result<()> { - let data_api_server = DataGrpcServer::default(); - Server::builder() +pub async fn start_test_grpc_server( + addr: SocketAddr, + collected_requests: Arc>>, +) -> anyhow::Result<()> { + info!("Starting gRPC server on {}", addr); + let data_api_server = DataGrpcServer { + collected_requests, + }; + let result = Server::builder() .add_service(DataApiServer::new(data_api_server)) .serve(addr) .await?; + info!("gRPC server shutting down"); Ok(()) } @@ -79,24 +86,44 @@ struct TestConfig { host: Arc, grpc_addr: SocketAddr, http_addr: SocketAddr, + collected_requests: Arc>>, } impl TestConfig { - fn new(host: Arc, grpc_addr: SocketAddr, http_addr: SocketAddr) -> Self { + fn new( + host: Arc, + grpc_addr: SocketAddr, + http_addr: SocketAddr, + collected_requests: Arc>>, + ) -> Self { Self { host, grpc_addr, http_addr, + collected_requests, } } + + fn take_collected_requests(&self) -> Vec { + self.collected_requests + .lock() + .expect("lock is poisoned") + .drain(..) + .collect() + } } async fn setup_wasmcloud_host() -> anyhow::Result { let grpc_port = find_available_port().await?; let grpc_addr: SocketAddr = format!("127.0.0.1:{grpc_port}").parse().unwrap(); + let collected_requests: Arc>> = Arc::new(Mutex::new(Vec::new())); + info!("Starting server"); - let _handle = task::spawn(start_test_grpc_server(grpc_addr)); + let _handle = task::spawn(start_test_grpc_server(grpc_addr, collected_requests.clone())); + + // Give the gRPC server time to start listening + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; let http_port = find_available_port().await?; let http_addr: SocketAddr = format!("127.0.0.1:{http_port}").parse().unwrap(); @@ -115,7 +142,7 @@ async fn setup_wasmcloud_host() -> anyhow::Result { let host = host.start().await?; - Ok(TestConfig::new(host, grpc_addr, http_addr)) + Ok(TestConfig::new(host, grpc_addr, http_addr, collected_requests)) } async fn start_workload(test_config: &TestConfig) -> anyhow::Result { @@ -299,9 +326,12 @@ async fn data_api_component_should_return_error_if_token_is_invalid() -> anyhow: Ok(()) } + /// REALLY SIMPLE MOCK OF THE DATA API -#[derive(Debug, Default)] -pub struct DataGrpcServer {} +#[derive(Debug)] +pub struct DataGrpcServer { + collected_requests: Arc>>, +} #[tonic::async_trait] impl DataApi for DataGrpcServer { @@ -311,6 +341,13 @@ impl DataApi for DataGrpcServer { ) -> Result, tonic::Status> { let (metadata, _extensions, data_api_request) = request.into_parts(); + info!("gRPC DataAPI Execute called with query: {}", data_api_request.query); + + self.collected_requests + .lock() + .expect("lock is poisoned") + .push(data_api_request.clone()); + let application_id = match data_api_request.context { Some(ctx) => ctx.application_id, None => { @@ -325,6 +362,16 @@ impl DataApi for DataGrpcServer { match authenticate(&metadata, &application_id).await { AuthResult::Ok => { + // Batch mutation queries from apply_capture get a success response + if data_api_request.query.contains("Many") { + let body = format_result_json(serde_json::json!({})); + let reply = DataApiResult { + status: Status::Ok as i32, + result: body, + }; + return Ok(Response::new(reply)); + } + if application_id == APPLICATION_ID { let body = format_error_json(serde_json::json!({ "message": "something went wrong" @@ -407,3 +454,189 @@ fn format_error_json(error: serde_json::Value) -> String { serde_json::to_string(&serde_json::json!({ "errors": errors })) .expect("JSON serialization should not fail") } + +#[tokio::test] +#[serial] +async fn capture_single_create() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "create", + "query": "", + "variables": "" + })) + .send() + .await?; + + // The gRPC mock returns a 400 error for APPLICATION_ID, but the request should have been made + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + let _ = test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn capture_single_update() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "update", + "query": "", + "variables": "" + })) + .send() + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = &test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn capture_single_delete() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "delete", + "query": "", + "variables": "" + })) + .send() + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = &test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn capture_passthrough_query() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "passthrough", + "query": "", + "variables": "" + })) + .send() + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = &test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn capture_all_operations_order() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "all-operations", + "query": "", + "variables": "" + })) + .send() + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = &test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn capture_negative_id_relation() -> anyhow::Result<()> { + init_tracing_for_test(); + + let test_config = setup_wasmcloud_host().await?; + let id = start_workload(&test_config).await?; + + let client = Client::new(); + let response = client + .post(format!("http://{}", test_config.http_addr)) + .json(&json!({ + "context": {"application_id": APPLICATION_ID.to_string()}, + "capture_endpoint": "negative-id-relation", + "query": "", + "variables": "" + })) + .send() + .await?; + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let _ = &test_config + .host + .workload_stop(wash_runtime::types::WorkloadStopRequest { workload_id: id }) + .await?; + + Ok(()) +} From 13d0721e051012d5d1c954fec9c65b6263ba9b05 Mon Sep 17 00:00:00 2001 From: Dewinz Date: Wed, 10 Jun 2026 14:13:53 +0200 Subject: [PATCH 2/2] refactor: remove unnecessary collected_requests field in testing for mass mutate in data-api --- .../tests/data_api_integration_test.rs | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/helper/data-api/tests/data_api_integration_test.rs b/helper/data-api/tests/data_api_integration_test.rs index 9faf8aa..18a8f38 100644 --- a/helper/data-api/tests/data_api_integration_test.rs +++ b/helper/data-api/tests/data_api_integration_test.rs @@ -6,7 +6,7 @@ use serial_test::serial; use std::collections::HashMap; use std::{ net::SocketAddr, - sync::{Arc, Mutex, Once}, + sync::{Arc, Once}, }; use tokio::task; use tonic::metadata::MetadataMap; @@ -67,13 +67,11 @@ fn init_tracing_for_test() { pub async fn start_test_grpc_server( addr: SocketAddr, - collected_requests: Arc>>, ) -> anyhow::Result<()> { info!("Starting gRPC server on {}", addr); let data_api_server = DataGrpcServer { - collected_requests, }; - let result = Server::builder() + Server::builder() .add_service(DataApiServer::new(data_api_server)) .serve(addr) .await?; @@ -86,7 +84,6 @@ struct TestConfig { host: Arc, grpc_addr: SocketAddr, http_addr: SocketAddr, - collected_requests: Arc>>, } impl TestConfig { @@ -94,33 +91,21 @@ impl TestConfig { host: Arc, grpc_addr: SocketAddr, http_addr: SocketAddr, - collected_requests: Arc>>, ) -> Self { Self { host, grpc_addr, http_addr, - collected_requests, } } - - fn take_collected_requests(&self) -> Vec { - self.collected_requests - .lock() - .expect("lock is poisoned") - .drain(..) - .collect() - } } async fn setup_wasmcloud_host() -> anyhow::Result { let grpc_port = find_available_port().await?; let grpc_addr: SocketAddr = format!("127.0.0.1:{grpc_port}").parse().unwrap(); - let collected_requests: Arc>> = Arc::new(Mutex::new(Vec::new())); - info!("Starting server"); - let _handle = task::spawn(start_test_grpc_server(grpc_addr, collected_requests.clone())); + let _handle = task::spawn(start_test_grpc_server(grpc_addr)); // Give the gRPC server time to start listening tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; @@ -142,7 +127,7 @@ async fn setup_wasmcloud_host() -> anyhow::Result { let host = host.start().await?; - Ok(TestConfig::new(host, grpc_addr, http_addr, collected_requests)) + Ok(TestConfig::new(host, grpc_addr, http_addr)) } async fn start_workload(test_config: &TestConfig) -> anyhow::Result { @@ -326,11 +311,9 @@ async fn data_api_component_should_return_error_if_token_is_invalid() -> anyhow: Ok(()) } - /// REALLY SIMPLE MOCK OF THE DATA API #[derive(Debug)] pub struct DataGrpcServer { - collected_requests: Arc>>, } #[tonic::async_trait] @@ -343,11 +326,6 @@ impl DataApi for DataGrpcServer { info!("gRPC DataAPI Execute called with query: {}", data_api_request.query); - self.collected_requests - .lock() - .expect("lock is poisoned") - .push(data_api_request.clone()); - let application_id = match data_api_request.context { Some(ctx) => ctx.application_id, None => {