Skip to content

Commit 969fad2

Browse files
authored
feat: provide more context information (#236)
* feat: provide more context information 1. provide context for notification 2. allow extract more info from tool call 3. inject http request part for streamable http server * docs: add document for getting peer from context
1 parent 9fc6af0 commit 969fad2

File tree

11 files changed

+194
-40
lines changed

11 files changed

+194
-40
lines changed

crates/rmcp/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,39 @@ let service = client.serve(transport).await?;
146146

147147

148148

149+
## Access with peer interface when handling message
150+
151+
You can get the [`Peer`](crate::service::Peer) struct from [`NotificationContext`](crate::service::NotificationContext) and [`RequestContext`](crate::service::RequestContext).
152+
153+
```rust, ignore
154+
# use rmcp::{
155+
# ServerHandler,
156+
# model::{LoggingLevel, LoggingMessageNotificationParam, ProgressNotificationParam},
157+
# service::{NotificationContext, RoleServer},
158+
# };
159+
# pub struct Handler;
160+
161+
impl ServerHandler for Handler {
162+
async fn on_progress(
163+
&self,
164+
notification: ProgressNotificationParam,
165+
context: NotificationContext<RoleServer>,
166+
) {
167+
let peer = context.peer;
168+
let _ = peer
169+
.notify_logging_message(LoggingMessageNotificationParam {
170+
level: LoggingLevel::Info,
171+
logger: None,
172+
data: serde_json::json!({
173+
"message": format!("Progress: {}", notification.progress),
174+
}),
175+
})
176+
.await;
177+
}
178+
}
179+
```
180+
181+
149182
## Manage Multi Services
150183

151184
For many cases you need to manage several service in a collection, you can call `into_dyn` to convert services into the same type.

crates/rmcp/src/handler/client.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
error::Error as McpError,
33
model::*,
4-
service::{RequestContext, RoleClient, Service, ServiceRole},
4+
service::{NotificationContext, RequestContext, RoleClient, Service, ServiceRole},
55
};
66

77
impl<H: ClientHandler> Service<RoleClient> for H {
@@ -26,28 +26,29 @@ impl<H: ClientHandler> Service<RoleClient> for H {
2626
async fn handle_notification(
2727
&self,
2828
notification: <RoleClient as ServiceRole>::PeerNot,
29+
context: NotificationContext<RoleClient>,
2930
) -> Result<(), McpError> {
3031
match notification {
3132
ServerNotification::CancelledNotification(notification) => {
32-
self.on_cancelled(notification.params).await
33+
self.on_cancelled(notification.params, context).await
3334
}
3435
ServerNotification::ProgressNotification(notification) => {
35-
self.on_progress(notification.params).await
36+
self.on_progress(notification.params, context).await
3637
}
3738
ServerNotification::LoggingMessageNotification(notification) => {
38-
self.on_logging_message(notification.params).await
39+
self.on_logging_message(notification.params, context).await
3940
}
4041
ServerNotification::ResourceUpdatedNotification(notification) => {
41-
self.on_resource_updated(notification.params).await
42+
self.on_resource_updated(notification.params, context).await
4243
}
4344
ServerNotification::ResourceListChangedNotification(_notification_no_param) => {
44-
self.on_resource_list_changed().await
45+
self.on_resource_list_changed(context).await
4546
}
4647
ServerNotification::ToolListChangedNotification(_notification_no_param) => {
47-
self.on_tool_list_changed().await
48+
self.on_tool_list_changed(context).await
4849
}
4950
ServerNotification::PromptListChangedNotification(_notification_no_param) => {
50-
self.on_prompt_list_changed().await
51+
self.on_prompt_list_changed(context).await
5152
}
5253
};
5354
Ok(())
@@ -87,34 +88,47 @@ pub trait ClientHandler: Sized + Send + Sync + 'static {
8788
fn on_cancelled(
8889
&self,
8990
params: CancelledNotificationParam,
91+
context: NotificationContext<RoleClient>,
9092
) -> impl Future<Output = ()> + Send + '_ {
9193
std::future::ready(())
9294
}
9395
fn on_progress(
9496
&self,
9597
params: ProgressNotificationParam,
98+
context: NotificationContext<RoleClient>,
9699
) -> impl Future<Output = ()> + Send + '_ {
97100
std::future::ready(())
98101
}
99102
fn on_logging_message(
100103
&self,
101104
params: LoggingMessageNotificationParam,
105+
context: NotificationContext<RoleClient>,
102106
) -> impl Future<Output = ()> + Send + '_ {
103107
std::future::ready(())
104108
}
105109
fn on_resource_updated(
106110
&self,
107111
params: ResourceUpdatedNotificationParam,
112+
context: NotificationContext<RoleClient>,
108113
) -> impl Future<Output = ()> + Send + '_ {
109114
std::future::ready(())
110115
}
111-
fn on_resource_list_changed(&self) -> impl Future<Output = ()> + Send + '_ {
116+
fn on_resource_list_changed(
117+
&self,
118+
context: NotificationContext<RoleClient>,
119+
) -> impl Future<Output = ()> + Send + '_ {
112120
std::future::ready(())
113121
}
114-
fn on_tool_list_changed(&self) -> impl Future<Output = ()> + Send + '_ {
122+
fn on_tool_list_changed(
123+
&self,
124+
context: NotificationContext<RoleClient>,
125+
) -> impl Future<Output = ()> + Send + '_ {
115126
std::future::ready(())
116127
}
117-
fn on_prompt_list_changed(&self) -> impl Future<Output = ()> + Send + '_ {
128+
fn on_prompt_list_changed(
129+
&self,
130+
context: NotificationContext<RoleClient>,
131+
) -> impl Future<Output = ()> + Send + '_ {
118132
std::future::ready(())
119133
}
120134

