热门搜索:和平精英 原神 街篮2 

您的位置:首页 > > 教程攻略 > ai资讯 >AI Agent的利器——MCP的java server实现原理

AI Agent的利器——MCP的java server实现原理

来源:互联网 更新时间:2026-06-20 15:50

先来看最核心的设计思路——MCP的Ja va服务器实现,本质上就是一个工厂接口,用来创建不同类型的MCP服务器实例。说实话,这块的设计在工程实践中相当值得借鉴,尤其适合那些需要在Ja va生态里对接AI Agent的场景。

AI Agent的利器——MCP的ja va server实现原理

让我分析一下 McpServer 的实现

1. 整体设计

McpServer是一个工厂接口,用于创建MCP服务器实例,支持两种模式:

public interface McpServer {
    // 1. 同步服务器构建器
    static SyncSpec sync(ServerMcpTransport transport)
    
    // 2. 异步服务器构建器
    static AsyncSpec async(ServerMcpTransport transport)
}

这里有个值得注意的点:同步模式和异步模式分别通过SyncSpec和AsyncSpec两个构建器来配置,这种设计让代码的分层和职责边界非常清晰。

2. 构建器模式实现

AsyncSpec (异步规范)

class AsyncSpec {
    private final ServerMcpTransport transport;
    private McpSchema.Implementation serverInfo;
    private McpSchema.ServerCapabilities serverCapabilities;
    
    // 功能注册容器
    private final List tools;
    private final Map resources;
    private final List resourceTemplates;
    private final Map prompts;
}

SyncSpec (同步规范)

class SyncSpec {
    private final ServerMcpTransport transport;
    private McpSchema.Implementation serverInfo;
    private McpSchema.ServerCapabilities serverCapabilities;
    
    // 功能注册容器
    private final List tools;
    private final Map resources;
    private final List resourceTemplates;
    private final Map prompts;
}

两种模式下的功能注册容器类型是不对称的——异步用Async前缀类型,同步用Sync前缀类型。这意味着底层对工具的调用和资源读写采用了不同的执行模式。

3. 主要功能

3.1 服务器信息配置

// 异步配置
AsyncSpec serverInfo(String name, String version) {
    this.serverInfo = new McpSchema.Implementation(name, version);
    return this;
}

// 同步配置
SyncSpec serverInfo(String name, String version) {
    this.serverInfo = new McpSchema.Implementation(name, version);
    return this;
}

3.2 工具注册

// 异步工具
AsyncSpec tool(McpSchema.Tool tool, 
    Function, Mono> handler) {
    this.tools.add(new AsyncToolRegistration(tool, handler));
    return this;
}

// 同步工具
SyncSpec tool(McpSchema.Tool tool, 
    Function, CallToolResult> handler) {
    this.tools.add(new SyncToolRegistration(tool, handler));
    return this;
}

工具注册是MCP的核心能力之一。这里可以看到,异步版的handler返回的是Mono,而同步版直接返回CallToolResult,差异非常直白。

3.3 资源管理

// 异步资源
AsyncSpec resources(Map resources) {
    this.resources.putAll(resources);
    return this;
}

// 同步资源
SyncSpec resources(Map resources) {
    this.resources.putAll(resources);
    return this;
}

3.4 提示模板

// 异步提示
AsyncSpec prompts(AsyncPromptRegistration... prompts) {
    for (AsyncPromptRegistration prompt : prompts) {
        this.prompts.put(prompt.prompt().name(), prompt);
    }
    return this;
}

// 同步提示
SyncSpec prompts(SyncPromptRegistration... prompts) {
    for (SyncPromptRegistration prompt : prompts) {
        this.prompts.put(prompt.prompt().name(), prompt);
    }
    return this;
}

注意到没有?构建器方法都返回自身,实现了链式调用。这是构建器模式的经典用法,但在这里做得尤其干净利落。

4. 使用示例

4.1 异步服务器

McpServer.async(transport)
    .serverInfo("async-server", "1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator", "计算器", schema),
          args -> Mono.just(new CallToolResult(calculate(args))))
    // 注册资源
    .resources(Map.of(
        "file", new AsyncResourceRegistration(fileResource, 
            req -> Mono.just(new ReadResourceResult(readFile(req))))
    ))
    // 注册提示
    .prompts(new AsyncPromptRegistration(prompt,
        req -> Mono.just(new GetPromptResult(getPrompt(req)))))
    .build();

