Skip to content

Orchestrator

techrevati.runtime.orchestrator

Orchestrator — single execution loop wiring the runtime primitives together.

Pairs an agent session with lifecycle tracking, usage accounting, circuit-breaker protection, automatic failure classification, permission gating, and policy evaluation. Use the primitives standalone or use Orchestrator.session() (sync) / Orchestrator.asession() (async) to get all of them wired in.

Example (sync): from techrevati.runtime import Orchestrator, UsageSnapshot

orch = Orchestrator(role="writer", phase="draft", project_id=1)
with orch.session() as session:
    text, usage = session.run_turn(
        lambda: call_model(prompt),
        model="model-a",
        usage=UsageSnapshot(input_tokens=5000, output_tokens=1200),
    )
print(session.summary())

Example (async): async with orch.asession() as session: text, usage = await session.arun_turn( lambda: acall_model(prompt), model="model-a", timeout=30.0, )

Orchestrator

Orchestrator(*args, **kwargs)

Bases: AgentSession

Deprecated alias for AgentSession. Removed in 0.3.0.

OrchestrationSession dataclass

OrchestrationSession(worker, tracker, recovery, registry, phase, role, project_id, permissions=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), events=list(), thread_id=None, saver=None, provider_router=None, usage_limits=None, governance=None, circuit_breaker=None, rate_limiter=None)

Bases: _SessionBase

Single-agent sync execution context. Created by Orchestrator.session().

run_tool

run_tool(tool_name, fn)

Execute a tool with permission + guardrail + governance checks around it.

Order: permission → governance → pre-guardrails → fn() → post-guardrails.

run_turn

run_turn(fn, model='', usage=None, estimate_usage=None, timeout=None, idempotency_key=None)

Execute one model turn with circuit-breaker + recovery wiring.

When timeout is set, fn is dispatched to a single-worker ThreadPoolExecutor and waited on with that deadline. The executor is created per-turn (cheap) so there is no pool to manage across the session.

On exception: classifies the failure, attempts recovery once, and re-raises so the caller decides whether to retry.

If a saver + thread_id are configured and idempotency_key is supplied, this method first looks for a prior checkpoint with the same key on the same thread; on a hit it returns the cached (result, usage) without calling fn. After a successful execution, a new checkpoint is written so a restart can replay through it.

AsyncOrchestrationSession dataclass

AsyncOrchestrationSession(worker, tracker, recovery, registry, phase, role, project_id, permissions=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), events=list(), thread_id=None, saver=None, provider_router=None, usage_limits=None, governance=None, circuit_breaker=None, async_rate_limiter=None)

Bases: _SessionBase

Single-agent async execution context. Created by Orchestrator.asession().

Sibling of OrchestrationSession. Sync helpers (authorize, evaluate_policy, evaluate_gate, summary, lifecycle methods) are inherited; only the execution path (arun_turn / arun_tool) and the human-in-the-loop pause are async.

arun_tool async

arun_tool(tool_name, coro_factory)

Async sibling of run_tool.

Permission + governance are sync; guardrails are awaited when they implement AsyncGuardrail and called inline when they are sync Guardrail instances.

arun_turn async

arun_turn(coro_factory, model='', usage=None, estimate_usage=None, timeout=None, idempotency_key=None)

Execute one model turn with async circuit-breaker + recovery wiring.

timeout is enforced with asyncio.wait_for. Cancellation from outside (parent task) is propagated as CancelledError; an internal timeout becomes TurnTimeoutError.

idempotency_key behaves the same as in run_turn: when the session has a saver + thread_id configured, a prior checkpoint with the same key short-circuits the call.

arun_parallel_tools async

arun_parallel_tools(coro_factories, *, timeout=None)

Run several tool calls concurrently with structured concurrency.

Uses asyncio.TaskGroup so any child failure cancels its siblings and surfaces as ExceptionGroup to the caller — no orphan tasks, no swallowed exceptions. timeout (if given) applies to the whole group via asyncio.timeout.

Returns results in input order. Each coro_factory is a zero-arg callable that returns an awaitable; this matches the contract used elsewhere in the session API and means callers can build the coroutine lazily inside the group.

pause_for_input async

pause_for_input(prompt)

Mark the worker WAITING_FOR_INPUT and await an external response.

Returns a future the caller resolves via session.provide_input(value) from elsewhere in the program. Use this to wire human-in-the-loop or out-of-band approvals without leaving the session machinery.

provide_input

provide_input(value)

Resolve the most recent pause_for_input future with value.

AgentSession dataclass

AgentSession(role, phase, project_id=None, registry=AgentRegistry(), permissions=None, circuit_breaker=None, async_circuit_breaker=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), saver=None, rate_limiter=None, async_rate_limiter=None, provider_router=None, usage_limits=None, governance=None)

Factory for sessions. Holds shared, long-lived components.

Components are optional; the simplest invocation is Orchestrator(role=..., phase=...). Provide circuit_breaker for sync sessions, async_circuit_breaker for async sessions, or both — they are independent.

To make sessions restart-resumable, pass saver (any object that satisfies the CheckpointSaver protocol) and a thread_id at session() / asession() time. The thread id is the durable handle a future process uses to pick up where this one left off.

session

session(*, thread_id=None)

Open a single-agent sync session.

On clean exit: worker → COMPLETED if still running. On exception: worker → FAILED with the error classified into an AgentFailureClass.

If thread_id is supplied and the orchestrator has a saver, the session writes a checkpoint after each turn and an idempotency_key on run_turn makes that turn replay-safe.

asession async

asession(*, thread_id=None)

Open a single-agent async session.

Mirrors session() but uses async primitives. CancelledError from anywhere inside the async with body transitions the worker to CANCELLED instead of FAILED, and is re-raised.

The same thread_id / saver contract from session() applies; pair with arun_turn(..., idempotency_key=...) for replay-safe async turns.

PermissionDeniedError

PermissionDeniedError(outcome)

Bases: Exception

Raised when a tool is blocked by the configured PermissionEnforcer.

TurnTimeoutError

TurnTimeoutError(timeout_seconds)

Bases: Exception

Raised when a sync or async turn exceeds the configured timeout.

For sync callers, this wraps the underlying concurrent.futures.TimeoutError; for async callers, the original asyncio.TimeoutError is re-raised as this type to give a single error class across both code paths.

MaxIterationsExceededError

MaxIterationsExceededError(max_iterations)

Bases: Exception

Raised when a session attempts more turns than max_iterations allows.

Default cap of 25 matches the OpenAI Agents SDK convention and prevents runaway agent loops — stopping conditions are an industry production-readiness requirement.