crates/rmcp/src/handler/server.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
error::Error as McpError,
33
model::*,
4-
service::{RequestContext, RoleServer, Service, ServiceRole},
4+
service::{NotificationContext, RequestContext, RoleServer, Service, ServiceRole},
55
};
66

77
mod resource;
@@ -71,19 +71,20 @@ impl<H: ServerHandler> Service<RoleServer> for H {
7171
async fn handle_notification(
7272
&self,
7373
notification: <RoleServer as ServiceRole>::PeerNot,
74+
context: NotificationContext<RoleServer>,
7475
) -> Result<(), McpError> {
7576
match notification {
7677
ClientNotification::CancelledNotification(notification) => {
77-
self.on_cancelled(notification.params).await
78+
self.on_cancelled(notification.params, context).await
7879
}
7980
ClientNotification::ProgressNotification(notification) => {
80-
self.on_progress(notification.params).await
81+
self.on_progress(notification.params, context).await
8182
}
8283
ClientNotification::InitializedNotification(_notification) => {
83-
self.on_initialized().await
84+
self.on_initialized(context).await
8485
}
8586
ClientNotification::RootsListChangedNotification(_notification) => {
86-
self.on_roots_list_changed().await
87+
self.on_roots_list_changed(context).await
8788
}
8889
};
8990
Ok(())
@@ -196,20 +197,28 @@ pub trait ServerHandler: Sized + Send + Sync + 'static {
196197
fn on_cancelled(
197198
&self,
198199
notification: CancelledNotificationParam,
200+
context: NotificationContext<RoleServer>,
199201
) -> impl Future<Output = ()> + Send + '_ {
200202
std::future::ready(())
201203
}
202204
fn on_progress(
203205
&self,
204206
notification: ProgressNotificationParam,
207+
context: NotificationContext<RoleServer>,
205208
) -> impl Future<Output = ()> + Send + '_ {
206209
std::future::ready(())
207210
}
208-
fn on_initialized(&self) -> impl Future<Output = ()> + Send + '_ {
211+
fn on_initialized(
212+
&self,
213+
context: NotificationContext<RoleServer>,
214+
) -> impl Future<Output = ()> + Send + '_ {
209215
tracing::info!("client initialized");
210216
std::future::ready(())
211217
}
212-
fn on_roots_list_changed(&self) -> impl Future<Output = ()> + Send + '_ {
218+
fn on_roots_list_changed(
219+
&self,
220+
context: NotificationContext<RoleServer>,
221+
) -> impl Future<Output = ()> + Send + '_ {
213222
std::future::ready(())
214223
}
215224

