openarmature.graph¶
Public API for the OpenArmature graph engine.
Re-exports the surface a user touches when building and running a graph: the state schema base, reducers, the builder/compiled pair, edge primitives and the END sentinel, the node/subgraph/projection seams, and the canonical compile-time and runtime error categories.
GraphBuilder ¶
Mutable builder for a graph; call compile() to produce a CompiledGraph.
add_fan_out_node ¶
add_fan_out_node(
name: str,
*,
subgraph: CompiledGraph[ChildT],
collect_field: str,
target_field: str,
items_field: str | None = None,
item_field: str | None = None,
count: int | CountResolver | None = None,
concurrency: int | ConcurrencyResolver | None = 10,
error_policy: str = "fail_fast",
on_empty: str = "raise",
count_field: str | None = None,
inputs: Mapping[str, str] | None = None,
extra_outputs: Mapping[str, str] | None = None,
instance_middleware: Iterable[Middleware] | None = None,
errors_field: str | None = None,
middleware: Iterable[Middleware] | None = None
) -> Self
Register a fan-out node.
Validates configuration at registration time:
- Exactly one of
items_fieldorcountMUST be specified (fan_out_count_mode_ambiguousotherwise). items_fieldMUST refer to a list-typed field on the parent state schema (fan_out_field_not_listotherwise).items_fieldmode requiresitem_field;countmode forbidsitem_field.on_emptyanderror_policyMUST be one of the permitted string literals ("raise"/"noop"and"fail_fast"/"collect"respectively).inputs/extra_outputs/count_fieldfield references go through the existingmapping_references_undeclared_fieldrule.
with_checkpointer ¶
with_checkpointer(checkpointer: Checkpointer) -> Self
Register a Checkpointer for the compiled graph.
At most one Checkpointer per graph; calling
with_checkpointer again replaces the previously-stored
one. Pass the result of :meth:compile to
:meth:CompiledGraph.invoke as usual; the engine fires saves
at every completed event for outermost-graph and
subgraph-internal nodes.
add_middleware ¶
add_middleware(middleware: Middleware) -> Self
Register a per-graph middleware applied to every node in this graph.
Per-graph middleware composes OUTSIDE per-node middleware.
Calling order is preserved (outer-to-inner) — earlier
add_middleware calls produce outer layers in the runtime
chain.
CompiledGraph
dataclass
¶
CompiledGraph(
state_cls: type[StateT],
entry: str,
nodes: Mapping[str, Node[StateT]],
edges: Mapping[
str, StaticEdge | ConditionalEdge[StateT]
],
reducers: Mapping[str, Reducer],
middleware: tuple[Middleware, ...] = (),
_attached_observers: list[SubscribedObserver] = list[
SubscribedObserver
](),
_active_workers: set[Task[None]] = set[Task[None]](),
_checkpointer_slot: list[Checkpointer | None] = (
lambda: [None]
)(),
)
An immutable, executable graph produced by GraphBuilder.compile().
The compile-time topology (state class, entry, nodes, edges, reducers) is
immutable. Two mutable lists ride alongside for observer plumbing —
_attached_observers and _active_workers — neither of which affect the
compiled topology and both of which are scoped to the same instance.
checkpointer
property
¶
checkpointer: Checkpointer | None
Currently-registered Checkpointer, or None.
attach_observer ¶
attach_observer(
observer: Observer,
*,
phases: Iterable[str] | None = None
) -> RemoveHandle
Register a graph-attached observer.
Graph-attached observers fire on every invocation of this
graph until removed — including when this graph runs as a
subgraph inside a parent. Returns a RemoveHandle whose
.remove() method detaches the observer; idempotent.
phases selects the phase strings ("started",
"completed") the observer subscribes to; default is both.
An empty phases set raises ValueError at registration
time.
Changes to the registered set during a graph run do NOT take effect until the next invocation. The set of observers delivering events for an in-flight invocation is fixed at the point the invocation begins.
attach_checkpointer ¶
attach_checkpointer(
checkpointer: Checkpointer | None,
) -> None
Register a Checkpointer for this graph.
Pass None to clear a previously-registered backend.
Without a registered Checkpointer the engine never calls
save() and invoke(resume_invocation=...) raises
checkpoint_not_found.
At most one Checkpointer per graph. Calling
attach_checkpointer again replaces the previously-
registered one; multi-backend fan-out is the user's
responsibility (wrap two underlying Checkpointers behind a
custom protocol-conforming implementation if needed).
drain
async
¶
Await delivery of every observer event produced by prior invocations of this graph.
Callers running in short-lived processes (scripts, serverless functions, CLIs) MUST use drain to avoid losing observer events that were dispatched but not yet delivered.
Only events dispatched before this call are awaited; events from invocations started concurrently with drain may or may not be included. Subgraph events from active invocations are part of the parent invocation's worker and are covered automatically.
Unbounded by design. Drain blocks until every queued event has
been delivered to every subscribed observer. A slow, hung, or
misbehaving observer can therefore hold drain — and the calling
process — indefinitely. If you need a bounded wait, wrap the call
in asyncio.wait_for and accept that events still queued when the
deadline elapses will not be delivered::
await asyncio.wait_for(compiled.drain(), timeout=5.0)
invoke
async
¶
invoke(
initial_state: StateT,
observers: (
Iterable[Observer | SubscribedObserver] | None
) = None,
*,
correlation_id: str | None = None,
resume_invocation: str | None = None
) -> StateT
Run the graph from initial_state to END and return the
final state.
Optional observers are invocation-scoped — they fire only
for this run, after all graph-attached observers (including
subgraph-attached ones for events originating in subgraphs).
Each entry in observers may be either a bare Observer
callable (subscribes to both phases) or a SubscribedObserver
wrapping an observer with an explicit phases set.
This method returns as soon as the graph execution loop
completes, regardless of whether the observer delivery queue
has finished processing every dispatched event. Use
await compiled.drain() if you need delivery-completion
guarantees.
Checkpointing.
correlation_idis the per-invocation cross-backend join key. Caller-supplied or auto-generated UUIDv4 when absent. Preserved unchanged acrossresume_invocation.resume_invocationnames a priorinvocation_idto resume from. Requires a registered Checkpointer; raisesCheckpointNotFoundwhen the backend has no record for the supplied id,CheckpointRecordInvalidwhen the loaded record's schema is incompatible. Resume mints a NEWinvocation_id— each attempt is its own invocation in the observability sense; thecorrelation_idis the cross-attempt join key.- Save-failure policy. This implementation raises
CheckpointSaveFailedto the caller ofinvoke()immediately whenCheckpointer.saveraises; saves are NOT retried by the engine. Wrap the Checkpointer in your own retry logic if transient backend failures should be reattempted.
Raises one of the runtime error categories on failure.
ConditionalEdge
dataclass
¶
ConditionalEdge(
source: str, fn: Callable[[StateT], str | EndSentinel]
)
Routes from source to whichever node fn(state) returns. The function
MUST return either a declared node name or END; any other value raises
RoutingError at runtime.
EndSentinel ¶
Engine-provided sentinel routing target. Use the module-level END.
StaticEdge
dataclass
¶
StaticEdge(source: str, target: str | EndSentinel)
Always routes from source to target.
CompileError ¶
FanOutCountModeAmbiguous ¶
Bases: CompileError
Raised when a fan-out node specifies both items_field and
count, or neither. Exactly one is required.
FanOutEmpty ¶
Bases: NodeException
Raised when a fan-out node resolves to zero instances while
its on_empty config is "raise" (the default).
Surfaces as a regular node_exception (so it integrates with
the existing error propagation and recoverable-state machinery)
but exposes an additional fan_out_category attribute so
callers can distinguish empty-fan-out from generic node failures.
FanOutFieldNotList ¶
Bases: CompileError
Raised when a fan-out node's items_field does not refer to
a declared list-typed field on the parent state schema.
FanOutInvalidConcurrency ¶
Bases: NodeException
Raised when a fan-out node's concurrency callable returns
zero or a negative integer at runtime. Same node-exception shape
as :class:FanOutEmpty.
FanOutInvalidCount ¶
Bases: NodeException
Raised when a fan-out node's count callable returns a
negative integer at runtime. Same node-exception shape as
:class:FanOutEmpty, with
fan_out_category = "fan_out_invalid_count".
GraphError ¶
Bases: Exception
Base for all graph-engine errors.
MappingReferencesUndeclaredField ¶
Bases: CompileError
Raised when a subgraph-as-node inputs or outputs
mapping names a field that is not declared in the relevant state
schema.
RuntimeGraphError ¶
Bases: GraphError
Base for runtime errors. The four non-validation categories carry a
recoverable_state attribute.
StateValidationError ¶
Bases: RuntimeGraphError
State failed schema validation at a graph boundary.
Unlike the other runtime errors, this category does NOT carry
recoverable_state — at entry there is no prior state to
recover; at exit the failing state IS the final state.
NodeEvent
dataclass
¶
NodeEvent(
node_name: str,
namespace: tuple[str, ...],
step: int,
phase: Literal[
"started", "completed", "checkpoint_saved"
],
pre_state: State,
post_state: State | None,
error: RuntimeGraphError | None,
parent_states: tuple[State, ...],
attempt_index: int = 0,
fan_out_index: int | None = None,
fan_out_config: FanOutEventConfig | None = None,
)
A single node-boundary event delivered to observers.
phaseis"started"(dispatched before the node runs) or"completed"(dispatched after the node returns or raises and the merge runs/fails). Each node attempt produces exactly one of each in that order. The engine ALSO dispatches a"checkpoint_saved"event on the same shape after a successfulCheckpointer.savecall — observers MUST opt in explicitly viaphases={"checkpoint_saved"}to receive these (default subscription is{"started", "completed"}only, so legacy observers don't see them).node_nameis the name under which this node was registered in its immediate containing graph.namespaceis an ordered sequence of node names from the outermost graph down to this node. For a node in the outermost graph,namespaceis(node_name,). For nested subgraphs, the chain extends.stepis a monotonically-increasing counter starting at 0, scoped to a single outermost invocation. Subgraph-internal nodes increment the same counter. The started/completed pair for one attempt share the same step.pre_stateis the state the node received, before reducer merge. Populated on both phases (identical across the pair).post_stateis the state after the node's partial update merged successfully. Populated only oncompletedevents that succeeded.erroris the wrapped runtime error (NodeException,ReducerError, orStateValidationError) when the node failed. Populated only oncompletedevents that failed.parent_statescarries one state snapshot per containing graph, outermost first; for a node in the outermost graph it's an empty tuple. Invariant:len(parent_states) == len(namespace) - 1.attempt_indexis the 0-based index of this attempt among any retries.0for nodes not wrapped by retry middleware.fan_out_indexis the 0-based index of this fan-out instance among its siblings.Nonefor nodes not inside a fan-out.fan_out_configcarries resolved fan-out configuration on events from a fan-out NODE itself. See :class:FanOutEventConfig.Noneon every other event.
Invariants:
- On
startedevents,post_stateanderrorMUST both beNone. - On
completedevents, exactly one ofpost_stateanderroris populated.
FanOutConfig
dataclass
¶
FanOutConfig(
subgraph: CompiledGraph[Any],
collect_field: str,
target_field: str,
items_field: str | None = None,
item_field: str | None = None,
count: int | CountResolver | None = None,
concurrency: int | ConcurrencyResolver | None = 10,
error_policy: Literal[
"fail_fast", "collect"
] = "fail_fast",
on_empty: Literal["raise", "noop"] = "raise",
count_field: str | None = None,
inputs: Mapping[str, str] = dict[str, str](),
extra_outputs: Mapping[str, str] = dict[str, str](),
instance_middleware: tuple[Middleware, ...] = (),
errors_field: str | None = None,
)
Frozen configuration for a :class:FanOutNode.
Validation happens at builder compile time (see
GraphBuilder.add_fan_out_node); construction here is
unchecked beyond the obvious type-level constraints.
FanOutNode
dataclass
¶
FanOutNode(
name: str,
config: FanOutConfig,
middleware: tuple[Middleware, ...] = (),
)
A node that fans out into N concurrent subgraph instances.
The Node Protocol contract requires name, middleware, and
run; run here is unusual because it needs the engine
invocation context to descend properly. The engine recognizes this
type in _invoke and calls run_with_context (see
compiled.CompiledGraph._step_fan_out_node) rather than the
plain run(state) shape. run exists for Protocol conformance
only and raises if anyone calls it without context.
run_with_context
async
¶
run_with_context(
state: ParentT,
context: _InvocationContext,
*,
pre_resolved_count: int | None = None,
pre_resolved_concurrency: (
tuple[int | None] | None
) = None
) -> Mapping[str, Any]
Execute the fan-out and return the merged partial update.
Snapshot, resolve count + concurrency, build per-instance states, run concurrently with the configured error policy, fan-in collected/extra fields, write count_field and errors_field if configured.
pre_resolved_count / pre_resolved_concurrency are the
proposal-0013 v0.10.0 hooks: when the engine has already
resolved the config eagerly to populate
NodeEvent.fan_out_config for the fan-out node's events,
it passes the resolved values in so callable resolvers
aren't invoked twice. pre_resolved_concurrency is wrapped
in a 1-tuple to disambiguate "caller passed None
(unbounded)" from "caller didn't pass anything."
Middleware ¶
Bases: Protocol
An async callable that wraps the dispatch of a single node.
The shape is (state, next) -> partial_update. The middleware
MUST return a mapping of field names to values — same shape a
node returns. It may:
- Inspect or transform
statebefore callingnext(state). - Inspect or transform the partial update returned from
next. - Short-circuit by NOT calling
nextand returning its own partial (the rest of the chain — subsequent middleware and the wrapped node — does not execute). - Catch exceptions raised by
next(state)and either re-raise, transform, or recover (returning a partial update instead of raising). - Call
nextmore than once (e.g., retry middleware).
A middleware MUST NOT mutate the input state object — pass a new
state to next if a transformation is needed.
RetryMiddleware ¶
RetryMiddleware(
*,
max_attempts: int = 3,
classifier: Classifier | None = None,
backoff: BackoffStrategy | None = None,
on_retry: OnRetryCallback | None = None
)
Canonical retry middleware.
Configuration:
max_attempts: total attempts including the first call.1disables retry. Default3.classifier: predicate(exception, state) -> bool. Default :func:default_classifier(matchescategoryagainstTRANSIENT_CATEGORIES).backoff: callable(attempt_index) -> seconds. Default :func:exponential_jitter_backoff(base 1s, cap 30s, full jitter).on_retry: optional async callback(exception, attempt_index) -> None. Fires before each sleep.
TimingMiddleware ¶
TimingMiddleware(
*,
node_name: str,
on_complete: OnCompleteCallback,
clock: Callable[[], float] | None = None
)
Canonical timing middleware.
Records wall-clock duration of the wrapped chain via the host
language's monotonic clock (Python's time.monotonic). The
callback fires inline before the chain's result returns to the
caller — slow callbacks add to the apparent node duration, so
users SHOULD keep them fast (queue work, defer I/O).
Errors raised by on_complete propagate to the engine as a
node_exception.
TimingRecord
dataclass
¶
A single timing measurement produced by TimingMiddleware.
node_name: the node this middleware was attached to (captured at registration; users supply it explicitly for per-node use).duration_ms: milliseconds from middleware entry to chain return-or-raise, measured with a monotonic clock.outcome: one of"success"or"exception".exception_category: whenoutcome == "exception"and the exception carries acategoryattribute, that string; otherwiseNone.
FunctionNode
dataclass
¶
FunctionNode(
name: str,
fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
middleware: tuple[Middleware, ...] = tuple[
Middleware, ...
](),
)
A node backed by an async callable.
Node ¶
Bases: Protocol
A unit of work in a compiled graph.
middleware
property
¶
middleware: tuple[Middleware, ...]
Per-node middleware applied at this node's registration site, outer-to-inner. Composed inside any per-graph middleware.
run
async
¶
Execute against state and return a partial update to be merged via reducers.
Observer ¶
Bases: Protocol
The shape of a callable that receives node-boundary events.
Observer is a structural Protocol — any async callable matching the
signature qualifies, no subclass required. Plain functions, bound
methods, and class instances with __call__ all work::
async def log_observer(event: NodeEvent) -> None:
print(event.node_name, event.phase)
compiled.attach_observer(log_observer)
Contract:
- Observers MUST be async so the delivery queue can await each one and coordinate ordering. The graph itself never awaits observers.
- Observers MUST NOT alter state, routing, or any other aspect of the graph run — read-only side effects (logging, metrics, span emission) only.
The event parameter is positional-only (event, /) so structural
conformance doesn't pin you to that name — any of event, _event,
e, etc. matches.
Optional prepare_sync extension¶
An observer MAY additionally define a synchronous method::
def prepare_sync(self, event: NodeEvent, /) -> None: ...
that the engine calls IN THE ENGINE TASK, BEFORE queueing the
event for the async __call__. This exists for observers that
need to set up state — e.g., open a span and stash a handle in
a ContextVar — that the engine itself must read synchronously
before running the node body (otherwise logs emitted on the
first line of the body wouldn't see the right span).
prepare_sync is opt-in via hasattr — no subclass or
Protocol method required. Observers that don't define it skip
the synchronous prep entirely; observers that do define it run
only for "started"-phase events, with errors warned-not-
propagated (same isolation contract as the async path).
RemoveHandle
dataclass
¶
RemoveHandle(
_observers: list[SubscribedObserver],
_observer: SubscribedObserver,
)
Returned by CompiledGraph.attach_observer. Call
.remove() to detach the observer. Idempotent — calling
.remove() after the observer is already detached is a no-op.
Changes to the registered observer set during a graph run do NOT take effect until the next invocation.
SubscribedObserver
dataclass
¶
SubscribedObserver(
observer: Observer, phases: frozenset[str] = ALL_PHASES
)
An observer paired with its phase subscription set.
Observers register with an optional phases parameter naming
the phase strings they want to receive. The default is
ALL_PHASES — historically named when there were only two
phases, now meaning "the default subscription"
({"started", "completed"}). The "checkpoint_saved" phase
is opt-in: subscribe to it explicitly via
phases={"checkpoint_saved"} (or include it in a custom set).
KNOWN_PHASES is the full "every phase the engine can produce"
set used by the registration-time validator.
Empty phase sets are forbidden — passing one raises
ValueError at registration time so misconfiguration surfaces
immediately.
Construct one of these directly when handing phase-filtered
observers to CompiledGraph.invoke(observers=...). For the
single-observer attach_observer path, pass phases= as a
keyword argument and the engine wraps it for you.
ExplicitMapping ¶
ExplicitMapping(
*,
inputs: Mapping[str, str] | None = None,
outputs: Mapping[str, str] | None = None
)
Explicit input/output mapping between parent and subgraph state.
inputs: subgraph_field → parent_field. At entry, the named
parent field's current value is copied into the named subgraph
field. Subgraph fields not listed receive their schema-declared
defaults — there is NO field-name fallback (additive over the
default no-projection-in).
outputs: parent_field → subgraph_field. At exit, the named
subgraph field's value is merged into the named parent field via
the parent's reducer. Subgraph fields not listed are discarded —
outputs REPLACES field-name matching for projection-out.
The two directions are independent: pass either, both, or
neither. The outputs field distinguishes "absent" (default
applies) from "present but empty"; outputs=None means absent
(fall back to field-name matching), outputs={} means present
and empty (project nothing). For inputs the two defaults
coincide (no-projection-in either way), so the distinction is
only meaningful for outputs.
FieldNameMatching ¶
Default subgraph projection strategy.
Parameterized for protocol conformance under generics. ParentT
is not consumed (the default projection ignores parent state on
the way in), but carrying the type variable keeps the default
assignable to ProjectionStrategy[ParentT, ChildT] without type
gymnastics at the SubgraphNode default-factory site.
ProjectionStrategy ¶
Bases: Protocol
Strategy for moving state across the parent ↔ subgraph boundary.
Two required methods plus one optional hook:
project_inandproject_outare required: the engine calls them on every subgraph step.validate(parent_cls, subgraph_state_cls) -> Noneis an optional compile-time validation hook. If a strategy defines it, the parent graph'scompile()calls it once perSubgraphNode; the strategy may raise aCompileErrorsubclass when its declarations don't match the supplied schemas. Declarative strategies likeExplicitMappinguse this to catch field-name typos before any node runs. Imperative custom projections typically have nothing declarative to check and can simply omit the method — the engine uses duck typing (getattr) to find it.
Reducer ¶
Base class for state-field reducers.
Each reducer carries a canonical name used in error messages and
introspection. Subclasses override __call__ to merge a node's partial
update for a single field into the prior value.
State ¶
Bases: BaseModel
Base for graph state schemas. Immutable; reducers attach via Annotated.
SubgraphNode
dataclass
¶
SubgraphNode(
name: str,
compiled: CompiledGraph[ChildT],
projection: ProjectionStrategy[
ParentT, ChildT
] = FieldNameMatching[ParentT, ChildT](),
middleware: tuple[Middleware, ...] = tuple[
Middleware, ...
](),
)
A node backed by a compiled subgraph.
The parent's per-node middleware on a SubgraphNode wraps the subgraph dispatch as a single atomic call — parent middleware does NOT cross into the subgraph's internal nodes (those are wrapped by the subgraph's own middleware independently).
run
async
¶
Execute the subgraph and project its result back into the parent.
When context is None (e.g., direct invocation in tests, or a parent
call that doesn't thread a context), the subgraph runs via its own
public invoke() — a fresh root invocation with no parent observer
chain.
When context is provided (the engine's normal path during
a parent run), the subgraph descends into a child context
that shares the parent's queue + step counter and extends the
namespace and parent-state stack. Observer events from inner
nodes bubble up to outer observers.
deterministic_backoff ¶
Constant-N seconds backoff factory — for deterministic testing.
The conformance fixtures use this form via backoff: {type:
deterministic, seconds: N} so retry timing is reproducible across
runs.
exponential_jitter_backoff ¶
Default backoff: random.uniform(0, min(cap, base * 2**attempt)).
Jitter is mandatory — fixed exponential backoff causes
synchronized retries from many concurrent callers, amplifying
rate-limit storms. base and cap are configurable; the
defaults are 1.0 and 30.0 seconds.