Building Multi-Step AI Agents with LangGraph

Modern AI agents use LangGraph to run cyclic workflows that need memory and self-correction. By framing your agent as a stateful graph, you move past simple linear prompts. You build autonomous systems that loop, branch on tool output, recover from failures, and save progress across hours or days of work.
This post walks LangGraph from core ideas to production deployment. You’ll learn how to design a state schema, set up self-correcting retry logic, build multi-agent patterns, and serve your agent through a production API. Working Python code runs throughout.
Prerequisites
You should be comfortable with Python 3.11+ and have some familiarity with LangChain ’s core ideas: LLMs, tools, prompts. The examples below use these package versions:
pip install langgraph==0.3 langchain-openai==0.2 langchain-core==0.3 pydantic==2.7You’ll also need an OPENAI_API_KEY in your environment, or swap in any LangChain-compatible LLM. All code is tested against Python 3.11 and 3.12.
What Is LangGraph and Why It Replaced Chains
For most of LangChain’s early life, the main way to chain LLM calls was the Chain: a linear pipeline where output from one step flowed into the next. For simple, predictable tasks this worked fine. But the moment you added a tool that could fail, an output that needed checking, or a step that needed a retry with new parameters, the linear model broke. Chains have no native way to loop back, branch on a condition, or route to a different step based on what a tool returned. They are directed acyclic graphs (DAGs) in disguise, and DAGs can’t express the retry logic that makes agents reliable in the real world.
LangGraph fixes this by treating the agent as a proper stateful graph with cyclic edges. A cycle is just a directed edge that points to a node already visited. That one primitive unlocks retry logic, self-correction loops, approval flows, and multi-turn chat. Instead of a pipeline, you’re building a state machine. Its transitions are driven by the agent’s own outputs and tool results. This is the key shift: your LLM no longer just generates text at the end of a chain. It makes routing choices that drive the whole program.