crates/rmcp/src/handler/server/tool.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,42 @@ where
320320
}
321321
}
322322

323+
impl<'a, S> FromToolCallContextPart<'a, S> for crate::Peer<RoleServer> {
324+
fn from_tool_call_context_part(
325+
context: ToolCallContext<'a, S>,
326+
) -> Result<(Self, ToolCallContext<'a, S>), crate::Error> {
327+
let peer = context.request_context.peer.clone();
328+
Ok((peer, context))
329+
}
330+
}
331+
332+
impl<'a, S> FromToolCallContextPart<'a, S> for crate::model::Meta {
333+
fn from_tool_call_context_part(
334+
mut context: ToolCallContext<'a, S>,
335+
) -> Result<(Self, ToolCallContext<'a, S>), crate::Error> {
336+
let mut meta = crate::model::Meta::default();
337+
std::mem::swap(&mut meta, &mut context.request_context.meta);
338+
Ok((meta, context))
339+
}
340+
}
341+
342+
pub struct RequestId(pub crate::model::RequestId);
343+
impl<'a, S> FromToolCallContextPart<'a, S> for RequestId {
344+
fn from_tool_call_context_part(
345+
context: ToolCallContext<'a, S>,
346+
) -> Result<(Self, ToolCallContext<'a, S>), crate::Error> {
347+
Ok((RequestId(context.request_context.id.clone()), context))
348+
}
349+
}
350+
351+
impl<'a, S> FromToolCallContextPart<'a, S> for RequestContext<RoleServer> {
352+
fn from_tool_call_context_part(
353+
context: ToolCallContext<'a, S>,
354+
) -> Result<(Self, ToolCallContext<'a, S>), crate::Error> {
355+
Ok((context.request_context.clone(), context))
356+
}
357+
}
358+
323359
impl<'s, S> ToolCallContext<'s, S> {
324360
pub fn invoke<H, A>(self, h: H) -> H::Fut
325361
where

crates/rmcp/src/model/extension.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub struct Extensions {
4949
impl Extensions {
5050
/// Create an empty `Extensions`.
5151
#[inline]
52-
pub fn new() -> Extensions {
52+
pub const fn new() -> Extensions {
5353
Extensions { map: None }
5454
}
5555

crates/rmcp/src/service.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
7676
type PeerResp: TransferObject;
7777
type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
7878
+ From<CancelledNotification>
79-
+ TransferObject;
79+
+ TransferObject
80+
+ GetMeta
81+
+ GetExtensions;
8082
type InitializeError<E>;
8183
const IS_CLIENT: bool;
8284
type Info: TransferObject;
@@ -100,6 +102,7 @@ pub trait Service<R: ServiceRole>: Send + Sync + 'static {
100102
fn handle_notification(
101103
&self,
102104
notification: R::PeerNot,
105+
context: NotificationContext<R>,
103106
) -> impl Future<Output = Result<(), McpError>> + Send + '_;
104107
fn get_info(&self) -> R::Info;
105108
}
@@ -145,8 +148,9 @@ impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
145148
fn handle_notification(
146149
&self,
147150
notification: R::PeerNot,
151+
context: NotificationContext<R>,
148152
) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
149-
DynService::handle_notification(self.as_ref(), notification)
153+
DynService::handle_notification(self.as_ref(), notification, context)
150154
}
151155

152156
fn get_info(&self) -> R::Info {
@@ -160,7 +164,11 @@ pub trait DynService<R: ServiceRole>: Send + Sync {
160164
request: R::PeerReq,
161165
context: RequestContext<R>,
162166
) -> BoxFuture<Result<R::Resp, McpError>>;
163-
fn handle_notification(&self, notification: R::PeerNot) -> BoxFuture<Result<(), McpError>>;
167+
fn handle_notification(
168+
&self,
169+
notification: R::PeerNot,
170+
context: NotificationContext<R>,
171+
) -> BoxFuture<Result<(), McpError>>;
164172
fn get_info(&self) -> R::Info;
165173
}
166174

@@ -172,8 +180,12 @@ impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
172180
) -> BoxFuture<Result<R::Resp, McpError>> {
173181
Box::pin(self.handle_request(request, context))
174182
}
175-
fn handle_notification(&self, notification: R::PeerNot) -> BoxFuture<Result<(), McpError>> {
176-
Box::pin(self.handle_notification(notification))
183+
fn handle_notification(
184+
&self,
185+
notification: R::PeerNot,
186+
context: NotificationContext<R>,
187+
) -> BoxFuture<Result<(), McpError>> {
188+
Box::pin(self.handle_notification(notification, context))
177189
}
178190
fn get_info(&self) -> R::Info {
179191
self.get_info()
@@ -487,6 +499,15 @@ pub struct RequestContext<R: ServiceRole> {
487499
pub peer: Peer<R>,
488500
}
489501

502+
/// Request execution context
503+
#[derive(Debug, Clone)]
504+
pub struct NotificationContext<R: ServiceRole> {
505+
pub meta: Meta,
506+
pub extensions: Extensions,
507+
/// An interface to fetch the remote client or server
508+
pub peer: Peer<R>,
509+
}
510+
490511
/// Use this function to skip initialization process
491512
pub fn serve_directly<R, S, T, E, A>(
492513
service: S,
@@ -710,7 +731,9 @@ where
710731
}));
711732
}
712733
Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
713-
id, request, ..
734+
id,
735+
mut request,
736+
..
714737
})) => {
715738
tracing::debug!(%id, ?request, "received request");
716739
{
@@ -719,12 +742,17 @@ where
719742
let request_ct = serve_loop_ct.child_token();
720743
let context_ct = request_ct.child_token();
721744
local_ct_pool.insert(id.clone(), request_ct);
745+
let mut extensions = Extensions::new();
746+
let mut meta = Meta::new();
747+
// avoid clone
748+
std::mem::swap(&mut extensions, request.extensions_mut());
749+
std::mem::swap(&mut meta, request.get_meta_mut());
722750
let context = RequestContext {
723751
ct: context_ct,
724752
id: id.clone(),
725753
peer: peer.clone(),
726-
meta: request.get_meta().clone(),
727-
extensions: request.extensions().clone(),
754+
meta,
755+
extensions,
728756
};
729757
tokio::spawn(async move {
730758
let result = service.handle_request(request, context).await;
@@ -748,7 +776,7 @@ where
748776
})) => {
749777
tracing::info!(?notification, "received notification");
750778
// catch cancelled notification
751-
let notification = match notification.try_into() {
779+
let mut notification = match notification.try_into() {
752780
Ok::<CancelledNotification, _>(cancelled) => {
753781
if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
754782
tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
@@ -760,8 +788,18 @@ where
760788
};
761789
{
762790
let service = shared_service.clone();
791+
let mut extensions = Extensions::new();
792+
let mut meta = Meta::new();
793+
// avoid clone
794+
std::mem::swap(&mut extensions, notification.extensions_mut());
795+
std::mem::swap(&mut meta, notification.get_meta_mut());
796+
let context = NotificationContext {
797+
peer: peer.clone(),
798+
meta,
799+
extensions,
800+
};
763801
tokio::spawn(async move {
764-
let result = service.handle_notification(notification).await;
802+
let result = service.handle_notification(notification, context).await;
765803
if let Err(error) = result {
766804
tracing::warn!(%error, "Error sending notification");
767805
}

0 commit comments

Comments
 (0)