4.1 引言:从“提供者”到“消费者”

在上一章,我们成功扮演了“能力提供者”的角色,构建了一个功能完备的MCP Server。然而,一个Server如果缺少了与之对话的另一半——Host,其价值便无法体现。Host是MCP生态中的“能力消费者”,它代表着最终用户的意图(例如,一个AI编码助手、一个研究助理Agent,或者一个简单的命令行工具),通过MCP协议与Server进行交互,从而获取上下文信息或执行特定任务。

本章,我们将转换视角,从Server的构建者转变为Host的构建者。我们将继续使用Python和mcp-sdk,创建一个简单的命令行Host。这个Host将能够:

  1. 连接到我们上一章创建的MCP Server。我们将演示如何通过标准输入/输出(stdio)与Server进程建立通信管道。
  2. 发现Server的能力。Host将主动查询Server支持哪些文件操作和自定义工具。
  3. 调用Server的能力。我们将编写代码来调用Server的fs/listDirectoryfs/readFile和我们自定义的project/countLines工具。
  4. 将结果呈现给用户。Host会将从Server获取的数据进行处理,并以友好的方式展示在命令行中。

通过构建这个Host,我们将打通MCP的整个通信链路,亲眼见证Host与Server之间如何基于标准协议进行“对话”和“协作”。这不仅是对上一章知识的巩固,更是理解MCP如何赋能AI应用的关键一步。


4.2 环境与项目设置

我们将继续在上一章创建的my-mcp-server项目目录中工作,因为我们的Host需要与该目录下的Server进行交互。

4.2.1 项目结构回顾

确保你的项目结构如下:

my-mcp-server/
├── .venv/                  # Python虚拟环境
├── workspace/              # Server的工作区目录
│   ├── welcome.txt
│   └── code_example.py
└── main.py                 # 上一章编写的Server代码

4.2.2 创建Host程序文件

my-mcp-server目录下,创建一个新文件,名为host.py。这将是我们的Host程序。

# 确保你在my-mcp-server目录下
touch host.py

我们的mcp-sdk库已经在虚拟环境中安装好了,所以不需要额外的安装步骤。确保你的虚拟环境仍然是激活状态。


4.3 编写Host主程序 (host.py)

我们的命令行Host将是一个交互式的程序,它会启动Server子进程,并通过管道与其通信,然后根据用户的输入来调用Server的功能。

4.3.1 完整代码概览

和上一章一样,我们先看完整的host.py代码,然后进行详细解析。

# host.py

import asyncio
import json
import base64
import logging
from typing import Dict, Any, Optional

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - HOST - %(levelname)s - %(message)s')

class StdioMcpHost:
    """一个通过stdio与MCP Server子进程通信的Host"""

    def __init__(self, server_command: str):
        self.server_command = server_command
        self.process: Optional[asyncio.subprocess.Process] = None
        self.reader: Optional[asyncio.StreamReader] = None
        self.writer: Optional[asyncio.StreamWriter] = None
        self._request_id_counter = 0
        self._pending_requests: Dict[int, asyncio.Future] = {}

    async def start(self):
        """启动Server子进程并建立通信管道"""
        logging.info(f"Starting server with command: '{self.server_command}'")
        self.process = await asyncio.create_subprocess_shell(
            self.server_command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE # 捕获错误输出以便调试
        )
        self.reader = self.process.stdout
        self.writer = self.process.stdin

        # 启动一个后台任务来读取Server的输出(响应和日志)
        asyncio.create_task(self._listen_to_server())
        # 启动一个后台任务来读取Server的错误输出
        asyncio.create_task(self._listen_to_stderr())
        logging.info("Host connected to server's stdio.")

    async def stop(self):
        """优雅地关闭Server"""
        if self.process and self.process.returncode is None:
            logging.info("Stopping server...")
            self.writer.close()
            await self.writer.wait_closed()
            self.process.terminate()
            await self.process.wait()
            logging.info("Server stopped.")

    async def _listen_to_server(self):
        """持续监听来自Server stdout的响应"""
        while self.process.returncode is None:
            try:
                line = await self.reader.readline()
                if not line:
                    break
                response = json.loads(line.decode('utf-8'))
                request_id = response.get('id')
                if request_id in self._pending_requests:
                    self._pending_requests.pop(request_id).set_result(response)
                else:
                    # 这可能是一个通知(Notification),我们这里只打印它
                    logging.info(f"Received notification from server: {response}")
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                logging.error(f"Failed to decode server response: {line.strip()}, error: {e}")
            except Exception as e:
                logging.error(f"Error while listening to server: {e}", exc_info=True)
                break

    async def _listen_to_stderr(self):
        """持续监听来自Server stderr的日志/错误信息"""
        while self.process.returncode is None:
            line = await self.process.stderr.readline()
            if not line:
                break
            logging.warning(f"SERVER_STDERR: {line.decode('utf-8').strip()}")

    async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """向Server发送请求并等待响应"""
        self._request_id_counter += 1
        request_id = self._request_id_counter

        request_obj = {
            "jsonrpc": "2.0",
            "id": request_id,
            "method": method,
            "params": params
        }

        future = asyncio.get_running_loop().create_future()
        self._pending_requests[request_id] = future

        request_line = json.dumps(request_obj) + '\n'
        self.writer.write(request_line.encode('utf-8'))
        await self.writer.drain()
        logging.info(f"Sent request: {request_line.strip()}")

        # 等待响应
        response = await asyncio.wait_for(future, timeout=10.0)
        return response

