Skip to main content
A Workflow is a graph of @agent_task-decorated Python functions. You write ordinary Python; Motus infers the data-flow dependencies between your functions and runs the graph in parallel underneath. No DAG definitions, no edge declarations, no YAML. Reach for Workflow when the steps are already known and you want stable, repeatable control over them: ETL pipelines, evaluation harnesses, content processing, batch LLM jobs. For open-ended problems where you want the model to decide what to do next, use a ReActAgent instead. Both programming models run on the same scheduler. A ReActAgent’s model calls and tool calls are themselves @agent_tasks, which is how the two models share retries, timeouts, cancellation, and tracing, and how a workflow step can freely call a ReActAgent (and vice versa).

Basic example

from motus.runtime import agent_task, resolve


@agent_task
def add(a, b):
    return a + b


@agent_task
def multiply(x, y):
    return x * y


a = add(3, 4)        # returns AgentFuture, not 7
b = multiply(a, 10)  # depends on a, runs after a completes
print(resolve(b))    # 70
When you call add(3, 4), the runtime registers a task and hands you back an AgentFuture, not a concrete value. When you pass that future into multiply(a, 10), the runtime sees the dependency and schedules multiply to run after a resolves. You write straight-line Python; Motus figures out what can run in parallel and what has to wait.

Getting a result

Three ways to pull the value out of a future, depending on where you are.
from motus.runtime import resolve

# Sync code: block until resolved
value = resolve(future)

# Block on several futures at once and unpack
a, b, c = resolve([future_a, future_b, future_c])

# Block with a timeout
value = future.af_result(timeout=5)   # raises TimeoutError if exceeded
Inside an async context, await the future directly instead of blocking:
@agent_task
async def fetch_and_process(url):
    raw = await download(url)
    return await parse(raw)
Inside an async @agent_task, use await rather than resolve(). Async tasks run on the runtime’s own event loop, and calling resolve() there raises RuntimeError to protect you from a deadlock. Sync @agent_tasks run in the thread pool off the loop, so resolve() works fine inside them.

Retries and timeouts

