模型上下文协议(MCP)开发实战——构建LangChain代理客户端

360影视 日韩动漫 2025-04-01 18:27 2

摘要:“MCP是一种开放协议,它标准化了应用程序向LLM提供上下文的方式。可以将MCP视为AI应用程序的USB-C端口。正如USB-C提供了一种将你的设备连接到各种外围设备和配件的标准化方式一样,MCP提供了一种将AI模型连接到不同数据源和工具的标准化方式。”

本文首先介绍模型上下文协议(MCP)的系统构架,然后基于MCP+LangChain+LangGraph组合框架开发了一个比较完整的基础代理客户端。

什么是模型上下文协议(Model Context Protocol)?让我们深入了解MCP背后的概念。以下是官方MCP文档对MCP的介绍:

“MCP是一种开放协议,它标准化了应用程序向LLM提供上下文的方式。可以将MCP视为AI应用程序的USB-C端口。正如USB-C提供了一种将你的设备连接到各种外围设备和配件的标准化方式一样,MCP提供了一种将AI模型连接到不同数据源和工具的标准化方式。”

让我来解释一下。假设你正在构建与不同语言模型和AI系统配合使用的AI代理,其中每个模型对工具的理解方式都不同。你已经编写了代码来使你的AI代理能够针对一个特定的AI模型进行构建,假设你的系统构架如下图所示:

假设你将来可能希望切换到具有不同架构和工具定义方法的另一个AI模型,那么你必须回去重新编写工具以适应这种新的AI模型架构和方法,假设新架构如下图所示:

我想,作为程序员,你已经看到了这里的问题——这是不可扩展的。如果我们可以编写一次工具,然后能够将其与任何AI模型架构连接起来,而不必担心这个AI模型架构在后台如何工作,那会怎样呢?

这会为我们省去很多麻烦,不是吗?是的,它不仅可扩展,我们还可以连接任何我们想要的AI模型!

你可能会想,我们刚刚引入了另一层(MCP层),另一层就意味着更复杂吗?是的,但增加这一层的好处远远大于坏处。下面是官方文档的说明:

“MCP可帮助你在LLM之上构建代理和复杂的工作流。LLM通常需要与数据和工具集成,而MCP可提供:

你的LLM可以直接插入不断增加的预构建集成列表在LLM提供商和供应商之间切换的灵活性保护基础架构内数据的最佳实践”

总体来说,MCP架构遵循客户端-服务器架构。我们可以让一个客户端连接到多个服务器(MCP服务器)。

现在,让我们来分析一下上面的图形架构:

MCP主机:顶部的“主机(代理、工具)”框代表想要通过模型上下文协议访问数据的程序。MCP客户端:通过MCP协议直接与MCP服务器(A、B、C)连接的客户端。MCP服务器:用三个框表示(MCP服务器A、B、C),每个框连接到不同的服务。本地数据源:文件系统和本地Postgres数据库。远程服务:虚拟私有云之外的Postgres存储。该架构显示了一个包含MCP基础设施的VPC(虚拟私有云)。其中,主机与多个MCP服务器通信,每个服务器处理特定的服务集成。

对于我们的第一个MCP服务器,我想直接在官方文档上创建一个天气MCP服务器,这只是为了让我们更迅速地了解MCP的方式。然后,我们将此服务器连接到一个LangChain代理。

如果愿意,你可以按照官方文档中的说明进行操作,我也会在本文中提供相关步骤。

在本文案例中,我们选择使用uv包管理器,它是推荐的包管理器,而且速度非常快,所以我会坚持使用它。通过运行下面的命令来安装它:

复制

curl -LsSf https://astral.sh/uv/install.sh | sh1.

至此,我已经成功地将它安装在我的机器上。如果这是你第一次安装它,你可能需要重新启动你的终端。

简言之,我正在使用Ma/Linux命令。如果你使用的是Windows,你可以按照官方文档中的Powershell命令进行操作。

复制

# 为我们的项目创建一个新目录uv init weather cd weather # 创建虚拟环境并激活它uv venv source .venv/bin/activate # 安装依赖项uv add "mcp[cli]" httpx # 创建我们的服务器文件touch weather.py1.2.3.4.5.6.7.8.9.10.

完成后,你可以在你最喜欢的IDE中打开目录。我将使用VSCode;如果你愿意,也可以使用Cursor或者任何其他IDE。

复制

编写服务器端代码

对于本文中的代码,我将使用官方文档中的代码。在此,非常感谢MCP团队提供的代码。

首先,我们将实例化FastMCP类,这有助于大多数工具创建逻辑,例如来自工具函数的文档字符串的工具描述以及函数类型提示。

复制

from typing import Anyimport httpxfrom mcp.server.fastmcp import FastMCP#初始化FastMCP服务器mcp = FastMCP("weather")# 指定常量NWS_API_BASE = "https://api.weather.gov"USER_AGENT = "weather-app/1.0"1.2.3.4.5.6.7.8.辅助函数

我们还将创建几个辅助函数,用于帮助格式化来自API的数据。

复制

async def make_nws_request(url: str) -> dict[str, Any] | None: """使用恰当的错误处理方式向NWS API提出请求。""" headers = { "User-Agent": USER_AGENT, "Accept": "application/geo+json" } async with httpx.AsyncClient as client: try: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status return response.json except Exception: return Nonedef format_alert(feature: dict) -> str: """将报警功能格式化为可读的字符串。""" props = feature["properties"] return f"""Event: {props.get('event', 'Unknown')}Area: {props.get('areaDesc', 'Unknown')}Severity: {props.get('severity', 'Unknown')}Description: {props.get('description', 'No description available')}Instructions: {props.get('instruction', 'No specific instructions provided')}"""1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.创建工具

现在,我们将使用Python中的装饰器在MCP服务器下创建实际的工具mcp.tool。

复制

@mcp.toolasync def get_alerts(state: str) -> str: """获取美国一个州的天气警报。 参数: state: 两个字母的美国州代码(例如CA,NY) """ url = f"{NWS_API_BASE}/alerts/active/area/{state}" data = await make_nws_request(url) if not data or "features" not in data: return "Unable to fetch alerts or no alerts found." if not data["features"]: return "No active alerts for this state." alerts = [format_alert(feature) for feature in data["features"]] return "\n---\n".join(alerts)@mcp.toolasync def get_forecast(latitude: float, longitude: float) -> str: """获取一个地点的天气预报。 参数: latitude: 位置的纬度 longitude: 位置的经度 """ # 首先获取预测网格端点 points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}" points_data = await make_nws_request(points_url) if not points_data: return "Unable to fetch forecast data for this location." # 从端点响应中获取预测的URL forecast_url = points_data["properties"]["forecast"] forecast_data = await make_nws_request(forecast_url) if not forecast_data: return "Unable to fetch detailed forecast." # 将时间范围格式化为可读的预测 periods = forecast_data["properties"]["periods"] forecasts = for period in periods[:5]: # 只显示未来5个时段 forecast = f"""{period['name']}:Temperature: {period['temperature']}°{period['temperatureUnit']}Wind: {period['windSpeed']} {period['windDirection']}Forecast: {period['detailedForecast']}""" forecasts.append(forecast) return "\n---\n".join(forecasts)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.

一旦我们完成上面所有这些工作,我们就可以在脚本中添加入口点来执行MCP服务器。现在,在脚本文件weather.py的底部添加以下代码:

复制

if __name__ == "__main__" : # 初始化并运行服务器 mcp.run(transport= 'stdio' )1.2.3.

从上面的代码中我们指定了stdio,这是什么意思?

HTTP中的STDIO(标准输入/输出)是指使用HTTP连接时输入和输出数据的标准流。在Web服务器和HTTP环境中:

标准输入(stdin):用于接收发送到服务器的数据,如POST请求数据。标准输出(stdout):用于将响应数据发送回客户端。标准错误(stderr):用于记录错误和调试信息在构建使用命令行界面的HTTP服务器或服务时,STDIO提供了一种通过标准Unix风格流传输HTTP请求/响应数据的方法,允许与其他命令行工具和进程集成。
我们还可以指定SSE通信方式:HTTP技术允许服务器将更新推送到客户端。单向通信(仅限服务器到客户端)。保持连接畅通以获取实时更新。比WebSocket更简单。用于通知、数据馈送和流更新。

完成后,导航到weather.py脚本所在的位置并在终端中运行以下命令:

复制

uv run weather.py1.

除了看不到输出内容之外,这表明服务器正在运行,或者你可以更新脚本以显示某些内容(如果你愿意)。

我想创建一个自定义LangChain代理来连接我们正在运行的MCP服务器。为此,我们必须安装langchain-mcp-adapters。你可以运行以下命令。

首先,停止天气脚本并运行以下命令:

复制

uv add ipykernel1.

原因是我将在VScode中使用一个笔记本文件作为LangChain代理。

安装完成后,继续再次运行天气MCP服务器脚本:

复制

我还在与我们的文件weather.py相同的目录中继续创建另一个文件client.ipynb。

然后,你可以运行下面的命令来安装LangChain MCP适配器:

复制

!uv add langchain-mcp-adapters1.

安装完成后,我们可以安装langchain-anthropic和LangGraph客户端。

复制

!uv add langgraph langchain-anthropic python-dotenv1.

首先,我们需要一个Anthropic API密钥。

一旦你获得Anthropic API密钥,你就可以添加.env文件,该文件应该位于你的项目的根目录中。

复制

ANTHROPIC_API_KEY =sk-xxxxxxxx1.

请确保用实际的API密钥替换上面的占位符。

接下来,我们可以使用以下方式加载API密钥:

复制

from dotenv import load_dotenvload_dotenvimport osapi_key=os.environ.get("ANTHROPIC_API_KEY")1.2.3.4.

现在,我们可以创建与我们正在运行的MCP服务器的stdio连接服务器参数。

复制

from mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_clientfrom langchain_mcp_adapters.tools import load_mcp_toolsfrom langgraph.prebuilt import create_react_agentfrom langchain_anthropic import ChatAnthropic1.2.3.4.5.6.7.

让我们继续创建模型,我将使用前面提到的Anthropic。我们再次提及了如何使用多个LLM提供商的问题。

复制

model = ChatAnthropic(model="claude-3-5-sonnet-20241022", api_key=api_key)1.

你可以在下面的链接处找到有关Anthropic聊天模型的更多信息:

All models overview - Anthropic

复制

server_params = StdioServerParameters( command= "python" , # 确保更新为math_server.py文件的完整绝对路径 args=[ "./weather.py" ], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # 初始化连接 await session.initialize # 获取工具 tools = await load_mcp_tools(session) # 创建并运行代理 agent = create_react_agent(model, tools) agent_response = await agent.ainvoke({ "messages" : "加州目前的天气怎么样" })1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.

然后,运行下面命令:

复制

agent_response1.

现在,让我们让输出看起来更美观一点:

复制

from IPython.display import display, Markdownfrom langchain_core.messages import HumanMessage, ToolMessage, AIMessagefor response in agent_response["messages"]: user = "" if isinstance(response, HumanMessage): user = "**User**" elif isinstance(response, ToolMessage): user = "**Tool**" elif isinstance(response, AIMessage): user = "**AI**" if isinstance(response.content, list): display(Markdown(f'{user}: {response.content[0].get("text", "")}')) continue display(Markdown(f"{user}: {response.content}"))1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

我希望能够流式传输响应,为此我们需要将trasportMCP的类型设置为sse。

为此,停止服务器(MCP服务器)并更改这部分代码:

复制

if __name__ == "__main__" : # 初始化并运行服务器 mcp.run(transport= 'sse' )1.2.3.

一旦完成,请使用以下命令再次运行代码:

复制

返回笔记本文件中,添加以下语句:

复制

from langchain_mcp_adapters.client import MultiServerMCPClient1.

要测试它,你可以使用:

复制

async with MultiServerMCPClient( { "weather": { "url": "http://localhost:8000/sse", "transport": "sse", } }) as client: agent = create_react_agent(model, client.get_tools) agent_response = await agent.ainvoke({"messages": "what is the weather in nyc?"})for response in agent_response["messages"]: user = "" if isinstance(response, HumanMessage): user = "**User**" elif isinstance(response, ToolMessage): user = "**Tool**" elif isinstance(response, AIMessage): user = "**AI**" if isinstance(response.content, list): display(Markdown(f'{user}: {response.content[0].get("text", "")}')) continue display(Markdown(f"{user}: {response.content}"))1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.

我希望能够实时流式传输响应。为此,让我们编写以下代码行:

复制

async with MultiServerMCPClient( { "weather" : { "url" : "http://localhost:8000/sse" , "transport" : "sse" , } } ) as client: agent = create_react_agent(model, client.get_tools) # 流式传输响应块 async for chunk in agent.astream({ "messages" : "what is the weather in nyc!" }): # 从AddableUpdatesDict结构中提取消息内容 if 'agent' in chunk and 'messages' in chunk[ 'agent' ]: for message in chunk[ 'agent' ][ 'messages' ]: if isinstance (message, AIMessage): # 处理不同的内容格式 if isinstance (message.content, list ): # 对于带有文本和工具使用的结构化内容 for item in message.content: if isinstance (item, dict ) and 'text' in item: display(Markdown( f"**AI**: {item[ 'text' ]} " )) else : # 对于简单文本内容 display(Markdown( f"**AI**: {message.content} " )) elif 'tools' in chunk and 'messages' in chunk[ 'tools' ]: for message in chunk[ 'tools' ][ 'messages' ]: if hasattr (message, 'name' ) and hasattr (message, 'content' ): # 显示工具响应 display(Markdown( f"**Tool ( {message.name} )**: {message.content} " ))1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.

运行此代码将逐行输出内容:

朱先忠,51CTO社区编辑,51CTO专家博客、讲师,潍坊一所高校计算机教师,自由编程界老兵一枚。

来源:51CTO

相关推荐