Skip to content

openarmature.graph

Public API for the OpenArmature graph engine.

Re-exports the surface a user touches when building and running a graph: the state schema base, reducers, the builder/compiled pair, edge primitives and the END sentinel, the node/subgraph/projection seams, and the canonical compile-time and runtime error categories.

GraphBuilder

GraphBuilder(state_cls: type[StateT])

Mutable builder for a graph; call compile() to produce a CompiledGraph.

add_fan_out_node

add_fan_out_node(
    name: str,
    *,
    subgraph: CompiledGraph[ChildT],
    collect_field: str,
    target_field: str,
    items_field: str | None = None,
    item_field: str | None = None,
    count: int | CountResolver | None = None,
    concurrency: int | ConcurrencyResolver | None = 10,
    error_policy: str = "fail_fast",
    on_empty: str = "raise",
    count_field: str | None = None,
    inputs: Mapping[str, str] | None = None,
    extra_outputs: Mapping[str, str] | None = None,
    instance_middleware: Iterable[Middleware] | None = None,
    errors_field: str | None = None,
    middleware: Iterable[Middleware] | None = None
) -> Self

Register a fan-out node.

Validates configuration at registration time:

  • Exactly one of items_field or count MUST be specified (fan_out_count_mode_ambiguous otherwise).
  • items_field MUST refer to a list-typed field on the parent state schema (fan_out_field_not_list otherwise).
  • items_field mode requires item_field; count mode forbids item_field.
  • on_empty and error_policy MUST be one of the permitted string literals ("raise" / "noop" and "fail_fast" / "collect" respectively).
  • inputs / extra_outputs / count_field field references go through the existing mapping_references_undeclared_field rule.

with_checkpointer

with_checkpointer(checkpointer: Checkpointer) -> Self

Register a Checkpointer for the compiled graph.

At most one Checkpointer per graph; calling with_checkpointer again replaces the previously-stored one. Pass the result of :meth:compile to :meth:CompiledGraph.invoke as usual; the engine fires saves at every completed event for outermost-graph and subgraph-internal nodes.

add_middleware

add_middleware(middleware: Middleware) -> Self

Register a per-graph middleware applied to every node in this graph.

Per-graph middleware composes OUTSIDE per-node middleware. Calling order is preserved (outer-to-inner) — earlier add_middleware calls produce outer layers in the runtime chain.

CompiledGraph dataclass

CompiledGraph(
    state_cls: type[StateT],
    entry: str,
    nodes: Mapping[str, Node[StateT]],
    edges: Mapping[
        str, StaticEdge | ConditionalEdge[StateT]
    ],
    reducers: Mapping[str, Reducer],
    middleware: tuple[Middleware, ...] = (),
    _attached_observers: list[SubscribedObserver] = list[
        SubscribedObserver
    ](),
    _active_workers: set[Task[None]] = set[Task[None]](),
    _checkpointer_slot: list[Checkpointer | None] = (
        lambda: [None]
    )(),
)

An immutable, executable graph produced by GraphBuilder.compile().

The compile-time topology (state class, entry, nodes, edges, reducers) is immutable. Two mutable lists ride alongside for observer plumbing — _attached_observers and _active_workers — neither of which affect the compiled topology and both of which are scoped to the same instance.

checkpointer property

checkpointer: Checkpointer | None

Currently-registered Checkpointer, or None.

attach_observer

attach_observer(
    observer: Observer,
    *,
    phases: Iterable[str] | None = None
) -> RemoveHandle

Register a graph-attached observer.

Graph-attached observers fire on every invocation of this graph until removed — including when this graph runs as a subgraph inside a parent. Returns a RemoveHandle whose .remove() method detaches the observer; idempotent.

phases selects the phase strings ("started", "completed") the observer subscribes to; default is both. An empty phases set raises ValueError at registration time.

Changes to the registered set during a graph run do NOT take effect until the next invocation. The set of observers delivering events for an in-flight invocation is fixed at the point the invocation begins.

attach_checkpointer

