本文为 MCP 系列的第九篇文章。本文我们从开发的角度,介绍如何通过 Sampling,赋予 MCP 服务端智能能力(网上介绍 MCP 的文章很多,但关于 MCP Samping 的机制、编程实现的文章较少)。

本文以数据智能查询应用场景为例,该应用基于 LLM、MCP 实现以下功能:

  1. 用户以自然语言的形式提问,LLM 识别用户提问意图,并通过 MCP 查询对应的数据库数据,结合数据查询结果回答用户问题。

  2. 通过 MCP 的 Sampling,赋予 MCP 服务端智能能力:服务端在查询微博数据时,分析并返回每条微博内容的情感倾向。

关注我,后台回复:MCP,获取代码、样例数据。

实体对象关系

假设应用场景有三类数据:中国省份信息、电影数据、微博数据。

主要对象关系如上图所示(与前文MCP:编程实战,手把手教你实现数学运算智能问答应用、FastMCP,构建 MCP 的 python 框架,比官方 SDK 更好用!类似),区别在于此处 MCP 服务端暴露的是资源 resources 服务,提供数据库数据查询服务):

  • 会话管理:

    • 处理用户输入,返回 LLM 的响应;

    • 维护对话历史记录;

    • 协调LLM和MCP客户端之间的交互;

    • 解析LLM响应中的数据资源调用请求;

    • 将MCP返回的数据资源结果反馈给LLM;

  • 模型客户端:与大语言模型API通信的客户端。主要职责:封装与 LLM 服务的API交互;发送对话历史到LLM并获取响应。

  • MCP客户端:与MCP服务端通信的客户端

  • MCP服务端:提供数据资源服务的后端服务器,可提供中国地区数据、电影数据、微博数据三类数据。

利用 Sampling 实现微博内容的情感分析

关于 MCP 的 Sampling ,可阅读前文:一文读懂 MCP 的 Sampling(采样),赋予 MCP 服务端智能能力!。通过 Sampling,MCP 服务端在提供微博数据查询服务时,可以借助 LLM 能力,实现对微博内容的情感分析,并将分析结果返回给用户。

通过 Sampling 实现微博内容情感分析流程如下:

基于 FastMCP 的 Sampling 实现:

  • MCP 服务端获取 Context 上下文对象,调用 Context 对象的 sample 方法,发起 Samping 请求并获取结果。

  • 在定义 MCP 服务端方法时,Context 对象是通过依赖注入的方式自动注入的(当定义资源处理函数时,如果参数中包含 ctx: Context,FastMCP 会自动注入 Context 实例,不需要手动创建 Context 对象)。

  • sampling_handler 作为一个回调函数,注册到 MCP Client 中。sampling_handler 用于处理向 LLM 的 Sampling 请求。

Sampling 的交互流程:

 这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

编码实现

服务端

中国地区数据、电影数据查询服务:

@mcp.resource("db://province_info/data/{province_name}")
asyncdef get_province_data(province_name: str) -> str:
    """中国省份(含直辖市)信息

    参数:
    province_name: 中国省份名称或直辖市,如广东省,北京市

    返回:
    以 json 格式返回某省份信息
    """
    sql = '''
            select region_code,
                   province_name,
                   capital_city,
                   area,
                   population,
                   abbreviation 
            from public.chinese_provinces 
            where province_name = '{province_name}'
    '''.format(province_name=province_name)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return json.dumps(list(rows), default=str)


@mcp.resource("db://movie_info/data/{movie_name}")
asyncdef get_movie_data(movie_name: str) -> str:
    """电影信息

    参数:
    movie_name: 电影名称

    返回:
    以 json 格式返回某电影信息
    """
    sql = '''
    select movie_type, 
           main_actors, 
           region, 
           director, 
           features, 
           rating, 
           movie_name
    from public.chinese_movie_ratings
    where movie_name = '{movie_name}'
    '''.format(movie_name=movie_name)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            return json.dumps(list(rows), default=str)

微博数据查询(调用上下文对象的 sample 方法,获取每条微博的情感倾向):

