[MCP在LangChain中的应用-05]如何实现基于反向通信的进度报告、日志回传和信息征询
MultiServerMCPClient利用注册的回调可以帮助我们处理由服务端发送给客户端的如下三种请求和通知:
- 进度报告:订阅长时间运行工具执行的进度更新;
- 日志回传:MC协议支持记录来自服务器的通知;
- 信息征询:信息征询(Elicitation)机制允许MCP服务器在工具执行期间向用户请求额外输入。服务器无需预先获取所有输入,而是可以根据需要以交互方式请求信息;
上述三种回调对应Callbacks类型如下所示的三个字段(on_progress、on_logging_message和on_elicitation),它的to_mcp_format方法会将它们统一转换称MCP规范定义的标准形式。我们利用__init__方法的callbacks参数将Callbacks对象注册到MultiServerMCPClient对象上。
@dataclass
class Callbacks:
on_logging_message: LoggingMessageCallback | None = None
on_progress: ProgressCallback | None = None
on_elicitation: ElicitationCallback | None = None
def to_mcp_format(self, *, context: CallbackContext) -> _MCPCallbacks
class MultiServerMCPClient:
def __init__(
self,
connections: dict[str, Connection] | None = None,
*,
callbacks: Callbacks | None = None,
tool_interceptors: list[ToolCallInterceptor] | None = None,
tool_name_prefix: bool = False,
) -> None
1. 进度报告
客户端用于处理进度报告的ProgressCallback类型是一个具有唯一__call__方法的协议,所以我们可以将其此回调定义成函数。
@runtime_checkable
class ProgressCallback(Protocol):
async def __call__(
self,
progress: float,
total: float | None,
message: str | None,
context: CallbackContext,
) -> None
@dataclass
class CallbackContext:
server_name: str
tool_name: str | None = None
__call__方法参数说明:
- progress:表示完成工作量的数值;
- total:表示总工作量的数值;
- message:描述进度的文本消息;
- context:提供MCP服务器和当前调用工具名称的上下文;
我们通过一个简答的实例来演示如何利用注册到MultiServerMCPClient的ProgressCallback回调来实时显示服务端回传的进度。如下所示的是由FastMCP构建的MCP服务器。注册的long_running_task工具模拟一个长耗时(5秒)的操作,我们每隔一秒调用Context对象的report_progress方法报告一次进度。
import asyncio
from fastmcp import FastMCP
from fastmcp.server.context import Context
mcp = FastMCP("Server",tasks=True)
@mcp.tool
async def long_running_task(context:Context) -> int:
for i in range(1, 6):
await context.report_progress(progress=i, total=5, message=f"Step {i} completed")
await asyncio.sleep(1)
return 5
mcp.run(transport="streamable-http",host="0.0.0.0")
在如下所示的客户端程序中,我们定义了用于处理进度报告的handle_progress函数,它会实时输出接收到的进度通知。我们针对这个函数创建了Callbacks对象,并将其注册到连接上面这个MCP服务器的MultiServerMCPClient对象上。
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
import asyncio,datetime
async def handle_progress(
progress: float,
total: float | None,
message: str | None,
context: CallbackContext,
) -> None:
percentage = (progress / (total or 100)) * 100
print(f"[{datetime.datetime.now()}] Progress: {percentage:.1f}% - {message or ''}")
callbacks = Callbacks(
on_progress=handle_progress,
)
async def main():
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "streamable_http", # 或者 "streamable_http"
"url": "http://localhost:8000/mcp"
}
},
callbacks= callbacks
)
tool = (await client.get_tools(server_name="server"))[0]
result = await tool.ainvoke(input={})
print(f"Result: {result}")
asyncio.run(main())
我们调用MultiServerMCPClient的get_tools方法得到工具对象,并直接调用的ainvoke调用此工具。handle_progress函数输出的进度和最终的结果将会以如下的形式输出来:
[2026-04-11 23:33:42.309599] Progress: 20.0% - Step 1 completed
[2026-04-11 23:33:43.315791] Progress: 40.0% - Step 2 completed
[2026-04-11 23:33:44.317458] Progress: 60.0% - Step 3 completed
[2026-04-11 23:33:45.328593] Progress: 80.0% - Step 4 completed
[2026-04-11 23:33:46.344349] Progress: 100.0% - Step 5 completed
Result: [{'type': 'text', 'text': '5', 'id': 'lc_eeb16e28-6729-4c30-b316-a6a8385970f1'}]
2. 日志回传
用于处理服务端回传日志的回调类型LoggingMessageCallback定义如下,这个协议同样只定义了唯一的__call__方法,日志等级、logger(表示由谁写入)和数据可以通过params参数表示的LoggingMessageNotificationParams对象提取出来。
class LoggingMessageCallback(Protocol):
async def __call__(
self,
params: LoggingMessageNotificationParams,
context: CallbackContext,
) -> None
class LoggingMessageNotificationParams(NotificationParams):
level: LoggingLevel
logger: str | None = None
data: Any
model_config = ConfigDict(extra="allow")
在上面演示的例子中,服务端指定的工具通过调用Context对象的report_progress方法报告进度,我们也可以按照如下的方式将其改写成利用日志报告进度,Context的info方法表示采用Information等级写入日志。
import asyncio
from fastmcp import FastMCP
from fastmcp.server.context import Context
mcp = FastMCP("Server",tasks=True)
@mcp.tool
async def long_running_task(context:Context) -> int:
for i in range(1, 6):
await context.info(message=f"{i * 20}% completed")
await asyncio.sleep(1)
return 5
mcp.run(transport="streamable-http",host="0.0.0.0")
那么客户端程序注册到MultiServerMCPClient上的Callbacks只需按照如下的方式定义就可以了:
async def handle_logging(
params: LoggingMessageNotificationParams,
context: CallbackContext,
) -> None:
print(f"[{datetime.datetime.now()}] [{params.level}]{params.data}")
callbacks = Callbacks(
on_logging_message=handle_logging
)
工具执行的进度将以如下的形式输出:
[2026-04-11 23:53:00.563647] [info]{'msg': '20% completed', 'extra': None}
[2026-04-11 23:53:01.569508] [info]{'msg': '40% completed', 'extra': None}
[2026-04-11 23:53:02.584512] [info]{'msg': '60% completed', 'extra': None}
[2026-04-11 23:53:03.595626] [info]{'msg': '80% completed', 'extra': None}
[2026-04-11 23:53:04.604788] [info]{'msg': '100% completed', 'extra': None}
Result: [{'type': 'text', 'text': '5', 'id': 'lc_d3862595-0977-4001-9a2d-6f921c56357b'}]
3. 信息征询
Elicitation是我们可以在工具执行期间请求用户的结构化输入。它允许工具无需预先要求所有输入,而是可以根据需要交互式地询问缺失的参数、澄清说明或补充上下文信息。启发功能使工具能够暂停执行并向用户请求特定信息,如下是几种典型的应用场景:
- 参数缺失:询问用户是否提供了初始未提供的必要信息;
- 澄清请求:针对模糊不清的情况,获取用户的确认或选择;
- 渐进披露:逐步收集复杂信息;
- 动态工作流程:根据用户响应调整工具行为;
如下所示的协议ElicitationCallback代表客户端处理Elicitation的回调。从__call__方法的定义可以看出,我们可以从mcp_context和params参数中获取Elicitation请求的相关信息,针对请求的处理结果体现在返回的MCPElicitResult对象上。
class ElicitationCallback(Protocol):
async def __call__(
self,
mcp_context: MCPRequestContext,
params: ElicitRequestParams,
context: CallbackContext,
) -> MCPElicitResult
mcp_context参数类型为RequestContext。这是一个相对复杂的泛型数据类,它是整个请求生命周期的“内存快照”,存储了处理一个请求所需的所有环境信息。泛型设计是为了高度解耦,以适配不同类型的Session(如 Webhook, WebSocket, SSE 等)和不同类型的生命周期管理。
SessionT = TypeVar("SessionT", bound=BaseSession[Any, Any, Any, Any, Any])
LifespanContextT = TypeVar("LifespanContextT")
RequestT = TypeVar("RequestT", default=Any)
@dataclass
class RequestContext(Generic[SessionT, LifespanContextT, RequestT]):
request_id: RequestId
meta: RequestParams.Meta | None
session: SessionT
lifespan_context: LifespanContextT
experimental: Any = field(default=None)
request: RequestT | None = None
close_sse_stream: CloseSSEStreamCallback | None = None
close_standalone_sse_stream: CloseSSEStreamCallback | None = None
CloseSSEStreamCallback = Callable[[], Awaitable[None]]
字段成员说明如下:
- request_id:请求的标识;
- meta:请求相关元数据;
- session:当前会话;
- lifespan_context:存储随请求生命周期存在的数据(例如数据库连接、临时缓存);
- experimental:提供一些支持的实验性的特性;
- request:原始请求对象的引用(如 HTTP Request 对象),方便底层调试。
- close_sse_stream/close_standalone_sse_stream:提供一个可执行对象关闭SSE连接;
Elicitation是服务器向用户索取额外信息的机制。根据信息的敏感程度和交互方式,它分为Form(表单)和URL两种模式:
- Form模式:这是一种 “在客户端内” 完成的交互方式。服务器发送一个JSON Schema给客户端,客户端根据这个架构在聊天窗口或UI中直接渲染一个输入框或表单(如单选、多选、文本框)。这种模式用于收集非敏感的结构化信息;
- URL模式:这是一种需要 “跳转到外部)” 完成的交互方式。服务器向客户端发送一个外部URL,客户端引导用户在外部浏览器中打开该链接进行操作。这种模式用于处理高敏感度或必须绕过AI客户端的操作,确保敏感数据(如密码)不会流经LLM或MCP 客户端。典型的例子包括OAuth授权、支付处理和用户凭证收集等;
params参数的类型ElicitRequestParams是由ElicitRequestURLParams和ElicitRequestFormParams,它们分别代表针对上面两种模式的请求参数。
ElicitRequestParams: TypeAlias = ElicitRequestURLParams | ElicitRequestFormParams
class ElicitRequestFormParams(RequestParams):
mode: Literal["form"] = "form"
message: str
requestedSchema: ElicitRequestedSchema
model_config = ConfigDict(extra="allow")
class ElicitRequestURLParams(RequestParams):
mode: Literal["url"] = "url"
message: str
url: str
elicitationId: str
model_config = ConfigDict(extra="allow")
作为针对Elicitation请求响应结果的ElicitResult,它利用action字段表达:对应接收到的请求,它是接受(accept)、拒绝(decline)还是无视(cancel)。并在接受的前提下,利用content提供所需的数据。
class ElicitResult(Result):
action: Literal["accept", "decline", "cancel"]
content: dict[str, str | int | float | bool | list[str] | None] | None = None
我们通过一个简单的实例演示工具在执行过程中如何利用信息征询向客户端请求所需的信息。在如下这个使用FastMCP构建的MCP服务器中,我们注册了一个collect_user_info工具,它在执行过程中会调用Context的elicit方法向客户端请求提供用户信息。
from fastmcp import FastMCP, Context
from dataclasses import dataclass
server = FastMCP("Server")
@dataclass
class UserInfo:
name: str
age: int
@server.tool
async def collect_user_info(ctx: Context) -> str:
result = await ctx.elicit(
message="Please provide your information", response_type=UserInfo
)
if result.action == "accept":
user = result.data
return f"Hello {user.name}, you are {user.age} years old."
elif result.action == "decline":
return "Information not provided"
else:
return "Operation cancelled"
server.run(transport="streamable-http",host="0.0.0.0")
在如下所示的客户端程序中,我们将ElicitationCallback定义为如下这个handle_elicitation函数。在它利用返回的ElicitResult提供用户信息之前,它还会将params参数承载的请求信息打印出来。我们利用handle_elicitation函数创建了Callbacks对象,并将其注册到连接上面这个MCP服务器的MultiServerMCPClient对象上。我们最终调用此对象的get_tools方法得到代表collect_user_info工具的BaseTool对象,调用ainvoke方法执行此工具并输出返回的结果。
from typing import cast
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
from mcp.shared.context import RequestContext
from mcp.types import ElicitRequestParams, ElicitResult,ElicitRequestFormParams
import asyncio
async def handle_elicitation(
mcp_context: RequestContext,
params: ElicitRequestParams,
context: CallbackContext,
) -> ElicitResult:
"""Handle elicitation requests from MCP servers."""
formParams:ElicitRequestFormParams = cast(ElicitRequestFormParams, params)
print(f"""\
Request:
mode: {formParams.mode}
message: {formParams.message}
response_format: {formParams.requestedSchema}
""")
return ElicitResult(
action="accept",
content={"name": "Jayden", "age": 18},
)
async def main():
client = MultiServerMCPClient(
{
"server": {
"url": "http://localhost:8000/mcp",
"transport": "streamable_http",
}
},
callbacks=Callbacks(on_elicitation=handle_elicitation),
)
tool = (await client.get_tools(server_name="server"))[0]
result = await tool.ainvoke(input={})
content:dict = result[0]
print(f"""\
Result:
type: {content["type"]}
content: {content["text"]}
""")
asyncio.run(main())
输出:
Request:
mode: form
message: Please provide your information
response_format: {'properties': {'name': {'title': 'Name', 'type': 'string'}, 'age': {'title': 'Age', 'type': 'integer'}}, 'required': ['name', 'age'], 'title': 'UserInfo', 'type': 'object'}
Result:
type: text
content: Hello Jayden, you are 18 years old.
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)