Skip to content

openarmature.checkpoint

openarmature.checkpoint — checkpointing capability.

Public surface: the typed :class:Checkpointer Protocol, :class:CheckpointRecord / :class:NodePosition / :class:CheckpointSummary shapes, the checkpoint error categories, and two reference backends (in-memory and SQLite).

Users register a backend at graph build time via GraphBuilder.with_checkpointer(...); the engine then fires saves at every completed event for outermost-graph nodes and subgraph-internal nodes, and invoke(resume_invocation=X) loads + restores from a prior record.

InMemoryCheckpointer

InMemoryCheckpointer()

Dict-backed Checkpointer.

Durability: none. Records live for the lifetime of this instance only; restarting the process loses everything. Appropriate for unit tests, the dev loop, and short-lived in-process pipelines that don't need crash recovery.

State shape: any. The record is held by reference, so the Pydantic state instance the engine produces is what comes back from :meth:load — no serialization round-trip. (This is the feature: tests can assert on the saved state's identity.)

SQLiteCheckpointer

SQLiteCheckpointer(
    path: str | Path,
    *,
    serialization: SerializationMode = "pickle"
)

SQLite Checkpointer with WAL-mode durability.

Retention: upsert — one row per invocation_id, overwritten on every save. Saved records are NOT historical: only the most recent save for any given invocation_id is retained.

Cross-language portability: depends on the serialization constructor argument. "pickle" is Python-only; "json" works across languages but is restricted to JSON-native state shapes (the engine's Pydantic state must successfully model_dump(mode="json")).

CheckpointError

Bases: Exception

Base for all checkpoint errors. Each subclass carries a category class attribute matching its canonical category string.

CheckpointNotFound

CheckpointNotFound(invocation_id: str)

Bases: CheckpointError

Raised when invoke(resume_invocation=X) is called and Checkpointer.load(X) returns None. Non-transient — the record genuinely does not exist; retrying without changing the invocation_id will never succeed.

CheckpointRecordInvalid

CheckpointRecordInvalid(invocation_id: str, message: str)

Bases: CheckpointError

Raised when Checkpointer.load(X) returns a record whose schema is incompatible with the current graph (state shape mismatch, missing required fields, or schema_version mismatch). Non-transient — the persisted record was written by an incompatible version of the engine.

CheckpointSaveFailed

CheckpointSaveFailed(
    invocation_id: str, cause: BaseException
)

Bases: CheckpointError

Raised when Checkpointer.save itself raises during a completed event handler. Engine behavior on save failure is implementation-defined; this implementation raises to the caller of invoke() immediately and does NOT retry the save itself (documented on :meth:CompiledGraph.invoke).

Checkpointer

Bases: Protocol

Persistence seam for graph invocations.

Implementations MUST be safe to share across concurrent invocations of the same graph (the engine does not serialize access). Each operation MUST be thread-safe (Python) / task-coroutine-safe (asyncio); backends with synchronous I/O typically wrap their work in asyncio.to_thread or equivalent.

save async

save(invocation_id: str, record: CheckpointRecord) -> None

Persist record for invocation_id. After return the record MUST be durable across process crashes for backends that document durability (in-memory backends are not durable and MUST document this). Synchronous-by-contract: the engine awaits this call before continuing to the next node so a crash immediately after a completed event cannot have lost the corresponding save.

load async

load(invocation_id: str) -> CheckpointRecord | None

Return the most recent record for invocation_id or None if no record exists. The returned record MUST be structurally identical to what save last wrote for this invocation_id (round-trip integrity).

list async

list(
    filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]

Enumerate saved invocations. The filter shape is backend-defined; this implementation ships list_all and list_by_correlation_id predicates.

delete async

delete(invocation_id: str) -> None

Remove all records for invocation_id. MUST be a no-op when the invocation_id has no record (no error).

CheckpointFilter dataclass

CheckpointFilter(correlation_id: str | None = None)

Predicate for :meth:Checkpointer.list. v1 ships two narrow fields; richer query DSLs are deferred to follow-on work.

  • correlation_id — match records whose correlation_id equals the supplied value. None matches every record (the "list all" case).

CheckpointRecord dataclass

CheckpointRecord(
    invocation_id: str,
    correlation_id: str,
    state: Any,
    completed_positions: tuple[NodePosition, ...],
    parent_states: tuple[Any, ...],
    last_saved_at: float,
    schema_version: str = CHECKPOINT_SCHEMA_VERSION,
    fan_out_progress: None = None,
)

One invocation's progress at one save point.

Frozen — backends MUST treat the record as immutable; the engine builds a fresh record per completed event rather than mutating a shared one. The fan_out_progress field is reserved for a future per-instance fan-out resume mode; in the shipping version it is always None.

CheckpointSummary dataclass

CheckpointSummary(
    invocation_id: str,
    correlation_id: str,
    last_saved_at: float,
    completed_node_count: int,
)

Lightweight record-level metadata returned by :meth:Checkpointer.list.

Implementations MAY add backend-specific fields; the four declared here are the cross-backend portable subset callers can rely on.

NodePosition dataclass

NodePosition(
    namespace: tuple[str, ...],
    node_name: str,
    step: int,
    attempt_index: int = 0,
    fan_out_index: int | None = None,
)

A single completed-node coordinate in the resume map.

Frozen + automatically hashable (no mutable fields), so positions can live in sets and dict keys — the engine's resume-entry derivation relies on set membership to skip nodes that have already completed.

Fields:

  • namespace — chain of containing-graph node names from outermost down to (but not including) this node. Empty for outermost-graph nodes; one entry for subgraph-internal nodes; two entries when nested two deep, and so on. Distinct from NodeEvent.namespace which includes the node's own name — NodeEvent.namespace == NodePosition.namespace + (NodePosition.node_name,).
  • node_name — the node's local name in its containing graph.
  • step — the monotonic step counter at the time the node completed (shared with NodeEvent.step).
  • attempt_index — 0-based retry attempt index. The final successful attempt's index is what gets recorded.
  • fan_out_index — populated only for events from inside a fan-out instance. Those events do NOT produce records in the shipping version; the field is part of the position shape so a future per-instance fan-out resume can populate it without a record-shape migration.