第四章:构建MCP Host与实践——连接与对话
本文介绍了如何从MCP Server构建者转变为Host构建者,创建一个命令行Host与Server交互。主要内容包括:1) Host作为MCP生态的"能力消费者",通过MCP协议与Server通信;2) 在现有项目目录中创建host.py文件;3) 编写StdioMcpHost类,实现与Server子进程的stdio通信,包括启动/停止服务、监听响应和发送请求等功能。该Hos
4.1 引言:从“提供者”到“消费者”
在上一章,我们成功扮演了“能力提供者”的角色,构建了一个功能完备的MCP Server。然而,一个Server如果缺少了与之对话的另一半——Host,其价值便无法体现。Host是MCP生态中的“能力消费者”,它代表着最终用户的意图(例如,一个AI编码助手、一个研究助理Agent,或者一个简单的命令行工具),通过MCP协议与Server进行交互,从而获取上下文信息或执行特定任务。
本章,我们将转换视角,从Server的构建者转变为Host的构建者。我们将继续使用Python和mcp-sdk,创建一个简单的命令行Host。这个Host将能够:
- 连接到我们上一章创建的MCP Server。我们将演示如何通过标准输入/输出(stdio)与Server进程建立通信管道。
- 发现Server的能力。Host将主动查询Server支持哪些文件操作和自定义工具。
- 调用Server的能力。我们将编写代码来调用Server的
fs/listDirectory、fs/readFile和我们自定义的project/countLines工具。 - 将结果呈现给用户。Host会将从Server获取的数据进行处理,并以友好的方式展示在命令行中。
通过构建这个Host,我们将打通MCP的整个通信链路,亲眼见证Host与Server之间如何基于标准协议进行“对话”和“协作”。这不仅是对上一章知识的巩固,更是理解MCP如何赋能AI应用的关键一步。
4.2 环境与项目设置
我们将继续在上一章创建的my-mcp-server项目目录中工作,因为我们的Host需要与该目录下的Server进行交互。
4.2.1 项目结构回顾
确保你的项目结构如下:
my-mcp-server/
├── .venv/ # Python虚拟环境
├── workspace/ # Server的工作区目录
│ ├── welcome.txt
│ └── code_example.py
└── main.py # 上一章编写的Server代码
4.2.2 创建Host程序文件
在my-mcp-server目录下,创建一个新文件,名为host.py。这将是我们的Host程序。
# 确保你在my-mcp-server目录下
touch host.py
我们的mcp-sdk库已经在虚拟环境中安装好了,所以不需要额外的安装步骤。确保你的虚拟环境仍然是激活状态。
4.3 编写Host主程序 (host.py)
我们的命令行Host将是一个交互式的程序,它会启动Server子进程,并通过管道与其通信,然后根据用户的输入来调用Server的功能。
4.3.1 完整代码概览
和上一章一样,我们先看完整的host.py代码,然后进行详细解析。
# host.py
import asyncio
import json
import base64
import logging
from typing import Dict, Any, Optional
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - HOST - %(levelname)s - %(message)s')
class StdioMcpHost:
"""一个通过stdio与MCP Server子进程通信的Host"""
def __init__(self, server_command: str):
self.server_command = server_command
self.process: Optional[asyncio.subprocess.Process] = None
self.reader: Optional[asyncio.StreamReader] = None
self.writer: Optional[asyncio.StreamWriter] = None
self._request_id_counter = 0
self._pending_requests: Dict[int, asyncio.Future] = {}
async def start(self):
"""启动Server子进程并建立通信管道"""
logging.info(f"Starting server with command: '{self.server_command}'")
self.process = await asyncio.create_subprocess_shell(
self.server_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE # 捕获错误输出以便调试
)
self.reader = self.process.stdout
self.writer = self.process.stdin
# 启动一个后台任务来读取Server的输出(响应和日志)
asyncio.create_task(self._listen_to_server())
# 启动一个后台任务来读取Server的错误输出
asyncio.create_task(self._listen_to_stderr())
logging.info("Host connected to server's stdio.")
async def stop(self):
"""优雅地关闭Server"""
if self.process and self.process.returncode is None:
logging.info("Stopping server...")
self.writer.close()
await self.writer.wait_closed()
self.process.terminate()
await self.process.wait()
logging.info("Server stopped.")
async def _listen_to_server(self):
"""持续监听来自Server stdout的响应"""
while self.process.returncode is None:
try:
line = await self.reader.readline()
if not line:
break
response = json.loads(line.decode('utf-8'))
request_id = response.get('id')
if request_id in self._pending_requests:
self._pending_requests.pop(request_id).set_result(response)
else:
# 这可能是一个通知(Notification),我们这里只打印它
logging.info(f"Received notification from server: {response}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logging.error(f"Failed to decode server response: {line.strip()}, error: {e}")
except Exception as e:
logging.error(f"Error while listening to server: {e}", exc_info=True)
break
async def _listen_to_stderr(self):
"""持续监听来自Server stderr的日志/错误信息"""
while self.process.returncode is None:
line = await self.process.stderr.readline()
if not line:
break
logging.warning(f"SERVER_STDERR: {line.decode('utf-8').strip()}")
async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""向Server发送请求并等待响应"""
self._request_id_counter += 1
request_id = self._request_id_counter
request_obj = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params
}
future = asyncio.get_running_loop().create_future()
self._pending_requests[request_id] = future
request_line = json.dumps(request_obj) + '\n'
self.writer.write(request_line.encode('utf-8'))
await self.writer.drain()
logging.info(f"Sent request: {request_line.strip()}")
# 等待响应
response = await asyncio.wait_for(future, timeout=10.0)
return response
async def interactive_cli(host: StdioMcpHost):
"""运行交互式命令行界面"""
print("\n--- MCP Host Interactive CLI ---")
print("Commands: ls <path>, cat <path>, countlines <path>, tools, exit")
while True:
try:
user_input = await asyncio.to_thread(input, "> ")
parts = user_input.strip().split()
if not parts:
continue
command = parts[0].lower()
if command == 'exit':
break
elif command == 'tools':
response = await host.send_request("project/listTools", {})
print(json.dumps(response, indent=2))
elif command == 'ls' and len(parts) > 1:
path = parts[1]
response = await host.send_request("fs/listDirectory", {"path": path})
print(json.dumps(response, indent=2))
elif command == 'cat' and len(parts) > 1:
path = parts[1]
response = await host.send_request("fs/readFile", {"path": path})
if 'result' in response and 'content' in response['result']:
decoded_content = base64.b64decode(response['result']['content']).decode('utf-8')
print("--- File Content ---")
print(decoded_content)
print("--------------------")
else:
print(json.dumps(response, indent=2))
elif command == 'countlines' and len(parts) > 1:
path = parts[1]
response = await host.send_request(
"project/executeTool",
{"name": "project/countLines", "parameters": {"path": path}}
)
print(json.dumps(response, indent=2))
else:
print(f"Unknown command: '{user_input}'. Try again.")
except Exception as e:
logging.error(f"An error occurred in CLI: {e}", exc_info=True)
async def main():
# 这里的命令必须能找到你的Python解释器和Server脚本
# 使用 'python3' 而不是 '.venv/bin/python' 可以使其更具可移植性
server_command = "python3 main.py"
host = StdioMcpHost(server_command)
try:
await host.start()
# 等待一小段时间确保Server完全启动
await asyncio.sleep(1)
await interactive_cli(host)
finally:
await host.stop()
if __name__ == "__main__":
asyncio.run(main())
4.3.2 代码逐段解析
1. StdioMcpHost 类
class StdioMcpHost:
def __init__(self, server_command: str):
self.server_command = server_command
self.process: Optional[asyncio.subprocess.Process] = None
...
self._pending_requests: Dict[int, asyncio.Future] = {}
- 这个类封装了与Server子进程交互的所有逻辑。
__init__接收一个server_command,这是用于启动Server的shell命令(例如python3 main.py)。self.process将持有asyncio.subprocess.Process对象,代表运行中的Server进程。self.reader和self.writer将分别对应Server进程的标准输出和标准输入流。self._pending_requests是实现异步请求-响应匹配的关键。它是一个字典,将我们发送的每个请求的id映射到一个asyncio.Future对象。当收到响应时,我们用响应的id找到对应的Future,并设置其结果,从而唤醒等待该响应的协程。
2. start() 和 stop() 方法
async def start(self):
self.process = await asyncio.create_subprocess_shell(
self.server_command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
...
asyncio.create_task(self._listen_to_server())
asyncio.create_task(self._listen_to_stderr())
start()方法使用asyncio.create_subprocess_shell来异步地启动Server进程。- 关键参数
stdin=PIPE,stdout=PIPE,stderr=PIPE告诉asyncio我们要接管子进程的标准IO流,以便通过管道进行通信。 - 启动后,它立即创建了两个后台任务:
_listen_to_server用于处理来自Server的JSON-RPC响应,_listen_to_stderr用于捕获并打印Server的日志和错误信息,这对于调试非常有用。 stop()方法则负责在程序退出时优雅地关闭子进程。
3. _listen_to_server() 方法
async def _listen_to_server(self):
while self.process.returncode is None:
line = await self.reader.readline()
...
response = json.loads(line.decode('utf-8'))
request_id = response.get('id')
if request_id in self._pending_requests:
self._pending_requests.pop(request_id).set_result(response)
- 这是Host的“耳朵”。它在一个循环中不断地从Server的
stdout读取一行数据(因为我们的Server保证每个JSON响应占一行)。 - 解析JSON后,它获取响应的
id。 - 然后,它在
_pending_requests字典中查找这个id。如果找到,就说明这是一个我们等待的响应。它调用future.set_result(response),这将立即唤醒之前调用send_request并await这个future的那个协程,并将response作为返回值传递过去。
4. send_request() 方法
async def send_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
request_id = self._request_id_counter
...
future = asyncio.get_running_loop().create_future()
self._pending_requests[request_id] = future
request_line = json.dumps(request_obj) + '\n'
self.writer.write(request_line.encode('utf-8'))
await self.writer.drain()
response = await asyncio.wait_for(future, timeout=10.0)
return response
- 这是Host的“嘴巴”。它负责构建一个JSON-RPC请求对象。
- 在发送请求之前,它先创建一个
Future对象,并以请求id为键存入_pending_requests字典。这相当于立下了一个“凭证”。 - 然后,它将请求序列化为JSON字符串,加上换行符
\n(这非常重要,因为Server端是按行读取的),并通过writer写入到Server的stdin。 - 最后,它
await future。此时,协程会暂停执行,直到_listen_to_server收到对应的响应并调用future.set_result()为止。一旦被唤醒,它就拿到了响应并返回。
5. interactive_cli() 和 main()
interactive_cli函数提供了一个简单的用户交互界面。它在一个循环中等待用户输入命令(如ls .),然后解析命令并调用host.send_request()与Server通信。- 特别注意在处理
cat命令的响应时,我们对返回的Base64内容进行了解码,以便显示原始文件内容。 main函数是整个Host程序的入口。它创建StdioMcpHost实例,启动它,运行交互式CLI,最后在退出时确保调用host.stop()来清理子进程。
4.4 联调与实践:打通端到端链路
现在,万事俱备。我们将同时运行Server和Host,见证它们之间的第一次“对话”。
操作步骤:
-
打开一个终端。确保你位于
my-mcp-server目录下,并且虚拟环境已激活。 -
运行Host程序。在终端中执行:
python3 host.py -
观察初始输出。你应该会看到类似下面的日志,表明Host已成功启动Server子进程并连接:
2023-10-27 11:00:00,123 - HOST - INFO - Starting server with command: 'python3 main.py' 2023-10-27 11:00:00,125 - HOST - WARNING - SERVER_STDERR: 2023-10-27 11:00:00,124 - INFO - Server initialized with workspace root: /path/to/my-mcp-server/workspace 2023-10-27 11:00:00,126 - HOST - WARNING - SERVER_STDERR: 2023-10-27 11:00:00,125 - INFO - Starting MCP Server... 2023-10-27 11:00:00,127 - HOST - INFO - Host connected to server's stdio. --- MCP Host Interactive CLI --- Commands: ls <path>, cat <path>, countlines <path>, tools, exit >注意
SERVER_STDERR的输出,这正是我们的Host捕获到的Server日志,证明了通信管道是通的。 -
开始交互! 现在你可以在
>提示符后输入命令了。-
查看可用工具
> tools响应:
{ "jsonrpc": "2.0", "id": 1, "result": [ { "name": "project/countLines", "description": "Counts the number of lines in a specified file within the workspace.", "parameters": [ { "name": "path", "description": "The relative path to the file.", "type": "string", "required": true } ] } ] } -
列出工作区根目录
> ls .响应:
{ "jsonrpc": "2.0", "id": 2, "result": [ { "name": "welcome.txt", "type": "file" }, { "name": "code_example.py", "type": "file" } ] } -
查看文件内容
> cat welcome.txt响应:
--- File Content --- Hello, MCP World! This is your first file. -------------------- -
执行自定义工具
> countlines code_example.py响应:
{ "jsonrpc": "2.0", "id": 4, "result": { "result": { "lineCount": 3 }, "stdout": "Successfully counted 3 lines in code_example.py.", "stderr": null } } -
退出
> exitHost会优雅地关闭Server子进程并退出。
-
4.5 总结
在本章中,我们成功地构建了一个MCP Host,并将其与我们之前创建的Server连接起来,完成了一次端到端的交互实践。我们不仅看到了MCP如何工作,更亲手实现了它的工作过程。
我们学到了:
- 如何使用
asyncio.subprocess来管理子进程并通过stdio管道进行IPC(进程间通信)。 - 实现异步请求-响应匹配的核心机制:使用Future对象作为“凭证”。
- 如何构建一个健壮的Host,能够发送请求、接收响应,并处理Server的日志和错误。
- 如何将MCP的能力集成到一个实际的应用(即使是一个简单的CLI)中,为用户提供价值。
至此,我们已经完整地走过了MCP的“Hello, World!”之旅。我们既能创建“能力提供者”(Server),也能创建“能力消费者”(Host)。我们已经掌握了构建更复杂、更强大的MCP应用所需的所有基础知识和技能。在接下来的“实践项目”章节中,我们将综合运用所学的一切,去挑战一个更宏大、更接近真实世界场景的目标。
火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。
更多推荐
所有评论(0)