Pass retry and timeout policy into the decorator. A task that raises (including TimeoutError) is re-queued with the same arguments until retries run out.
@agent_task(retries=3, timeout=10.0, retry_delay=1.0)
async def download(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()
ParameterTypeDefaultPurpose
retriesint0Number of retries after the first failure
timeoutfloat | NoneNonePer-execution timeout in seconds. Exceeding it raises TimeoutError, which counts as a failure.
retry_delayfloat0.0Seconds to wait between attempts
If a task exhausts its retries, the final exception propagates out of the future. resolve() or await will re-raise it.

Multi-return

When a task produces multiple independent outputs, use num_returns to split them into separate futures.
@agent_task(num_returns=2)
def split(data):
    mid = len(data) // 2
    return data[:mid], data[mid:]


@agent_task
def sum_part(part):
    return sum(part)


left, right = split([1, 2, 3, 4])    # two separate AgentFutures
left_total = sum_part(left)          # scheduled as soon as `left` is ready
right_total = sum_part(right)        # scheduled as soon as `right` is ready
print(resolve(left_total + right_total))  # 10

Per-call policy overrides

Override the default policy for one invocation without touching the decorator.
result = download.policy(retries=5, timeout=30)("https://api.example.com")
.policy() gives you a one-off variant of the task with different settings. The original task is unchanged, so other call sites keep using its defaults. You can override retries, timeout, retry_delay, and num_returns this way.

Sync and async tasks

Both sync and async functions work with @agent_task. Sync tasks run in the runtime’s ThreadPoolExecutor; async tasks run directly on the runtime’s event loop. You can wrap any callable this way, whether or not you wrote it and whether or not it is async.
@agent_task
def cpu_bound(data):
    """Runs in a thread pool executor."""
    return heavy_computation(data)


@agent_task
async def io_bound(url):
    """Runs as a coroutine on the runtime event loop."""
    return await fetch(url)

Class methods

@agent_task implements the descriptor protocol, so it works on class methods: self is bound automatically when the method is accessed through an instance.
class DataProcessor:
    def __init__(self, multiplier):
        self.multiplier = multiplier

    @agent_task
    def process(self, value):
        return value * self.multiplier


proc = DataProcessor(3)
future = proc.process(10)   # AgentFuture, resolves to 30

Per-task hooks

Attach lifecycle callbacks directly in the decorator. They fire only for this task. For cross-cutting hooks (global, per task type, or per task name), see Tracing.
def on_start(event):
    print(f"Starting {event.name}")

def on_end(event):
    print(f"Finished {event.name}: {event.result}")

def on_error(event):
    print(f"Failed {event.name}: {event.error}")


@agent_task(on_start=on_start, on_end=on_end, on_error=on_error)
def important_task(data):
    return transform(data)

Cancellation

Cancel a future to stop its task and every task that depends on it downstream.
from motus.runtime import cancel, cancelled


future = long_running_task()
cancel(future)          # or future.af_cancel()

if cancelled(future):   # or future.af_cancelled()
    print("Cancelled.")
Cancellation is thread-safe and can be called from any thread, including from inside another @agent_task. If the future is already resolved, cancel() is a no-op and returns False.

Lifecycle

You rarely need to manage the runtime yourself. It auto-initializes on the first @agent_task call and cleans up at interpreter exit. The following entry points exist for the cases where you need them:
from motus.runtime import init, shutdown, is_initialized

init()             # optional; pre-start the runtime thread and event loop
shutdown()         # force-stop: cancels in-flight tasks, poisons their futures
is_initialized()   # True if the runtime is currently running
shutdown() is a hard stop: it signals the event loop to exit, cancels any in-flight tasks, and poisons their futures with RuntimeError("Motus runtime is shutting down"). If you need tasks to finish, resolve() or await them before calling shutdown().

Building the graph without blocking

Most Python operators on an AgentFuture return a new future that extends the graph, so you can keep composing without pulling values out.
x = add(10, 5)
total = x + 100            # AgentFuture, not blocking
doubled = x * 2            # AgentFuture, another graph node

data = make_data()
first = data["scores"][0]  # chained getitem, two new futures
top_score = data.scores    # attribute access is deferred too

get_scorer = find_scorer()
score = get_scorer(team, year)  # calling a future is deferred too
total = x + 100 does not wait for x to resolve. It creates a node that executes when x is ready. Arithmetic, __getitem__, __getattr__, __call__, and ordering comparisons (>, <, >=, <=) all return futures. Some operators force a blocking wait to return a concrete value. See Sync barriers below.

Sync barriers

A handful of Python operators have to return a concrete value rather than another future. On an AgentFuture these trigger a blocking wait to resolve the underlying value.
OperatorTriggered by
__bool__if future:, bool(future), not future
__str__str(future), f"{future}"
__len__len(future)
__iter__for x in future:, unpacking a, b = future
__int__ / __float__int(future), float(future)
__eq__ / __ne__future == x, future != x
__hash__putting a future in set() or as a dict key
__contains__x in future
Motus warns every time a sync barrier fires so you can catch accidents. Set MOTUS_QUIET_SYNC=1 to silence the warnings once you have audited your code.

How the runtime runs it

Under the hood, every @agent_task call is submitted to GraphScheduler, a small task scheduler that lives in its own thread and drives an async event loop. The scheduler tracks dependencies through AgentFuture objects and dispatches each task as soon as its prerequisites are ready. The same scheduler backs ReActAgent. Every model call and tool call inside a reasoning loop is wrapped as an @agent_task and submitted to GraphScheduler, which is why both programming models share retries, timeouts, cancellation, and tracing uniformly, and why ReActAgent’s independent tool calls run in parallel. You normally never touch GraphScheduler or AgentRuntime directly. The decorator is the API.