找回密码
 立即注册
首页 业界区 业界 张高兴的大模型开发实战:(三)使用 LangGraph 为对话 ...

张高兴的大模型开发实战:(三)使用 LangGraph 为对话添加历史记录

方子楠 5 天前
目录

  • 基础概念
  • 环境搭建与配置
  • 将对话历史存储至内存
  • 将对话历史存储至 PostgreSQL

在构建聊天机器人时,对话历史记录是提升用户体验的核心功能之一,用户希望机器人能够记住之前的对话内容,从而避免重复提问。LangGraph 是 LangChain 生态中一个工具,通过将应用逻辑组织成有向图(Graph)的形式,可以轻松实现对话历史的管理和复杂的对话流程。本文将通过一个示例,展示如何使用 LangGraph 实现这一功能。
在上一篇博客中提到,链(Chain)在 LangChain 中是一种基本的构建块,用于将多个 LLM 调用和工具调用链接在一起。然而,链在处理复杂、动态的对话流程时存在一些局限性,例如,链通常是线性的,这种线性结构只能按照预定义的顺序执行,限制了在对话中进行动态路由和条件分支的能力。LangGraph 的设计目标是提供一个更灵活、更强大的框架来构建复杂的智能体应用。
LangGraphLangChain核心设计循环图结构:支持条件分支、循环和反馈机制,适合复杂多步骤任务。线性流程(DAG):以链式结构为主,适合线性任务(如文档检索、文本生成)。控制能力高度可控:通过节点(Node)和边(Edge)精细控制流程,支持条件逻辑和动态修改。中等可控:依赖链式编排,灵活性较低,难以处理复杂循环或动态分支。持久化与状态管理内置持久化:支持状态检查点(Checkpoints),可中断/恢复任务,适合长期任务。基础记忆功能:依赖对话历史记录,但无法持久化复杂状态或跨会话共享。人在环(Human-in-the-Loop)深度支持:可在任意节点插入人工审核、干预,适合医疗、金融等需人工决策的场景。弱支持:需手动集成人工干预逻辑,流程中断后难以恢复。多代理(Multi-Agent)原生支持:通过共享状态实现多Agent协作,适合复杂任务拆分与协同。较弱:需手动协调多个链,难以实现动态任务分配。错误处理容错性强:支持失败节点跳转或重试,流程可恢复。基础重试:依赖单链重试,无法处理复杂流程中的错误传播。适用场景复杂多步骤任务、需人工干预的场景(如医疗诊断)、多Agent协作系统、长期任务(如持续对话)线性任务(文档检索、文本生成)、快速原型开发、简单对话系统开发复杂度中等:需定义节点、边和状态,但提供了灵活的编排能力。低:开箱即用的链式结构,适合快速开发。基础概念

LangGraph 的核心是 State Graph,它通过状态(State)、节点(Node)和边(Edge)的组合,定义对话的流程和逻辑。每个状态可以保存对话的上下文(如历史消息、总结等),节点定义了在不同状态下如何处理输入和生成输出,边定义了处理流程。

  • State(状态)
    用于存储对话中的临时数据,例如用户消息、模型响应、总结内容等。例如 class State(MessagesState): messages: str 表示一个状态,其中 messages 字段用于存储对话的具体信息。
  • Node(节点)
    定义了对话流程中的具体操作,通常是具体的函数,例如调用模型、判断是否需要总结、生成总结等。
  • Edge(边)
    用于连接不同的节点,定义了节点之间的关系和流程。边可以包含条件逻辑、循环、分支等,用于控制对话流程的走向。
我们来看一个最简单的示例,下图是一个 LangGraph 实现的聊天机器人。
1.png

起始节点为 __start__,结束节点为 __end__,chatbot 表示调用大模型处理对话。__start__ 节点存储了应用的 State 数据。节点之间带箭头的线段表示边,实线代表普通边 →,虚线代表条件边 ⇢,条件边根据当前的具体条件而选择哪一条边执行,选择不同的边,则到达的节点不同。
环境搭建与配置

在上一篇博客创建的 Python 虚拟环境中执行以下命令,安装需要的包:
  1. pip install langgraph langgraph-checkpoint-postgres psycopg[binary,pool]
复制代码
将对话历史存储至内存

在开始之前,先构建一个图,实现一个最简单的聊天机器人。
  1. from typing import Annotated
  2. from typing_extensions import TypedDict
  3. from langgraph.graph import StateGraph, START, END
  4. from langgraph.graph.message import add_messages
  5. from langchain_ollama import ChatOllama
  6. class State(TypedDict):
  7.     """存储对话状态信息"""
  8.     messages: Annotated[list, add_messages]
  9. def chatbot(state: State):
  10.     """调用模型处理对话"""
  11.     return {"messages": [llm.invoke(state["messages"])]}
  12. llm = ChatOllama(model="qwen2.5:1.5b")
  13. # 创建图
  14. graph_builder = StateGraph(State)
  15. graph_builder.add_node("chatbot", chatbot)  # 添加节点
  16. graph_builder.add_edge(START, "chatbot")    # 添加边
  17. graph_builder.add_edge("chatbot", END)
  18. graph = graph_builder.compile()
复制代码
使用下面的代码输出图的结构:
  1. png = graph.get_graph().draw_mermaid_png()
  2. with open("chatbot.png", "wb") as f:
  3.     f.write(png)
复制代码
2.png

接下来,使用 graph.stream() 方法执行图,即可开始对话。
  1. events = graph.stream({"messages": [{"role": "user", "content": "你可以做些什么?"}]})
  2. for event in events:
  3.     last_event = event
  4. print("AI: ", last_event["messages"][-1].content)
复制代码
下面使用 MemorySaver 将对话历史存储在内存中。
  1. from langgraph.checkpoint.memory import MemorySaver
  2. checkpointer = MemorySaver()
  3. # 创建图
  4. # ...
  5. graph = graph_builder.compile(checkpointer=checkpointer)
复制代码
在对话时要记录对话历史,还需要在 graph.stream() 方法中传入 config 参数,thread_id 用于标识对话的唯一性,不同的对话 thread_id 不同。
  1. import uuid
  2. config = {"configurable": {"thread_id": uuid.uuid4().hex}}
  3. events = graph.stream({"messages": [{"role": "user", "content": "你好,我的名字是张三"}]}, config)
复制代码
最后,我们将对话的代码封装成 stream_graph_updates() 方法,通过对话检测一下历史信息是否被正确保存。
  1. def stream_graph_updates(user_input: str, config: dict):
  2.     """对话"""
  3.     events = graph.stream({"messages": [{"role": "user", "content": user_input}]}, config, stream_mode="values")
  4.     for event in events:
  5.         last_event = event
  6.     print("AI: ", last_event["messages"][-1].content)
  7. if __name__ == "__main__":
  8.     config = {"configurable": {"thread_id": uuid.uuid4().hex}}
  9.     while True:
  10.         user_input = input("User: ")    # 用户输入问题进行对话
  11.         if user_input.lower() in ["exit", "quit"]:
  12.             break
  13.         stream_graph_updates(user_input, config)
  14.     print("\nHistory: ")    # 输出对话历史
  15.     for message in graph.get_state(config).values["messages"]:
  16.         if isinstance(message, AIMessage):
  17.             prefix = "AI"
  18.         else:
  19.             prefix = "User"
  20.         print(f"{prefix}: {message.content}")
复制代码
  1. User: 你好,我的名字是张三
  2. AI:  你好!很高兴认识你。有什么可以帮忙的吗?
  3. User: 我叫什么名字
  4. AI:  你的名字确实是“张三”。很高兴认识你!有什么问题或需要帮助的地方吗?
复制代码
将对话历史存储至 PostgreSQL

对话历史存储至内存中,当应用关闭时,对话历史也会消失,有时无法满足持久化的需求。LangGraph 提供了一些数据库持久化方式,支持的数据库有 PostgreSQL、MongoDB、Redis。下面使用 PostgreSQL 数据库为例。在开始之前,执行以下命令创建一个 PostgreSQL 数据库:
  1. psql -U postgres -c "CREATE DATABASE llm"
复制代码
接着,在代码中替换 MemorySaver 为 PostgresSaver,连接并初始化数据库:
  1. from psycopg import Connection
  2. from langgraph.checkpoint.postgres import PostgresSaver
  3. DB_URI = "postgresql://postgres:YOUR_PASSW0RD@localhost:5432/llm"   # 记得替换数据库密码
  4. conn = Connection.connect(DB_URI)   # 连接数据库
  5. checkpointer = PostgresSaver(conn)
  6. checkpointer.setup()    # 初始化数据库
复制代码
使用数据库管理工具查看数据库,可以看到 LangGraph 在数据库初始化时帮我们创建了四张表:checkpoint、checkpoint_blobs、checkpoint_writes、checkpoint_migrations。
3.jpeg

完整的程序代码如下:
  1. import uuid
  2. from typing import Annotated
  3. from typing_extensions import TypedDict
  4. from langgraph.graph import StateGraph, START, END
  5. from langgraph.graph.message import add_messages
  6. from langchain_ollama import ChatOllama
  7. from langchain_core.messages import AIMessage, HumanMessage
  8. from psycopg import Connection
  9. from langgraph.checkpoint.postgres import PostgresSaver
  10. class State(TypedDict):
  11.     messages: Annotated[list, add_messages]
  12. def chatbot(state: State):
  13.     return {"messages": [llm.invoke(state["messages"])]}
  14. DB_URI = "postgresql://postgres:%40Passw0rd@localhost:5432/llm"
  15. llm = ChatOllama(model="qwen2.5:1.5b")
  16. conn = Connection.connect(DB_URI)
  17. checkpointer = PostgresSaver(conn)
  18. checkpointer.setup()
  19. graph_builder = StateGraph(State)
  20. graph_builder.add_node("chatbot", chatbot)
  21. graph_builder.add_edge(START, "chatbot")
  22. graph_builder.add_edge("chatbot", END)
  23. graph = graph_builder.compile(checkpointer=checkpointer)
  24. def stream_graph_updates(user_input: str, config: dict):
  25.     events = graph.stream({"messages": [{"role": "user", "content": user_input}]}, config, stream_mode="values")
  26.     for event in events:
  27.         last_event = event
  28.     print("AI: ", last_event["messages"][-1].content)
  29. if __name__ == "__main__":
  30.     config = {"configurable": {"thread_id": uuid.uuid4().hex}}
  31.     while True:
  32.         user_input = input("User: ")
  33.         if user_input.lower() in ["exit", "quit"]:
  34.             break
  35.         stream_graph_updates(user_input, config)
  36.     print("\nHistory: ")
  37.     for message in checkpointer.get(config)["channel_values"]["messages"]:
  38.         if isinstance(message, AIMessage):
  39.             prefix = "AI"
  40.         else:
  41.             prefix = "User"
  42.         print(f"{prefix}: {message.content}")
  43.     conn.close()
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册