抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

一、为什么需要持久化?

LangGraph 把图的每一步执行状态保存成 checkpoint(检查点),解锁了四个核心能力:

能力 说明
Human-in-the-loop 人类可以在任意步骤查看、暂停、修改状态后继续执行
Memory 同一个 thread 内的多轮对话自动保留上下文
Time travel 可以回放历史执行,也可以从任意检查点分叉探索不同路径
Fault-tolerance 节点失败时从上一个成功的 checkpoint 恢复,不重跑已成功的节点

二、核心概念

Thread(线程)

每次对话或任务运行对应一个 thread_id,所有 checkpoint 都挂在这个 ID 下。调用图时必须传:

1
2
config = {"configurable": {"thread_id": "1"}}
graph.invoke(input, config)

Super-step 与 Checkpoint

每个 super-step 是图的一次”滴答”——所有当前调度的节点(可并行)执行完毕后,保存一次 checkpoint。

对于 START → A → B → END 这样的线性图,会产生 4 个 checkpoint:

1
2
3
4
checkpoint 0: 空状态,        next = START
checkpoint 1: 用户输入, next = node_a
checkpoint 2: node_a 输出后,next = node_b
checkpoint 3: node_b 输出后,next = () ← 完成

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
foo: str
bar: Annotated[list[str], add] # add 是 reducer,bar 会累积而非覆盖

def node_a(state: State):
return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
return {"foo": "b", "bar": ["b"]}

workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar": []}, config)

三、读写状态

获取最新状态

1
2
config = {"configurable": {"thread_id": "1"}}
snapshot = graph.get_state(config)

获取完整历史(最新的在最前面)

1
history = list(graph.get_state_history(config))

修改状态

1
2
# 创建新 checkpoint,不覆盖原来的
graph.update_state(config, {"foo": "new_value"}, as_node="node_a")

StateSnapshot 关键字段

字段 类型 说明
values dict 当前状态值
next tuple 下一步要执行的节点,空元组表示已完成
config dict 包含 thread_idcheckpoint_nscheckpoint_id
metadata.step int 当前是第几个 super-step
tasks tuple 如果有中断,这里包含中断信息
parent_config dict 上一个 checkpoint 的 config,第一个为 None

查找特定 checkpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
history = list(graph.get_state_history(config))

# 找 node_b 执行前的状态
before_node_b = next(s for s in history if s.next == ("node_b",))

# 按步骤号查找
step_2 = next(s for s in history if s.metadata["step"] == 2)

# 找所有通过 update_state 创建的 checkpoint
forks = [s for s in history if s.metadata["source"] == "update"]

# 找发生中断的 checkpoint
interrupted = next(
s for s in history
if s.tasks and any(t.interrupts for t in s.tasks)
)

四、两种存储的区别(重要)

1
2
3
4
5
6
Checkpointer(线程内记忆)        Store(跨线程记忆)
───────────────────── ──────────────────────
保存每步的图状态快照 保存任意自定义数据
作用域:单个 thread 作用域:跨所有 thread
用途:对话上下文、中断恢复 用途:用户偏好、长期记忆
key:thread_id + checkpoint_id key:namespace tuple + memory_id

Store 基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langgraph.store.memory import InMemoryStore
import uuid

store = InMemoryStore()

user_id = "1"
namespace = (user_id, "memories")

# 写入记忆
memory_id = str(uuid.uuid4())
store.put(namespace, memory_id, {"food_preference": "I like pizza"})

# 读取记忆
memories = store.search(namespace)
memories[-1].dict()
# {'value': {'food_preference': 'I like pizza'}, 'key': '...', 'namespace': ['1', 'memories'], ...}

语义搜索

1
2
3
4
5
6
7
8
9
10
11
12
from langchain.embeddings import init_embeddings

store = InMemoryStore(
index={
"embed": init_embeddings("openai:text-embedding-3-small"),
"dims": 1536,
"fields": ["food_preference", "$"]
}
)

# 用自然语言查找
memories = store.search(namespace, query="用户喜欢吃什么?", limit=3)

在节点中使用 Store(通过 Runtime 注入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langgraph.runtime import Runtime
from dataclasses import dataclass

@dataclass
class Context:
user_id: str

# 写入记忆
async def update_memory(state: MessagesState, runtime: Runtime[Context]):
user_id = runtime.context.user_id
namespace = (user_id, "memories")
memory_id = str(uuid.uuid4())
await runtime.store.aput(namespace, memory_id, {"memory": "用户喜欢披萨"})

# 读取记忆
async def call_model(state: MessagesState, runtime: Runtime[Context]):
user_id = runtime.context.user_id
namespace = (user_id, "memories")
memories = await runtime.store.asearch(
namespace,
query=state["messages"][-1].content,
limit=3
)
info = "\n".join([d.value["memory"] for d in memories])
# ... 将 info 注入到模型 prompt 中

编译图时同时挂载两者

1
2
3
4
5
6
7
8
9
10
11
12
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer, store=store)

# 调用时传 thread_id 和 context
config = {"configurable": {"thread_id": "1"}}
graph.stream(
{"messages": [{"role": "user", "content": "hi"}]},
config,
context=Context(user_id="1"),
)

五、Checkpointer 选型

实现 适用场景 是否内置
InMemorySaver 开发调试,进程退出数据丢失
SqliteSaver 本地工作流,单机持久化 需单独安装
PostgresSaver 生产环境,支持并发和持久化 需单独安装
CosmosDBSaver Azure 生产环境 需单独安装

加密存储(生产环境推荐)

1
2
3
4
5
6
7
from langgraph.checkpoint.serde.encrypted import EncryptedSerializer
from langgraph.checkpoint.postgres import PostgresSaver

# 读取环境变量 LANGGRAPH_AES_KEY
serde = EncryptedSerializer.from_pycryptodome_aes()
checkpointer = PostgresSaver.from_conn_string("postgresql://...", serde=serde)
checkpointer.setup()

Pickle 回退(处理特殊类型如 Pandas DataFrame)

1
2
3
4
5
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

graph.compile(
checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True))
)

六、总结

Checkpointer 是图的”自动存档”,Store 是图的”跨存档共享背包”。两者配合使用,才能构建真正有记忆、可恢复、支持人类干预的 agent 系统。

评论