LangGraph’s vocabulary is small but precise. A StateGraph is the container that holds everything. Nodes are plain Python functions, sync or async, that take the current state, do some work (call an LLM, invoke a tool, check a result), and return a partial state update. Edges are the transitions between nodes. They can be fixed (always go from A to B) or conditional (call a function that reads the state and returns the next node’s name). The State object is a typed dictionary that flows through every node, picking up updates as it goes. Here’s the simplest form:
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add] # append-only list
task_complete: bool
def my_node(state: AgentState) -> dict:
# do work, return partial state update
return {"task_complete": True}
graph = StateGraph(AgentState)
graph.add_node("worker", my_node)
graph.set_entry_point("worker")
graph.add_edge("worker", END)
app = graph.compile()
result = app.invoke({"messages": [], "task_complete": False})That’s enough to run a single-node graph. Everything else (memory, self-correction, multi-agent patterns) builds on this base.
LangGraph vs. the Alternatives in 2026
Before you commit to LangGraph, it helps to see where it sits in a crowded field. The table below compares the four most popular agent frameworks in early 2026:
| Framework | Paradigm | Cyclic Graphs | Built-in Persistence | Multi-Agent | Best For |
|---|---|---|---|---|---|
| LangGraph | Stateful graph / state machine | Yes (first-class) | Yes (Sqlite, Postgres, Redis) | Yes (via subgraphs) | Complex, production agents requiring reliability |
| CrewAI | Role-based crews | Limited | No (bring your own) | Yes (role abstraction) | Teams of specialized agents, rapid prototyping |
| AutoGen | Conversational agents | Via orchestration | Limited | Yes (agent conversations) | Research, multi-agent dialogue tasks |
| OpenAI Assistants API | Managed, cloud-hosted | No (managed by API) | Yes (cloud-managed threads) | Limited | Simplest path to a working assistant, OpenAI-only |
LangGraph is the right pick when you need fine-grained control over agent behavior, solid persistence, and the option to self-host. If you’re building a quick proof-of-concept with role-playing agents, CrewAI may get you there faster. But for production where you need to reason about every state change and handle failure well, LangGraph’s explicit graph wins. You can read the graph and know exactly what the agent will do in every case.
Designing Your State Schema
The State object is the single most important design choice in any LangGraph agent. Everything the agent knows at any moment lives in state: chat history, tool results, error counts, flags, and any data your app needs. Getting this right before you write a single node heads off the most common bugs in multi-step agents. Think state that grows forever, fields with fuzzy meanings, and schemas you can’t migrate when the spec shifts.
The first question is what belongs in state at all. A rule of thumb: if a node reads a value to make a choice, or writes a value for a later node, it belongs in state. If data is only used inside one node, keep it as a local variable. Don’t treat state as a scratchpad. Every field you add grows the bug surface and makes serializing state more expensive in the context window.
For most agents, the state schema looks something like this:
from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field
class ResearchAgentState(TypedDict):
# The conversation history - LangGraph's add_messages reducer
# appends new messages rather than replacing the whole list
messages: Annotated[list, add_messages]
# URLs discovered during research, to be scraped
urls_to_scrape: list[str]
# Scraped content ready for summarization
scraped_content: list[dict]
# The final synthesized answer
final_answer: Optional[str]
# Error tracking for the self-correction loop
error_count: int
last_error: Optional[str]
# Termination signal
task_complete: boolNotice the Annotated[list, add_messages] pattern on the messages field. LangGraph uses reducers to merge partial state updates from nodes. Without a reducer, returning {"messages": [new_message]} from a node would wipe the whole messages list. The add_messages reducer appends instead, which is almost always what you want for chat history. You can write custom reducers for any field. For example, one that caps a list at N items to stop unbounded growth.
TypedDict vs. Pydantic for State Validation
TypedDict is the default and it’s fast. Python doesn’t enforce the types at runtime, which means bad state can quietly spread through your graph. During development, use a Pydantic model instead:
from pydantic import BaseModel, Field, field_validator
from typing import Optional
class StrictAgentState(BaseModel):
messages: list = Field(default_factory=list)
error_count: int = Field(default=0, ge=0, le=10)
task_complete: bool = False
final_answer: Optional[str] = None
@field_validator("error_count")
@classmethod
def error_count_non_negative(cls, v: int) -> int:
if v < 0:
raise ValueError("error_count cannot be negative")
return vPydantic validation runs each time a node returns a state update. You get fast, clear errors when a node returns the wrong type or an out-of-range value. The trade-off is a small runtime cost and the need to wire LangGraph to your Pydantic model. For any real agent this cost is tiny next to the price of LLM calls.
Avoiding State Bloat
One failure mode is worth calling out: agents that stuff large data into state. If your agent scrapes ten web pages, don’t store the full HTML in the state object. Instead, write the content to a temp file or external store (Redis, S3) and keep only the key or path in state. This keeps state serializable in milliseconds, stops context window overflow when state gets dumped into prompts, and makes checkpointing cheap. The state object should hold references to data, not the data itself, whenever documents top a few kilobytes.
State Management and Long-Term Memory
One of LangGraph’s most useful features, and the clearest gap between it and simpler frameworks, is native persistent checkpointing. After every node runs, LangGraph can save the full graph state to a durable store. If the agent crashes, gets killed, or just needs to pause for human review, it resumes from the last checkpoint with no data loss.
LangGraph ships with two built-in checkpointers: SqliteSaver for dev and single-machine setups, and PostgresSaver for production multi-instance deployments. Adding persistence to any graph takes four lines:
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph
# Create the checkpointer
checkpointer = SqliteSaver.from_conn_string("agent_state.db")
# Compile the graph with the checkpointer attached
app = graph.compile(checkpointer=checkpointer)
# Each invocation uses a thread_id to identify the conversation
config = {"configurable": {"thread_id": "user-session-42"}}
# First invocation - agent starts working
result = app.invoke({"messages": [], "task_complete": False}, config=config)
# If the process crashes here, the state is safe in SQLite
# Later: resume from the last checkpoint by invoking with the same thread_id
result = app.invoke(None, config=config) # None resumes from checkpointThe thread_id is the key concept. Every distinct agent session gets its own thread, and LangGraph uses this ID to read and write checkpoints. A single deployed agent can handle thousands of concurrent sessions, each with fully separate state, through this one feature.
Short-Term vs. Long-Term Memory
It helps to think about agent memory in two tiers. Short-term memory is the messages list in state: the rolling chat history that fits in the LLM’s context window and is open to every node in the current run. It’s fast, always available, and managed for you by LangGraph’s checkpointing. But it has a hard ceiling set by your model’s context window, and it’s scoped to a single thread, or session.
Long-term memory is everything outside the context window. A vector database holding the agent’s notes and lessons across all sessions, a SQL database tracking past choices and their outcomes, or a Redis cache for intermediate results too big for context. The agent reaches long-term memory through tool calls or dedicated retrieval nodes. Choosing what lives in each tier is one of the biggest design calls for production agents.
Human-in-the-Loop Checkpoints
Not every action an agent takes should be autonomous. Sending an email, shipping code to production, or running a financial transaction are all one-way doors where a human should have the final call. LangGraph handles this through interrupt points: checkpoints where the graph pauses and waits for outside input before moving on.
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
def draft_email_node(state):
# Agent drafts an email
draft = call_llm_to_draft(state["messages"])
return {"draft_email": draft}
def send_email_node(state):
# This node only runs after human approval
send_email(state["draft_email"])
return {"task_complete": True}
graph = StateGraph(AgentState)
graph.add_node("draft", draft_email_node)
graph.add_node("send", send_email_node)
graph.add_edge("draft", "send")
graph.set_entry_point("draft")
graph.add_edge("send", END)
checkpointer = SqliteSaver.from_conn_string("state.db")
# interrupt_before causes the graph to pause BEFORE entering "send"
app = graph.compile(checkpointer=checkpointer, interrupt_before=["send"])
config = {"configurable": {"thread_id": "email-task-1"}}
# Graph runs "draft" then pauses - human reviews state["draft_email"]
app.invoke(initial_state, config=config)
# After human approves, resume - "send" now executes
app.invoke(None, config=config)This pattern is a must for any agent working in a high-stakes domain. The agent does the thinking. The human makes the final call before any one-way side effect.
Advanced Error Handling: The Self-Correction Loop
Production agents fail constantly. APIs time out. LLMs return JSON that won’t parse. Web scrapers hit rate limits. Code execution tools hit sandbox limits. The gap between a brittle demo and a solid production agent is how it handles these failures. LangGraph’s conditional edges are what makes self-correction work.
The base pattern is the retry edge with a counter. Add an error_count field to your state. Bump it in the node that handles failures. Use a conditional edge to route back to the failing node for another try, or to a clean exit node after N attempts. Without the counter, a stuck failure spins forever. With it, the agent degrades cleanly.
from typing import TypedDict, Annotated, Optional
from langgraph.graph import StateGraph, END
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
tool_result: Optional[str]
error_count: int
last_error: Optional[str]
task_complete: bool
MAX_RETRIES = 3
def call_api_node(state: AgentState) -> dict:
try:
result = call_external_api(state["messages"][-1].content)
return {"tool_result": result, "error_count": 0}
except Exception as e:
return {
"tool_result": None,
"error_count": state["error_count"] + 1,
"last_error": str(e),
}
def route_after_api(state: AgentState) -> str:
if state["tool_result"] is not None:
return "process_result"
elif state["error_count"] >= MAX_RETRIES:
return "handle_failure"
else:
return "call_api" # Loop back for retry
graph = StateGraph(AgentState)
graph.add_node("call_api", call_api_node)
graph.add_node("process_result", process_result_node)
graph.add_node("handle_failure", handle_failure_node)
graph.set_entry_point("call_api")
graph.add_conditional_edges("call_api", route_after_api)
graph.add_edge("process_result", END)
graph.add_edge("handle_failure", END)
app = graph.compile()The Reflexion Pattern: LLM-Guided Self-Correction
Simple retry loops work for flaky failures. But some failures need the agent to change its strategy. The Reflexion pattern routes the agent through a dedicated critique node after a failure. The LLM looks at what went wrong and suggests a new plan before the next try.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def critique_node(state: AgentState) -> dict:
"""
After a failure, ask the LLM to reflect on what went wrong
and generate a revised plan before the next attempt.
"""
critique_prompt = f"""
The previous attempt failed with this error:
{state['last_error']}
The original request was:
{state['messages'][0].content}
Analyze what went wrong and provide a revised approach.
Be specific about what you will do differently.
"""
response = llm.invoke([
SystemMessage(content="You are a debugging assistant."),
HumanMessage(content=critique_prompt),
])
# The critique is appended to messages, so the next attempt
# can see what the LLM learned from the failure
return {
"messages": [response],
"error_count": state["error_count"], # preserve count
}
# In the graph: failure routes to critique, critique routes back to attempt
graph.add_node("critique", critique_node)
graph.add_edge("critique", "call_api") # retry after critiqueThis pattern works well for code generation agents. When generated code fails its tests, routing through a critique node that reads the test output and error before the next try usually lands on a working solution in two or three rounds.
Structured Output Validation
Beyond flaky API failures, one of the most common failure modes is an LLM returning text that doesn’t fit the expected JSON shape. Catch this at the node boundary before bad output flows downstream:
from pydantic import BaseModel, ValidationError
from langchain_openai import ChatOpenAI
class SearchQuery(BaseModel):
query: str
num_results: int
filter_domain: str | None = None
llm = ChatOpenAI(model="gpt-4o")
structured_llm = llm.with_structured_output(SearchQuery)
def generate_search_query_node(state: AgentState) -> dict:
try:
query = structured_llm.invoke(state["messages"])
# query is guaranteed to be a valid SearchQuery instance
return {"tool_result": query.model_dump()}
except ValidationError as e:
return {
"error_count": state["error_count"] + 1,
"last_error": f"Structured output validation failed: {e}",
}Using with_structured_output with a Pydantic model moves validation out of your app logic and into the LangChain layer. The LLM is told to produce JSON matching the schema. Any drift raises a ValidationError that your error-handling edge can route around.
Multi-Agent Collaboration Patterns
Single agents are powerful. But the most capable LangGraph setups use several specialist agents working together. Each is tuned for a narrow task, supervised by an orchestrator that splits work and merges results. LangGraph supports this natively through its subgraph feature, where one StateGraph can call another as a node.
The Supervisor-Worker Pattern
The most broadly useful multi-agent pattern is the supervisor-worker setup. A supervisor agent takes a high-level task, breaks it into subtasks, hands each one to a specialist worker agent, and stitches the results back together. The supervisor doesn’t do the domain work itself. Its job is planning, delegation, and synthesis.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Literal
import operator
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# --- Worker Agents (simplified) ---
class WorkerState(TypedDict):
task: str
result: str
def research_worker(state: WorkerState) -> dict:
response = llm.invoke([HumanMessage(content=f"Research: {state['task']}")])
return {"result": response.content}
def coder_worker(state: WorkerState) -> dict:
response = llm.invoke([HumanMessage(content=f"Write Python code for: {state['task']}")])
return {"result": response.content}
research_graph = StateGraph(WorkerState)
research_graph.add_node("research", research_worker)
research_graph.set_entry_point("research")
research_graph.add_edge("research", END)
research_app = research_graph.compile()
coder_graph = StateGraph(WorkerState)
coder_graph.add_node("code", coder_worker)
coder_graph.set_entry_point("code")
coder_graph.add_edge("code", END)
coder_app = coder_graph.compile()
# --- Supervisor ---
class SupervisorState(TypedDict):
original_task: str
subtasks: list[dict] # [{"type": "research"|"code", "task": str}]
results: Annotated[list, operator.add]
final_answer: str
def plan_node(state: SupervisorState) -> dict:
"""Decompose the task into subtasks."""
plan_prompt = f"""
Decompose this task into subtasks. Return a JSON list where each item
has "type" (either "research" or "code") and "task" (the subtask description).
Task: {state['original_task']}
"""
# In practice, use structured output here
response = llm.invoke([HumanMessage(content=plan_prompt)])
subtasks = parse_subtasks(response.content)
return {"subtasks": subtasks}
def delegate_node(state: SupervisorState) -> dict:
"""Execute all subtasks and collect results."""
results = []
for subtask in state["subtasks"]:
worker = research_app if subtask["type"] == "research" else coder_app
output = worker.invoke({"task": subtask["task"], "result": ""})
results.append({"type": subtask["type"], "result": output["result"]})
return {"results": results}
def synthesize_node(state: SupervisorState) -> dict:
"""Combine worker results into a final answer."""
synthesis_prompt = f"""
Original task: {state['original_task']}
Worker results:
{state['results']}
Synthesize these into a comprehensive final answer.
"""
response = llm.invoke([HumanMessage(content=synthesis_prompt)])
return {"final_answer": response.content}
supervisor = StateGraph(SupervisorState)
supervisor.add_node("plan", plan_node)
supervisor.add_node("delegate", delegate_node)
supervisor.add_node("synthesize", synthesize_node)
supervisor.set_entry_point("plan")
supervisor.add_edge("plan", "delegate")
supervisor.add_edge("delegate", "synthesize")
supervisor.add_edge("synthesize", END)
supervisor_app = supervisor.compile()The Coder-Reviewer Loop
A useful variant of multi-agent work is the coder-reviewer loop. One agent writes code. A second runs it in a sandbox and reviews the output. If the tests fail, the result goes back to the coder with the failure report attached. This mirrors the TDD (Test-Driven Development) flow at the agent level and yields far more reliable code than single-pass generation.
def coder_node(state):
"""Generate or revise Python code based on the task and any prior failures."""
context = "\n".join([
f"Prior attempt failed:\n{state['last_error']}"
if state.get("last_error") else ""
])
prompt = f"Write Python code for: {state['task']}\n{context}"
response = llm.invoke([HumanMessage(content=prompt)])
code = extract_code_block(response.content)
return {"generated_code": code}
def reviewer_node(state):
"""Execute the code in a sandbox and capture output or errors."""
try:
result = execute_in_sandbox(state["generated_code"], timeout=10)
if result.tests_passed:
return {"task_complete": True, "last_error": None}
else:
return {
"error_count": state["error_count"] + 1,
"last_error": result.test_output,
"task_complete": False,
}
except TimeoutError:
return {
"error_count": state["error_count"] + 1,
"last_error": "Code execution timed out (>10s)",
"task_complete": False,
}
def route_after_review(state) -> str:
if state["task_complete"]:
return END
elif state["error_count"] >= MAX_RETRIES:
return "handle_failure"
return "coder" # Loop back to coder with the error contextAvoiding Agent Storms
Multi-agent systems have a failure mode single agents don’t: agent storms. Agents delegate to each other in a loop, spawning a growing tree of subtasks. Prevention needs explicit limits at the architecture level:
- Set a max recursion depth in the supervisor’s state schema (
delegation_depth: int) and enforce it in the routing function. - Rate limit inter-agent calls with a token bucket as a shared counter in Redis.
- Log every agent decision to a structured event log, with the calling agent’s ID as a field. Without this, debugging a storm after the fact is near impossible.
- Design worker agents to be stateless where you can. Workers that don’t call other workers can’t trigger storms.
Deploying LangGraph Agents to Production