4.2 同步服务器

McpServer.sync(transport)
    .serverInfo("sync-server", "1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator", "计算器", schema),
          args -> new CallToolResult(calculate(args)))
    // 注册资源
    .resources(Map.of(
        "file", new SyncResourceRegistration(fileResource, 
            req -> new ReadResourceResult(readFile(req)))
    ))
    // 注册提示
    .prompts(new SyncPromptRegistration(prompt,
        req -> new GetPromptResult(getPrompt(req))))
    .build();

这段代码其实很能说明问题:同步版本和异步版本的使用体验几乎一样,但底层执行模型大不相同。

5. 主要特点

  1. 双模式支持

    :同步模式适合简单场景;异步模式适合高并发场景。
  2. 流式构建器

    :链式调用、参数验证、类型安全。
  3. 功能模块化

    :工具注册、资源管理、提示模板、根目录变更通知。
  4. 扩展性设计

    :支持自定义传输层、可扩展的功能注册、灵活的配置选项。
  5. 错误处理

    :参数验证、异常传播、资源管理。

这个设计提供了一个灵活且强大的MCP服务器构建框架,可以根据不同需求选择同步或异步模式,并通过构建器模式提供清晰的配置接口。

McpAsyncServer

接下来深入分析McpAsyncServer的实现原理和用法。

1. 基本架构

public class McpAsyncServer {
    private final DefaultMcpSession mcpSession;        // MCP 会话管理
    private final ServerMcpTransport transport;        // 传输层
    private final ServerCapabilities serverCapabilities; // 服务器能力
    
    // 线程安全的功能注册容器
    private final CopyOnWriteArrayList tools;
    private final ConcurrentHashMap resources;
    private final ConcurrentHashMap prompts;
}

这里容器使用了CopyOnWriteArrayList和ConcurrentHashMap,目的很明确——保证高并发环境下的线程安全。

2. 主要功能模块

2.1 初始化处理

private DefaultMcpSession.RequestHandler asyncInitializeRequestHandler() {
    return params -> {
        // 1. 解析初始化请求
        InitializeRequest request = transport.unmarshalFrom(params, 
            new TypeReference() {});
            
        // 2. 保存客户端信息
        this.clientCapabilities = request.capabilities();
        this.clientInfo = request.clientInfo();
        
        // 3. 协议版本协商
        String serverVersion = negotiateProtocolVersion(request.protocolVersion());
        
        // 4. 返回服务器信息
        return Mono.just(new InitializeResult(
            serverVersion,
            this.serverCapabilities,
            this.serverInfo,
            null
        ));
    };
}

初始化流程做了几件事:解析请求、保存客户端能力、协商协议版本、返回服务器信息。整个过程都是响应式的,返回Mono。

2.2 工具管理

// 添加工具
public Mono addTool(AsyncToolRegistration toolRegistration) {
    return Mono.defer(() -> {
        // 检查重复
        if (toolExists(toolRegistration.tool().name())) {
            return Mono.error(new McpError("Tool already exists"));
        }
        
        // 添加工具
        tools.add(toolRegistration);
        
        // 通知客户端
        return notifyToolsListChanged();
    });
}

// 工具调用处理
private RequestHandler toolsCallRequestHandler() {
    return params -> {
        CallToolRequest request = unmarshalRequest(params);
        return findTool(request.name())
            .map(tool -> tool.call().apply(request.arguments()))
            .orElse(Mono.error(new McpError("Tool not found")));
    };
}

工具添加时会检查重复,如果工具已存在就抛出错误;成功添加后还会主动通知客户端列表变更。这个设计让客户端和服务端的状态保持同步。

2.3 资源管理

// 添加资源
public Mono addResource(AsyncResourceRegistration resource) {
    return Mono.defer(() -> {
        resources.putIfAbsent(resource.resource().uri(), resource);
        return notifyResourcesListChanged();
    });
}