attach_checkpointer(
    checkpointer: Checkpointer | None,
) -> None

Register a Checkpointer for this graph.

Pass None to clear a previously-registered backend. Without a registered Checkpointer the engine never calls save() and invoke(resume_invocation=...) raises checkpoint_not_found.

At most one Checkpointer per graph. Calling attach_checkpointer again replaces the previously- registered one; multi-backend fan-out is the user's responsibility (wrap two underlying Checkpointers behind a custom protocol-conforming implementation if needed).

drain async

drain() -> None

Await delivery of every observer event produced by prior invocations of this graph.

Callers running in short-lived processes (scripts, serverless functions, CLIs) MUST use drain to avoid losing observer events that were dispatched but not yet delivered.

Only events dispatched before this call are awaited; events from invocations started concurrently with drain may or may not be included. Subgraph events from active invocations are part of the parent invocation's worker and are covered automatically.

Unbounded by design. Drain blocks until every queued event has been delivered to every subscribed observer. A slow, hung, or misbehaving observer can therefore hold drain — and the calling process — indefinitely. If you need a bounded wait, wrap the call in asyncio.wait_for and accept that events still queued when the deadline elapses will not be delivered::

await asyncio.wait_for(compiled.drain(), timeout=5.0)

invoke async

invoke(
    initial_state: StateT,
    observers: (
        Iterable[Observer | SubscribedObserver] | None
    ) = None,
    *,
    correlation_id: str | None = None,
    resume_invocation: str | None = None
) -> StateT

Run the graph from initial_state to END and return the final state.

Optional observers are invocation-scoped — they fire only for this run, after all graph-attached observers (including subgraph-attached ones for events originating in subgraphs).

Each entry in observers may be either a bare Observer callable (subscribes to both phases) or a SubscribedObserver wrapping an observer with an explicit phases set.

This method returns as soon as the graph execution loop completes, regardless of whether the observer delivery queue has finished processing every dispatched event. Use await compiled.drain() if you need delivery-completion guarantees.

Checkpointing.

  • correlation_id is the per-invocation cross-backend join key. Caller-supplied or auto-generated UUIDv4 when absent. Preserved unchanged across resume_invocation.
  • resume_invocation names a prior invocation_id to resume from. Requires a registered Checkpointer; raises CheckpointNotFound when the backend has no record for the supplied id, CheckpointRecordInvalid when the loaded record's schema is incompatible. Resume mints a NEW invocation_id — each attempt is its own invocation in the observability sense; the correlation_id is the cross-attempt join key.
  • Save-failure policy. This implementation raises CheckpointSaveFailed to the caller of invoke() immediately when Checkpointer.save raises; saves are NOT retried by the engine. Wrap the Checkpointer in your own retry logic if transient backend failures should be reattempted.

Raises one of the runtime error categories on failure.

ConditionalEdge dataclass

ConditionalEdge(
    source: str, fn: Callable[[StateT], str | EndSentinel]
)

Routes from source to whichever node fn(state) returns. The function MUST return either a declared node name or END; any other value raises RoutingError at runtime.

EndSentinel

Engine-provided sentinel routing target. Use the module-level END.

StaticEdge dataclass

StaticEdge(source: str, target: str | EndSentinel)

Always routes from source to target.

CompileError

Bases: GraphError

Base for compile-time errors.

FanOutCountModeAmbiguous

FanOutCountModeAmbiguous(node_name: str, message: str)

Bases: CompileError

Raised when a fan-out node specifies both items_field and count, or neither. Exactly one is required.

FanOutEmpty

FanOutEmpty(node_name: str, recoverable_state: Any)

Bases: NodeException

Raised when a fan-out node resolves to zero instances while its on_empty config is "raise" (the default).

Surfaces as a regular node_exception (so it integrates with the existing error propagation and recoverable-state machinery) but exposes an additional fan_out_category attribute so callers can distinguish empty-fan-out from generic node failures.

FanOutFieldNotList

FanOutFieldNotList(node_name: str, field_name: str)

Bases: CompileError

