diff --git a/Cargo.lock b/Cargo.lock index 4acd31d9d..83137c720 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1141,7 +1141,7 @@ dependencies = [ "cfg-if", "futures", "log", - "reqwest", + "reqwest 0.12.28", "url", "warpui_core", ] @@ -3756,6 +3756,16 @@ dependencies = [ "darling_macro 0.21.3", ] +[[package]] +name = "darling" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" +dependencies = [ + "darling_core 0.23.0", + "darling_macro 0.23.0", +] + [[package]] name = "darling_core" version = "0.20.8" @@ -3784,6 +3794,19 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "darling_core" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0" +dependencies = [ + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.117", +] + [[package]] name = "darling_macro" version = "0.20.8" @@ -3806,6 +3829,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "darling_macro" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" +dependencies = [ + "darling_core 0.23.0", + "quote", + "syn 2.0.117", +] + [[package]] name = "dary_heap" version = "0.3.7" @@ -6164,7 +6198,7 @@ dependencies = [ "oauth2", "prevent_sleep", "prost 0.14.3", - "reqwest", + "reqwest 0.12.28", "reqwest-eventsource", "serde", "serde_json", @@ -7248,9 +7282,9 @@ checksum = "0c2cdeb66e45e9f36bfad5bbdb4d2384e70936afbee843c6f6543f0c551ebb25" [[package]] name = "libc" -version = "0.2.180" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libfuzzer-sys" @@ -8268,6 +8302,18 @@ dependencies = [ "memoffset 0.9.0", ] +[[package]] +name = "nix" +version = "0.31.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" +dependencies = [ + "bitflags 2.9.4", + "cfg-if", + "cfg_aliases 0.2.1", + "libc", +] + [[package]] name = "node_runtime" version = "0.1.0" @@ -8533,12 +8579,12 @@ version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51e219e79014df21a225b1860a479e2dcd7cbd9130f4defd4bd0e191ea31d67d" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "chrono", "getrandom 0.2.16", "http 1.4.0", "rand 0.8.6", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "serde_path_to_error", @@ -9189,7 +9235,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d8fae84b431384b68627d0f9b3b1245fcf9f46f6c0e3dc902e9dce64edd1967" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.45.0", ] [[package]] @@ -9867,16 +9913,16 @@ dependencies = [ [[package]] name = "process-wrap" -version = "8.2.1" +version = "9.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3ef4f2f0422f23a82ec9f628ea2acd12871c81a9362b02c43c1aa86acfc3ba1" +checksum = "2e842efad9119158434d193c6682e2ebee4b44d6ad801d7b349623b3f57cdf55" dependencies = [ "futures", "indexmap 2.12.0", - "nix 0.30.1", + "nix 0.31.3", "tokio", "tracing", - "windows 0.61.3", + "windows 0.62.2", ] [[package]] @@ -9934,7 +9980,7 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "itertools 0.14.0", "log", "multimap", @@ -10751,11 +10797,45 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.4.0", "web-sys", "webpki-roots 1.0.1", ] +[[package]] +name = "reqwest" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "sync_wrapper", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams 0.5.0", + "web-sys", +] + [[package]] name = "reqwest-eventsource" version = "0.6.0" @@ -10768,7 +10848,7 @@ dependencies = [ "mime", "nom 7.1.3", "pin-project-lite", - "reqwest", + "reqwest 0.12.28", "thiserror 1.0.63", ] @@ -10853,8 +10933,8 @@ dependencies = [ [[package]] name = "rmcp" -version = "0.10.0" -source = "git+https://github.com/warpdotdev/rmcp.git?rev=c0f65dc441af7d714b9c453ac5e7ef641451abe3#c0f65dc441af7d714b9c453ac5e7ef641451abe3" +version = "1.6.0" +source = "git+https://github.com/warpdotdev/rmcp.git?rev=321ab14f67da734a8e0cfa0bfcdee1690663d9dc#321ab14f67da734a8e0cfa0bfcdee1690663d9dc" dependencies = [ "async-trait", "base64 0.22.1", @@ -10865,7 +10945,7 @@ dependencies = [ "pastey 0.2.1", "pin-project-lite", "process-wrap", - "reqwest", + "reqwest 0.13.3", "rmcp-macros", "schemars", "serde", @@ -10881,10 +10961,10 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "0.10.0" -source = "git+https://github.com/warpdotdev/rmcp.git?rev=c0f65dc441af7d714b9c453ac5e7ef641451abe3#c0f65dc441af7d714b9c453ac5e7ef641451abe3" +version = "1.6.0" +source = "git+https://github.com/warpdotdev/rmcp.git?rev=321ab14f67da734a8e0cfa0bfcdee1690663d9dc#321ab14f67da734a8e0cfa0bfcdee1690663d9dc" dependencies = [ - "darling 0.21.3", + "darling 0.23.0", "proc-macro2", "quote", "serde_json", @@ -11471,7 +11551,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "507ac2be9bf2da56c831da57faf1dadd81f434bd282935cdb06193d0c94e8811" dependencies = [ "httpdate", - "reqwest", + "reqwest 0.12.28", "rustls", "sentry-actix", "sentry-anyhow", @@ -14385,7 +14465,7 @@ dependencies = [ "regex-automata", "remote_server", "repo_metadata", - "reqwest", + "reqwest 0.12.28", "reqwest-eventsource", "rmcp", "rquickjs", @@ -14419,6 +14499,7 @@ dependencies = [ "siphasher 0.3.11", "smallvec 1.15.1", "smol_str", + "sse-stream", "static_assertions", "string-offset", "strum", @@ -14635,7 +14716,7 @@ dependencies = [ "parking_lot", "rand 0.8.6", "regex", - "reqwest", + "reqwest 0.12.28", "schemars", "sentry", "sentry-log", @@ -14754,7 +14835,7 @@ dependencies = [ "instant", "log", "persistence", - "reqwest", + "reqwest 0.12.28", "serde", "thiserror 2.0.17", "warp_core", @@ -15296,6 +15377,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasm-streams" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" diff --git a/Cargo.toml b/Cargo.toml index e92d4a513..2f7c884fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -222,6 +222,7 @@ reqwest = { version = "0.12.28", default-features = false, features = [ "json", "macos-system-configuration", "rustls-tls-native-roots-no-provider", + "rustls-tls-webpki-roots-no-provider", "stream", "system-proxy", ] } @@ -378,7 +379,7 @@ typed-path = "0.10.0" streaming-iterator = "0.1.0" derivative = "2.2.0" parquet = { version = "55.0.0", features = ["arrow"] } -rmcp = { git = "https://github.com/warpdotdev/rmcp.git", rev = "c0f65dc441af7d714b9c453ac5e7ef641451abe3" } +rmcp = { git = "https://github.com/warpdotdev/rmcp.git", rev = "321ab14f67da734a8e0cfa0bfcdee1690663d9dc" } [profile.release] # Use line-tables-only (debug = 1) rather than full debuginfo (debug = 2 / diff --git a/app/Cargo.toml b/app/Cargo.toml index 691c4ef06..9f19a5137 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -299,10 +299,10 @@ aws-types = "1" rmcp = { workspace = true, features = [ "auth", - "transport-streamable-http-client-reqwest", - "transport-sse-client-reqwest", + "transport-streamable-http-client", "transport-child-process", ] } +sse-stream = "0.2" notify-debouncer-full.workspace = true rquickjs = { workspace = true, optional = true } rustls.workspace = true diff --git a/app/src/ai/blocklist/action_model/execute/call_mcp_tool.rs b/app/src/ai/blocklist/action_model/execute/call_mcp_tool.rs index 8810fed2e..a294ad5bd 100644 --- a/app/src/ai/blocklist/action_model/execute/call_mcp_tool.rs +++ b/app/src/ai/blocklist/action_model/execute/call_mcp_tool.rs @@ -141,10 +141,10 @@ impl CallMCPToolExecutor { ActionExecution::new_async( async move { reconnecting_peer - .call_tool(rmcp::model::CallToolRequestParam { - name: name_owned_inner.into(), - arguments: Some(arguments), - }) + .call_tool(rmcp::model::CallToolRequestParams::new( + name_owned_inner, + Some(arguments), + )) .await }, move |res, ctx| handle_call_tool_result(res, server_output_id, name_clone, ctx), diff --git a/app/src/ai/blocklist/action_model/execute/read_mcp_resource.rs b/app/src/ai/blocklist/action_model/execute/read_mcp_resource.rs index 12c69b4b9..8e45b96d4 100644 --- a/app/src/ai/blocklist/action_model/execute/read_mcp_resource.rs +++ b/app/src/ai/blocklist/action_model/execute/read_mcp_resource.rs @@ -128,7 +128,7 @@ impl ReadMCPResourceExecutor { ActionExecution::new_async( async move { reconnecting_peer - .read_resource(rmcp::model::ReadResourceRequestParam { uri }) + .read_resource(rmcp::model::ReadResourceRequestParams::new(uri)) .await }, |res, _ctx| handle_read_resource_result(res), diff --git a/app/src/ai/mcp/http_client.rs b/app/src/ai/mcp/http_client.rs index 47d0be706..e878dd576 100644 --- a/app/src/ai/mcp/http_client.rs +++ b/app/src/ai/mcp/http_client.rs @@ -1,8 +1,25 @@ -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap, sync::Arc}; -use reqwest::header::HeaderMap; +use futures::{stream::BoxStream, StreamExt}; +use http::{header::WWW_AUTHENTICATE, HeaderName, HeaderValue}; +use reqwest::header::{HeaderMap, ACCEPT}; +use rmcp::{ + model::{ClientJsonRpcMessage, JsonRpcMessage, ServerJsonRpcMessage}, + transport::{ + common::http_header::{ + EVENT_STREAM_MIME_TYPE, HEADER_LAST_EVENT_ID, HEADER_SESSION_ID, JSON_MIME_TYPE, + }, + streamable_http_client::*, + }, +}; +use sse_stream::{Sse, SseStream}; -type ReqwestHttpTransport = rmcp::transport::StreamableHttpClientTransport; +/// Newtype wrapper around `reqwest::Client` so we can implement the foreign +/// `StreamableHttpClient` trait (orphan rule). +#[derive(Clone, Debug, Default)] +pub struct McpHttpClient(pub reqwest::Client); + +pub type ReqwestHttpTransport = rmcp::transport::StreamableHttpClientTransport; /// Builds a `HeaderMap` from a `HashMap` of user-provided headers. /// @@ -27,3 +44,251 @@ pub fn build_client_with_headers( )) }) } + +/// Reserved headers that must not be overridden by custom headers. +/// Matches the validation logic in rmcp's `validate_custom_header`. +const RESERVED_HEADERS: &[&str] = &[ + "accept", + "content-type", + "mcp-session-id", + "last-event-id", + "authorization", + "host", + "origin", +]; + +/// Applies custom headers to a request builder, rejecting reserved headers. +fn apply_custom_headers( + mut builder: reqwest::RequestBuilder, + custom_headers: HashMap, +) -> Result> { + for (name, value) in custom_headers { + let name_lower = name.as_str().to_lowercase(); + if RESERVED_HEADERS.contains(&name_lower.as_str()) { + return Err(StreamableHttpError::ReservedHeaderConflict( + name.to_string(), + )); + } + builder = builder.header(name, value); + } + Ok(builder) +} + +/// Extracts the scope value from a WWW-Authenticate header. +fn extract_scope(header: &str) -> Option { + header.split(',').find_map(|part| { + let part = part.trim(); + if let Some(rest) = part.strip_prefix("scope=") { + Some(rest.trim_matches('"').to_string()) + } else { + None + } + }) +} + +/// Attempts to parse `body` as a JSON-RPC error message. +fn parse_json_rpc_error(body: &str) -> Option { + match serde_json::from_str::(body) { + Ok(message @ JsonRpcMessage::Error(_)) => Some(message), + _ => None, + } +} + +/// Implement `StreamableHttpClient` for our newtype wrapper around reqwest 0.12's `Client`. +/// +/// rmcp 1.6.0 ships its own impl for reqwest 0.13, but warp uses reqwest 0.12. +/// This provides the equivalent implementation against the 0.12 API. +impl StreamableHttpClient for McpHttpClient { + type Error = reqwest::Error; + + async fn get_stream( + &self, + uri: Arc, + session_id: Arc, + last_event_id: Option, + auth_token: Option, + custom_headers: HashMap, + ) -> Result>, StreamableHttpError> { + let mut request_builder = self + .0 + .get(uri.as_ref()) + .header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", ")) + .header(HEADER_SESSION_ID, session_id.as_ref()); + if let Some(last_event_id) = last_event_id { + request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id); + } + if let Some(auth_header) = auth_token { + request_builder = request_builder.bearer_auth(auth_header); + } + request_builder = apply_custom_headers(request_builder, custom_headers)?; + let response = request_builder + .send() + .await + .map_err(StreamableHttpError::Client)?; + if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED { + return Err(StreamableHttpError::ServerDoesNotSupportSse); + } + let response = response + .error_for_status() + .map_err(StreamableHttpError::Client)?; + match response.headers().get(reqwest::header::CONTENT_TYPE) { + Some(ct) => { + if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) + && !ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) + { + return Err(StreamableHttpError::UnexpectedContentType(Some( + String::from_utf8_lossy(ct.as_bytes()).to_string(), + ))); + } + } + None => { + return Err(StreamableHttpError::UnexpectedContentType(None)); + } + } + let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); + Ok(event_stream) + } + + async fn delete_session( + &self, + uri: Arc, + session: Arc, + auth_token: Option, + custom_headers: HashMap, + ) -> Result<(), StreamableHttpError> { + let mut request_builder = self.0.delete(uri.as_ref()); + if let Some(auth_header) = auth_token { + request_builder = request_builder.bearer_auth(auth_header); + } + request_builder = request_builder.header(HEADER_SESSION_ID, session.as_ref()); + request_builder = apply_custom_headers(request_builder, custom_headers)?; + let response = request_builder + .send() + .await + .map_err(StreamableHttpError::Client)?; + if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED { + tracing::debug!("this server doesn't support deleting session"); + return Ok(()); + } + let _response = response + .error_for_status() + .map_err(StreamableHttpError::Client)?; + Ok(()) + } + + async fn post_message( + &self, + uri: Arc, + message: ClientJsonRpcMessage, + session_id: Option>, + auth_token: Option, + custom_headers: HashMap, + ) -> Result> { + let mut request = self + .0 + .post(uri.as_ref()) + .header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", ")); + if let Some(auth_header) = auth_token { + request = request.bearer_auth(auth_header); + } + request = apply_custom_headers(request, custom_headers)?; + let session_was_attached = session_id.is_some(); + if let Some(session_id) = session_id { + request = request.header(HEADER_SESSION_ID, session_id.as_ref()); + } + let response = request + .json(&message) + .send() + .await + .map_err(StreamableHttpError::Client)?; + if response.status() == reqwest::StatusCode::UNAUTHORIZED { + if let Some(header) = response.headers().get(WWW_AUTHENTICATE) { + let header = header + .to_str() + .map_err(|_| { + StreamableHttpError::UnexpectedServerResponse(Cow::from( + "invalid www-authenticate header value", + )) + })? + .to_string(); + return Err(StreamableHttpError::AuthRequired(AuthRequiredError::new( + header, + ))); + } + } + if response.status() == reqwest::StatusCode::FORBIDDEN { + if let Some(header) = response.headers().get(WWW_AUTHENTICATE) { + let header_str = header.to_str().map_err(|_| { + StreamableHttpError::UnexpectedServerResponse(Cow::from( + "invalid www-authenticate header value", + )) + })?; + return Err(StreamableHttpError::InsufficientScope( + InsufficientScopeError::new(header_str.to_string(), extract_scope(header_str)), + )); + } + } + let status = response.status(); + if matches!( + status, + reqwest::StatusCode::ACCEPTED | reqwest::StatusCode::NO_CONTENT + ) { + return Ok(StreamableHttpPostResponse::Accepted); + } + if status == reqwest::StatusCode::NOT_FOUND && session_was_attached { + return Err(StreamableHttpError::SessionExpired); + } + let content_type = response + .headers() + .get(reqwest::header::CONTENT_TYPE) + .map(|ct| String::from_utf8_lossy(ct.as_bytes()).to_string()); + let session_id = response + .headers() + .get(HEADER_SESSION_ID) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + if !status.is_success() { + let body = response + .text() + .await + .unwrap_or_else(|_| "".to_owned()); + if content_type + .as_deref() + .is_some_and(|ct| ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes())) + { + match parse_json_rpc_error(&body) { + Some(message) => { + return Ok(StreamableHttpPostResponse::Json(message, session_id)); + } + None => tracing::warn!( + "HTTP {status}: could not parse JSON body as a JSON-RPC error" + ), + } + } + return Err(StreamableHttpError::UnexpectedServerResponse(Cow::Owned( + format!("HTTP {status}: {body}"), + ))); + } + match content_type.as_deref() { + Some(ct) if ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) => { + let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); + Ok(StreamableHttpPostResponse::Sse(event_stream, session_id)) + } + Some(ct) if ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) => { + match response.json::().await { + Ok(message) => Ok(StreamableHttpPostResponse::Json(message, session_id)), + Err(e) => { + tracing::warn!( + "could not parse JSON response as ServerJsonRpcMessage, treating as accepted: {e}" + ); + Ok(StreamableHttpPostResponse::Accepted) + } + } + } + _ => { + tracing::error!("unexpected content type: {:?}", content_type); + Err(StreamableHttpError::UnexpectedContentType(content_type)) + } + } + } +} diff --git a/app/src/ai/mcp/reconnecting_peer.rs b/app/src/ai/mcp/reconnecting_peer.rs index 71fbde0a1..992c032e4 100644 --- a/app/src/ai/mcp/reconnecting_peer.rs +++ b/app/src/ai/mcp/reconnecting_peer.rs @@ -120,7 +120,7 @@ impl ReconnectingPeer { /// Calls a tool on the MCP server. pub async fn call_tool( &self, - params: rmcp::model::CallToolRequestParam, + params: rmcp::model::CallToolRequestParams, ) -> Result { self.with_reconnect_retry(params, |peer, p| async move { peer.call_tool(p).await }) .await @@ -129,7 +129,7 @@ impl ReconnectingPeer { /// Reads a resource from the MCP server. pub async fn read_resource( &self, - params: rmcp::model::ReadResourceRequestParam, + params: rmcp::model::ReadResourceRequestParams, ) -> Result { self.with_reconnect_retry(params, |peer, p| async move { peer.read_resource(p).await }) .await diff --git a/app/src/ai/mcp/templatable_manager/native.rs b/app/src/ai/mcp/templatable_manager/native.rs index 421e06b72..0a695c824 100644 --- a/app/src/ai/mcp/templatable_manager/native.rs +++ b/app/src/ai/mcp/templatable_manager/native.rs @@ -36,9 +36,7 @@ use crate::{ }, cloud_object::{GenericStringObjectFormat, JsonObjectType}, drive::CloudObjectTypeAndId, - persistence::{ - database_file_path_for_scope, establish_ro_connection, ModelEvent, PersistenceScope, - }, + persistence::ModelEvent, send_telemetry_from_ctx, server::{ cloud_objects::update_manager::UpdateManager, ids::SyncId, telemetry::TelemetryEvent, @@ -160,6 +158,7 @@ fn error_to_user_message(error: &rmcp::RmcpError) -> String { } _ => format!("Service error: {}", err), }, + _ => format!("Error: {}", error), } } @@ -287,13 +286,15 @@ impl TemplatableMCPServerManager { _ => {} }); - let database_connection = database_file_path_for_scope(&PersistenceScope::App) - .to_str() - .and_then(|db_url| { - establish_ro_connection(db_url) - .ok() - .map(|conn| Arc::new(Mutex::new(conn))) - }); + let database_connection = crate::persistence::database_file_path_for_scope( + &crate::persistence::PersistenceScope::App, + ) + .to_str() + .and_then(|db_url| { + crate::persistence::establish_ro_connection(db_url) + .ok() + .map(|conn| Arc::new(Mutex::new(conn))) + }); let mut me = Self { cloud_templatable_mcp_servers: Default::default(), @@ -1722,8 +1723,7 @@ impl TemplatableMCPServerManager { } } -type ReqwestHttpTransport = rmcp::transport::StreamableHttpClientTransport; -type ReqwestSseTransport = rmcp::transport::SseClientTransport; +use crate::ai::mcp::http_client::ReqwestHttpTransport; /// Spawns a new MCP server from a given [`TransportType`]. async fn spawn_server( @@ -1855,7 +1855,7 @@ async fn spawn_server( logger.log("[info] MCP: Using Streaming HTTP transport".to_string()); let transport = rmcp::transport::StreamableHttpClientTransport::with_client( - client, + crate::ai::mcp::http_client::McpHttpClient(client), rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig::with_uri( sse_server.url.clone(), ), @@ -1868,12 +1868,14 @@ async fn spawn_server( } Ok(Transport::Http(None)) => { logger.log("[info] MCP: Using Streaming HTTP transport".to_string()); - let transport = if headers.is_empty() { - rmcp::transport::StreamableHttpClientTransport::from_uri( - sse_server.url.clone(), - ) - } else { - let client = build_client_with_headers(&headers)?; + let transport = { + let client = if headers.is_empty() { + crate::ai::mcp::http_client::McpHttpClient::default() + } else { + crate::ai::mcp::http_client::McpHttpClient(build_client_with_headers( + &headers, + )?) + }; rmcp::transport::StreamableHttpClientTransport::with_client( client, rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig::with_uri( @@ -1887,44 +1889,22 @@ async fn spawn_server( }; Ok(make_client_info().into_dyn().serve(transport).await?) } - Ok(Transport::Sse(Some(client))) => { - is_authenticated_transport = true; - - logger.log("[info] MCP: Using (legacy) SSE transport (due to preflight failing with a 404)".to_string()); - let transport = rmcp::transport::SseClientTransport::start_with_client( - client, - rmcp::transport::sse_client::SseClientConfig { - sse_endpoint: sse_server.url.into(), - ..Default::default() - }, - ) - .await - .map_err(rmcp::RmcpError::transport_creation::)?; - let transport = TransportLoggingWrapper { - transport, - logger: logger.clone(), - }; - Ok(make_client_info().into_dyn().serve(transport).await?) - } - Ok(Transport::Sse(None)) => { - logger.log("[info] MCP: Using (legacy) SSE transport (due to preflight failing with a 404)".to_string()); - let transport = if headers.is_empty() { - rmcp::transport::SseClientTransport::start(sse_server.url.clone()) - .await - .map_err(|e| { - rmcp::RmcpError::transport_creation::(e) - })? - } else { - let client = build_client_with_headers(&headers)?; - rmcp::transport::SseClientTransport::start_with_client( + Ok(Transport::LegacySse) => { + logger.log("[warn] MCP: Server only supports legacy SSE transport, which is no longer supported by this client. Attempting Streamable HTTP anyway.".to_string()); + let transport = { + let client = if headers.is_empty() { + crate::ai::mcp::http_client::McpHttpClient::default() + } else { + crate::ai::mcp::http_client::McpHttpClient(build_client_with_headers( + &headers, + )?) + }; + rmcp::transport::StreamableHttpClientTransport::with_client( client, - rmcp::transport::sse_client::SseClientConfig { - sse_endpoint: sse_server.url.clone().into(), - ..Default::default() - }, + rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig::with_uri( + sse_server.url.clone(), + ), ) - .await - .map_err(rmcp::RmcpError::transport_creation::)? }; let transport = TransportLoggingWrapper { transport, @@ -1965,9 +1945,9 @@ async fn spawn_server( /// The transport to use for MCP. enum Transport { /// The HTTP transport, with an optional authenticated client. - Http(Option>), - /// The SSE transport, with an optional authenticated client. - Sse(Option>), + Http(Option), + /// Legacy SSE transport (server responded with 404 to HTTP transport check). + LegacySse, } /// Determines which transport to use. @@ -1990,7 +1970,7 @@ async fn determine_transport( } match send_initialize_request(url, headers, None).await? { StatusCode::OK => Ok(Transport::Http(None)), - StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED => Ok(Transport::Sse(None)), + StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED => Ok(Transport::LegacySse), StatusCode::UNAUTHORIZED => { if !FeatureFlag::McpOauth.is_enabled() { return Err(rmcp::RmcpError::transport_creation::( @@ -2001,14 +1981,15 @@ async fn determine_transport( let spawner = auth_context.spawner.clone(); // Go through the OAuth flow to get an authenticated client. // This will first attempt to use cached credentials before starting interactive OAuth. - let (client, did_require_login) = oauth::make_authenticated_client(url, auth_context) - .boxed() - .await - .map_err(rmcp::RmcpError::transport_creation::)?; - let transport = match send_initialize_request(url, headers, Some(&client)).await? { - StatusCode::OK => Ok(Transport::Http(Some(client))), + let (auth_client, did_require_login) = + oauth::make_authenticated_client(url, auth_context) + .boxed() + .await + .map_err(rmcp::RmcpError::transport_creation::)?; + let transport = match send_initialize_request(url, headers, Some(&auth_client)).await? { + StatusCode::OK => Ok(Transport::Http(Some(auth_client.http_client.clone()))), StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED => { - Ok(Transport::Sse(Some(client))) + Ok(Transport::Http(Some(auth_client.http_client.clone()))) } other => Err(unexpected_error(other)), }; @@ -2078,19 +2059,15 @@ async fn send_initialize_request( /// /// This tells the MCP server who we are and what capabilities we have. fn make_client_info() -> rmcp::model::ClientInfo { - rmcp::model::ClientInfo { - protocol_version: Default::default(), - capabilities: Default::default(), - client_info: rmcp::model::Implementation { - name: warp_core::channel::ChannelState::app_id().to_string(), - version: warp_core::channel::ChannelState::app_version() + rmcp::model::ClientInfo::new( + Default::default(), + rmcp::model::Implementation::new( + warp_core::channel::ChannelState::app_id().to_string(), + warp_core::channel::ChannelState::app_version() .map(|v| v.to_string()) .unwrap_or_default(), - title: None, - icons: None, - website_url: None, - }, - } + ), + ) } /// A wrapper around a [`rmcp::transport::Transport`] that logs all requests and responses. diff --git a/app/src/ai/mcp/templatable_manager/oauth.rs b/app/src/ai/mcp/templatable_manager/oauth.rs index 3e9c746e9..8bdb6cba9 100644 --- a/app/src/ai/mcp/templatable_manager/oauth.rs +++ b/app/src/ai/mcp/templatable_manager/oauth.rs @@ -274,12 +274,10 @@ pub async fn make_authenticated_client( // If this is a client for which we have a known client secret, // update our client config accordingly. if let Some(client_secret) = &client_secret { - auth_manager.configure_client(OAuthClientConfig { - client_id: credentials.client_id.clone(), - client_secret: Some(client_secret.clone()), - scopes: vec![], - redirect_uri: redirect_uri.clone(), - })?; + auth_manager.configure_client( + OAuthClientConfig::new(credentials.client_id.clone(), redirect_uri.clone()) + .with_client_secret(client_secret.clone()), + )?; } // GitHub does not issue refresh tokens for OAuth apps; their access tokens are valid @@ -359,7 +357,10 @@ pub async fn make_authenticated_client( // For apps for which we have static client IDs (e.g. GitHub), we manually override scopes. let mut scopes: &[&str] = &[]; - let config = match auth_manager.register_client("Warp", &redirect_uri).await { + let config = match auth_manager + .register_client("Warp", &redirect_uri, scopes) + .await + { Ok(config) => config, Err(err @ AuthError::RegistrationFailed(_)) => { // If we failed dynamic registration, check to see if this is an auth @@ -379,14 +380,8 @@ pub async fn make_authenticated_client( scopes = &GITHUB_OAUTH_SCOPES; } - OAuthClientConfig { - client_id: provider.client_id.into_owned(), - client_secret: Some(provider.client_secret.into_owned()), - redirect_uri: redirect_uri.clone(), - // This `scopes` field appears to be unused by rmcp as of 9/17/25 - we pass scopes - // in construction of the authorization url below. - scopes: vec![], - } + OAuthClientConfig::new(provider.client_id.into_owned(), redirect_uri.clone()) + .with_client_secret(provider.client_secret.into_owned()) } Err(e) => return Err(e), }; @@ -395,11 +390,11 @@ pub async fn make_authenticated_client( auth_manager.configure_client(config)?; let auth_url = auth_manager.get_authorization_url(scopes).await?; - oauth_state = OAuthState::Session(AuthorizationSession { + oauth_state = OAuthState::Session(AuthorizationSession::for_scope_upgrade( auth_manager, - auth_url: auth_url.clone(), - redirect_uri, - }); + auth_url.clone(), + &redirect_uri, + )); // Extract the CSRF token that rmcp embedded as the `state` query parameter in the // authorization URL. We register a csrf→uuid mapping on the manager so that