// 资源读取处理
private RequestHandler resourcesReadRequestHandler() {
    return params -> {
        ReadResourceRequest request = unmarshalRequest(params);
        return Optional.ofNullable(resources.get(request.uri()))
            .map(r -> r.readHandler().apply(request))
            .orElse(Mono.error(new McpError("Resource not found")));
    };
}

资源管理使用了putIfAbsent,避免重复写入。读取时如果找不到资源的uri,同样会返回错误。整个链路都是异步响应式的。

3. 使用示例

3.1 创建服务器

// 1. 创建传输层
ServerMcpTransport transport = new StdioServerTransport();

// 2. 配置服务器
McpAsyncServer server = McpServer.async(transport)
    .serverInfo("my-server", "1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator", "计算器", schema),
          args -> Mono.just(new CallToolResult(calculate(args))))
    // 注册资源
    .resources(new AsyncResourceRegistration(
        new Resource("file://data"),
        req -> Mono.just(new ReadResourceResult(readFile(req)))
    ))
    .build();

// 3. 启动服务器
server.initialize().subscribe();

3.2 动态管理功能

// 添加新工具
server.addTool(new AsyncToolRegistration(
    new Tool("newTool", "新工具", schema),
    args -> Mono.just(new CallToolResult(process(args)))
)).subscribe();

// 移除工具
server.removeTool("oldTool").subscribe();

// 添加资源
server.addResource(new AsyncResourceRegistration(
    new Resource("db://users"),
    req -> Mono.just(new ReadResourceResult(queryDb(req)))
)).subscribe();

动态注册和移除的能力很实用——服务器不需要重启,就能实时调整暴露的工具和资源。

3.3 日志管理

// 发送日志通知
server.loggingNotification(new LoggingMessageNotification(
    LoggingLevel.INFO,
    "操作完成"
)).subscribe();

4. 核心特点

  1. 异步响应式设计

    :基于Project Reactor,非阻塞操作,响应式流处理。
  2. 线程安全

    :使用CopyOnWriteArrayList和ConcurrentHashMap,原子操作支持。
  3. 可扩展性

    :动态工具注册、动态资源管理、灵活的提示模板。
  4. 事件通知

    :工具列表变更、资源列表变更、日志消息通知。
  5. 错误处理

    :统一错误模型、异常传播、优雅降级。

5. 最佳实践

  1. 初始化顺序

// 1. 配置服务器
McpAsyncServer server = configureServer();

// 2. 注册核心功能
registerCoreFeatures(server);

// 3. 启动服务器
server.initialize()
    .doOnSuccess(() -> log.info("Server started"))
    .doOnError(e -> log.error("Start failed", e))
    .subscribe();
  1. 错误处理

server.addTool(toolRegistration)
    .doOnSuccess(() -> log.info("Tool added"))
    .doOnError(McpError.class, e -> log.warn("Tool error", e))
    .doOnError(Exception.class, e -> log.error("System error", e))
    .onErrorResume(e -> Mono.empty())
    .subscribe();
  1. 资源管理

// 使用 try-with-resources
try (McpAsyncServer server = createServer()) {
    server.initialize()
        .then(server.addTools())
        .then(server.addResources())
        .block();
}

McpAsyncServer提供了一个功能完整的异步MCP服务器实现,适合构建高性能、可扩展的AI工具和资源服务。

McpSyncServer

最后来看McpSyncServer的实现原理和用法。

1. 基本设计

public class McpSyncServer {
    private final McpAsyncServer asyncServer;  // 委托异步服务器
}

这是一个典型的装饰器模式实现,将异步服务器包装成同步接口。说白了,就是内部跑着异步引擎,对外提供阻塞式的API。

2. 核心功能实现

2.1 工具管理

// 添加工具
public void addTool(SyncToolRegistration toolHandler) {
    // 转换为异步并阻塞等待
    this.asyncServer.addTool(
        AsyncToolRegistration.fromSync(toolHandler)
    ).block();
}

// 移除工具
public void removeTool(String toolName) {
    this.asyncServer.removeTool(toolName).block();
}

2.2 资源管理

// 添加资源
public void addResource(SyncResourceRegistration resourceHandler) {
    this.asyncServer.addResource(
        AsyncResourceRegistration.fromSync(resourceHandler)
    ).block();
}