async def interactive_cli(host: StdioMcpHost):
    """运行交互式命令行界面"""
    print("\n--- MCP Host Interactive CLI ---")
    print("Commands: ls <path>, cat <path>, countlines <path>, tools, exit")

    while True:
        try:
            user_input = await asyncio.to_thread(input, "> ")
            parts = user_input.strip().split()
            if not parts:
                continue

            command = parts[0].lower()

            if command == 'exit':
                break
            elif command == 'tools':
                response = await host.send_request("project/listTools", {})
                print(json.dumps(response, indent=2))
            elif command == 'ls' and len(parts) > 1:
                path = parts[1]
                response = await host.send_request("fs/listDirectory", {"path": path})
                print(json.dumps(response, indent=2))
            elif command == 'cat' and len(parts) > 1:
                path = parts[1]
                response = await host.send_request("fs/readFile", {"path": path})
                if 'result' in response and 'content' in response['result']:
                    decoded_content = base64.b64decode(response['result']['content']).decode('utf-8')
                    print("--- File Content ---")
                    print(decoded_content)
                    print("--------------------")
                else:
                    print(json.dumps(response, indent=2))
            elif command == 'countlines' and len(parts) > 1:
                path = parts[1]
                response = await host.send_request(
                    "project/executeTool",
                    {"name": "project/countLines", "parameters": {"path": path}}
                )
                print(json.dumps(response, indent=2))
            else:
                print(f"Unknown command: '{user_input}'. Try again.")

        except Exception as e:
            logging.error(f"An error occurred in CLI: {e}", exc_info=True)

async def main():
    # 这里的命令必须能找到你的Python解释器和Server脚本
    # 使用 'python3' 而不是 '.venv/bin/python' 可以使其更具可移植性
    server_command = "python3 main.py"
    host = StdioMcpHost(server_command)
    try:
        await host.start()
        # 等待一小段时间确保Server完全启动
        await asyncio.sleep(1)
        await interactive_cli(host)
    finally:
        await host.stop()

if __name__ == "__main__":
    asyncio.run(main())

4.3.2 代码逐段解析

1. StdioMcpHost

class StdioMcpHost:
    def __init__(self, server_command: str):
        self.server_command = server_command
        self.process: Optional[asyncio.subprocess.Process] = None
        ...
        self._pending_requests: Dict[int, asyncio.Future] = {}
  • 这个类封装了与Server子进程交互的所有逻辑。
  • __init__接收一个server_command,这是用于启动Server的shell命令(例如python3 main.py)。
  • self.process将持有asyncio.subprocess.Process对象,代表运行中的Server进程。
  • self.readerself.writer将分别对应Server进程的标准输出和标准输入流。
  • self._pending_requests是实现异步请求-响应匹配的关键。它是一个字典,将我们发送的每个请求的id映射到一个asyncio.Future对象。当收到响应时,我们用响应的id找到对应的Future,并设置其结果,从而唤醒等待该响应的协程。