@mcp.resource("db://weibo_data/data/?date={date}&limit={limit}")
asyncdef get_weibo_data(ctx: Context, date: str, limit: int = 10,) -> str:
    """微博数据

    参数:
    date: 日期,数据格式为 yyyymmdd
    limit: 要求返回的数据量

    返回:
    以 json 格式返回微博数据
    """
    sql = f'''
            select weibo_id,
                   user_name,
                   publish_date,
                   publish_time,
                   weibo_content 
            from public.t_weibo_ncov
            where publish_date = '{date}'
            limit {limit}
    '''.format(date=date, limit=limit)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()

            # 通过 Sampling,给每个微博内容增加情感倾向标签
            for weibo_data in rows:
                system_prompt = '请给出这条微博内容的情感倾向,标注分为三类的其中一个:积极,中性和消极'
                response = await ctx.sample(
                    messages=weibo_data['weibo_content'],
                    system_prompt=system_prompt
                )
                assert isinstance(response, TextContent)
                weibo_data['sentiment_tag'] = response.text
                print(weibo_data)
            return json.dumps(list(rows), default=str)


@mcp.resource("db://weibo_data/data/?weibo_user={weibo_user}&limit={limit}")
asyncdef get_weibo_user_data(ctx: Context, weibo_user: str, limit: int = 10,) -> str:
    """某具体微博用户名的微博内容

    参数:
    weibo_user: 微博用户名
    limit: 要求返回的数据量

    返回:
    以 json 格式返回微博数据
    """
    sql = '''
            select weibo_id,
                   user_name,
                   publish_date,
                   publish_time,
                   weibo_content 
            from public.t_weibo_ncov
            where user_name = '{weibo_user}'
            limit {limit}
    '''.format(weibo_user=weibo_user, limit=limit)
    with get_db_connection() as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(sql)
            rows = cur.fetchall()

            # 通过 Sampling,给每个微博内容增加情感倾向标签
            for weibo_data in rows:
                system_prompt = '请给出这条微博内容的情感倾向,标注分为三类的其中一个:积极,中性和消极'
                response = await ctx.sample(
                    messages=weibo_data['weibo_content'],
                    system_prompt=system_prompt
                )
                assert isinstance(response, TextContent)
                weibo_data['sentiment_tag'] = response.text
                print(weibo_data)
            return json.dumps(list(rows), default=str)

服务端设置与运行:

mcp = FastMCP("weibo data mcp server",
              debug=True,
              host="0.0.0.0",
              port=8002)

if __name__ == "__main__":
    mcp.run("sse")
客户端

会话管理:

class LLMClient:
    """大语言模型客户端,负责与LLM API进行通信"""

    def __init__(self, model_name: str, url: str, api_key: str) -> None:
        """
        初始化LLM客户端

        Args:
            model_name: 模型名称
            url: API基础URL
            api_key: API密钥
        """
        self.model_name: str = model_name
        self.url: str = url
        self.client = OpenAI(api_key=api_key, base_url=url)

    def get_response(self, messages: list[dict[str, str]]) -> str:
        """
        向LLM发送消息并获取响应

        Args:
            messages: 消息列表,每条消息包含role和content

        Returns:
            LLM的响应文本
        """
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=messages,
            stream=False
        )
        return response.choices[0].message.content


