找回密码
 立即注册
首页 业界区 业界 Flask集成MCP的AI Agent

Flask集成MCP的AI Agent

孔季雅 3 小时前
前言

近年来,大量新兴的 AI 相关第三方库都提供了异步接口,有些甚至出于性能考虑仅支持异步调用,例如 MCP SDK。伴随着 Python 异步编程的发展,FastAPI 等框架迅速流行,许多新项目倾向于采用 FastAPI。但实际上,Flask 自 2.0 版本起也开始支持异步方法,因此我们也能借助 Flask 参与到 MCP 的 AI Agent 浪潮中。
PS:需要注意的是,Flask 的异步性能表现不佳。正如 Flask 官方文档所指出的那样,每当收到请求时,Flask 都会在一个线程内启动一个新的事件循环来执行异步视图函数,然后返回结果。这种方式增加了额外的性能开销。经过个人实测,Flask 的异步视图性能确实不如传统的同步视图。因此,如果有异步需求,建议还是选用像 FastAPI 这样原生支持异步的框架。
Async functions require an event loop to run. Flask, as a WSGI application, uses one worker to handle one request/response cycle. When a request comes in to an async view, Flask will start an event loop in a thread, run the view function there, then return the result.
Each request still ties up one worker, even for async views. The upside is that you can run async code within a view, for example to make multiple concurrent database queries, HTTP requests to an external API, etc. However, the number of requests your application can handle at one time will remain the same.
Async is not inherently faster than sync code. Async is beneficial when performing concurrent IO-bound tasks, but will probably not improve CPU-bound tasks. Traditional Flask views will still be appropriate for most use cases, but Flask's async support enables writing and using code that wasn't possible natively before.
本文使用的 LLM 是阿里的通义千问,需要自行申请 API Key。其他兼容 OpenAI SDK 的模型理论上也可使用。
本文介绍的是如何在 Flask 中集成基于 MCP Client 和 MCP Server 的 AI Agent,并不仅仅是用 Flask 开发一个 MCP Server,所以只关注 Flask 实现 MCP Server 的看众可以关闭本文了。
安装 SDK

虽然 Flask 从 2.0 版本开始支持异步功能,但这部分功能需要额外安装相关依赖。这里我们将 MCP 和 LLM 相关的依赖一起安装。
  1. uv add 'flask[async]' fastmcp openai
复制代码
此外还需要安装 gunicorn 和 gevent,这两个是在生产环境中常用的部署工具。虽然是开发演示项目,但我们也会安装它们来验证异步功能的支持情况。
  1. uv add gunicorn gevent
复制代码
代码示例

代码结构

由于这只是个演示项目,所以代码结构相对简单。
  1. ├── aiagent  # aiagent 模块
  2. │   ├── mcp_client.py  # 封装 mcp 的 client
  3. │   └── mcp_servers  # mcp 的 servers
  4. │       ├── common.py  # 会被导入到 composition 的 mcp server 中
  5. │       └── composition.py  # composition 的 mcp server, client 只会连这个 mcp server
  6. ├── config.py  # 配置模块
  7. ├── log.py  # 日志模块
  8. ├── main.py  # 入口文件
  9. ├── pyproject.toml
  10. ├── README.md
  11. └── uv.lock
复制代码
配置模块

简单写写,能用就行,注意替换 API Key。config.py
  1. class Config:
  2.     @property
  3.     def llm_base_url(self) -> str:
  4.         return "https://dashscope.aliyuncs.com/compatible-mode/v1"
  5.    
  6.     @property
  7.     def llm_model(self) -> str:
  8.         return "qwen-plus"
  9.    
  10.     @property
  11.     def llm_api_key(self) -> str:
  12.         return "<your api key>"
  13.    
  14. cfg = Config()
复制代码
日志模块

简单写写,能用就行。log.py
  1. import logging
  2. import sys
  3. def setup_logger() -> logging.Logger:
  4.     level = logging.DEBUG
  5.     logger = logging.getLogger("flask-mcp")
  6.     logger.setLevel(level)
  7.     handler = logging.StreamHandler(sys.stdout)
  8.     handler.setLevel(level)
  9.     fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"
  10.     datefmt = "%Y-%m-%d %H:%M:%S"
  11.     formatter = logging.Formatter(fmt, datefmt)
  12.     handler.setFormatter(formatter)
  13.     logger.addHandler(handler)
  14.     return logger
  15. logger = setup_logger()
复制代码
MCP Servers

common

我们可以将这个 common mcp server 视为子服务器,它将被导入到父级 composition server 中统一管理。
在 common mcp server 中,以下仅实现了一个用于获取当前时间的工具函数。
  1. from datetime import datetime
  2. from fastmcp import FastMCP
  3. mcp = FastMCP(name="common mcp server", instructions="Common MCP server for general tasks.")
  4. @mcp.tool
  5. async def get_current_datetime() -> str:
  6.     """Get the current date and time as a string. Format: YYYY-MM-DDTHH:MM:SS±hhmm"""
  7.     return datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")
  8. if __name__ == "__main__":
  9.     mcp.run(transport="stdio", show_banner=False)
