【MindLab】ゼロから本番運用まで:LangGraph で作る状態管理可能なマルチエージェント Workflow 完全ガイド
TL; DR LangGraph は「Graph = 状態機械」という設計思想で、分岐・ループ・エラー回復を
コードとして明示的に制御できる Agent オーケストレーションフレームワーク。本記事では StateGraph の基本構造から、実戦で即使えるエラー回復パターン 3 選、再利用可能な Graph テンプレート 2 選、そして Postgres Checkpointer / LangSmith によるコスト可視化まで、すべてコピペで動くコード付きで解説する。
対象バージョン:
langgraph>=1.0.0(2025年10月 GA)、
langchain-openai>=0.3.0LangGraph は API 変更が頻繁なライブラリ。記事中のコードは 2026 年 5 月時点で動作確認済みだが、バージョンが合わない場合は
pip show langgraphで確認を。
この記事の想定読者
- LangChain の Chain や
create_react_agent
は触ったことがある - マルチエージェント構成で「ループ」「条件分岐」「エラー回復」が必要になった
- 本番環境に投入するために Checkpointer や Observability を真剣に検討している
入門レベルの StateGraph の説明は Zenn にも良い記事がすでにある。本記事はそこから一歩先、「本番で壊れないマルチエージェント Graph をどう設計するか」 に焦点を当てる。
Chain で書いたエージェントが本番で壊れて、
ログを追っても「どのステップで死んだか」が全くわからなかった——
そこから LangGraph に移行した。
なぜ LangGraph か──Chain / Prompt の限界
分岐・ループ・エラー回復で普通の Chain が崩れる理由
LangChain の Chain は
入力 → 変換 → 出力という一方向パイプラインとして設計されている。シンプルな RAG や一問一答には十分だが、以下のような要件が出た瞬間に構造的に苦しくなる。
❌ Chain が対応しにくいパターン 1. 検索結果が不十分なら再検索、3回失敗したら別経路へ(条件分岐 + ループ) 2. ツール呼び出し → 結果検証 → 不合格なら再試行(自己修正ループ) 3. 外部 API がタイムアウトしたらフォールバックモデルに切り替え(graceful degradation) 4. 高リスク操作の前に人間が承認してから実行(Human-in-the-loop)
Chain でこれらを実装しようとすると、Python の
whileループで Chain を外から何度も呼んだり、
try/exceptで分岐を手書きしたりすることになる。結果として生まれるのは、どこで何が起きているか追えないスパゲッティコードだ。
実際にやってみるとわかるが、最大の問題は実行途中の状態が保存できないこと。Chain が途中で落ちたとき「どのステップまで完了したか」を知る手段がなく、最初からやり直すしかない。
LangChain が「重すぎ」、CrewAI が「ブラックボックス」な理由
ここを整理しておくと技術選定で迷わなくなる。
┌─────────────────────┬────────────────────────────────────────────────┐ │ フレームワーク │ 特性 │ ├─────────────────────┼────────────────────────────────────────────────┤ │ LangChain │ 部品ライブラリとしては優秀。 │ │ (AgentExecutor) │ ただし AgentExecutor は実行フローが内部に隠蔽 │ │ │ されており、カスタムの分岐・ループ・エラー処理が │ │ │ やりにくい。「どこで何が起きてるか追えない」問題。 │ ├─────────────────────┼────────────────────────────────────────────────┤ │ CrewAI │ 宣言的にマルチエージェントを記述できて立ち上げが │ │ │ 速い。ただしエージェント間通信・状態管理・エラー │ │ │ 処理がフレームワーク内部に隠れている。 │ │ │ 「このエージェントが失敗したら別経路を通す」 │ │ │ ような制御を自分で書く手段が限られる。 │ ├─────────────────────┼────────────────────────────────────────────────┤ │ LangGraph │ 低レベルのオーケストレーションフレームワーク。 │ │ │ 実行フローを State / Node / Edge で明示的に定義。 │ │ │ 学習コストは高いが、制御力は最も高い。 │ │ │ Durable Execution / HITL / Checkpointer が │ │ │ ファーストクラスでサポートされている。 │ └─────────────────────┴────────────────────────────────────────────────┘
LangChain の State of Agent Engineering Survey(2025)によると、AI エンジニアの 57.3% がすでに Agent を本番運用しているという数字がある。本番に出すなら「どこで何が起きているか」を掌握できる LangGraph が現時点で最も信頼できる選択肢だと考えている。
:::message 補足: LangChain と LangGraph は排他ではない。LangChain のモデル統合・ツール定義は LangGraph 内で普通に使える。「LangChain = 部品、LangGraph = ワークフローエンジン」という棲み分けが 2026 年現在の公式見解。 :::
状態機械(State Machine)として Graph を設計する思想
LangGraph の核心は、Workflow を有限状態機械として明示的にモデル化するという設計思想にある。
┌──────────────────────────────────────────────────┐ │ LangGraph の 3 要素 │ │ │ │ State = アプリの「今の状況」を表す構造体 │ │ (TypedDict / Pydantic / dataclass) │ │ │ │ Node = State を受け取り、State を返す純粋関数 │ │ (副作用は内部に閉じ込める) │ │ │ │ Edge = ノード間の遷移ルール │ │ (無条件 / 条件付き / Command) │ └──────────────────────────────────────────────────┘
この 3 つを組み合わせれば、どんな複雑なフローも「コード上で見える形」で構築できる。実行フローがそのまま設計図になるため、テストも書きやすく、LangSmith でのトレースもノード単位で追える。
StateGraph の基本構造(最小動作サンプル付き)
まず動くコードを見てほしい。「検索 → 回答生成 → 品質チェック → 不合格なら再検索」というループ付きの最小構成。
# requirements:
# langgraph>=1.0.0
# langchain-openai>=0.3.0
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# ── 1. State 定義 ─────────────────────────────────────────────────
class ResearchState(TypedDict):
query: str
search_results: list[str]
answer: str
quality_score: float
retry_count: int
# ── 2. LLM ────────────────────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ── 3. ノード定義(State → 部分 State を返す)────────────────────
def search_node(state: ResearchState) -> dict:
"""検索をシミュレート(実運用では Tavily / SerpAPI に差し替え)"""
results = [f"Result {i}: {state['query']} に関する情報" for i in range(3)]
return {"search_results": results}
def generate_node(state: ResearchState) -> dict:
"""検索結果をもとに回答を生成"""
context = "\n".join(state["search_results"])
prompt = f"以下の情報をもとに質問に答えてください。\n質問: {state['query']}\n情報:\n{context}"
response = llm.invoke(prompt)
return {"answer": response.content}
def evaluate_node(state: ResearchState) -> dict:
"""回答の品質スコアを算出(0.0〜1.0)"""
prompt = f"次の回答の品質を 0.0〜1.0 で数値のみ返してください。\n回答: {state['answer']}"
response = llm.invoke(prompt)
try:
score = float(response.content.strip())
except ValueError:
score = 0.5
return {"quality_score": score, "retry_count": state.get("retry_count", 0) + 1}
# ── 4. ルーティング関数 ──────────────────────────────────────────
def route_after_evaluate(state: ResearchState) -> Literal["search", "__end__"]:
if state["quality_score"] >= 0.7:
return "__end__"
if state.get("retry_count", 0) >= 3:
return "__end__" # 上限到達 → 強制終了
return "search"
# ── 5. Graph 組み立て ────────────────────────────────────────────
graph = StateGraph(ResearchState)
graph.add_node("search", search_node)
graph.add_node("generate", generate_node)
graph.add_node("evaluate", evaluate_node)
graph.add_edge(START, "search")
graph.add_edge("search", "generate")
graph.add_edge("generate", "evaluate")
graph.add_conditional_edges(
"evaluate",
route_after_evaluate,
{"search": "search", "__end__": END},
)
app = graph.compile()
# ── 6. 実行 ──────────────────────────────────────────────────────
if __name__ == "__main__":
result = app.invoke({
"query": "LangGraph と CrewAI の設計思想の違い",
"search_results": [],
"answer": "",
"quality_score": 0.0,
"retry_count": 0,
})
print(f"最終回答: {result['answer'][:200]}...")
print(f"品質スコア: {result['quality_score']}")
print(f"リトライ回数: {result['retry_count']}")
このコードのポイントは 3 つ。
① ノードは「部分 State」を返す。
{**state, "answer": ...} のように全 State を返すのではなく、変更したいキーだけを含む dict を返す。LangGraph が自動マージしてくれるので、ノード側で全体を気にする必要がない。② ルーティング関数は State だけを見る純粋関数。外部依存がないため
pytestで普通にユニットテストが書ける。
③ ループの終了条件が明示的。
retry_count >= 3でガードしているため、品質スコアが永遠に上がらなくても無限ループにはならない。
State・Node・Edge の三角形と TypedDict スキーマ定義
State の設計が Graph 全体の品質を決める。実運用で重要になるのは Reducer(リデューサー) の概念。
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
# 標準フィールド: 上書きマージ(デフォルト動作)
current_step: str
error_message: str | None
# Reducer: operator.add → ノードが list を返すと「追記」になる
tool_outputs: Annotated[list[str], operator.add]
# Reducer: add_messages → メッセージ ID で重複排除しながらマージ
messages: Annotated[list[BaseMessage], add_messages]
# カウンター系は上書きで十分
retry_count: int
Reducer を使い分けるルールは単純で、**「複数ノードから同じキーに書き込む可能性があるか?」**で判断する。
- ある →
Annotated[list, operator.add]
かadd_messages
- ない → 素の型で十分(上書きマージ)
Conditional Edge で分岐ロジックを書く
Conditional Edge は「ルーティング関数」+「マッピング辞書」のペアで定義する。
from typing import Literal
def route_by_intent(state: AgentState) -> Literal["tool", "answer", "error"]:
"""状態を見てルーティングを決める純粋関数"""
if state.get("error_message"):
return "error"
if state["current_step"] == "needs_tool":
return "tool"
return "answer"
graph.add_conditional_edges(
"classifier", # このノードの出力後に分岐
route_by_intent, # ルーティング関数
{ # 戻り値 → 遷移先ノードのマッピング
"tool": "tool_node",
"answer": "answer_node",
"error": "error_node",
},
)
ルーティング関数の戻り値は
Literalで型を明示しておくと、マッピング辞書との不整合をエディタが検出してくれる。地味だが、Graph が大きくなったときのバグ防止に効く。
サブグラフによるモジュール化
Graph が 10 ノードを超えたあたりから、サブグラフへの分割を検討すべきだ。サブグラフ自体が
CompiledGraphオブジェクトなので、親 Graph のノードとしてそのまま登録できる。
from langgraph.graph import StateGraph, START, END
def build_research_subgraph():
"""検索に特化したサブグラフ"""
sub = StateGraph(ResearchState)
sub.add_node("fetch", fetch_node)
sub.add_node("parse", parse_node)
sub.add_node("validate", validate_node)
sub.add_edge(START, "fetch")
sub.add_edge("fetch", "parse")
sub.add_edge("parse", "validate")
sub.add_edge("validate", END)
return sub.compile()
# 親グラフにサブグラフをノードとして登録
parent = StateGraph(PipelineState)
parent.add_node("research", build_research_subgraph())
parent.add_node("write", write_node)
parent.add_edge(START, "research")
parent.add_edge("research", "write")
parent.add_edge("write", END)
サブグラフにすると LangSmith のトレースでも折りたたみ表示される。デバッグ時に「検索フェーズの内部」と「パイプライン全体」を切り替えて確認できるので、可視性が大きく向上する。
:::message alert 注意: サブグラフの State スキーマは親と互換性が必要。親の State にないキーをサブグラフが要求すると
compile()時にエラーになる。入出力スキーマを分離したい場合は
input_schema/
output_schemaパラメータを使う。 :::
エラー回復パターン 3 選(コピペ可テンプレ)
Zenn で見かける LangGraph 記事の多くは Happy Path しか書いていない。本番で一番詰まるのはエラー処理。以下の 3 パターンをそのまま使えるテンプレートとして提示する。
① Retry Node──一時失敗を自動リトライ
外部 API のタイムアウトや LLM のレート制限に対して、指数バックオフで自動リトライするパターン。
import time
import random
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
class RetryState(TypedDict):
task: str
result: str | None
error: str | None
retry_count: int
max_retries: int
def api_call_node(state: RetryState) -> dict:
"""失敗する可能性のある外部 API 呼び出し"""
try:
# ── 実運用ではここに実際の API 呼び出しを書く ──
if random.random() < 0.5: # デモ用: 50% で失敗
raise TimeoutError("API timeout")
return {"result": f"成功: {state['task']}", "error": None}
except Exception as e:
return {"result": None, "error": str(e)}
def retry_wait_node(state: RetryState) -> dict:
"""指数バックオフで待機"""
count = state["retry_count"] + 1
wait_sec = min(2 ** (count - 1), 8) # 1s → 2s → 4s → 8s
print(f" [Retry {count}/{state['max_retries']}] {wait_sec}s 待機...")
time.sleep(wait_sec)
return {"retry_count": count, "error": None}
def give_up_node(state: RetryState) -> dict:
"""リトライ上限到達時のフォールバック"""
return {"result": f"[フォールバック] {state['task']} の処理をスキップ"}
def route_after_api(state: RetryState) -> Literal["done", "retry", "give_up"]:
if state.get("result") is not None:
return "done"
if state["retry_count"] >= state["max_retries"]:
return "give_up"
return "retry"
# ── Graph 組み立て ──────────────────────────────────────────────
def build_retry_graph():
g = StateGraph(RetryState)
g.add_node("api_call", api_call_node)
g.add_node("retry_wait", retry_wait_node)
g.add_node("give_up", give_up_node)
g.add_edge(START, "api_call")
g.add_conditional_edges(
"api_call", route_after_api,
{"done": END, "retry": "retry_wait", "give_up": "give_up"},
)
g.add_edge("retry_wait", "api_call") # リトライループ
g.add_edge("give_up", END)
return g.compile()
# ── 実行 ─────────────────────────────────────────────────────────
if __name__ == "__main__":
app = build_retry_graph()
result = app.invoke({
"task": "外部データ取得",
"result": None,
"error": None,
"retry_count": 0,
"max_retries": 3,
})
print(f"結果: {result['result']}")
実行フロー:
START → api_call ──成功──→ END
│
└─失敗─→ retry_wait → api_call(ループ)
│
└─上限到達→ give_up → END
② Fallback Branch──代替パスへ graceful に切り替え
主系モデル(GPT-4o)が完全に使えない状態のとき、軽量モデル(GPT-4o-mini)に切り替えてサービスを継続するパターン。
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
class FallbackState(TypedDict):
query: str
primary_result: str | None
fallback_result: str | None
final_answer: str
used_fallback: bool
def primary_node(state: FallbackState) -> dict:
"""高精度モデルで処理。失敗したら result = None"""
try:
llm = ChatOpenAI(model="gpt-4o", temperature=0, request_timeout=10)
resp = llm.invoke(state["query"])
return {"primary_result": resp.content}
except Exception as e:
print(f" [Primary 失敗] {e}")
return {"primary_result": None}
def fallback_node(state: FallbackState) -> dict:
"""軽量モデルで処理。こちらは安定性重視"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, request_timeout=30)
resp = llm.invoke(state["query"])
return {"fallback_result": resp.content, "used_fallback": True}
def merge_node(state: FallbackState) -> dict:
"""結果をマージして最終回答を生成"""
answer = state.get("primary_result") or state.get("fallback_result") or "回答を取得できませんでした"
return {"final_answer": answer}
def route_after_primary(state: FallbackState) -> Literal["merge", "fallback"]:
return "merge" if state.get("primary_result") else "fallback"
def build_fallback_graph():
g = StateGraph(FallbackState)
g.add_node("primary", primary_node)
g.add_node("fallback", fallback_node)
g.add_node("merge", merge_node)
g.add_edge(START, "primary")
g.add_conditional_edges(
"primary", route_after_primary,
{"merge": "merge", "fallback": "fallback"},
)
g.add_edge("fallback", "merge")
g.add_edge("merge", END)
return g.compile()
実行フロー:
START → primary ──成功──→ merge → END
│
└─失敗──→ fallback → merge → END
実運用では
merge_nodeで「フォールバックを使った」というメタデータをログに残しておくと、後から主系モデルの障害頻度を追跡できる。
③ Human-in-the-loop Gate──高リスク操作の手動承認
「AI が本番 DB を更新する」「大量メール送信を実行する」のような高リスク操作の前に、人間の承認を挟むパターン。LangGraph の
interrupt()を使う。
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
class GateState(TypedDict):
action: str
risk_level: str # "low" | "high"
approved: bool | None
result: str | None
def assess_risk(state: GateState) -> dict:
"""操作のリスクレベルを判定"""
dangerous = ["delete", "drop", "truncate", "本番", "production", "send_all"]
risk = "high" if any(kw in state["action"].lower() for kw in dangerous) else "low"
return {"risk_level": risk}
def human_gate(state: GateState) -> dict:
"""
interrupt() で Graph が一時停止。
人間が approve / reject を Command で送るまで待機する。
"""
decision = interrupt({
"message": "以下の操作を実行してよいですか?",
"action": state["action"],
"risk_level": state["risk_level"],
"options": ["approve", "reject"],
})
return {"approved": (decision == "approve")}
def execute_action(state: GateState) -> dict:
"""承認された操作を実行"""
# 実運用ではここに実際の操作を書く
return {"result": f"[完了] {state['action']}"}
def reject_action(state: GateState) -> dict:
return {"result": f"[却下] {state['action']} は実行されませんでした"}
def route_by_risk(state: GateState) -> Literal["gate", "execute"]:
return "gate" if state["risk_level"] == "high" else "execute"
def route_by_approval(state: GateState) -> Literal["execute", "reject"]:
return "execute" if state.get("approved") else "reject"
def build_gate_graph():
# Human-in-the-loop には Checkpointer が必須
checkpointer = InMemorySaver()
g = StateGraph(GateState)
g.add_node("assess", assess_risk)
g.add_node("gate", human_gate)
g.add_node("execute", execute_action)
g.add_node("reject", reject_action)
g.add_edge(START, "assess")
g.add_conditional_edges("assess", route_by_risk, {"gate": "gate", "execute": "execute"})
g.add_conditional_edges("gate", route_by_approval, {"execute": "execute", "reject": "reject"})
g.add_edge("execute", END)
g.add_edge("reject", END)
return g.compile(checkpointer=checkpointer)
# ── 使い方 ──────────────────────────────────────────────────────
if __name__ == "__main__":
app = build_gate_graph()
config = {"configurable": {"thread_id": "session-001"}}
# 1回目: interrupt で停止する
result = app.invoke(
{"action": "DROP TABLE users", "risk_level": "", "approved": None, "result": None},
config=config,
)
print("承認待ち状態になりました")
# 2回目: 人間が承認して再開
final = app.invoke(Command(resume="approve"), config=config)
print(f"結果: {final['result']}")
実行フロー:
START → assess ─low risk──→ execute → END
│
└─high risk─→ gate(interrupt) ──approve──→ execute → END
│
└─reject───→ reject → END
:::message interrupt の制約:
interrupt()を使うノードがあるグラフは、必ず Checkpointer 付きで
compile()する必要がある。Checkpointer なしだと
interrupt呼び出し時に例外が飛ぶ。 :::
再利用可能 Graph テンプレート 2 選
Research → Write → Review パイプライン
ドキュメント生成・技術記事作成・レポート生成に使える汎用テンプレート。Review ノードで「APPROVED」が返るまで Write → Review をループする。
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
class ContentState(TypedDict):
topic: str
research_notes: list[str]
draft: str
review_feedback: str
final_content: str
revision_count: int
max_revisions: int
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
def research(state: ContentState) -> dict:
resp = llm.invoke(
f"'{state['topic']}' について重要なポイントを5〜7項目で箇条書きにしてください。"
)
notes = [line.strip() for line in resp.content.split("\n") if line.strip()]
return {"research_notes": notes}
def write(state: ContentState) -> dict:
notes = "\n".join(state["research_notes"])
feedback = state.get("review_feedback", "")
prompt = f"以下の調査メモをもとに '{state['topic']}' についての記事を書いてください。"
if feedback:
prompt += f"\n\n【前回のレビュー指摘】\n{feedback}\n上記を反映して改善してください。"
prompt += f"\n\n【調査メモ】\n{notes}"
resp = llm.invoke(prompt)
return {"draft": resp.content}
def review(state: ContentState) -> dict:
prompt = (
"以下の草稿をレビューしてください。改善点があれば具体的に指摘してください。"
"問題がなければ 'APPROVED' とだけ返してください。\n\n"
f"{state['draft']}"
)
resp = llm.invoke(prompt)
return {
"review_feedback": resp.content,
"revision_count": state["revision_count"] + 1,
}
def finalize(state: ContentState) -> dict:
return {"final_content": state["draft"]}
def route_review(state: ContentState) -> Literal["write", "finalize"]:
if "APPROVED" in state["review_feedback"]:
return "finalize"
if state["revision_count"] >= state["max_revisions"]:
return "finalize" # 上限到達 → 現状の draft を最終版とする
return "write"
def build_content_pipeline():
g = StateGraph(ContentState)
g.add_node("research", research)
g.add_node("write", write)
g.add_node("review", review)
g.add_node("finalize", finalize)
g.add_edge(START, "research")
g.add_edge("research", "write")
g.add_edge("write", "review")
g.add_conditional_edges(
"review", route_review,
{"write": "write", "finalize": "finalize"},
)
g.add_edge("finalize", END)
return g.compile()
# ── 実行 ─────────────────────────────────────────────────────────
if __name__ == "__main__":
app = build_content_pipeline()
result = app.invoke({
"topic": "LangGraph のエラー回復パターン",
"research_notes": [], "draft": "", "review_feedback": "",
"final_content": "", "revision_count": 0, "max_revisions": 3,
})
print(f"修正回数: {result['revision_count']}")
print(f"最終コンテンツ:\n{result['final_content'][:300]}...")
実行フロー:
START → research → write → review ──APPROVED──→ finalize → END
↑ │
└───改善指摘─────┘ (max_revisions までループ)
Tool-Call ループ + 自己検証ノード
LLM がツールを呼び出し、その結果を自分で検証してから次のアクションを決める ReAct 的パターン。
create_react_agentよりも粒度の細かい制御が必要な場面で使う。
import json
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# ── ツール定義 ──────────────────────────────────────────────────
TOOLS = {
"calculator": lambda expr: str(eval(expr)),
"get_weather": lambda city: f"{city}: 晴れ 23℃",
"search_web": lambda q: f"検索結果: {q} に関する最新情報",
}
class ToolLoopState(TypedDict):
user_request: str
tool_results: list[str]
final_answer: str
iteration: int
max_iterations: int
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def decide_node(state: ToolLoopState) -> dict:
"""LLM がツールを使うか最終回答を返すか判断"""
history = "\n".join(state["tool_results"]) if state["tool_results"] else "(なし)"
prompt = f"""ユーザーの要求: {state['user_request']}
過去のツール結果: {history}
利用可能なツール: {list(TOOLS.keys())}
ツールが必要なら {{"tool": "名前", "args": "引数"}} の JSON を返してください。
最終回答ができるなら "FINAL: 回答" の形式で返してください。"""
resp = llm.invoke(prompt)
content = resp.content.strip()
if content.startswith("FINAL:"):
return {"final_answer": content[6:].strip()}
try:
call = json.loads(content)
tool_name = call.get("tool", "")
args = call.get("args", "")
if tool_name in TOOLS:
result = TOOLS[tool_name](args)
else:
result = f"[Error] ツール '{tool_name}' は存在しません"
return {
"tool_results": state["tool_results"] + [f"[{tool_name}({args})] → {result}"],
"iteration": state["iteration"] + 1,
}
except (json.JSONDecodeError, Exception):
return {"final_answer": content}
def route_decide(state: ToolLoopState) -> Literal["loop", "end", "force_end"]:
if state.get("final_answer"):
return "end"
if state["iteration"] >= state["max_iterations"]:
return "force_end"
return "loop"
def force_end_node(state: ToolLoopState) -> dict:
summary = "\n".join(state["tool_results"])
return {"final_answer": f"[最大試行到達]\nツール結果:\n{summary}"}
def build_tool_loop():
g = StateGraph(ToolLoopState)
g.add_node("decide", decide_node)
g.add_node("force_end", force_end_node)
g.add_edge(START, "decide")
g.add_conditional_edges(
"decide", route_decide,
{"loop": "decide", "end": END, "force_end": "force_end"},
)
g.add_edge("force_end", END)
return g.compile()
if __name__ == "__main__":
app = build_tool_loop()
result = app.invoke({
"user_request": "東京の天気を調べて、気温を華氏に変換して",
"tool_results": [], "final_answer": "", "iteration": 0, "max_iterations": 5,
})
print(f"回答: {result['final_answer']}")
実行フロー:
START → decide ──ツール呼出──→ decide(ループ)
│ │
├─最終回答─→ END ├─最大試行到達→ force_end → END
本番デプロイ:Checkpointer・LangSmith・コスト管理
Postgres Checkpointer で実行状態を永続化
InMemorySaverはプロセスが落ちると状態が消える。ローカル開発用。本番では Postgres Checkpointer を使う。
pip install langgraph-checkpoint-postgres "psycopg[binary,pool]"
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:password@localhost:5432/langgraph_db"
# コネクションプール + Checkpointer の初期化
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup() # 初回のみ。テーブルを自動作成
graph = StateGraph(ContentState)
# ... ノードとエッジの定義 ...
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-12345-session-001"}}
# 途中でプロセスが落ちても、同じ thread_id で再開できる
result = app.invoke(initial_state, config=config)
# 過去の実行ステップを取得(タイムトラベル)
for snapshot in app.get_state_history(config):
print(f"Step: {snapshot.metadata}")
:::message Checkpointer 選定基準
- ローカル開発 →
InMemorySaver
- ステージング →
SqliteSaver
(langgraph-checkpoint-sqlite
) - 本番 →
PostgresSaver
(langgraph-checkpoint-postgres
) - 副作用のあるノードは 冪等(idempotent)に実装すること。再開時はそのノードの先頭から再実行される。 :::
LangSmith でトレースとコストを可視化
# .env に設定するだけで自動トレースが有効になる LANGCHAIN_TRACING_V2=true LANGCHAIN_API_KEY=ls__xxxxxxxxxxxx LANGCHAIN_PROJECT=my-langgraph-prod
# Graph 実行時に metadata を付与 → LangSmith でフィルタ / コスト按分に使える
result = app.invoke(
initial_state,
config={
"configurable": {"thread_id": "session-001"},
"metadata": {
"user_id": "user-12345",
"pipeline": "content_generation",
"environment": "production",
},
},
)
LangSmith のダッシュボードで確認できること:
- ノード単位のレイテンシ: どのノードがボトルネックか
- ノード単位のトークン消費量: コストの内訳
- エラーが発生したノードの特定: スタックトレースとその時点の State
- 実行パスの可視化: 条件分岐でどちらの経路を通ったか
LangSmith を使わない選択肢としては Langfuse(OSS / self-hosted)もある。自社サーバーで運用したい場合はそちらも検討に値する。
MAX_ITERATIONS で無限ループを防ぐ
ループ構造を持つ Graph は 必ず 終了条件を実装する。3 重のガードを入れておくのがベストプラクティス。
# ── ガード 1: State 内の iteration カウンター(前述のパターン)──────
def route_with_guard(state: MyState) -> Literal["continue", "end"]:
if state["iteration"] >= state["max_iterations"]:
return "end"
return "continue"
# ── ガード 2: LangGraph の recursion_limit ──────────────────────
result = app.invoke(
initial_state,
config={
"recursion_limit": 30, # デフォルト 25。超えると GraphRecursionError
},
)
# ── ガード 3: 外側でのタイムアウト ──────────────────────────────
import asyncio
async def invoke_with_timeout(app, state, config, timeout_sec=120):
"""全体のタイムアウトを設定"""
try:
result = await asyncio.wait_for(
app.ainvoke(state, config=config),
timeout=timeout_sec,
)
return result
except asyncio.TimeoutError:
print(f"[Timeout] {timeout_sec}s で強制停止")
return None
:::message alert **
recursion_limit の落とし穴**: サブグラフの内部ステップも親のカウントに含まれる。サブグラフが深い構成では、予想より早く上限に達することがある。本番投入前に意図したループ回数で正常終了することを必ず確認すること。 :::
まとめ──「自分でフローを掌握する」ための設計思想
LangGraph を使ってみて一番変わったのは、「Agent が何をしているかわからない」という不安がなくなったことだ。
Graph を書くということは、実行フローをコードとして明示するということ。どのノードが何をするか、どの条件でどこへ遷移するか、失敗したときどう回復するか──すべてが読める。
実装する順番としては、以下が個人的にやりやすかった。
① State の TypedDict を先に設計する どの情報が必要かを整理するだけで、Graph の構造が見えてくる。
② ノードを純粋関数として書く State の部分更新だけを返す関数にしておけば、テストもしやすい。
③ ルーティング関数のユニットテストを先に書く 入力と出力が明確なので、
pytestで普通にテストできる。ここが壊れると Graph 全体が壊れるため、重点的にカバーする。
④ Checkpointer は最初からつけておく 後から追加すると State スキーマの変更が必要になることがある。開発時は
InMemorySaver、本番で差し替えればいい。
⑤ LangSmith のトレースを見ながらチューニングする どこで時間がかかっているか、コストが何に使われているかを確認して、ボトルネックから潰す。
Chain や CrewAI を完全に捨てる必要はない。シンプルな一方向の処理なら Chain で十分だし、CrewAI の宣言的な記法が向いているケースもある。ただ「フローを自分で握りたい」「エラー処理を真剣に考えたい」「本番で安心して動かしたい」というなら、LangGraph は現時点で最も手堅い選択肢だと考えている。
LangGraph が教えてくれるのは、「記憶のないエージェントは、本当の意味でインテリジェントではない」ということ。StateGraph で状態を管理し、Checkpointer で文脈を保持し、エラーから回復する——それだけの設計コストをかけて、私たちが本当に作りたいのは何だろう。