一、概述
MCP服务端当前支持两种与客户端的数据通信方式:标准输入输出(stdio) 和 基于Http的服务器推送事件(http sse)
1.1 标准输入输出(stdio)
原理: 标准输入输出是一种用于本地通信的传输方式。在这种模式下,MCP 客户端会将服务器程序作为子进程启动,双方通过约定的标准输入和标准输出(可能是通过共享文件等方法)进行数据交换。具体而言,客户端通过标准输入发送请求,服务器通过标准输出返回响应。。
适用场景: 标准输入输出方式适用于客户端和服务器在同一台机器上运行的场景(本地自行编写服务端或将别人编写的服务端代码pull到本地执行),确保了高效、低延迟的通信。这种直接的数据传输方式减少了网络延迟和传输开销,适合需要快速响应的本地应用。
1.2 基于Http的服务器推送事件(http sse)
原理: 客户端和服务端通过 HTTP 协议进行通信,利用 SSE 实现服务端向客户端的实时数据推送,服务端定义了/see与/messages接口用于推送与接收数据。这里要注意SSE协议和WebSocket协议的区别,SSE协议是单向的,客户端和服务端建立连接后,只能由服务端向客户端进行消息推送。而WebSocket协议客户端和服务端建立连接后,客户端可以通过send向服务端发送数据,并通过onmessage事件接收服务端传过来的数据。
适用场景: 适用于客户端和服务端位于不同物理位置的场景,尤其是对于分布式或远程部署的场景,基于 HTTP 和 SSE 的传输方式更合适。
二、MCP开发应用
MCP Server应用平台
主要有以下这些:1.GitHub - modelcontextprotocol/servers: Model Context Protocol Servers
2.PulseMCP | Keep up-to-date with MCP
3.Awesome MCP Servers
4.Smithery - Model Context Protocol Registry
5.Open-Source MCP servers | Glama
6.Cursor Directory - Cursor Rules & MCP Servers
7.MCP.SO
8.List of all MCP Servers (42) | Portkey
9.Cline/MCP-MarketPlace
10.Reddit/MCP
这里面有很多已经开发好的mcp应用,可以拿来直接使用即可。Cherry Studio客户端推荐的mcp平台为:https://mcp.so 但是大家可以发现一个问题,就是MCP Server应用平台,发布的应用,80%以上,运行方式都是stdio,这种方式适合本地开发。但是想在dify里面使用,一般都是sse方式,因为适合远程调用。即使是不同命名空间,不同的工作流依然可以调用,只要网络能联通,就没问题。而且sse方式,只需要运行一次,就可以让成千上万个dify应用调用,还是很方便的。 MCP应用开发
以mysql为例子,进入网站https://mcp.so,搜索mysql,找到Mysql_mcp_server_pro,链接如下:https://mcp.so/server/mysql_mcp_server_pro/wenb1n-dev(xwb602625136)?tab=content
github地址如下:https://github.com/wenb1n-dev/mysql_mcp_server_pro
这个应用只支持mysql 5.6,由于我的mysql版本是8.0,需要修改对应的源码才行。 新建空目录Mysql_mcp_server_pro,新建2个文件.evn,server.py.env文件内容如下:- # MySQL数据库配置
- MYSQL_HOST=192.168.20.128
- MYSQL_PORT=3306
- MYSQL_USER=root
- MYSQL_PASSWORD=abcd@1234
- MYSQL_DATABASE=test
复制代码 这个是mysql连接信息
server.py- import os
- import uvicorn
- from mcp.server.sse import SseServerTransport
- from mysql.connector import connect, Error
- from mcp.server import Server
- from mcp.types import Tool, TextContent
- from starlette.applications import Starlette
- from starlette.routing import Route, Mount
- from dotenv import load_dotenv
- def get_db_config():
- """从环境变量获取数据库配置信息
- 返回:
- dict: 包含数据库连接所需的配置信息
- - host: 数据库主机地址
- - port: 数据库端口
- - user: 数据库用户名
- - password: 数据库密码
- - database: 数据库名称
- 异常:
- ValueError: 当必需的配置信息缺失时抛出
- """
- # 加载.env文件
- load_dotenv()
- config = {
- "host": os.getenv("MYSQL_HOST", "localhost"),
- "port": int(os.getenv("MYSQL_PORT", "3306")),
- "user": os.getenv("MYSQL_USER"),
- "password": os.getenv("MYSQL_PASSWORD"),
- "database": os.getenv("MYSQL_DATABASE"),
- }
- print(config)
- if not all([config["user"], config["password"], config["database"]]):
- raise ValueError("缺少必需的数据库配置")
- return config
- def execute_sql(query: str) -> list[TextContent]:
- """执行SQL查询语句
- 参数:
- query (str): 要执行的SQL语句,支持多条语句以分号分隔
- 返回:
- list[TextContent]: 包含查询结果的TextContent列表
- - 对于SELECT查询:返回CSV格式的结果,包含列名和数据
- - 对于SHOW TABLES:返回数据库中的所有表名
- - 对于其他查询:返回执行状态和影响行数
- - 多条语句的结果以"---"分隔
- 异常:
- Error: 当数据库连接或查询执行失败时抛出
- """
- config = get_db_config()
- try:
- with connect(**config) as conn:
- with conn.cursor() as cursor:
- statements = [stmt.strip() for stmt in query.split(";") if stmt.strip()]
- results = []
- for statement in statements:
- try:
- cursor.execute(statement)
- # 检查语句是否返回了结果集 (SELECT, SHOW, EXPLAIN, etc.)
- if cursor.description:
- columns = [desc[0] for desc in cursor.description]
- rows = cursor.fetchall()
- # 将每一行的数据转换为字符串,特殊处理None值
- formatted_rows = []
- for row in rows:
- formatted_row = [
- "NULL" if value is None else str(value)
- for value in row
- ]
- formatted_rows.append(",".join(formatted_row))
- # 将列名和数据合并为CSV格式
- results.append(
- "\n".join([",".join(columns)] + formatted_rows)
- )
- # 如果语句没有返回结果集 (INSERT, UPDATE, DELETE, etc.)
- else:
- conn.commit() # 只有在非查询语句时才提交
- results.append(f"查询执行成功。影响行数: {cursor.rowcount}")
- except Error as stmt_error:
- # 单条语句执行出错时,记录错误并继续执行
- results.append(
- f"执行语句 '{statement}' 出错: {str(stmt_error)}"
- )
- # 可以在这里选择是否继续执行后续语句,目前是继续
- return [TextContent(type="text", text="\n---\n".join(results))]
- except Error as e:
- print(f"执行SQL '{query}' 时出错: {e}")
- return [TextContent(type="text", text=f"执行查询时出错: {str(e)}")]
- def get_table_name(text: str) -> list[TextContent]:
- """根据表的中文注释搜索数据库中的表名
- 参数:
- text (str): 要搜索的表中文注释关键词
- 返回:
- list[TextContent]: 包含查询结果的TextContent列表
- - 返回匹配的表名、数据库名和表注释信息
- - 结果以CSV格式返回,包含列名和数据
- """
- config = get_db_config()
- sql = "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT "
- sql += f"FROM information_schema.TABLES WHERE TABLE_SCHEMA = '{config['database']}' AND TABLE_COMMENT LIKE '%{text}%';"
- return execute_sql(sql)
- def get_table_desc(text: str) -> list[TextContent]:
- """获取指定表的字段结构信息
- 参数:
- text (str): 要查询的表名,多个表名以逗号分隔
- 返回:
- list[TextContent]: 包含查询结果的TextContent列表
- - 返回表的字段名、字段注释等信息
- - 结果按表名和字段顺序排序
- - 结果以CSV格式返回,包含列名和数据
- """
- config = get_db_config()
- # 将输入的表名按逗号分割成列表
- table_names = [name.strip() for name in text.split(",")]
- # 构建IN条件
- table_condition = "','".join(table_names)
- sql = "SELECT TABLE_NAME, COLUMN_NAME, COLUMN_COMMENT "
- sql += (
- f"FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{config['database']}' "
- )
- sql += f"AND TABLE_NAME IN ('{table_condition}') ORDER BY TABLE_NAME, ORDINAL_POSITION;"
- return execute_sql(sql)
- def get_lock_tables() -> list[TextContent]:
- sql = """SELECT
- p2.`HOST` AS 被阻塞方host,
- p2.`USER` AS 被阻塞方用户,
- r.trx_id AS 被阻塞方事务id,
- r.trx_mysql_thread_id AS 被阻塞方线程号,
- TIMESTAMPDIFF(SECOND, r.trx_wait_started, CURRENT_TIMESTAMP) AS 等待时间,
- r.trx_query AS 被阻塞的查询,
- l.OBJECT_NAME AS 阻塞方锁住的表,
- m.LOCK_MODE AS 被阻塞方的锁模式,
- m.LOCK_TYPE AS '被阻塞方的锁类型(表锁还是行锁)',
- m.INDEX_NAME AS 被阻塞方锁住的索引,
- m.OBJECT_SCHEMA AS 被阻塞方锁对象的数据库名,
- m.OBJECT_NAME AS 被阻塞方锁对象的表名,
- m.LOCK_DATA AS 被阻塞方事务锁定记录的主键值,
- p.`HOST` AS 阻塞方主机,
- p.`USER` AS 阻塞方用户,
- b.trx_id AS 阻塞方事务id,
- b.trx_mysql_thread_id AS 阻塞方线程号,
- b.trx_query AS 阻塞方查询,
- l.LOCK_MODE AS 阻塞方的锁模式,
- l.LOCK_TYPE AS '阻塞方的锁类型(表锁还是行锁)',
- l.INDEX_NAME AS 阻塞方锁住的索引,
- l.OBJECT_SCHEMA AS 阻塞方锁对象的数据库名,
- l.OBJECT_NAME AS 阻塞方锁对象的表名,
- l.LOCK_DATA AS 阻塞方事务锁定记录的主键值,
- IF(p.COMMAND = 'Sleep', CONCAT(p.TIME, ' 秒'), 0) AS 阻塞方事务空闲的时间
- FROM performance_schema.data_lock_waits w
- INNER JOIN performance_schema.data_locks l ON w.BLOCKING_ENGINE_LOCK_ID = l.ENGINE_LOCK_ID
- INNER JOIN performance_schema.data_locks m ON w.REQUESTING_ENGINE_LOCK_ID = m.ENGINE_LOCK_ID
- INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
- INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
- INNER JOIN information_schema.PROCESSLIST p ON p.ID = b.trx_mysql_thread_id
- INNER JOIN information_schema.PROCESSLIST p2 ON p2.ID = r.trx_mysql_thread_id
- ORDER BY 等待时间 DESC;"""
- return execute_sql(sql)
- # 初始化服务器
- app = Server("operateMysql")
- @app.list_tools()
- async def list_tools() -> list[Tool]:
- """列出可用的MySQL工具
- 返回:
- list[Tool]: 工具列表,当前仅包含execute_sql工具
- """
- return [
- Tool(
- name="execute_sql",
- description="在MySQL8.0数据库上执行SQL",
- inputSchema={
- "type": "object",
- "properties": {
- "query": {"type": "string", "description": "要执行的SQL语句"}
- },
- "required": ["query"],
- },
- ),
- Tool(
- name="get_table_name",
- description="根据表中文名搜索数据库中对应的表名",
- inputSchema={
- "type": "object",
- "properties": {
- "text": {"type": "string", "description": "要搜索的表中文名"}
- },
- "required": ["text"],
- },
- ),
- Tool(
- name="get_table_desc",
- description="根据表名搜索数据库中对应的表结构,支持多表查询",
- inputSchema={
- "type": "object",
- "properties": {
- "text": {"type": "string", "description": "要搜索的表名"}
- },
- "required": ["text"],
- },
- ),
- Tool(
- name="get_lock_tables",
- description="获取当前mysql服务器InnoDB 的行级锁",
- inputSchema={"type": "object", "properties": {}},
- ),
- ]
- @app.call_tool()
- async def call_tool(name: str, arguments: dict) -> list[TextContent]:
- if name == "execute_sql":
- query = arguments.get("query")
- if not query:
- raise ValueError("缺少查询语句")
- return execute_sql(query)
- elif name == "get_table_name":
- text = arguments.get("text")
- if not text:
- raise ValueError("缺少表信息")
- return get_table_name(text)
- elif name == "get_table_desc":
- text = arguments.get("text")
- if not text:
- raise ValueError("缺少表信息")
- return get_table_desc(text)
- elif name == "get_lock_tables":
- return get_lock_tables()
- raise ValueError(f"未知的工具: {name}")
- sse = SseServerTransport("/messages/")
- # Handler for SSE connections
- async def handle_sse(request):
- async with sse.connect_sse(
- request.scope, request.receive, request._send
- ) as streams:
- await app.run(streams[0], streams[1], app.create_initialization_options())
- # Create Starlette app with routes
- starlette_app = Starlette(
- debug=True,
- routes=[
- Route("/sse", endpoint=handle_sse),
- Mount("/messages/", app=sse.handle_post_message),
- ],
- )
- if __name__ == "__main__":
- uvicorn.run(starlette_app, host="0.0.0.0", port=9000)
复制代码 这里面,主要提供了4个工具方法,分别是:- execute_sql
- get_table_name
- get_table_desc
- get_lock_tables
复制代码
安装python依赖- pip install mcp
- pip install mysql-connector-python
- pip install uvicorn
- pip install python-dotenv
- pip install starlette
复制代码
启动应用输出:- INFO: Started server process [23756]
- INFO: Waiting for application startup.
- INFO: Application startup complete.
- INFO: Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
- INFO: 127.0.0.1:60896 - "GET /sse HTTP/1.1" 200 OK
复制代码
访问页面:http://127.0.0.1:9000/sse
效果如下:
三、测试MCP应用
客户端添加MCP
以Cherry Studio客户端为例子,注意:必须是Cherry Studio最新版本,才有MCP设置。
添加MCP服务器
输入名称:mysql_mcp_server_pro
类型:sse
URL:http://127.0.0.1:9000/sse
点击保存
保存成功后,就可以看到工具列表了,只有4个
测试MCP应用
返回主页,点击新建助手,选择模型
在输入框,找到MCP服务器
开启MCP
先来看mysql的数据表score,内容如下:
查询成绩表score的内容
查询成绩表的表名是什么
查询成绩表的表结构
获取当前mysql的锁
总结一下,AI模型调用MCP,还是很方便的。
有些时候AI模型做不到的,你可以自己写一个MCP应用。比如上面提到的查询mysql表数据,还有很多呢。
比如查询内部CRM系统,gitlab信息,内部业务系统,处理特定格式excel文件等等,都可以的。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |