openarmature.checkpoint¶
openarmature.checkpoint — checkpointing capability.
Public surface: the typed :class:Checkpointer Protocol,
:class:CheckpointRecord / :class:NodePosition /
:class:CheckpointSummary shapes, the checkpoint error categories,
and two reference backends (in-memory and SQLite).
Users register a backend at graph build time via
GraphBuilder.with_checkpointer(...); the engine then fires saves
at every completed event for outermost-graph nodes and
subgraph-internal nodes, and invoke(resume_invocation=X) loads +
restores from a prior record.
InMemoryCheckpointer ¶
Dict-backed Checkpointer.
Durability: none. Records live for the lifetime of this instance only; restarting the process loses everything. Appropriate for unit tests, the dev loop, and short-lived in-process pipelines that don't need crash recovery.
State shape: any. The record is held by reference, so the
Pydantic state instance the engine produces is what comes back
from :meth:load — no serialization round-trip. (This is the
feature: tests can assert on the saved state's identity.)
SQLiteCheckpointer ¶
SQLite Checkpointer with WAL-mode durability.
Retention: upsert — one row per invocation_id, overwritten
on every save. Saved records are NOT historical: only the most
recent save for any given invocation_id is retained.
Cross-language portability: depends on the serialization
constructor argument. "pickle" is Python-only; "json"
works across languages but is restricted to JSON-native state
shapes (the engine's Pydantic state must successfully
model_dump(mode="json")).
CheckpointError ¶
Bases: Exception
Base for all checkpoint errors. Each subclass carries a
category class attribute matching its canonical category
string.
CheckpointNotFound ¶
Bases: CheckpointError
Raised when invoke(resume_invocation=X) is called and
Checkpointer.load(X) returns None. Non-transient — the
record genuinely does not exist; retrying without changing the
invocation_id will never succeed.
CheckpointRecordInvalid ¶
Bases: CheckpointError
Raised when Checkpointer.load(X) returns a record whose
schema is incompatible with the current graph (state shape
mismatch, missing required fields, or
schema_version mismatch). Non-transient — the persisted
record was written by an incompatible version of the engine.
CheckpointSaveFailed ¶
Bases: CheckpointError
Raised when Checkpointer.save itself raises during a
completed event handler. Engine behavior on save failure is
implementation-defined; this implementation raises to the caller
of invoke() immediately and does NOT retry the save itself
(documented on :meth:CompiledGraph.invoke).
Checkpointer ¶
Bases: Protocol
Persistence seam for graph invocations.
Implementations MUST be safe to share across concurrent
invocations of the same graph (the engine does not serialize
access). Each operation MUST be thread-safe (Python) /
task-coroutine-safe (asyncio); backends with synchronous I/O
typically wrap their work in asyncio.to_thread or equivalent.
save
async
¶
save(invocation_id: str, record: CheckpointRecord) -> None
Persist record for invocation_id. After return the
record MUST be durable across process crashes for backends
that document durability (in-memory backends are not durable
and MUST document this). Synchronous-by-contract: the engine
awaits this call before continuing to the next node so a
crash immediately after a completed event cannot have
lost the corresponding save.
load
async
¶
load(invocation_id: str) -> CheckpointRecord | None
Return the most recent record for invocation_id or
None if no record exists. The returned record MUST be
structurally identical to what save last wrote for this
invocation_id (round-trip integrity).
list
async
¶
list(
filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]
Enumerate saved invocations. The filter shape is
backend-defined; this implementation ships list_all and
list_by_correlation_id predicates.
delete
async
¶
Remove all records for invocation_id. MUST be a no-op
when the invocation_id has no record (no error).
CheckpointFilter
dataclass
¶
Predicate for :meth:Checkpointer.list. v1 ships two narrow
fields; richer query DSLs are deferred to follow-on work.
correlation_id— match records whosecorrelation_idequals the supplied value.Nonematches every record (the "list all" case).
CheckpointRecord
dataclass
¶
CheckpointRecord(
invocation_id: str,
correlation_id: str,
state: Any,
completed_positions: tuple[NodePosition, ...],
parent_states: tuple[Any, ...],
last_saved_at: float,
schema_version: str = CHECKPOINT_SCHEMA_VERSION,
fan_out_progress: None = None,
)
One invocation's progress at one save point.
Frozen — backends MUST treat the record as immutable; the engine
builds a fresh record per completed event rather than mutating
a shared one. The fan_out_progress field is reserved for a
future per-instance fan-out resume mode; in the shipping version
it is always None.
CheckpointSummary
dataclass
¶
CheckpointSummary(
invocation_id: str,
correlation_id: str,
last_saved_at: float,
completed_node_count: int,
)
Lightweight record-level metadata returned by
:meth:Checkpointer.list.
Implementations MAY add backend-specific fields; the four declared here are the cross-backend portable subset callers can rely on.
NodePosition
dataclass
¶
NodePosition(
namespace: tuple[str, ...],
node_name: str,
step: int,
attempt_index: int = 0,
fan_out_index: int | None = None,
)
A single completed-node coordinate in the resume map.
Frozen + automatically hashable (no mutable fields), so positions
can live in sets and dict keys — the engine's resume-entry
derivation relies on set membership to skip nodes that have
already completed.
Fields:
namespace— chain of containing-graph node names from outermost down to (but not including) this node. Empty for outermost-graph nodes; one entry for subgraph-internal nodes; two entries when nested two deep, and so on. Distinct fromNodeEvent.namespacewhich includes the node's own name —NodeEvent.namespace == NodePosition.namespace + (NodePosition.node_name,).node_name— the node's local name in its containing graph.step— the monotonic step counter at the time the node completed (shared withNodeEvent.step).attempt_index— 0-based retry attempt index. The final successful attempt's index is what gets recorded.fan_out_index— populated only for events from inside a fan-out instance. Those events do NOT produce records in the shipping version; the field is part of the position shape so a future per-instance fan-out resume can populate it without a record-shape migration.