复制代码
composition

其他的 mcp server 都会组合到这个 composition mcp server 中。一方面 mcp client 只需要连这一个 composition mcp server 即可,另一方面可以按功能组织 mcp server 的代码。
  1. import asyncio
  2. import sys
  3. from pathlib import Path
  4. sys.path.append(str(Path(__file__).parents[2]))
  5. from fastmcp import FastMCP
  6. from aiagent.mcp_servers.common import mcp as common_mcp
  7. composition_mcp = FastMCP(name="composition mcp server")
  8. async def compose():
  9.     await composition_mcp.import_server(common_mcp)
  10. if __name__ == "__main__":
  11.     asyncio.run(compose())
  12.     composition_mcp.run(transport="stdio", show_banner=False)
复制代码
MCP Client

aiagent/mcp_client.py 负责整合 LLM 与 MCP Server 的交互。示例中的对话记忆仅存储在实例变量中,在实际应用中应考虑持久化存储方案。
  1. import json
  2. from pathlib import Path
  3. from typing import cast
  4. from fastmcp.client import Client, StdioTransport
  5. from openai import AsyncOpenAI
  6. from openai.types.chat import ChatCompletionMessageFunctionToolCall
  7. from config import cfg
  8. from log import logger
  9. class MCPClient:
  10.     def __init__(self):
  11.         self.client = Client(StdioTransport(
  12.             command=str(Path(__file__).parent.parent / ".venv" / "bin" / "python"),
  13.             args=[str(Path(__file__).parent / "mcp_servers" / "composition.py")],
  14.             cwd=str(Path(__file__).parent / "mcp_servers"),
  15.         ))
  16.         self._llm = AsyncOpenAI(
  17.             base_url=cfg.llm_base_url,
  18.             api_key=cfg.llm_api_key,
  19.         )
  20.         self._temp_memories = []
  21.     async def close(self):
  22.         if self.client:
  23.             await self.client.close()
  24.     async def process(self, prompt: str, system_prompt: str = "") -> str:
  25.         if system_prompt:
  26.             self._temp_memories.append({"role": "system", "content": system_prompt})
  27.         
  28.         self._temp_memories.append({"role": "user", "content": prompt})
  29.         async with self.client:
  30.             tools = await self.client.list_tools()
  31.             available_tools = []
  32.             for tool in tools:
  33.                 available_tools.append({
  34.                     "type": "function",
  35.                     "function": {
  36.                         "name": tool.name,
  37.                         "description": tool.description,
  38.                         "parameters": tool.inputSchema,
  39.                     }
  40.                 })
  41.             logger.info(f"Available mcp tools: {[tool.name for tool in tools]}")
  42.             resp = await self._llm.chat.completions.create(
  43.                 model=cfg.llm_model,
  44.                 messages=self._temp_memories,
  45.                 tools=available_tools,
  46.                 temperature=0.3,
  47.             )
  48.             # 存储最终响应文本
  49.             final_text = []
  50.             # 获取 LLM 的首个响应消息
  51.             message = resp.choices[0].message
  52.             # 如果响应包含直接内容,则添加到结果中
  53.             if hasattr(message, "content") and message.content:
  54.                 final_text.append(message.content)
  55.             # 循环处理工具调用,直到没有更多工具调用为止
  56.             while message.tool_calls:
  57.                 # 遍历所有工具调用
  58.                 for tool_call in message.tool_calls:
  59.                     # 确保工具调用有函数信息
  60.                     if not hasattr(tool_call, "function"):
  61.                         continue
  62.                     # 类型转换以获取函数调用详情
  63.                     function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
  64.                     function = function_call.function
  65.                     tool_name = function.name
  66.                     # 解析函数参数
  67.                     tool_args = json.loads(function.arguments)
  68.                     # 检查 MCP 客户端是否已连接
  69.                     if not self.client.is_connected():
  70.                         raise RuntimeError("Session not initialized. Cannot call tool.")
  71.                     
  72.                     # 调用 MCP 服务器上的指定工具
  73.                     logger.info(f"Calling tool: {tool_name} with args: {tool_args}")
  74.                     result = await self.client.call_tool(tool_name, tool_args)
  75.                     # 将助手的工具调用添加到消息历史中
  76.                     self._temp_memories.append({
  77.                         "role": "assistant",
  78.                         "tool_calls": [
  79.                             {
  80.                                 "id": tool_call.id,
  81.                                 "type": "function",
  82.                                 "function": {
  83.                                     "name": function.name,
  84.                                     "arguments": function.arguments
  85.                                 }
  86.                             }
  87.                         ]
  88.                     })
  89.                     # 将工具调用结果添加到消息历史中
  90.                     self._temp_memories.append({
  91.                         "role": "tool",
  92.                         "tool_call_id":tool_call.id,
  93.                         "content": str(result.content) if result.content else ""
  94.                     })
  95.                
  96.                 # 基于工具调用结果再次调用 LLM
  97.                 final_resp = await self._llm.chat.completions.create(
  98.                     model=cfg.llm_model,
  99.                     messages=self._temp_memories,
  100.                     tools=available_tools,
  101.                     temperature=0.3,
  102.                 )
  103.                 # 更新消息为最新的 LLM 响应
  104.                 message = final_resp.choices[0].message
  105.                 # 如果响应包含内容,则添加到最终结果中
  106.                 if message.content:
  107.                     final_text.append(message.content)
  108.             # 返回连接后的完整响应
  109.             return "\n".join(final_text)