The gap between a working agent in a notebook and a solid agent in production is wide. Production brings concurrent users, long-running tasks that span process restarts, observability needs, and cost control. None of these show up during development.
Wrapping LangGraph in a FastAPI Service
The most portable self-hosted deployment wraps your compiled graph in an async FastAPI endpoint. LangGraph’s astream method lets you stream node outputs to the client in real time. That’s important for long-running tasks where the user would otherwise wait silently for minutes.
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import asyncio
import json
app = FastAPI()
# Use PostgresSaver for production (supports concurrent access)
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/agentdb"
)
agent_app = build_agent_graph().compile(checkpointer=checkpointer)
class TaskRequest(BaseModel):
task: str
thread_id: str
@app.post("/run-agent")
async def run_agent(request: TaskRequest):
config = {"configurable": {"thread_id": request.thread_id}}
initial_state = {
"messages": [{"role": "user", "content": request.task}],
"error_count": 0,
"task_complete": False,
}
async def event_stream():
async for event in agent_app.astream(initial_state, config=config):
# Stream each node's output as a server-sent event
yield f"data: {json.dumps(event)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/agent-state/{thread_id}")
async def get_state(thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
state = agent_app.get_state(config)
return state.valuesObservability with LangSmith
When an agent fails in production, you need to know which node it failed in, what the state looked like at that point, and what the LLM was prompted with. LangSmith gives you this for free in LangChain and LangGraph apps. Every node run, every LLM call, and every tool call is traced and stored.
import os
# Enable LangSmith tracing via environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-production-agent"
# No code changes required - LangGraph integrates automaticallyWith tracing on, every production call makes a trace in the LangSmith UI. It shows the full path through the graph, latency per node, and the exact inputs and outputs of every LLM call. For debugging production incidents, this is a must-have.
LangGraph Platform vs. Self-Hosting
LangGraph Platform (the managed offering, formerly LangServe) handles persistence, scaling, and the REST API layer for you, for a monthly fee. For teams without infra engineers, it’s a strong option. For teams that need full control over data residency, self-hosting with the FastAPI pattern above and PostgresSaver is simple. It adds no per-request cost beyond the LLM API calls themselves.
Cost Management for Multi-Step Agents
A single task run by a multi-step agent can easily fire 20 to 50 LLM calls: the supervisor’s plan, each worker’s run, critique nodes, retry attempts, and the final synthesis. At GPT-4o prices (about $15 per million output tokens in early 2026), a complex task costs $0.30 to $1.50 per call. Before you deploy at scale, log token use per node and per thread. Find which nodes burn the most tokens. Ask whether a smaller, cheaper model would do for that node’s job. Routing simple classification through gpt-4o-mini instead of gpt-4o can cut overall costs by 40 to 60 percent with little quality loss. For teams with strict data-residency rules or high-volume workloads, running a local LLM on consumer hardware
wipes out per-token API costs for lighter nodes.
Putting It All Together: A Complete Research Agent
Here’s a small but complete agent that shows all the patterns from this post: stateful graph, checkpointing, conditional routing with retry logic, and streaming deployment.
from typing import TypedDict, Annotated, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_community.tools import DuckDuckGoSearchRun
llm = ChatOpenAI(model="gpt-4o", temperature=0)
search_tool = DuckDuckGoSearchRun()
MAX_RETRIES = 3
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
search_results: list[str]
final_report: Optional[str]
error_count: int
last_error: Optional[str]
task_complete: bool
def search_node(state: ResearchState) -> dict:
"""Perform a web search based on the latest user message."""
query = state["messages"][-1].content
try:
results = search_tool.run(query)
return {"search_results": [results], "error_count": 0, "last_error": None}
except Exception as e:
return {
"search_results": [],
"error_count": state["error_count"] + 1,
"last_error": str(e),
}
def synthesize_node(state: ResearchState) -> dict:
"""Synthesize search results into a structured report."""
context = "\n\n".join(state["search_results"])
prompt = f"""
Based on these search results, write a concise, accurate research report.
Address the original query: {state['messages'][0].content}
Search results:
{context}
"""
response = llm.invoke([HumanMessage(content=prompt)])
return {
"final_report": response.content,
"messages": [AIMessage(content=response.content)],
"task_complete": True,
}
def route_after_search(state: ResearchState) -> str:
if state["search_results"]:
return "synthesize"
elif state["error_count"] >= MAX_RETRIES:
return END
return "search" # Retry
# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("search", search_node)
graph.add_node("synthesize", synthesize_node)
graph.set_entry_point("search")
graph.add_conditional_edges("search", route_after_search)
graph.add_edge("synthesize", END)
checkpointer = SqliteSaver.from_conn_string("research_agent.db")
research_agent = graph.compile(checkpointer=checkpointer)
# Run the agent
config = {"configurable": {"thread_id": "research-001"}}
initial_state = {
"messages": [HumanMessage(content="What are the latest developments in quantum computing?")],
"search_results": [],
"final_report": None,
"error_count": 0,
"last_error": None,
"task_complete": False,
}
result = research_agent.invoke(initial_state, config=config)
print(result["final_report"])What to Build Next
The patterns in this post (stateful graphs, checkpointed persistence, self-correction loops, and multi-agent supervision) are the building blocks of the most capable autonomous systems being built today. Once you have a working single-agent loop, the natural next step is to add a vector database for long-term memory so the agent learns from past tasks, connect it to real tools via the Model Context Protocol, and deploy it behind a streaming API your frontend can consume in real time.
LangGraph’s explicitness is its biggest strength. Unlike black-box agent frameworks, you can read the graph definition and know exactly what will happen in every case. That predictability is the gap between an agent that wows in a demo and one that earns trust in production.
Botmonster Tech