Raised when a fan-out node's items_field does not refer to a declared list-typed field on the parent state schema.

FanOutInvalidConcurrency

FanOutInvalidConcurrency(
    node_name: str,
    returned: int | None,
    recoverable_state: Any,
)

Bases: NodeException

Raised when a fan-out node's concurrency callable returns zero or a negative integer at runtime. Same node-exception shape as :class:FanOutEmpty.

FanOutInvalidCount

FanOutInvalidCount(
    node_name: str, returned: int, recoverable_state: Any
)

Bases: NodeException

Raised when a fan-out node's count callable returns a negative integer at runtime. Same node-exception shape as :class:FanOutEmpty, with fan_out_category = "fan_out_invalid_count".

GraphError

Bases: Exception

Base for all graph-engine errors.

MappingReferencesUndeclaredField

MappingReferencesUndeclaredField(
    *, direction: str, side: str, field_name: str
)

Bases: CompileError

Raised when a subgraph-as-node inputs or outputs mapping names a field that is not declared in the relevant state schema.

RuntimeGraphError

Bases: GraphError

Base for runtime errors. The four non-validation categories carry a recoverable_state attribute.

StateValidationError

StateValidationError(
    message: str,
    fields: list[str],
    cause: BaseException | None = None,
)

Bases: RuntimeGraphError

State failed schema validation at a graph boundary.

Unlike the other runtime errors, this category does NOT carry recoverable_state — at entry there is no prior state to recover; at exit the failing state IS the final state.

NodeEvent dataclass

NodeEvent(
    node_name: str,
    namespace: tuple[str, ...],
    step: int,
    phase: Literal[
        "started", "completed", "checkpoint_saved"
    ],
    pre_state: State,
    post_state: State | None,
    error: RuntimeGraphError | None,
    parent_states: tuple[State, ...],
    attempt_index: int = 0,
    fan_out_index: int | None = None,
    fan_out_config: FanOutEventConfig | None = None,
)

