import asyncio
from mcp import ClientSession
from contextlib import AsyncExitStack

class MCPClient:
    def __init__(self):
        """初始化 MCP 客户端"""
        self.session = None
        self.exit_stack = AsyncExitStack()
    async def connect_to_mock_server(self):
        """模拟 MCP 服务器的连接(暂不连接真实服务器)"""
        print("✅ MCP 客户端已初始化,但未连接到服务器")
    async def chat_loop(self):
        """运行交互式聊天循环"""
        print("\nMCP 客户端已启动!输入 'quit' 退出")
        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == 'quit':
                    break
                print(f"\n🤖 [Mock Response] 你说的是:{query}")
            except Exception as e:
                print(f"\n⚠️ 发生错误: {str(e)}")
    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()
        
async def main():
    client = MCPClient()
    """
    这段代码能够初始化 MCP 客户端(但不连接服务器),并提供一个 交互式 CLI,可以输入查询(但只返
    回模拟回复),通过输入 quit 退出程序。需要注意的是,此时客户端没有关联任何大模型,因此只会
    重复用户的输入。
    2.4 MCP客户端基本代码结构
    以下是 client.py 代码详解,代码核心功能:
    初始化 MCP 客户端
    *提供一个命令行交互界面
    模拟 MCP 服务器连接
    支持用户输入查询并返回「模拟回复」
    支持安全退出
    代码具体解释如下:首先是导入必要的库
    """
    try:
        await client.connect_to_mock_server()
        await client.chat_loop()
    finally:
        await client.cleanup()
        
if __name__ == "__main__":
    asyncio.run(main())
    
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