来源:互联网 更新时间:2026-06-20 15:50
先来看最核心的设计思路——MCP的Ja va服务器实现,本质上就是一个工厂接口,用来创建不同类型的MCP服务器实例。说实话,这块的设计在工程实践中相当值得借鉴,尤其适合那些需要在Ja va生态里对接AI Agent的场景。

McpServer是一个工厂接口,用于创建MCP服务器实例,支持两种模式:
public interface McpServer {
// 1. 同步服务器构建器
static SyncSpec sync(ServerMcpTransport transport)
// 2. 异步服务器构建器
static AsyncSpec async(ServerMcpTransport transport)
}
这里有个值得注意的点:同步模式和异步模式分别通过SyncSpec和AsyncSpec两个构建器来配置,这种设计让代码的分层和职责边界非常清晰。
class AsyncSpec {
private final ServerMcpTransport transport;
private McpSchema.Implementation serverInfo;
private McpSchema.ServerCapabilities serverCapabilities;
// 功能注册容器
private final List tools;
private final Map resources;
private final List resourceTemplates;
private final Map prompts;
}
class SyncSpec {
private final ServerMcpTransport transport;
private McpSchema.Implementation serverInfo;
private McpSchema.ServerCapabilities serverCapabilities;
// 功能注册容器
private final List tools;
private final Map resources;
private final List resourceTemplates;
private final Map prompts;
}
两种模式下的功能注册容器类型是不对称的——异步用Async前缀类型,同步用Sync前缀类型。这意味着底层对工具的调用和资源读写采用了不同的执行模式。
// 异步配置
AsyncSpec serverInfo(String name, String version) {
this.serverInfo = new McpSchema.Implementation(name, version);
return this;
}
// 同步配置
SyncSpec serverInfo(String name, String version) {
this.serverInfo = new McpSchema.Implementation(name, version);
return this;
}
// 异步工具
AsyncSpec tool(McpSchema.Tool tool,
Function
工具注册是MCP的核心能力之一。这里可以看到,异步版的handler返回的是Mono,而同步版直接返回CallToolResult,差异非常直白。
// 异步资源
AsyncSpec resources(Map resources) {
this.resources.putAll(resources);
return this;
}
// 同步资源
SyncSpec resources(Map resources) {
this.resources.putAll(resources);
return this;
}
// 异步提示
AsyncSpec prompts(AsyncPromptRegistration... prompts) {
for (AsyncPromptRegistration prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
// 同步提示
SyncSpec prompts(SyncPromptRegistration... prompts) {
for (SyncPromptRegistration prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
注意到没有?构建器方法都返回自身,实现了链式调用。这是构建器模式的经典用法,但在这里做得尤其干净利落。
McpServer.async(transport)
.serverInfo("async-server", "1.0.0")
.capabilities(new ServerCapabilities(...))
// 注册工具
.tool(new Tool("calculator", "计算器", schema),
args -> Mono.just(new CallToolResult(calculate(args))))
// 注册资源
.resources(Map.of(
"file", new AsyncResourceRegistration(fileResource,
req -> Mono.just(new ReadResourceResult(readFile(req))))
))
// 注册提示
.prompts(new AsyncPromptRegistration(prompt,
req -> Mono.just(new GetPromptResult(getPrompt(req)))))
.build();
McpServer.sync(transport)
.serverInfo("sync-server", "1.0.0")
.capabilities(new ServerCapabilities(...))
// 注册工具
.tool(new Tool("calculator", "计算器", schema),
args -> new CallToolResult(calculate(args)))
// 注册资源
.resources(Map.of(
"file", new SyncResourceRegistration(fileResource,
req -> new ReadResourceResult(readFile(req)))
))
// 注册提示
.prompts(new SyncPromptRegistration(prompt,
req -> new GetPromptResult(getPrompt(req))))
.build();
这段代码其实很能说明问题:同步版本和异步版本的使用体验几乎一样,但底层执行模型大不相同。
这个设计提供了一个灵活且强大的MCP服务器构建框架,可以根据不同需求选择同步或异步模式,并通过构建器模式提供清晰的配置接口。
接下来深入分析McpAsyncServer的实现原理和用法。
public class McpAsyncServer {
private final DefaultMcpSession mcpSession; // MCP 会话管理
private final ServerMcpTransport transport; // 传输层
private final ServerCapabilities serverCapabilities; // 服务器能力
// 线程安全的功能注册容器
private final CopyOnWriteArrayList tools;
private final ConcurrentHashMap resources;
private final ConcurrentHashMap prompts;
}
这里容器使用了CopyOnWriteArrayList和ConcurrentHashMap,目的很明确——保证高并发环境下的线程安全。
private DefaultMcpSession.RequestHandler asyncInitializeRequestHandler() {
return params -> {
// 1. 解析初始化请求
InitializeRequest request = transport.unmarshalFrom(params,
new TypeReference() {});
// 2. 保存客户端信息
this.clientCapabilities = request.capabilities();
this.clientInfo = request.clientInfo();
// 3. 协议版本协商
String serverVersion = negotiateProtocolVersion(request.protocolVersion());
// 4. 返回服务器信息
return Mono.just(new InitializeResult(
serverVersion,
this.serverCapabilities,
this.serverInfo,
null
));
};
}
初始化流程做了几件事:解析请求、保存客户端能力、协商协议版本、返回服务器信息。整个过程都是响应式的,返回Mono。
// 添加工具
public Mono addTool(AsyncToolRegistration toolRegistration) {
return Mono.defer(() -> {
// 检查重复
if (toolExists(toolRegistration.tool().name())) {
return Mono.error(new McpError("Tool already exists"));
}
// 添加工具
tools.add(toolRegistration);
// 通知客户端
return notifyToolsListChanged();
});
}
// 工具调用处理
private RequestHandler toolsCallRequestHandler() {
return params -> {
CallToolRequest request = unmarshalRequest(params);
return findTool(request.name())
.map(tool -> tool.call().apply(request.arguments()))
.orElse(Mono.error(new McpError("Tool not found")));
};
}
工具添加时会检查重复,如果工具已存在就抛出错误;成功添加后还会主动通知客户端列表变更。这个设计让客户端和服务端的状态保持同步。
// 添加资源
public Mono addResource(AsyncResourceRegistration resource) {
return Mono.defer(() -> {
resources.putIfAbsent(resource.resource().uri(), resource);
return notifyResourcesListChanged();
});
}
// 资源读取处理
private RequestHandler resourcesReadRequestHandler() {
return params -> {
ReadResourceRequest request = unmarshalRequest(params);
return Optional.ofNullable(resources.get(request.uri()))
.map(r -> r.readHandler().apply(request))
.orElse(Mono.error(new McpError("Resource not found")));
};
}
资源管理使用了putIfAbsent,避免重复写入。读取时如果找不到资源的uri,同样会返回错误。整个链路都是异步响应式的。
// 1. 创建传输层
ServerMcpTransport transport = new StdioServerTransport();
// 2. 配置服务器
McpAsyncServer server = McpServer.async(transport)
.serverInfo("my-server", "1.0.0")
.capabilities(new ServerCapabilities(...))
// 注册工具
.tool(new Tool("calculator", "计算器", schema),
args -> Mono.just(new CallToolResult(calculate(args))))
// 注册资源
.resources(new AsyncResourceRegistration(
new Resource("file://data"),
req -> Mono.just(new ReadResourceResult(readFile(req)))
))
.build();
// 3. 启动服务器
server.initialize().subscribe();
// 添加新工具
server.addTool(new AsyncToolRegistration(
new Tool("newTool", "新工具", schema),
args -> Mono.just(new CallToolResult(process(args)))
)).subscribe();
// 移除工具
server.removeTool("oldTool").subscribe();
// 添加资源
server.addResource(new AsyncResourceRegistration(
new Resource("db://users"),
req -> Mono.just(new ReadResourceResult(queryDb(req)))
)).subscribe();
动态注册和移除的能力很实用——服务器不需要重启,就能实时调整暴露的工具和资源。
// 发送日志通知
server.loggingNotification(new LoggingMessageNotification(
LoggingLevel.INFO,
"操作完成"
)).subscribe();
// 1. 配置服务器
McpAsyncServer server = configureServer();
// 2. 注册核心功能
registerCoreFeatures(server);
// 3. 启动服务器
server.initialize()
.doOnSuccess(() -> log.info("Server started"))
.doOnError(e -> log.error("Start failed", e))
.subscribe();
server.addTool(toolRegistration)
.doOnSuccess(() -> log.info("Tool added"))
.doOnError(McpError.class, e -> log.warn("Tool error", e))
.doOnError(Exception.class, e -> log.error("System error", e))
.onErrorResume(e -> Mono.empty())
.subscribe();
// 使用 try-with-resources
try (McpAsyncServer server = createServer()) {
server.initialize()
.then(server.addTools())
.then(server.addResources())
.block();
}
McpAsyncServer提供了一个功能完整的异步MCP服务器实现,适合构建高性能、可扩展的AI工具和资源服务。
最后来看McpSyncServer的实现原理和用法。
public class McpSyncServer {
private final McpAsyncServer asyncServer; // 委托异步服务器
}
这是一个典型的装饰器模式实现,将异步服务器包装成同步接口。说白了,就是内部跑着异步引擎,对外提供阻塞式的API。
// 添加工具
public void addTool(SyncToolRegistration toolHandler) {
// 转换为异步并阻塞等待
this.asyncServer.addTool(
AsyncToolRegistration.fromSync(toolHandler)
).block();
}
// 移除工具
public void removeTool(String toolName) {
this.asyncServer.removeTool(toolName).block();
}
// 添加资源
public void addResource(SyncResourceRegistration resourceHandler) {
this.asyncServer.addResource(
AsyncResourceRegistration.fromSync(resourceHandler)
).block();
}
// 移除资源
public void removeResource(String resourceUri) {
this.asyncServer.removeResource(resourceUri).block();
}
// 添加提示
public void addPrompt(SyncPromptRegistration promptRegistration) {
this.asyncServer.addPrompt(
AsyncPromptRegistration.fromSync(promptRegistration)
).block();
}
// 移除提示
public void removePrompt(String promptName) {
this.asyncServer.removePrompt(promptName).block();
}
核心模式就是一句话:把同步注册转换为异步操作,然后调用.block()阻塞等待结果。
// 1. 创建同步服务器
McpSyncServer server = McpServer.sync(transport)
.serverInfo("my-server", "1.0.0")
.capabilities(new ServerCapabilities(...))
// 注册工具
.tool(new Tool("calculator", "计算器", schema),
args -> new CallToolResult(calculate(args)))
// 注册资源
.resource(new Resource("file://data"),
req -> new ReadResourceResult(readFile(req)))
.build();
// 添加工具
server.addTool(new SyncToolRegistration(
new Tool("newTool", "新工具", schema),
args -> new CallToolResult(process(args))
));
// 添加资源
server.addResource(new SyncResourceRegistration(
new Resource("db://users"),
req -> new ReadResourceResult(queryDb(req))
));
// 发送通知
server.notifyToolsListChanged();
// 创建消息
CreateMessageResult result = server.createMessage(
new CreateMessageRequest(
"prompt",
Map.of("key", "value")
)
);
// 将异步操作转换为同步
public void someOperation(Parameters params) {
asyncServer.someOperation(params).block();
}
try {
server.addTool(toolRegistration);
} catch (McpError e) {
// 处理 MCP 错误
} catch (Exception e) {
// 处理其他错误
}
// 优雅关闭
public void closeGracefully() {
this.asyncServer.closeGracefully().block();
}
// 立即关闭
public void close() {
this.asyncServer.close();
}
// 获取服务器信息
ServerCapabilities capabilities = server.getServerCapabilities();
Implementation serverInfo = server.getServerInfo();
// 获取客户端信息
ClientCapabilities clientCaps = server.getClientCapabilities();
Implementation clientInfo = server.getClientInfo();
// 创建服务器
McpSyncServer server = createServer();
try {
// 添加核心功能
server.addTool(calculatorTool);
server.addResource(fileResource);
// 通知变更
server.notifyToolsListChanged();
server.notifyResourcesListChanged();
} catch (Exception e) {
server.close();
throw e;
}
try (AutoCloseable ignored = () -> server.closeGracefully()) {
// 使用服务器
server.addTool(tool);
server.addResource(resource);
} // 自动关闭
try {
server.addTool(tool);
} catch (McpError e) {
// 记录错误
log.error("Tool registration failed", e);
// 尝试恢复
server.notifyToolsListChanged();
}
McpSyncServer通过包装McpAsyncServer,为不需要异步编程的场景提供了更简单的同步API,同时保持了底层实现的一致性。说到底,选择哪种模式取决于你的业务场景和团队技术栈偏好。
《Off Campus》第二季官宣:这对CP还在,但不再是主角
和平精英如何做到压枪稳-和平精英怎样才能压枪稳
币安Binance虚拟货币交易平台 币安官方APP安卓苹果下载入口
客单价碾压宝马奥迪!极氪5月交付新车34377辆:连续4个月双增长
HBO 奇幻剧《龙之家族》第三季定档 6 月 22 日,最终预告片曝光喉道海战
帅到极致的网名女生霸气(精选100个)
帅气继父网名女生可爱英文(精选100个)
DOTA2 TI时隔七年重返上海!门票6月10日开抢,国服享受优先购买!
如何在夸克浏览器中开启网页视频的倍速播放功能?
韦一敏是什么梗
蒙古上单是什么梗
韩漫小少爷网名大全女生(精选100个)
网络热词聊污是什么意思
欧易OKX官方网站直达入口 2026欧易官方App安卓版v7.1.0下载安装
抖音最火沙雕男生网名(精选100个)
作家助手如何上传自制封面 作家助手如何设置小说的封面
折后价近千元 澳洲一店主将真老鼠缝到内裤上当时尚单品卖
阿里发布Qwen3.7-Max大模型,全球第五、国产第一
金铲铲之战s17六暗星卡莎阵容玩法构筑指南
三角洲行动卡战备怎么弄 三角洲行动卡战备攻略
手机号码测吉凶
本站所有软件,都由网友上传,如有侵犯你的版权,请发邮件haolingcc@hotmail.com 联系删除。 版权所有 Copyright@2012-2013 haoling.cc