Orchestrator¶
techrevati.runtime.orchestrator ¶
Orchestrator — single execution loop wiring the runtime primitives together.
Pairs an agent session with lifecycle tracking, usage accounting,
circuit-breaker protection, automatic failure classification, permission
gating, and policy evaluation. Use the primitives standalone or use
Orchestrator.session() (sync) / Orchestrator.asession() (async)
to get all of them wired in.
Example (sync): from techrevati.runtime import Orchestrator, UsageSnapshot
orch = Orchestrator(role="writer", phase="draft", project_id=1)
with orch.session() as session:
text, usage = session.run_turn(
lambda: call_model(prompt),
model="model-a",
usage=UsageSnapshot(input_tokens=5000, output_tokens=1200),
)
print(session.summary())
Example (async): async with orch.asession() as session: text, usage = await session.arun_turn( lambda: acall_model(prompt), model="model-a", timeout=30.0, )
Orchestrator ¶
OrchestrationSession
dataclass
¶
OrchestrationSession(worker, tracker, recovery, registry, phase, role, project_id, permissions=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), events=list(), thread_id=None, saver=None, provider_router=None, usage_limits=None, governance=None, circuit_breaker=None, rate_limiter=None)
Bases: _SessionBase
Single-agent sync execution context. Created by Orchestrator.session().
run_tool ¶
Execute a tool with permission + guardrail + governance checks around it.
Order: permission → governance → pre-guardrails → fn() → post-guardrails.
run_turn ¶
Execute one model turn with circuit-breaker + recovery wiring.
When timeout is set, fn is dispatched to a single-worker
ThreadPoolExecutor and waited on with that deadline. The
executor is created per-turn (cheap) so there is no pool to
manage across the session.
On exception: classifies the failure, attempts recovery once, and re-raises so the caller decides whether to retry.
If a saver + thread_id are configured and
idempotency_key is supplied, this method first looks for a
prior checkpoint with the same key on the same thread; on a hit
it returns the cached (result, usage) without calling
fn. After a successful execution, a new checkpoint is
written so a restart can replay through it.
AsyncOrchestrationSession
dataclass
¶
AsyncOrchestrationSession(worker, tracker, recovery, registry, phase, role, project_id, permissions=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), events=list(), thread_id=None, saver=None, provider_router=None, usage_limits=None, governance=None, circuit_breaker=None, async_rate_limiter=None)
Bases: _SessionBase
Single-agent async execution context. Created by Orchestrator.asession().
Sibling of OrchestrationSession. Sync helpers (authorize, evaluate_policy,
evaluate_gate, summary, lifecycle methods) are inherited; only the
execution path (arun_turn / arun_tool) and the human-in-the-loop
pause are async.
arun_tool
async
¶
Async sibling of run_tool.
Permission + governance are sync; guardrails are awaited when
they implement AsyncGuardrail and called inline when they
are sync Guardrail instances.
arun_turn
async
¶
arun_turn(coro_factory, model='', usage=None, estimate_usage=None, timeout=None, idempotency_key=None)
Execute one model turn with async circuit-breaker + recovery wiring.
timeout is enforced with asyncio.wait_for. Cancellation
from outside (parent task) is propagated as CancelledError; an
internal timeout becomes TurnTimeoutError.
idempotency_key behaves the same as in run_turn: when the
session has a saver + thread_id configured, a prior checkpoint
with the same key short-circuits the call.
arun_parallel_tools
async
¶
Run several tool calls concurrently with structured concurrency.
Uses asyncio.TaskGroup so any child failure cancels its
siblings and surfaces as ExceptionGroup to the caller —
no orphan tasks, no swallowed exceptions. timeout (if
given) applies to the whole group via asyncio.timeout.
Returns results in input order. Each coro_factory is a
zero-arg callable that returns an awaitable; this matches the
contract used elsewhere in the session API and means callers
can build the coroutine lazily inside the group.
pause_for_input
async
¶
Mark the worker WAITING_FOR_INPUT and await an external response.
Returns a future the caller resolves via
session.provide_input(value) from elsewhere in the program.
Use this to wire human-in-the-loop or out-of-band approvals
without leaving the session machinery.
AgentSession
dataclass
¶
AgentSession(role, phase, project_id=None, registry=AgentRegistry(), permissions=None, circuit_breaker=None, async_circuit_breaker=None, policy_engine=None, quality_gate=None, budget_usd=None, enforce_budget=False, max_iterations=25, guardrails=list(), event_sink=NoopEventSink(), usage_sink=NoopUsageSink(), saver=None, rate_limiter=None, async_rate_limiter=None, provider_router=None, usage_limits=None, governance=None)
Factory for sessions. Holds shared, long-lived components.
Components are optional; the simplest invocation is
Orchestrator(role=..., phase=...). Provide circuit_breaker
for sync sessions, async_circuit_breaker for async sessions, or
both — they are independent.
To make sessions restart-resumable, pass saver (any object that
satisfies the CheckpointSaver protocol) and a thread_id at
session() / asession() time. The thread id is the durable
handle a future process uses to pick up where this one left off.
session ¶
Open a single-agent sync session.
On clean exit: worker → COMPLETED if still running. On exception: worker → FAILED with the error classified into an AgentFailureClass.
If thread_id is supplied and the orchestrator has a saver,
the session writes a checkpoint after each turn and an
idempotency_key on run_turn makes that turn replay-safe.
asession
async
¶
Open a single-agent async session.
Mirrors session() but uses async primitives. CancelledError
from anywhere inside the async with body transitions the
worker to CANCELLED instead of FAILED, and is re-raised.
The same thread_id / saver contract from session()
applies; pair with arun_turn(..., idempotency_key=...) for
replay-safe async turns.
PermissionDeniedError ¶
Bases: Exception
Raised when a tool is blocked by the configured PermissionEnforcer.
TurnTimeoutError ¶
Bases: Exception
Raised when a sync or async turn exceeds the configured timeout.
For sync callers, this wraps the underlying concurrent.futures.TimeoutError;
for async callers, the original asyncio.TimeoutError is re-raised
as this type to give a single error class across both code paths.
MaxIterationsExceededError ¶
Bases: Exception
Raised when a session attempts more turns than max_iterations allows.
Default cap of 25 matches the OpenAI Agents SDK convention and prevents runaway agent loops — stopping conditions are an industry production-readiness requirement.