class ChatSession:
    """聊天会话管理器,处理用户输入、LLM响应和资源访问"""

    def __init__(self, llm_client: LLMClient, mcp_client: Client) -> None:
        """
        初始化聊天会话

        Args:
            llm_client: LLM客户端实例
            mcp_client: MCP客户端实例,用于访问资源
        """
        self.mcp_client: Client = mcp_client
        self.llm_client: LLMClient = llm_client

    asyncdef process_llm_response(self, llm_response: str) -> str:
        """
        处理LLM的响应,解析资源URI调用或工具调用并执行

        Args:
            llm_response: LLM返回的响应文本

        Returns:
            处理后的响应文本,包含资源数据或工具执行结果
        """
        try:
            # 检查是否为资源URI调用(以db://开头)
            if llm_response.strip().startswith('db://'):
                uri = llm_response.strip()
                try:
                    # 执行资源读取
                    resource_data = await self.mcp_client.read_resource(uri=uri)
                    returnf"Resource data: {resource_data}"
                except Exception as e:
                    error_msg = f"Error reading resource: {str(e)}"
                    logging.error(error_msg)
                    return error_msg
            else:
                # 尝试解析为JSON工具调用(保留兼容性)
                if llm_response.startswith('```json'):
                    llm_response = llm_response.strip('```json').strip('```').strip()
                try:
                    tool_call = json.loads(llm_response)
                    if"tool"in tool_call and"arguments"in tool_call:
                        # 检查工具是否可用
                        available_tools = await self.mcp_client.list_tools()
                        if any(tool.name == tool_call["tool"] for tool in available_tools):
                            try:
                                # 执行工具调用
                                tool_result = await self.mcp_client.call_tool(
                                    tool_call["tool"], tool_call["arguments"]
                                )
                                returnf"Tool execution result: {tool_result}"
                            except Exception as e:
                                error_msg = f"Error executing tool: {str(e)}"
                                logging.error(error_msg)
                                return error_msg
                        returnf"Tool not found: {tool_call['tool']}"
                except json.JSONDecodeError:
                    pass
                # 如果不是JSON格式或工具调用,直接返回原始响应
                return llm_response
        except Exception as e:
            error_msg = f"Error processing LLM response: {str(e)}"
            logging.error(error_msg)
            return error_msg

    asyncdef start(self, system_message: str) -> None:
        """
        启动聊天会话的主循环

        Args:
            system_message: 系统提示消息,指导LLM的行为
        """
        messages = [{"role": "system", "content": system_message}]
        whileTrue:
            try:
                # 获取用户输入
                user_input = input("用户: ").strip().lower()
                if user_input in ["quit", "exit", "退出"]:
                    print('AI助手已退出')
                    break
                messages.append({"role": "user", "content": user_input})

                # 获取LLM的初始响应
                llm_response = self.llm_client.get_response(messages)
                print("助手: ", llm_response)

                # 处理可能的资源调用或工具调用
                result = await self.process_llm_response(llm_response)

                # 如果处理结果与原始响应不同,说明执行了资源调用或工具调用,需要进一步处理
                while result != llm_response:
                    messages.append({"role": "assistant", "content": llm_response})
                    messages.append({"role": "system", "content": result})

                    # 将资源数据或工具执行结果发送回LLM获取新响应
                    llm_response = self.llm_client.get_response(messages)
                    result = await self.process_llm_response(llm_response)
                    print("助手: ", llm_response)

                messages.append({"role": "assistant", "content": llm_response})

            except KeyboardInterrupt:
                print('AI助手已退出')
                break

Sampling:通过 marvin 框架,创建 Agent,负责处理向 LLM 的 Sampling 请求

# 获取DeepSeek API密钥
api_key = os.getenv('DEEPSEEK_API_KEY')

# 初始化DeepSeek模型
model = OpenAIModel(
    'deepseek-chat',
    provider=DeepSeekProvider(api_key=api_key),
)

# 创建Marvin智能助手
agent = marvin.Agent(
    model=model,
    name="智能助手"
)


asyncdef sampling_func(
        messages: list[SamplingMessage],
        params: SamplingParams,
        ctx: RequestContext,
) -> str:
    """
    采样函数,用于处理消息并获取LLM响应

    Args:
        messages: 消息列表
        params: 采样参数
        ctx: 请求上下文

    Returns:
        LLM的响应文本
    """
    returnawait marvin.say_async(
        message=[m.content.text for m in messages],
        instructions=params.systemPrompt,
        agent=agent
    )

main 函数:

async def main():
    """主函数,初始化客户端并启动聊天会话"""
    asyncwith Client("http://127.0.0.1:8002/sse", sampling_handler=sampling_func) as mcp_client:
        # 初始化LLM客户端,使用通义千问模型
        llm_client = LLMClient(
            model_name='qwen-plus-latest',
            api_key=os.getenv('DASHSCOPE_API_KEY'),
            url='https://dashscope.aliyuncs.com/compatible-mode/v1'
        )

        # 获取可用资源模板
        resource_templates = await mcp_client.list_resource_templates()
        template_dicts = [template.__dict__ for template in resource_templates]
        resources_description = json.dumps(template_dicts, ensure_ascii=False)

        # 系统提示,指导LLM如何使用资源模板和返回响应
        system_message = f'''
                你是一个智能助手,能够访问多种数据资源。严格遵循以下协议返回响应:

                可用资源模板:{resources_description}

                响应规则:
                1、当用户询问需要查询数据时,判断是否需要调用资源模板:
                 - 如果需要查询数据,返回对应的资源URI
                 - 如果是普通对话,直接回答用户问题

                2、资源调用格式:
                 - 返回纯净的URI字符串,不包含任何其他内容
                 - URI格式必须严格按照资源模板定义
                 - 根据用户需求填入合适的参数值
                 
                3、URI参数说明:
                 - date: 日期参数,格式如0101表示1月1日
                 - limit: 限制返回数据条数
                 - 其他参数根据资源模板定义填入

                4、非数据查询的响应:
                 - 对于普通对话,直接给出自然语言回答
                 - 不需要调用资源时,不要返回URI

                5、收到资源数据后:
                 - 将数据转化为用户友好的格式
                 - 突出显示关键信息
                 - 保持回复简洁清晰
                 - 根据用户问题提供相关分析
                '''
        # 启动聊天会话
        chat_session = ChatSession(llm_client=llm_client, mcp_client=mcp_client)
        await chat_session.start(system_message=system_message)

运行效果

1 运行服务端:python weibo_data_server.py

2 运行客户端的 main 函数,进入会话,用户提问:

    • 介绍一下北京市

    • 介绍一下电影我不是药神

    • 获取1月1日前两条微博

    • 获取微博用户“是快乐小张张”的微博

下面的运行结果显示,LLM 可根据用户的提问,转换为相应的数据查询(通过 MCP 查询外部数据资源),并根据查询结果,回答用户的问题。微博数据的查询返回,提供了每条微博的情感倾向(利用 MCP Sampling 实现微博内容的情感分析):

D:\python_project\mcp_learning\.venv\Scripts\python.exe D:\python_project\mcp_learning\sample\weibo_data_client.py 
用户: 介绍一下北京市
助手:  db://province_info/data/北京市
助手:  北京市是中国的首都,位于华北地区。以下是一些关于北京市的关键信息:

- **区域代码**:110000
- **省份名称**:北京市
- **省会城市**:北京市
- **面积**:16,410平方公里
- **人口**:约2189.3万人
- **简称**:京

北京不仅是政治和文化中心,也是世界著名古都之一,拥有众多历史遗迹和现代化建筑,如故宫、颐和园和鸟巢等。
用户: 介绍一下电影我不是药神
助手:  db://movie_info/data/我不是药神
助手:  电影《我不是药神》是一部感人至深的剧情片,以下是该电影的关键信息:

- **电影名称**:我不是药神  
- **电影类型**:剧情  
- **地区**:中国大陆  
- **导演**:文牧野  
- **主演**:徐峥、王传君、周一围、谭卓、章宇  
- **影片特点**:经典  
- **评分**:8.9分(豆瓣高分)  

这部电影改编自真实事件,讲述了一位普通药店老板程勇为白血病患者走私低价救命药的故事。影片深刻反映了医疗问题与人性挣扎,获得了广泛赞誉和多项奖项。
用户: 获取1月1日前两条微博
助手:  db://weibo_data/data/?date=0101&limit=2
助手:  以下是2020年1月1日的前两条微博内容:

1. **用户:** 小-fairy  
   **时间:** 00:00  
   **内容:** "2019年过去了,所有不好的都忘记吧……2020年已经来了,所有愿望都许在新年这一天吧……2020中考顺利,家人身体健康,国家越来越强大?"  
   **情绪标签:** 积极  

2. **用户:** 黄大喵的金油瓶  
   **时间:** 00:42  
   **内容:** "这新年的祝福,手长的我先给利利送到~@齐木心美冲鸭(因为我预测零点的服务器会崩溃hhhhh~)本号的第一条原创!!献给你!!!味~说正祝可爱的利利同学在新的一年里也要好好加油哦~回想起与利利认识的第一天;和利利交谈的第一句话;和利利坐在一节课堂上……我都记不得了~⊙_?展开全文c"  
   **情绪标签:** 积极  