// 移除资源
public void removeResource(String resourceUri) {
    this.asyncServer.removeResource(resourceUri).block();
}

2.3 提示管理

// 添加提示
public void addPrompt(SyncPromptRegistration promptRegistration) {
    this.asyncServer.addPrompt(
        AsyncPromptRegistration.fromSync(promptRegistration)
    ).block();
}

// 移除提示
public void removePrompt(String promptName) {
    this.asyncServer.removePrompt(promptName).block();
}

核心模式就是一句话:把同步注册转换为异步操作,然后调用.block()阻塞等待结果。

3. 使用示例

3.1 创建服务器

// 1. 创建同步服务器
McpSyncServer server = McpServer.sync(transport)
    .serverInfo("my-server", "1.0.0")
    .capabilities(new ServerCapabilities(...))
    // 注册工具
    .tool(new Tool("calculator", "计算器", schema),
          args -> new CallToolResult(calculate(args)))
    // 注册资源
    .resource(new Resource("file://data"),
              req -> new ReadResourceResult(readFile(req)))
    .build();

3.2 功能管理

// 添加工具
server.addTool(new SyncToolRegistration(
    new Tool("newTool", "新工具", schema),
    args -> new CallToolResult(process(args))
));

// 添加资源
server.addResource(new SyncResourceRegistration(
    new Resource("db://users"),
    req -> new ReadResourceResult(queryDb(req))
));

// 发送通知
server.notifyToolsListChanged();

3.3 消息创建

// 创建消息
CreateMessageResult result = server.createMessage(
    new CreateMessageRequest(
        "prompt",
        Map.of("key", "value")
    )
);

4. 主要特点

  1. 同步包装

// 将异步操作转换为同步
public void someOperation(Parameters params) {
    asyncServer.someOperation(params).block();
}
  1. 错误处理

try {
    server.addTool(toolRegistration);
} catch (McpError e) {
    // 处理 MCP 错误
} catch (Exception e) {
    // 处理其他错误
}
  1. 资源管理

// 优雅关闭
public void closeGracefully() {
    this.asyncServer.closeGracefully().block();
}

// 立即关闭
public void close() {
    this.asyncServer.close();
}
  1. 状态访问

// 获取服务器信息
ServerCapabilities capabilities = server.getServerCapabilities();
Implementation serverInfo = server.getServerInfo();

// 获取客户端信息
ClientCapabilities clientCaps = server.getClientCapabilities();
Implementation clientInfo = server.getClientInfo();

5. 最佳实践

  1. 初始化流程

// 创建服务器
McpSyncServer server = createServer();

try {
    // 添加核心功能
    server.addTool(calculatorTool);
    server.addResource(fileResource);
    
    // 通知变更
    server.notifyToolsListChanged();
    server.notifyResourcesListChanged();
} catch (Exception e) {
    server.close();
    throw e;
}
  1. 资源清理

try (AutoCloseable ignored = () -> server.closeGracefully()) {
    // 使用服务器
    server.addTool(tool);
    server.addResource(resource);
} // 自动关闭
  1. 错误恢复

try {
    server.addTool(tool);
} catch (McpError e) {
    // 记录错误
    log.error("Tool registration failed", e);
    // 尝试恢复
    server.notifyToolsListChanged();
}

6. 与异步版本的区别

  1. API 风格

    :同步版本直接返回结果;异步版本返回Mono/Flux。
  2. 错误处理

    :同步版本用try-catch;异步版本用onError操作符。
  3. 资源管理

    :同步版本直接阻塞;异步版本响应式流。
  4. 使用场景

    :同步版本适合简单应用和测试;异步版本适合高并发和响应式应用。

McpSyncServer通过包装McpAsyncServer,为不需要异步编程的场景提供了更简单的同步API,同时保持了底层实现的一致性。说到底,选择哪种模式取决于你的业务场景和团队技术栈偏好。

AI自动绘画大师
AI自动绘画大师

类型:益智休闲

大小:5.72MB

语言:简体中文

平台:互联网

游戏下载

热门手游

手机号码测吉凶
本站所有软件,都由网友上传,如有侵犯你的版权,请发邮件haolingcc@hotmail.com 联系删除。 版权所有 Copyright@2012-2013 haoling.cc