复制代码
main

main.py 是应用程序的入口文件,其中实现了健康检查的同步视图和处理聊天请求的异步视图。此外还自定义了响应类,确保 Flask 能正确处理非英文字符。
  1. import json
  2. from http import HTTPStatus
  3. from typing import Optional
  4. from flask import Flask, Response, request
  5. from aiagent.mcp_client import MCPClient
  6. app = Flask(__name__)
  7. class APIResponse(Response):
  8.     def __init__(self, data: Optional[dict] = None, code: HTTPStatus = HTTPStatus.OK, msg: str = "success"):
  9.         headers = dict({"Content-Type": "application/json; charset=utf-8"})
  10.         response = json.dumps({
  11.             "code": code.value,
  12.             "msg": msg,
  13.             "data": data,
  14.         }, ensure_ascii=False, default=str)
  15.         super().__init__(response=response, status=code.value, headers=headers)
  16. @app.get("/health")
  17. def health_check():
  18.     return APIResponse(data={"status": "ok"})
  19. @app.post("/chat")
  20. async def post_chat():
  21.     try:
  22.         req_body: Optional[dict] = request.get_json()
  23.         if not req_body or "prompt" not in req_body:
  24.             return APIResponse(code=HTTPStatus.BAD_REQUEST, msg="Missing 'prompt' in request body")
  25.         prompt = req_body.get("prompt")
  26.         if not isinstance(prompt, str):
  27.             return APIResponse(code=HTTPStatus.BAD_REQUEST, msg="'prompt' must be a string")
  28.     except Exception as e:
  29.         return APIResponse(code=HTTPStatus.BAD_REQUEST, msg=str(e))
  30.    
  31.     mcp_client = MCPClient()
  32.     try:
  33.         resp = await mcp_client.process(prompt=prompt)
  34.         resp_body = {
  35.             "content": resp
  36.         }
  37.         return APIResponse(data=resp_body)
  38.     except Exception as e:
  39.         return APIResponse(code=HTTPStatus.INTERNAL_SERVER_ERROR, msg=str(e))
  40.     finally:
  41.         await mcp_client.close()
  42. if __name__ == "__main__":
  43.     app.run(host="127.0.0.1", port=8000)
复制代码
运行测试


  • 首先启动服务端应用
  1. python main.py
复制代码

  • 使用 curl 发起请求进行测试。可以看到接口成功返回了答案。通过服务端控制台日志可以看出 mcp client 成功调用了 get_current_datetime 工具。
  1. $ curl --request POST \
  2. --url http://127.0.0.1:8000/chat \
  3. --header 'content-type: application/json' \
  4. --data '{
  5. "prompt": "今天的日期是什么"
  6. }'
  7. {"code": 200, "msg": "success", "data": {"content": "今天的日期是 2025 年 12 月 8 日。"}}
  8. # Server 端控制台日志
  9. 2025-12-08 21:55:50 | flask-mcp | INFO | mcp_client.py:51 | process | Available mcp tools: ['get_current_datetime']
  10. 2025-12-08 21:55:51 | flask-mcp | INFO | mcp_client.py:88 | process | Calling tool: get_current_datetime with args: {}
复制代码

  • 使用 gunicorn 启动应用,测试 gunicorn 对异步方法的支持情况。
  1. gunicorn main:app -n 127.0.0.1:8000 -w 4 -k gevent --worker-connections 1000
复制代码

  • 再次使用 curl 测试。测试依然正常。
  1. $ curl --request POST \
  2.   --url http://127.0.0.1:8000/chat \
  3.   --header 'content-type: application/json' \
  4.   --data '{
  5.   "prompt": "现在是什么时候?"
  6. }'
  7. {"code": 200, "msg": "success", "data": {"content": "现在是 2025 年 12 月 8 日 22 时 37 分 50 秒。"}}
复制代码
小结

通过上述示例可以看出,Flask 仍然能够胜任基于 MCP 的 AI Agent 应用开发任务。而且 Flask 2.0 之后的版本与之前版本保持良好的兼容性,因此可以考虑将旧项目升级到新版。不过需要再次强调的是,Flask 的异步视图性能并不理想,对于新的 AI Agent 项目,建议优先选择原生支持异步的框架。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册