A single node-boundary event delivered to observers.

  • phase is "started" (dispatched before the node runs) or "completed" (dispatched after the node returns or raises and the merge runs/fails). Each node attempt produces exactly one of each in that order. The engine ALSO dispatches a "checkpoint_saved" event on the same shape after a successful Checkpointer.save call — observers MUST opt in explicitly via phases={"checkpoint_saved"} to receive these (default subscription is {"started", "completed"} only, so legacy observers don't see them).
  • node_name is the name under which this node was registered in its immediate containing graph.
  • namespace is an ordered sequence of node names from the outermost graph down to this node. For a node in the outermost graph, namespace is (node_name,). For nested subgraphs, the chain extends.
  • step is a monotonically-increasing counter starting at 0, scoped to a single outermost invocation. Subgraph-internal nodes increment the same counter. The started/completed pair for one attempt share the same step.
  • pre_state is the state the node received, before reducer merge. Populated on both phases (identical across the pair).
  • post_state is the state after the node's partial update merged successfully. Populated only on completed events that succeeded.
  • error is the wrapped runtime error (NodeException, ReducerError, or StateValidationError) when the node failed. Populated only on completed events that failed.
  • parent_states carries one state snapshot per containing graph, outermost first; for a node in the outermost graph it's an empty tuple. Invariant: len(parent_states) == len(namespace) - 1.
  • attempt_index is the 0-based index of this attempt among any retries. 0 for nodes not wrapped by retry middleware.
  • fan_out_index is the 0-based index of this fan-out instance among its siblings. None for nodes not inside a fan-out.
  • fan_out_config carries resolved fan-out configuration on events from a fan-out NODE itself. See :class:FanOutEventConfig. None on every other event.

Invariants:

  • On started events, post_state and error MUST both be None.
  • On completed events, exactly one of post_state and error is populated.

FanOutConfig dataclass

FanOutConfig(
    subgraph: CompiledGraph[Any],
    collect_field: str,
    target_field: str,
    items_field: str | None = None,
    item_field: str | None = None,
    count: int | CountResolver | None = None,
    concurrency: int | ConcurrencyResolver | None = 10,
    error_policy: Literal[
        "fail_fast", "collect"
    ] = "fail_fast",
    on_empty: Literal["raise", "noop"] = "raise",
    count_field: str | None = None,
    inputs: Mapping[str, str] = dict[str, str](),
    extra_outputs: Mapping[str, str] = dict[str, str](),
    instance_middleware: tuple[Middleware, ...] = (),
    errors_field: str | None = None,
)

Frozen configuration for a :class:FanOutNode.

Validation happens at builder compile time (see GraphBuilder.add_fan_out_node); construction here is unchecked beyond the obvious type-level constraints.

FanOutNode dataclass

FanOutNode(
    name: str,
    config: FanOutConfig,
    middleware: tuple[Middleware, ...] = (),
)

A node that fans out into N concurrent subgraph instances.

The Node Protocol contract requires name, middleware, and run; run here is unusual because it needs the engine invocation context to descend properly. The engine recognizes this type in _invoke and calls run_with_context (see compiled.CompiledGraph._step_fan_out_node) rather than the plain run(state) shape. run exists for Protocol conformance only and raises if anyone calls it without context.

run_with_context async

run_with_context(
    state: ParentT,
    context: _InvocationContext,
    *,
    pre_resolved_count: int | None = None,
    pre_resolved_concurrency: (
        tuple[int | None] | None
    ) = None
) -> Mapping[str, Any]

Execute the fan-out and return the merged partial update.

Snapshot, resolve count + concurrency, build per-instance states, run concurrently with the configured error policy, fan-in collected/extra fields, write count_field and errors_field if configured.

pre_resolved_count / pre_resolved_concurrency are the proposal-0013 v0.10.0 hooks: when the engine has already resolved the config eagerly to populate NodeEvent.fan_out_config for the fan-out node's events, it passes the resolved values in so callable resolvers aren't invoked twice. pre_resolved_concurrency is wrapped in a 1-tuple to disambiguate "caller passed None (unbounded)" from "caller didn't pass anything."

Middleware

Bases: Protocol

An async callable that wraps the dispatch of a single node.

The shape is (state, next) -> partial_update. The middleware MUST return a mapping of field names to values — same shape a node returns. It may:

  • Inspect or transform state before calling next(state).
  • Inspect or transform the partial update returned from next.
  • Short-circuit by NOT calling next and returning its own partial (the rest of the chain — subsequent middleware and the wrapped node — does not execute).
  • Catch exceptions raised by next(state) and either re-raise, transform, or recover (returning a partial update instead of raising).
  • Call next more than once (e.g., retry middleware).

A middleware MUST NOT mutate the input state object — pass a new state to next if a transformation is needed.

RetryMiddleware

RetryMiddleware(
    *,
    max_attempts: int = 3,
    classifier: Classifier | None = None,
    backoff: BackoffStrategy | None = None,
    on_retry: OnRetryCallback | None = None
)

Canonical retry middleware.

Configuration:

  • max_attempts: total attempts including the first call. 1 disables retry. Default 3.
  • classifier: predicate (exception, state) -> bool. Default :func:default_classifier (matches category against TRANSIENT_CATEGORIES).
  • backoff: callable (attempt_index) -> seconds. Default :func:exponential_jitter_backoff (base 1s, cap 30s, full jitter).
  • on_retry: optional async callback (exception, attempt_index) -> None. Fires before each sleep.

TimingMiddleware

TimingMiddleware(
    *,
    node_name: str,
    on_complete: OnCompleteCallback,
    clock: Callable[[], float] | None = None
)

Canonical timing middleware.

Records wall-clock duration of the wrapped chain via the host language's monotonic clock (Python's time.monotonic). The callback fires inline before the chain's result returns to the caller — slow callbacks add to the apparent node duration, so users SHOULD keep them fast (queue work, defer I/O).

Errors raised by on_complete propagate to the engine as a node_exception.

TimingRecord dataclass

