Loading
Please wait while your experience is prepared...
Please wait while your experience is prepared...
backend / Jun 5, 2026 / 13 min
wiring LangGraph interrupt() and Command(resume=) into a FastAPI pipeline: a compiled graph singleton, a dedicated checkpoint pool, and resumable SSE streaming.
The content pipeline runs multi-phase LangGraph graphs that take 5 to 30 minutes per execution. At several points during a run, the pipeline needs a human decision before it can continue: which brand voice to use, which content goal to pursue, whether discovered case studies are accurate. The graph has to pause, surface a question, wait indefinitely for an answer, then resume exactly where it left off with all prior state intact.
LangGraph's interrupt() and Command(resume=) primitives handle that contract. Wiring them correctly into a FastAPI service without data loss across restarts requires a few specific decisions: how the graph object is initialized and shared, how the checkpointer is isolated from the application database pool, how interrupt detection works at the call site, and how an SSE stream reconnects safely after a client drop.
The first decision is to compile the graph exactly once at application startup, not once per request. A LangGraph CompiledStateGraph is stateless between invocations: all persistent state lives in the checkpointer, not in the compiled graph object. Compiling it once and reusing it across all invocations avoids rebuilding the node map and conditional edge functions on every request. More importantly, it makes the dependencies explicit at a single initialization point.
# runtime/graph.py
from langgraph.graph import StateGraph, END
from langgraph.graph.state import CompiledStateGraph
from langgraph.store.base import BaseStore
from pipeline_api.runtime.state import PipelineState
from pipeline_api.runtime.nodes.router import route_phase
from pipeline_api.runtime.nodes.onboarding.node import run_onboarding
from pipeline_api.runtime.nodes.research import run_research
from pipeline_api.runtime.nodes.production.node import run_production
from pipeline_api.runtime.nodes.ask_user import ask_user_node
# ... other node imports
PHASES = [
"onboarding", "research", "production",
"learning", "distribution", "ask_user", "goal_selection",
]
ROUTING_MAP = {p: p for p in PHASES}
ROUTING_MAP[END] = END
_graph: CompiledStateGraph | None = None
def build_graph(store: BaseStore, checkpointer=None) -> CompiledStateGraph:
builder = StateGraph(PipelineState)
builder.add_node("onboarding", run_onboarding)
builder.add_node("research", run_research)
builder.add_node("production", run_production)
builder.add_node("ask_user", ask_user_node)
# ... other nodes
builder.set_conditional_entry_point(route_phase, ROUTING_MAP)
for node in PHASES:
builder.add_conditional_edges(node, route_phase, ROUTING_MAP)
return builder.compile(checkpointer=checkpointer, store=store)
def init_graph(store: BaseStore, checkpointer) -> None:
global _graph
_graph = build_graph(store=store, checkpointer=checkpointer)
def get_graph() -> CompiledStateGraph:
assert _graph is not None, "Graph not initialized; call init_graph() at lifespan startup"
return _graphAll routing is handled by a single function that reads state["phase"] and returns either a phase string or END. The same function serves as both the conditional entry point and the conditional edge out of every node. When a node wants to hand off to a different phase, it updates phase in its return dict. When it wants to stop execution, it sets phase = "done".
# runtime/nodes/router.py
from langgraph.graph import END
from pipeline_api.runtime.state import PipelineState
def route_phase(state: PipelineState) -> str:
phase = state.get("phase", "done")
if phase == "done":
return END
return phaseThe startup sequence in main.py wires everything together in the correct dependency order: the application database pool opens first, then the checkpointer gets its own pool, then the vector store is initialized, and finally the graph is compiled with both the store and checkpointer passed in. The order matters because init_graph calls get_store() and get_saver() internally, both of which assert that their respective objects have been initialized.
# main.py
@asynccontextmanager
async def lifespan(app: FastAPI):
configure_litellm()
await db_pool.setup(settings.database_url)
await checkpointer.setup(settings.database_url)
await store_module.initialize()
init_graph(
store=store_module.get_store(),
checkpointer=checkpointer.get_saver(),
)
yield
await store_module.shutdown()
await checkpointer.shutdown()
await db_pool.shutdown()LangGraph's AsyncPostgresSaver needs a database connection for every checkpoint read and write. If it shares the application's connection pool, checkpoint operations compete with application queries under load. More critically, the checkpointer makes specific assumptions about connection state: it requires autocommit=True and prepare_threshold=None to work correctly with psycopg3's pipeline mode, and those settings are incompatible with how the application pool is configured.
# runtime/checkpointer.py
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
_saver: AsyncPostgresSaver | None = None
_pool: AsyncConnectionPool | None = None
async def setup(database_url: str) -> None:
global _saver, _pool
_pool = AsyncConnectionPool(
database_url,
min_size=1,
max_size=2,
kwargs={"autocommit": True, "prepare_threshold": None},
open=False,
)
await _pool.open(wait=True)
_saver = AsyncPostgresSaver(_pool)
await _saver.setup()
def get_saver() -> AsyncPostgresSaver:
assert _saver is not None, "Checkpointer not initialized"
return _saverThe pool ceiling of two connections is deliberate. Each run has exactly one active execution at a time, and the number of concurrent active runs is bounded by the number of active organizations. Two connections are enough headroom: one for the active checkpoint write, one for the concurrent state read that _execute_graph performs after ainvoke to detect interrupt status. Keeping it small prevents the checkpointer from holding idle connections that could otherwise be used by the application pool.
autocommit=True is not optional. AsyncPostgresSaver issues multi-statement transactions that psycopg3 in autocommit mode handles correctly via explicit BEGIN and COMMIT calls. With the default transaction mode, those statements run inside implicit transactions that cause serialization errors or stale reads depending on when the connection is reused. prepare_threshold=None disables server-side prepared statements, which trip over the schema-qualified table names the checkpointer uses in its internal queries.
The PipelineState TypedDict carries everything that moves between nodes. For HITL specifically, three fields matter: pending_question holds the question text before the interrupt fires, decision_id holds the UUID of the database record that was created for that question, and context accumulates answers and other cross-phase data as a free-form dict.
# runtime/state.py
from typing import Annotated, Any, TypedDict
from langgraph.graph.message import add_messages
class PipelineState(TypedDict):
run_id: str
org_id: str
phase: str
messages: Annotated[list, add_messages]
completed_phases: list[str]
pending_question: str | None
decision_id: str | None
context: dict[str, Any]
error: str | Nonemessages uses add_messages as its reducer, which appends rather than replaces on each node return. All other fields use last-write-wins: each node's return dict is merged into the checkpoint state. The context dict is intentionally a catch-all. It accumulates brand doc content, goal selections, user answers, and any other cross-phase data that doesn't warrant a dedicated typed field. Phase nodes read from context and write back into it by spreading the existing dict and adding new keys.
The ask_user node is the dedicated HITL dispatch point. When the pipeline needs a human decision, a phase node sets pending_question and phase = "ask_user" in its return dict. The router sends execution to ask_user_node, which creates a database record for the question and calls interrupt().
# runtime/nodes/ask_user.py
from langgraph.types import interrupt
from pipeline_api.db.repositories.decision_repo import create_decision
from pipeline_api.runtime.state import PipelineState
async def ask_user_node(state: PipelineState, **_) -> dict:
question = state["pending_question"] or "What should the pipeline do next?"
decision = await create_decision(state["run_id"], question)
answer = interrupt({"question": question, "decision_id": decision.id})
return {
"pending_question": None,
"decision_id": None,
"context": {**state["context"], "last_answer": answer},
}From the node's perspective, interrupt() behaves like a blocking call that returns exactly once: when the graph is resumed with Command(resume=answer). The line after interrupt() executes only after a successful resume. answer receives whatever value was passed to Command(resume=...) by the caller. The node clears pending_question and decision_id from state and writes the answer into context["last_answer"] so the next phase node can read it.
The database Decision record and the interrupt() call serve different purposes. interrupt() is the graph's pause mechanism: LangGraph serializes state to the checkpointer when it's called and raises GraphBubbleUp to unwind execution. The database record is what the frontend uses to render the question. The graph doesn't need the DB record to resume: Command(resume=answer) is all that's required. The record exists so the UI has something to show the user while the graph is waiting.
The onboarding node also calls interrupt() directly in multiple places via a present_decision helper, without routing through ask_user_node. Both approaches produce identical behavior at the LangGraph level. The ask_user_node route is used when a downstream phase node wants to delegate the full pause-and-wait cycle to a dedicated graph node. The inline approach is used inside the onboarding node itself, which contains a long multi-step HITL flow that would be awkward to break into separate graph nodes.
After await graph.ainvoke(input, config) returns, the graph is either complete or suspended at an interrupt point. LangGraph surfaces this two ways. First, it raises GraphBubbleUp if execution was interrupted: that exception propagates through ainvoke to the caller. Second, even if the exception is somehow swallowed upstream, await graph.aget_state(config) returns a snapshot where state.next is non-empty when the graph is waiting for a resume. The execution wrapper checks both.
# runtime/runner.py
from langgraph.errors import GraphBubbleUp
from pipeline_api.runtime.graph import get_graph
from pipeline_api.db.repositories.run_repo import update_run_status
async def _execute_graph(run_id: str, graph_input, store=None) -> None:
config = {"configurable": {"thread_id": run_id}}
is_interrupted = False
graph = get_graph()
try:
await graph.ainvoke(graph_input, config=config)
except GraphBubbleUp:
is_interrupted = True
if not is_interrupted:
state = await graph.aget_state(config)
is_interrupted = bool(state.next)
if is_interrupted:
await update_run_status(run_id, "suspended")
else:
await update_run_status(run_id, "completed")The thread ID is the run ID. LangGraph uses the thread ID as the primary key for the checkpoint, so every ainvoke call with the same thread ID picks up from the same checkpoint. For the initial run, the graph input is a PipelineState dict. For resumes, the graph input is a Command(resume=answer) object, and the checkpoint provides all state: the Command only carries the resume value.
When the human answers a question, the answer endpoint validates that the run is in suspended status and fires a background task that calls resume_graph. Checking status before resuming is important: calling ainvoke with Command(resume=...) on a run that isn't suspended would start a second concurrent execution of the same thread, which would corrupt the checkpoint.
# features/run/routes.py
@router.post("/runs/{run_id}/resume")
async def resume_run_endpoint(run_id: str, body: dict):
run = await get_run(run_id)
if not run:
raise HTTPException(status_code=404, detail="Run not found")
if run.status != "suspended":
raise HTTPException(
status_code=409, detail=f"Run is {run.status}, not suspended"
)
answer = body.get("answer", "skip")
fire_task(resume_graph(run_id=run_id, answer=answer))
return {"status": "resuming", "run_id": run_id, "answer": answer}# runtime/resume.py
from langgraph.types import Command
from pipeline_api.db.repositories.run_repo import update_run_status
from pipeline_api.runtime.runner import _execute_graph
from pipeline_api.runtime.store import get_store
async def resume_graph(run_id: str, answer: str) -> None:
store = get_store()
await update_run_status(run_id, "running")
await _execute_graph(run_id, Command(resume=answer), store=store)Command(resume=answer) is passed as the graph_input argument to _execute_graph, which passes it directly to ainvoke. LangGraph loads the checkpoint for the thread ID, finds the suspended interrupt position, and resumes execution from that node with answer as the return value of interrupt(). From there, ask_user_node completes its return dict and route_phase handles the next transition.
The answer can be any serializable Python value. In this pipeline, answers are always strings: approval text, rejection feedback, a numeric choice index, or a JSON payload for complex decisions like case study confirmation or author profile editing. Nodes receiving the answer handle parsing themselves. Complex answers get serialized as JSON strings passed through Command(resume=) and parsed with json.loads on the receiving end: the interrupt() / Command(resume=) contract passes through whatever the caller provides without any schema enforcement.
Both the initial pipeline trigger and the resume call use fire_task() rather than await. The endpoint returns a 202 or a status dict immediately, and the actual graph execution runs as a background asyncio task. This is covered in depth in why fastapi background tasks disappear mid-execution, but the relevant detail here: asyncio.create_task() returns a Task object that the event loop holds via a weak reference only. If nothing else holds a strong reference to the returned Task, the garbage collector can collect and cancel it while it's still running: no exception, no log, the pipeline simply stops.
# utils/tasks.py
import asyncio
from typing import Any, Coroutine
_background_tasks: set[asyncio.Task] = set()
def fire_task(coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return taskThe module-level _background_tasks set holds a strong reference to every in-flight task. The add_done_callback removes the task from the set when it completes, allowing the GC to collect it normally after it's done. The set never grows unbounded because tasks remove themselves on completion. This pattern appears everywhere the codebase fires a long-running background coroutine: pipeline triggers, resumes, retries, and post-run metric refreshes.
The frontend streams run progress via Server-Sent Events. The stream endpoint polls a run_events table every two seconds and yields each new event as an SSE frame. Because pipeline runs last minutes, the client may disconnect and reconnect at any time: a page reload, a mobile app backgrounding, a proxy timeout. The Last-Event-ID header lets the client resume from the last event it received rather than replaying the full history from the start.
# features/run/routes.py
@router.get("/runs/{run_id}/stream")
async def stream_run(run_id: str, request: Request):
last_event_id = (
request.headers.get("last-event-id")
or request.query_params.get("since")
)
initial_last_seen = None
if last_event_id:
try:
initial_last_seen = datetime.fromisoformat(last_event_id)
if initial_last_seen.tzinfo is None:
initial_last_seen = initial_last_seen.replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
pass
async def event_generator():
last_seen = initial_last_seen
deadline = time.monotonic() + 3600
run = await get_run(run_id)
if not run:
yield "data: [DONE]\n\n"
return
decisions = await list_decisions_for_run(run_id)
answered_ids = {d.id for d in decisions if d.status == "answered"}
if run.status in ("completed", "failed"):
events = await get_run_events_since(run_id, since=last_seen)
for event in events:
payload = dict(event.payload)
if event.type == "decision_card":
if payload.get("decisionId") in answered_ids:
payload["answered"] = True
ts = event.created_at.isoformat()
yield f"id: {ts}\ndata: {json.dumps({'type': event.type, 'timestamp': ts, **payload})}\n\n"
yield "data: [DONE]\n\n"
return
while time.monotonic() < deadline:
if await request.is_disconnected():
return
events = await get_run_events_since(run_id, since=last_seen)
for event in events:
payload = dict(event.payload)
if event.type == "decision_card":
if payload.get("decisionId") in answered_ids:
payload["answered"] = True
ts = event.created_at.isoformat()
yield f"id: {ts}\ndata: {json.dumps({'type': event.type, 'timestamp': ts, **payload})}\n\n"
last_seen = event.created_at
run = await get_run(run_id)
if run and run.status in ("completed", "failed"):
yield "data: [DONE]\n\n"
return
yield ": ping\n\n"
await asyncio.sleep(2)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)Event IDs are ISO 8601 timestamps. The client sends the most recent one back as Last-Event-ID on reconnect, and get_run_events_since(run_id, since=last_seen) filters by created_at > last_seen. Reconnecting clients skip everything they've already processed without any client-side bookkeeping beyond the standard SSE Last-Event-ID mechanism.
request.is_disconnected() at the top of each loop iteration is the disconnect guard. FastAPI does not automatically stop a StreamingResponse generator when the client drops: the generator keeps running and yielding until it tries to write to the closed connection and gets a transport error. Checking explicitly exits the generator cleanly on the next poll cycle. The pipeline run is completely unaffected by the disconnect: it's running in a fire_task background coroutine with no reference to the SSE request.
The : ping\n\n frame is a comment in the SSE protocol, not a data event. It keeps the connection alive through proxies that close idle connections and prevents the browser's EventSource from treating silence as a failure. The one-hour deadline is the absolute upper bound: a run that has been active for over an hour with no terminal event is most likely stuck, and keeping the generator alive past that point serves no purpose.
decision_card events get an answered field injected before they're sent. When a client reconnects after the human has already answered a decision, the card arrives with answered: true and the UI can render the correct post-answer state without a separate fetch. The answered_ids set is loaded once at stream open from the database. For reconnecting clients this is accurate: a client that answers a question and immediately reconnects will see the correct answered: true on the card because the database record is updated before the resume call is fired.
The full lifecycle ties together cleanly. A run starts with trigger_run_for_org, which calls fire_task(_run_graph(...)) and returns a run ID. The graph executes, phases update phase in their return dicts, and route_phase handles all transitions. When a phase needs a human decision, it routes to ask_user_node, which creates a DB record and calls interrupt(). The state serializes to Postgres, execution unwinds, and _execute_graph catches GraphBubbleUp and sets status to suspended. The frontend streams a decision_card event via SSE, renders the question, and waits. When the human responds, POST /runs/{run_id}/resume fires a resume_graph background task that calls ainvoke with Command(resume=answer). LangGraph loads the checkpoint, ask_user_node receives the answer, and execution continues through the remaining phases.
If the FastAPI process restarts while a run is suspended, nothing is lost. The checkpoint is in Postgres, the run record shows suspended, and the next resume call to the new process instance reloads everything from the checkpoint and continues as if no restart occurred.
why compile the graph once instead of rebuilding it per request?
A CompiledStateGraph in LangGraph is stateless between invocations: all persistent data lives in the checkpointer, not in the graph object. Compiling once avoids rebuilding the entire node map, conditional edge functions, and checkpointer bindings on every request. The graph object also captures a reference to the checkpointer and store at compile time. Compiling per-request would require passing those dependencies in at every invocation, which adds indirection without benefit. The compilation cost is non-trivial for larger graphs with many nodes and conditional edges. Paying it once at startup means every subsequent ainvoke call gets the pre-built object at no additional cost.
why does the checkpointer need a separate database connection pool?
Two reasons. First, LangGraph's AsyncPostgresSaver requires autocommit=True and prepare_threshold=None on its connections. These settings conflict with how the application's main pool is configured: the app pool uses explicit transactions in several places, and server-side prepared statements improve performance for high-frequency application queries. Sharing the pool would require one set of connection settings to satisfy both use cases, which is not possible with psycopg3. Second, checkpointer operations happen at specific points in graph execution, not in response to HTTP requests. A separate pool prevents checkpoint writes from competing with application database queries under load.
what is GraphBubbleUp and when does it get raised?
GraphBubbleUp is the exception LangGraph raises when a graph execution suspends at an interrupt point. When interrupt() is called inside a node, LangGraph serializes the current state to the checkpointer, then raises GraphInterrupt internally. This gets wrapped and re-raised as GraphBubbleUp by the time it propagates to the ainvoke() call site. It signals to the caller that the graph has suspended and is waiting for a resume. The execution wrapper catches GraphBubbleUp to set the run status to suspended. The fallback check on state.next handles cases where the exception is caught somewhere in middleware before it reaches the caller.
how does Command(resume=) know which interrupt() point to resume from?
LangGraph tracks the interrupt position in the checkpoint. When interrupt() suspends execution, the checkpoint records which node was running and which specific interrupt() call within that node was reached. When ainvoke is called with Command(resume=answer) using the same thread_id, LangGraph loads the checkpoint, finds the suspended position, and resumes execution from that exact point with answer as the return value of interrupt(). You do not specify the interrupt point explicitly in the Command: the checkpoint contains all information needed to resume correctly. If a node calls interrupt() multiple times in sequence, each call is tracked separately in the checkpoint.
what happens when the SSE client disconnects mid-run?
The event_generator detects disconnection via await request.is_disconnected() at the top of each poll iteration. FastAPI does not automatically stop a StreamingResponse generator when the client disconnects: the generator keeps running and yielding until it tries to write to the closed connection and gets a write error. Checking is_disconnected() explicitly exits the generator cleanly without waiting for a write failure. The pipeline run itself is unaffected by the SSE disconnection: it runs in a background asyncio task via fire_task with no connection to the SSE stream. The client can reconnect at any time and use the Last-Event-ID header to resume from the last event it received.
can the pipeline resume after a FastAPI process restart?
Yes, because all graph state is serialized to the Postgres checkpoint on every node transition. If the process restarts while a run is suspended at an interrupt point, the run record in the database still shows status=suspended and the checkpoint still exists in Postgres. When the human answers via POST /runs/{run_id}/resume, resume_graph loads the checkpoint using the run_id as the thread_id and calls ainvoke with Command(resume=answer). LangGraph restores the full PipelineState from the checkpoint and resumes execution exactly where it left off. The only state that would not survive a restart is anything held in Python memory that was not written into PipelineState and therefore not serialized to the checkpoint.