diff --git a/crates/rmcp/src/model.rs b/crates/rmcp/src/model.rs index b473e9ac..4aabab1d 100644 --- a/crates/rmcp/src/model.rs +++ b/crates/rmcp/src/model.rs @@ -461,13 +461,17 @@ pub struct JsonRpcResponse { #[expect(clippy::exhaustive_structs, reason = "intentionally exhaustive")] pub struct JsonRpcError { pub jsonrpc: JsonRpcVersion2_0, - pub id: RequestId, + // MCP 2025-11-25 §Error Responses: `id` is optional and omitted when the + // server cannot read the request id (e.g. parse error / invalid request). + // https://modelcontextprotocol.io/specification/2025-11-25/basic#error-responses + #[serde(default, skip_serializing_if = "Option::is_none")] + pub id: Option, pub error: ErrorData, } impl JsonRpcError { /// Create a new JsonRpcError. - pub fn new(id: RequestId, error: ErrorData) -> Self { + pub fn new(id: Option, error: ErrorData) -> Self { Self { jsonrpc: JsonRpcVersion2_0, id, @@ -601,7 +605,7 @@ impl JsonRpcMessage { }) } #[inline] - pub const fn error(error: ErrorData, id: RequestId) -> Self { + pub const fn error(error: ErrorData, id: Option) -> Self { JsonRpcMessage::Error(JsonRpcError { jsonrpc: JsonRpcVersion2_0, id, @@ -633,15 +637,15 @@ impl JsonRpcMessage { _ => None, } } - pub fn into_error(self) -> Option<(ErrorData, RequestId)> { + pub fn into_error(self) -> Option<(ErrorData, Option)> { match self { JsonRpcMessage::Error(e) => Some((e.error, e.id)), _ => None, } } - pub fn into_result(self) -> Option<(Result, RequestId)> { + pub fn into_result(self) -> Option<(Result, Option)> { match self { - JsonRpcMessage::Response(r) => Some((Ok(r.result), r.id)), + JsonRpcMessage::Response(r) => Some((Ok(r.result), Some(r.id))), JsonRpcMessage::Error(e) => Some((Err(e.error), e.id)), _ => None, diff --git a/crates/rmcp/src/service.rs b/crates/rmcp/src/service.rs index 65b5ee71..d938cd66 100644 --- a/crates/rmcp/src/service.rs +++ b/crates/rmcp/src/service.rs @@ -881,7 +881,7 @@ where Event::ToSink(m) => { if let Some(id) = match &m { JsonRpcMessage::Response(response) => Some(&response.id), - JsonRpcMessage::Error(error) => Some(&error.id), + JsonRpcMessage::Error(error) => error.id.as_ref(), _ => None, } { if let Some(ct) = local_ct_pool.remove(id) { @@ -971,7 +971,7 @@ where } Err(error) => { tracing::warn!(%id, ?error, "response error"); - JsonRpcMessage::error(error, id) + JsonRpcMessage::error(error, Some(id)) } }; let _send_result = sink.send(response).await; @@ -1028,6 +1028,12 @@ where } } Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => { + let Some(id) = id else { + // MCP error responses without an id (e.g. Parse error / Invalid Request) + // can't be routed back to a pending request — log and drop. + tracing::debug!(?error, "received id-less peer error"); + continue; + }; if let Some(responder) = local_responder_pool.remove(&id) { let _response_result = responder.send(Err(ServiceError::McpError(error))); if let Err(_error) = _response_result { diff --git a/crates/rmcp/src/service/server.rs b/crates/rmcp/src/service/server.rs index 82db47b8..530e508e 100644 --- a/crates/rmcp/src/service/server.rs +++ b/crates/rmcp/src/service/server.rs @@ -219,7 +219,7 @@ where } Err(e) => { transport - .send(ServerJsonRpcMessage::error(e.clone(), id)) + .send(ServerJsonRpcMessage::error(e.clone(), Some(id))) .await .map_err(|error| { ServerInitializeError::transport::(error, "sending error response") diff --git a/crates/rmcp/src/transport/async_rw.rs b/crates/rmcp/src/transport/async_rw.rs index b14d94c3..2ef0aae2 100644 --- a/crates/rmcp/src/transport/async_rw.rs +++ b/crates/rmcp/src/transport/async_rw.rs @@ -1,20 +1,22 @@ use std::{marker::PhantomData, sync::Arc}; -// use crate::schema::*; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; use serde::{Serialize, de::DeserializeOwned}; use thiserror::Error; use tokio::{ - io::{AsyncRead, AsyncWrite}, + io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader}, sync::Mutex, }; use tokio_util::{ bytes::{Buf, BufMut, BytesMut}, - codec::{Decoder, Encoder, FramedRead, FramedWrite}, + codec::{Decoder, Encoder, FramedWrite}, }; use super::{IntoTransport, Transport}; -use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage}; +use crate::{ + model::ErrorData, + service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage}, +}; #[non_exhaustive] pub enum TransportAdapterAsyncRW {} @@ -47,8 +49,10 @@ where pub type TransportWriter = FramedWrite>>; pub struct AsyncRwTransport { - read: FramedRead>>, + read: BufReader, + line_buf: Vec, write: Arc>>>, + _role: PhantomData Role>, } impl AsyncRwTransport @@ -57,15 +61,17 @@ where W: Send + AsyncWrite + Unpin + 'static, { pub fn new(read: R, write: W) -> Self { - let read = FramedRead::new( - read, - JsonRpcMessageCodec::>::default(), - ); + let read = BufReader::new(read); let write = Arc::new(Mutex::new(Some(FramedWrite::new( write, JsonRpcMessageCodec::>::default(), )))); - Self { read, write } + Self { + read, + line_buf: Vec::new(), + write, + _role: PhantomData, + } } } @@ -116,15 +122,43 @@ where } } - fn receive(&mut self) -> impl Future>> { - let next = self.read.next(); - async { - next.await.and_then(|e| { - e.inspect_err(|e| { + async fn receive(&mut self) -> Option> { + loop { + self.line_buf.clear(); + match self.read.read_until(b'\n', &mut self.line_buf).await { + Ok(0) => return None, + Ok(_) => {} + Err(e) => { tracing::error!("Error reading from stream: {}", e); - }) - .ok() - }) + return None; + } + } + let line = without_carriage_return( + self.line_buf.strip_suffix(b"\n").unwrap_or(&self.line_buf), + ); + if line.is_empty() { + continue; + } + match try_parse_with_compatibility::>(line, "receive") { + Ok(Some(msg)) => return Some(msg), + Ok(None) => continue, + Err(JsonRpcMessageCodecError::Serde(e)) => { + tracing::debug!("Parse error on incoming message: {e}"); + let mut write = self.write.lock().await; + let framed = write.as_mut()?; + let response = TxJsonRpcMessage::::error( + ErrorData::parse_error("Parse error", None), + None, + ); + if framed.send(response).await.is_err() { + return None; + } + } + Err(e) => { + tracing::error!("Error reading from stream: {}", e); + return None; + } + } } } @@ -172,13 +206,12 @@ impl JsonRpcMessageCodec { } fn without_carriage_return(s: &[u8]) -> &[u8] { - if let Some(&b'\r') = s.last() { - &s[..s.len() - 1] - } else { - s - } + s.strip_suffix(b"\r").unwrap_or(s) } +/// UTF-8 byte order mark. RFC 8259 §8.1 allows JSON parsers to ignore a leading BOM. +const UTF8_BOM: &[u8; 3] = b"\xEF\xBB\xBF"; + /// Check if a method is a standard MCP method (request, response, or notification). /// This includes both requests and notifications defined in the MCP specification. /// @@ -247,6 +280,7 @@ fn try_parse_with_compatibility( line: &[u8], context: &str, ) -> Result, JsonRpcMessageCodecError> { + let line = line.strip_prefix(UTF8_BOM.as_slice()).unwrap_or(line); if let Ok(line_str) = std::str::from_utf8(line) { match serde_json::from_slice(line) { Ok(item) => Ok(Some(item)), @@ -406,7 +440,8 @@ impl Encoder for JsonRpcMessageCodec { #[cfg(test)] mod test { - use futures::{Sink, Stream}; + use futures::{Sink, Stream, StreamExt}; + use tokio_util::codec::FramedRead; use super::*; fn from_async_read(reader: R) -> impl Stream { @@ -555,4 +590,76 @@ mod test { println!("Standard notifications are preserved, non-standard are handled gracefully"); } + + #[tokio::test] + async fn test_decode_strips_utf8_bom() { + use futures::StreamExt; + use tokio::io::BufReader; + + // Valid JSON-RPC message preceded by a UTF-8 BOM (EF BB BF). Some Windows + // tooling and editors prepend this; the codec should ignore it per RFC 8259 §8.1. + let mut data = Vec::new(); + data.extend_from_slice(UTF8_BOM); + data.extend_from_slice(br#"{"jsonrpc":"2.0","method":"ping","id":1}"#); + data.push(b'\n'); + + let mut cursor = BufReader::new(&data[..]); + let mut stream = from_async_read::(&mut cursor); + + let item = stream + .next() + .await + .expect("should decode BOM-prefixed line"); + assert_eq!( + item, + serde_json::json!({"jsonrpc": "2.0", "method": "ping", "id": 1}) + ); + } + + #[cfg(feature = "server")] + #[tokio::test] + async fn receive_recovers_from_parse_error() { + use tokio::io::AsyncWriteExt; + + use crate::{RoleServer, transport::Transport}; + + // Two paired streams: `server_io` is wrapped by the transport; the test + // drives `client_io` to act as the peer. + let (server_io, client_io) = tokio::io::duplex(4096); + let (server_r, server_w) = tokio::io::split(server_io); + let (mut client_r, mut client_w) = tokio::io::split(client_io); + + let mut transport = AsyncRwTransport::::new(server_r, server_w); + + client_w + .write_all( + b"not json\n{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n", + ) + .await + .unwrap(); + + let received = transport + .receive() + .await + .expect("transport should recover and yield the next valid message"); + + // Read one line back from the peer side and parse as JSON. + let mut reply_buf = Vec::new(); + let mut peer = tokio::io::BufReader::new(&mut client_r); + peer.read_until(b'\n', &mut reply_buf).await.unwrap(); + let reply: serde_json::Value = serde_json::from_slice(&reply_buf).unwrap(); + + // Per MCP 2025-11-25: id is omitted when the server can't read the request id. + assert_eq!( + reply, + serde_json::json!({ + "jsonrpc": "2.0", + "error": {"code": -32700, "message": "Parse error"}, + }) + ); + assert_eq!( + serde_json::to_value(&received).unwrap()["method"], + "notifications/initialized", + ); + } } diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index ed360475..54c7b558 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -523,14 +523,12 @@ impl LocalSessionWorker { } } ServerJsonRpcMessage::Error(json_rpc_error) => { - if let Some(id) = self - .resource_router - .get(&ResourceKey::McpRequestId(json_rpc_error.id.clone())) - { - OutboundChannel::RequestWise { - id: *id, - close: true, - } + if let Some(id) = json_rpc_error.id.clone().and_then(|rid| { + self.resource_router + .get(&ResourceKey::McpRequestId(rid)) + .copied() + }) { + OutboundChannel::RequestWise { id, close: true } } else { OutboundChannel::Common } @@ -1041,8 +1039,7 @@ impl Worker for LocalSessionWorker { Some(ResourceKey::McpRequestId(request_id)) } crate::model::JsonRpcMessage::Error(json_rpc_error) => { - let request_id = json_rpc_error.id.clone(); - Some(ResourceKey::McpRequestId(request_id)) + json_rpc_error.id.clone().map(ResourceKey::McpRequestId) } _ => { None diff --git a/crates/rmcp/tests/test_client_initialization.rs b/crates/rmcp/tests/test_client_initialization.rs index 4a91f3ac..f51b33ef 100644 --- a/crates/rmcp/tests/test_client_initialization.rs +++ b/crates/rmcp/tests/test_client_initialization.rs @@ -30,7 +30,7 @@ async fn test_client_init_handles_jsonrpc_error() { let error_msg = ServerJsonRpcMessage::Error(JsonRpcError { jsonrpc: JsonRpcVersion2_0, - id: RequestId::Number(1), + id: Some(RequestId::Number(1)), error: ErrorData { code: ErrorCode(-32600), message: Cow::Borrowed("Invalid Request"), diff --git a/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema.json b/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema.json index 8e082db9..f8f94c6c 100644 --- a/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema.json +++ b/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema.json @@ -862,7 +862,14 @@ "$ref": "#/definitions/ErrorData" }, "id": { - "$ref": "#/definitions/NumberOrString" + "anyOf": [ + { + "$ref": "#/definitions/NumberOrString" + }, + { + "type": "null" + } + ] }, "jsonrpc": { "$ref": "#/definitions/JsonRpcVersion2_0" @@ -870,7 +877,6 @@ }, "required": [ "jsonrpc", - "id", "error" ] }, diff --git a/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema_current.json b/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema_current.json index 8e082db9..f8f94c6c 100644 --- a/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema_current.json +++ b/crates/rmcp/tests/test_message_schema/client_json_rpc_message_schema_current.json @@ -862,7 +862,14 @@ "$ref": "#/definitions/ErrorData" }, "id": { - "$ref": "#/definitions/NumberOrString" + "anyOf": [ + { + "$ref": "#/definitions/NumberOrString" + }, + { + "type": "null" + } + ] }, "jsonrpc": { "$ref": "#/definitions/JsonRpcVersion2_0" @@ -870,7 +877,6 @@ }, "required": [ "jsonrpc", - "id", "error" ] }, diff --git a/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema.json b/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema.json index 405b3e02..c2af3fba 100644 --- a/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema.json +++ b/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema.json @@ -1281,7 +1281,14 @@ "$ref": "#/definitions/ErrorData" }, "id": { - "$ref": "#/definitions/NumberOrString" + "anyOf": [ + { + "$ref": "#/definitions/NumberOrString" + }, + { + "type": "null" + } + ] }, "jsonrpc": { "$ref": "#/definitions/JsonRpcVersion2_0" @@ -1289,7 +1296,6 @@ }, "required": [ "jsonrpc", - "id", "error" ] }, diff --git a/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema_current.json b/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema_current.json index 405b3e02..c2af3fba 100644 --- a/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema_current.json +++ b/crates/rmcp/tests/test_message_schema/server_json_rpc_message_schema_current.json @@ -1281,7 +1281,14 @@ "$ref": "#/definitions/ErrorData" }, "id": { - "$ref": "#/definitions/NumberOrString" + "anyOf": [ + { + "$ref": "#/definitions/NumberOrString" + }, + { + "type": "null" + } + ] }, "jsonrpc": { "$ref": "#/definitions/JsonRpcVersion2_0" @@ -1289,7 +1296,6 @@ }, "required": [ "jsonrpc", - "id", "error" ] },