TimingRecord(
    node_name: str,
    duration_ms: float,
    outcome: str,
    exception_category: str | None,
)

A single timing measurement produced by TimingMiddleware.

  • node_name: the node this middleware was attached to (captured at registration; users supply it explicitly for per-node use).
  • duration_ms: milliseconds from middleware entry to chain return-or-raise, measured with a monotonic clock.
  • outcome: one of "success" or "exception".
  • exception_category: when outcome == "exception" and the exception carries a category attribute, that string; otherwise None.

FunctionNode dataclass

FunctionNode(
    name: str,
    fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
    middleware: tuple[Middleware, ...] = tuple[
        Middleware, ...
    ](),
)

A node backed by an async callable.

Node

Bases: Protocol

A unit of work in a compiled graph.

name property

name: str

The name this node was registered under in its containing graph.

middleware property

middleware: tuple[Middleware, ...]

Per-node middleware applied at this node's registration site, outer-to-inner. Composed inside any per-graph middleware.

run async

run(state: StateT) -> Mapping[str, Any]

Execute against state and return a partial update to be merged via reducers.

Observer

Bases: Protocol

The shape of a callable that receives node-boundary events.

Observer is a structural Protocol — any async callable matching the signature qualifies, no subclass required. Plain functions, bound methods, and class instances with __call__ all work::

async def log_observer(event: NodeEvent) -> None:
    print(event.node_name, event.phase)

compiled.attach_observer(log_observer)

Contract:

  • Observers MUST be async so the delivery queue can await each one and coordinate ordering. The graph itself never awaits observers.
  • Observers MUST NOT alter state, routing, or any other aspect of the graph run — read-only side effects (logging, metrics, span emission) only.

The event parameter is positional-only (event, /) so structural conformance doesn't pin you to that name — any of event, _event, e, etc. matches.

Optional prepare_sync extension

An observer MAY additionally define a synchronous method::

def prepare_sync(self, event: NodeEvent, /) -> None: ...