2. start()stop() 方法

    async def start(self):
        self.process = await asyncio.create_subprocess_shell(
            self.server_command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        ...
        asyncio.create_task(self._listen_to_server())
        asyncio.create_task(self._listen_to_stderr())
  • start()方法使用asyncio.create_subprocess_shell来异步地启动Server进程。
  • 关键参数stdin=PIPE, stdout=PIPE, stderr=PIPE告诉asyncio我们要接管子进程的标准IO流,以便通过管道进行通信。
  • 启动后,它立即创建了两个后台任务:_listen_to_server用于处理来自Server的JSON-RPC响应,_listen_to_stderr用于捕获并打印Server的日志和错误信息,这对于调试非常有用。
  • stop()方法则负责在程序退出时优雅地关闭子进程。

3. _listen_to_server() 方法

    async def _listen_to_server(self):
        while self.process.returncode is None:
            line = await self.reader.readline()
            ...
            response = json.loads(line.decode('utf-8'))
            request_id = response.get('id')
            if request_id in self._pending_requests:
                self._pending_requests.pop(request_id).set_result(response)
  • 这是Host的“耳朵”。它在一个循环中不断地从Server的stdout读取一行数据(因为我们的Server保证每个JSON响应占一行)。
  • 解析JSON后,它获取响应的id
  • 然后,它在_pending_requests字典中查找这个id。如果找到,就说明这是一个我们等待的响应。它调用future.set_result(response),这将立即唤醒之前调用send_requestawait这个future的那个协程,并将response作为返回值传递过去。

4. send_request() 方法

    async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
        request_id = self._request_id_counter
        ...
        future = asyncio.get_running_loop().create_future()
        self._pending_requests[request_id] = future

        request_line = json.dumps(request_obj) + '\n'
        self.writer.write(request_line.encode('utf-8'))
        await self.writer.drain()

        response = await asyncio.wait_for(future, timeout=10.0)
        return response
  • 这是Host的“嘴巴”。它负责构建一个JSON-RPC请求对象。
  • 在发送请求之前,它先创建一个Future对象,并以请求id为键存入_pending_requests字典。这相当于立下了一个“凭证”。
  • 然后,它将请求序列化为JSON字符串,加上换行符\n(这非常重要,因为Server端是按行读取的),并通过writer写入到Server的stdin
  • 最后,它await future。此时,协程会暂停执行,直到_listen_to_server收到对应的响应并调用future.set_result()为止。一旦被唤醒,它就拿到了响应并返回。

5. interactive_cli()main()

  • interactive_cli函数提供了一个简单的用户交互界面。它在一个循环中等待用户输入命令(如ls .),然后解析命令并调用host.send_request()与Server通信。
  • 特别注意在处理cat命令的响应时,我们对返回的Base64内容进行了解码,以便显示原始文件内容。
  • main函数是整个Host程序的入口。它创建StdioMcpHost实例,启动它,运行交互式CLI,最后在退出时确保调用host.stop()来清理子进程。

4.4 联调与实践:打通端到端链路

现在,万事俱备。我们将同时运行Server和Host,见证它们之间的第一次“对话”。

操作步骤:

  1. 打开一个终端。确保你位于my-mcp-server目录下,并且虚拟环境已激活。

  2. 运行Host程序。在终端中执行:

    python3 host.py
    
  3. 观察初始输出。你应该会看到类似下面的日志,表明Host已成功启动Server子进程并连接:

    2023-10-27 11:00:00,123 - HOST - INFO - Starting server with command: 'python3 main.py'
    2023-10-27 11:00:00,125 - HOST - WARNING - SERVER_STDERR: 2023-10-27 11:00:00,124 - INFO - Server initialized with workspace root: /path/to/my-mcp-server/workspace
    2023-10-27 11:00:00,126 - HOST - WARNING - SERVER_STDERR: 2023-10-27 11:00:00,125 - INFO - Starting MCP Server...
    2023-10-27 11:00:00,127 - HOST - INFO - Host connected to server's stdio.
    
    --- MCP Host Interactive CLI ---
    Commands: ls <path>, cat <path>, countlines <path>, tools, exit
    > 
    

    注意SERVER_STDERR的输出,这正是我们的Host捕获到的Server日志,证明了通信管道是通的。

  4. 开始交互! 现在你可以在>提示符后输入命令了。

    • 查看可用工具

      > tools
      

      响应:

      {
        "jsonrpc": "2.0",
        "id": 1,
        "result": [
          {
            "name": "project/countLines",
            "description": "Counts the number of lines in a specified file within the workspace.",
            "parameters": [
              {
                "name": "path",
                "description": "The relative path to the file.",
                "type": "string",
                "required": true
              }
            ]
          }
        ]
      }
      
    • 列出工作区根目录

      > ls .
      

      响应:

      {
        "jsonrpc": "2.0",
        "id": 2,
        "result": [
          {
            "name": "welcome.txt",
            "type": "file"
          },
          {
            "name": "code_example.py",
            "type": "file"
          }
        ]
      }
      
    • 查看文件内容

      > cat welcome.txt
      

      响应:

      --- File Content ---
      Hello, MCP World!
      This is your first file.
      --------------------
      
    • 执行自定义工具

      > countlines code_example.py
      

      响应:

      {
        "jsonrpc": "2.0",
        "id": 4,
        "result": {
          "result": {
            "lineCount": 3
          },
          "stdout": "Successfully counted 3 lines in code_example.py.",
          "stderr": null
        }
      }
      
    • 退出

      > exit
      

      Host会优雅地关闭Server子进程并退出。


4.5 总结

在本章中,我们成功地构建了一个MCP Host,并将其与我们之前创建的Server连接起来,完成了一次端到端的交互实践。我们不仅看到了MCP如何工作,更亲手实现了它的工作过程。

我们学到了:

  • 如何使用asyncio.subprocess来管理子进程并通过stdio管道进行IPC(进程间通信)。
  • 实现异步请求-响应匹配的核心机制:使用Future对象作为“凭证”。
  • 如何构建一个健壮的Host,能够发送请求、接收响应,并处理Server的日志和错误。
  • 如何将MCP的能力集成到一个实际的应用(即使是一个简单的CLI)中,为用户提供价值。

至此,我们已经完整地走过了MCP的“Hello, World!”之旅。我们既能创建“能力提供者”(Server),也能创建“能力消费者”(Host)。我们已经掌握了构建更复杂、更强大的MCP应用所需的所有基础知识和技能。在接下来的“实践项目”章节中,我们将综合运用所学的一切,去挑战一个更宏大、更接近真实世界场景的目标。

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