这两条微博展现了人们在新年伊始时对未来的美好祝愿以及分享生活点滴的喜悦心情。
用户: 获取微博用户“是快乐小张张”的微博
助手:  db://weibo_data/data/?weibo_user=是快乐小张张&limit=10
助手:  以下是微博用户“是快乐小张张”最近的一条微博内容:

- **发布时间**:02月03日 17:52  
- **微博内容**:终于哭了不再为自己的小小世界。我与疫情的关系是不想延迟开学的我,希望它快快过去,痛恨它阻碍了我的感情。在这段时间里,外面的世界纷纷扰扰,我在小小世界伸手不见五指,我甚至羡慕那些为了疫情新闻义愤填膺的普通人,至少还有心情关注外面的世界。孔维的视频我看到亲情:采访时一个顶天立地医生他想…展开全文c  
- **情绪标签**:消极  

这条微博反映了作者在疫情期间的情绪困扰,既有对个人生活和感情受阻的无奈,也有对外界疫情动态的关注与感慨。
用户: quit
AI助手已退出

Process finished with exit code 0

以上是基于 MCP 的数据智能查询应用实现。

  这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

一、大模型风口已至:月薪30K+的AI岗正在批量诞生

2025年大模型应用呈现爆发式增长,根据工信部最新数据:

国内大模型相关岗位缺口达47万

初级工程师平均薪资28K

70%企业存在"能用模型不会调优"的痛点

真实案例:某二本机械专业学员,通过4个月系统学习,成功拿到某AI医疗公司大模型优化岗offer,薪资直接翻3倍!

二、如何学习大模型 AI ?


🔥AI取代的不是人类,而是不会用AI的人!麦肯锡最新报告显示:掌握AI工具的从业者生产效率提升47%,薪资溢价达34%!🚀

由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。

但是具体到个人,只能说是:

“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。

这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

1️⃣ 提示词工程:把ChatGPT从玩具变成生产工具
2️⃣ RAG系统:让大模型精准输出行业知识
3️⃣ 智能体开发:用AutoGPT打造24小时数字员工

📦熬了三个大夜整理的《AI进化工具包》送你:
✔️ 大厂内部LLM落地手册(含58个真实案例)
✔️ 提示词设计模板库(覆盖12大应用场景)
✔️ 私藏学习路径图(0基础到项目实战仅需90天)

 

第一阶段(10天):初阶应用

该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。

*   大模型 AI 能干什么?
*   大模型是怎样获得「智能」的?
*   用好 AI 的核心心法
*   大模型应用业务架构
*   大模型应用技术架构
*   代码示例:向 GPT-3.5 灌入新知识
*   提示工程的意义和核心思想
*   Prompt 典型构成
*   指令调优方法论
*   思维链和思维树
*   Prompt 攻击和防范
*   …

第二阶段(30天):高阶应用

该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。

*   为什么要做 RAG
*   搭建一个简单的 ChatPDF
*   检索的基础概念
*   什么是向量表示(Embeddings)
*   向量数据库与向量检索
*   基于向量检索的 RAG
*   搭建 RAG 系统的扩展知识
*   混合检索与 RAG-Fusion 简介
*   向量模型本地部署
*   …

第三阶段(30天):模型训练

恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。

到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?

*   为什么要做 RAG
*   什么是模型
*   什么是模型训练
*   求解器 & 损失函数简介
*   小实验2:手写一个简单的神经网络并训练它
*   什么是训练/预训练/微调/轻量化微调
*   Transformer结构简介
*   轻量化微调
*   实验数据集的构建
*   …

第四阶段(20天):商业闭环

对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。

*   硬件选型
*   带你了解全球大模型
*   使用国产大模型服务
*   搭建 OpenAI 代理
*   热身:基于阿里云 PAI 部署 Stable Diffusion
*   在本地计算机运行大模型
*   大模型的私有化部署
*   基于 vLLM 部署大模型
*   案例:如何优雅地在阿里云私有部署开源大模型
*   部署一套开源 LLM 项目
*   内容安全
*   互联网信息服务算法备案
*   …

学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。

如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