that the engine calls IN THE ENGINE TASK, BEFORE queueing the event for the async __call__. This exists for observers that need to set up state — e.g., open a span and stash a handle in a ContextVar — that the engine itself must read synchronously before running the node body (otherwise logs emitted on the first line of the body wouldn't see the right span).

prepare_sync is opt-in via hasattr — no subclass or Protocol method required. Observers that don't define it skip the synchronous prep entirely; observers that do define it run only for "started"-phase events, with errors warned-not- propagated (same isolation contract as the async path).

RemoveHandle dataclass

RemoveHandle(
    _observers: list[SubscribedObserver],
    _observer: SubscribedObserver,
)

Returned by CompiledGraph.attach_observer. Call .remove() to detach the observer. Idempotent — calling .remove() after the observer is already detached is a no-op.

Changes to the registered observer set during a graph run do NOT take effect until the next invocation.

SubscribedObserver dataclass

SubscribedObserver(
    observer: Observer, phases: frozenset[str] = ALL_PHASES
)

An observer paired with its phase subscription set.

Observers register with an optional phases parameter naming the phase strings they want to receive. The default is ALL_PHASES — historically named when there were only two phases, now meaning "the default subscription" ({"started", "completed"}). The "checkpoint_saved" phase is opt-in: subscribe to it explicitly via phases={"checkpoint_saved"} (or include it in a custom set). KNOWN_PHASES is the full "every phase the engine can produce" set used by the registration-time validator.

Empty phase sets are forbidden — passing one raises ValueError at registration time so misconfiguration surfaces immediately.

Construct one of these directly when handing phase-filtered observers to CompiledGraph.invoke(observers=...). For the single-observer attach_observer path, pass phases= as a keyword argument and the engine wraps it for you.

ExplicitMapping

ExplicitMapping(
    *,
    inputs: Mapping[str, str] | None = None,
    outputs: Mapping[str, str] | None = None
)

Explicit input/output mapping between parent and subgraph state.

inputs: subgraph_field → parent_field. At entry, the named parent field's current value is copied into the named subgraph field. Subgraph fields not listed receive their schema-declared defaults — there is NO field-name fallback (additive over the default no-projection-in).

outputs: parent_field → subgraph_field. At exit, the named subgraph field's value is merged into the named parent field via the parent's reducer. Subgraph fields not listed are discarded — outputs REPLACES field-name matching for projection-out.

The two directions are independent: pass either, both, or neither. The outputs field distinguishes "absent" (default applies) from "present but empty"; outputs=None means absent (fall back to field-name matching), outputs={} means present and empty (project nothing). For inputs the two defaults coincide (no-projection-in either way), so the distinction is only meaningful for outputs.

FieldNameMatching

Default subgraph projection strategy.

Parameterized for protocol conformance under generics. ParentT is not consumed (the default projection ignores parent state on the way in), but carrying the type variable keeps the default assignable to ProjectionStrategy[ParentT, ChildT] without type gymnastics at the SubgraphNode default-factory site.

ProjectionStrategy

Bases: Protocol

Strategy for moving state across the parent ↔ subgraph boundary.

Two required methods plus one optional hook:

  • project_in and project_out are required: the engine calls them on every subgraph step.
  • validate(parent_cls, subgraph_state_cls) -> None is an optional compile-time validation hook. If a strategy defines it, the parent graph's compile() calls it once per SubgraphNode; the strategy may raise a CompileError subclass when its declarations don't match the supplied schemas. Declarative strategies like ExplicitMapping use this to catch field-name typos before any node runs. Imperative custom projections typically have nothing declarative to check and can simply omit the method — the engine uses duck typing (getattr) to find it.

project_in

project_in(
    parent_state: ParentT, subgraph_state_cls: type[ChildT]
) -> ChildT

Build the subgraph's initial state at the moment it begins.

project_out

project_out(
    subgraph_final_state: ChildT,
    parent_state: ParentT,
    subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]

Project the subgraph's final state back to the parent as a partial update.

Reducer

Base class for state-field reducers.

Each reducer carries a canonical name used in error messages and introspection. Subclasses override __call__ to merge a node's partial update for a single field into the prior value.

State

Bases: BaseModel

Base for graph state schemas. Immutable; reducers attach via Annotated.

SubgraphNode dataclass

SubgraphNode(
    name: str,
    compiled: CompiledGraph[ChildT],
    projection: ProjectionStrategy[
        ParentT, ChildT
    ] = FieldNameMatching[ParentT, ChildT](),
    middleware: tuple[Middleware, ...] = tuple[
        Middleware, ...
    ](),
)

A node backed by a compiled subgraph.

The parent's per-node middleware on a SubgraphNode wraps the subgraph dispatch as a single atomic call — parent middleware does NOT cross into the subgraph's internal nodes (those are wrapped by the subgraph's own middleware independently).

run async

run(
    state: ParentT,
    context: _InvocationContext | None = None,
) -> Mapping[str, Any]

Execute the subgraph and project its result back into the parent.

When context is None (e.g., direct invocation in tests, or a parent call that doesn't thread a context), the subgraph runs via its own public invoke() — a fresh root invocation with no parent observer chain.

When context is provided (the engine's normal path during a parent run), the subgraph descends into a child context that shares the parent's queue + step counter and extends the namespace and parent-state stack. Observer events from inner nodes bubble up to outer observers.

deterministic_backoff

deterministic_backoff(
    seconds: float,
) -> Callable[[int], float]

Constant-N seconds backoff factory — for deterministic testing.

The conformance fixtures use this form via backoff: {type: deterministic, seconds: N} so retry timing is reproducible across runs.

exponential_jitter_backoff

exponential_jitter_backoff(
    attempt: int, *, base: float = 1.0, cap: float = 30.0
) -> float

Default backoff: random.uniform(0, min(cap, base * 2**attempt)).

Jitter is mandatory — fixed exponential backoff causes synchronized retries from many concurrent callers, amplifying rate-limit storms. base and cap are configurable; the defaults are 1.0 and 30.0 seconds.