Checkpointing¶
Save state at every node boundary; resume a crashed run from the last
saved point on a subsequent invoke(). Without a checkpointer, the
engine holds no state across invocations — a crash means start-from-entry.
Wiring a checkpointer¶
Register at build time via with_checkpointer:
from openarmature.checkpoint import SQLiteCheckpointer
checkpointer = SQLiteCheckpointer(db_path="./checkpoints.db")
graph = (
GraphBuilder(MyState)
.add_node("step_a", step_a)
.add_node("step_b", step_b)
.add_edge("step_a", "step_b")
.add_edge("step_b", END)
.set_entry("step_a")
.with_checkpointer(checkpointer)
.compile()
)
The engine writes a record at every completed event for outermost-
graph nodes and subgraph-internal nodes. Fan-out instance internal
events do NOT save in the shipping version — atomic-restart is the
fan-out contract.
Saves are synchronous-by-contract¶
The engine awaits every Checkpointer.save before continuing to
the next node. This is the load-bearing property that makes
checkpointing useful at all: a crash immediately
after a completed event cannot have lost the corresponding save,
because the save resolves before the next node runs.
The corollary: slow backends throttle execution. Wrapping a high-
latency persistence layer in a checkpointer makes the whole graph
run at its latency. Plan accordingly — async writes inside the
backend (e.g., asyncio.to_thread around a sync driver) are fine;
fire-and-forget patterns that return before durability is established
violate the contract.
Resuming¶
Pass resume_invocation to invoke():
- If a record exists for that
invocation_id, the engine restores state fromrecord.state(orparent_stateschain for subgraph- internal resumes), reconstructs the completed-node set fromrecord.completed_positions, and continues from the first not-yet- completed node. - If no record exists, the engine raises
CheckpointNotFound. It does NOT silently start a fresh run; the user must explicitly handle the not-found case (typically: drop theresume_invocation=and re-invoke without it for a fresh start).
CheckpointRecordInvalid surfaces when a record's schema_version
doesn't match the current CHECKPOINT_SCHEMA_VERSION, or when its
persisted state can't be re-validated against the current state class
(state-shape mismatch after a refactor).
What a CheckpointRecord carries¶
@dataclass(frozen=True)
class CheckpointRecord:
invocation_id: str
correlation_id: str
state: Any
completed_positions: tuple[NodePosition, ...]
parent_states: tuple[Any, ...]
last_saved_at: float
schema_version: str = CHECKPOINT_SCHEMA_VERSION
fan_out_progress: None = field(default=None)
Field framing worth getting right:
completed_positionsis history, not "next." It's the list ofNodePositions that have already completed. Resume works by replaying that list to derive the next node, not by reading a pointer to the next node. This is why the field is plural and why the framing matters: every saved node contributes a position, and resume walks the graph skipping every position that's already there.correlation_id≠invocation_id.invocation_ididentifies this graph run uniquely.correlation_idis a cross-system identifier propagated via ContextVar — multiple invocations related by a higher-level request can share onecorrelation_idwhile each having its owninvocation_id. See Observability for howcorrelation_idthreads through logs and spans.parent_statesis the chain of containing-graph snapshots. Outermost first; empty for an outer-level save. Inner-node saves populate it so resume can re-enter a subgraph from the right depth without re-projecting.fan_out_progress: Noneis reserved for a future per-instance fan-out resume mode (planned, not yet shipped). In the shipping version it's alwaysNone.
The Checkpointer Protocol¶
Four methods:
class Checkpointer(Protocol):
async def save(self, invocation_id: str, record: CheckpointRecord) -> None: ...
async def load(self, invocation_id: str) -> CheckpointRecord | None: ...
async def list(self, filter: CheckpointFilter | None = None) -> Iterable[CheckpointSummary]: ...
async def delete(self, invocation_id: str) -> None: ...
save— persist the record underinvocation_id. Durable for any backend that documents durability. Synchronous-by-contract per the section above.load— return the most recent record forinvocation_id, orNone. Round-trip-stable with whatsavewrote.list— enumerate saved invocations, optionally filtered byCheckpointFilter(currently a singlecorrelation_idfield; v1 ships intentionally narrow).delete— remove all records forinvocation_id. No-op if the invocation has no record (no error).
Backends MUST be safe to share across concurrent invocations; the
engine doesn't serialize access. For backends with sync I/O, the
standard pattern is asyncio.to_thread around the actual driver
call.
Two built-in backends¶
InMemoryCheckpointer— backed by a dict in process memory. Loses everything on process exit. Useful for tests and short-lived contexts that want the API surface without disk overhead.SQLiteCheckpointer— backed by a SQLite database file. Survives process exit. Reasonable default for any non-trivial use.
Custom backends just implement the four-method Protocol. Targets that make sense: Redis (ephemeral, network-shared), Postgres (durable, multi-process), S3 (cross-region durability). For event-sourced runtimes (Temporal, DBOS, Restate, Inngest) the Protocol is the adapter layer.
When NOT to use checkpointing¶
- Pure pipelines that complete in seconds. Restart-from-entry is cheap; checkpoints are pure overhead.
- Pipelines whose external side effects can't safely be re-played. If node A sends an email, resuming from after A means the email has already sent — fine if your downstream is idempotent, surprising if it isn't. Reason explicitly about replay semantics before turning on resume.
What checkpointing is NOT¶
- Not a database. It's a serialization/deserialization seam for state, not a query layer. Don't drive analytics off saved records; emit observability events instead.
- Not human-in-the-loop. Pausing for human input is a separate capability; checkpointing is just "save and resume," not "pause and wait."
- Not a workflow orchestrator. Long-running, multi-process, cross-system orchestration belongs at a higher layer (Temporal, Airflow). Checkpointing is for crash-recovery and resumability within one logical run.