Skip to main content
Motus lets you compose multiple agents into larger workflows. The primary patterns are: wrapping a specialist agent as a tool for a supervisor, forking an agent to create independent conversation branches, and running sub-agents in parallel using the task runtime.

Agent as tool

agent.as_tool() wraps any agent as a callable tool that another agent can invoke. This is the core building block for supervisor/specialist patterns the supervisor delegates subtasks to specialists by calling their tools.
from motus.agent import ReActAgent
from motus.models import OpenAIChatClient

client = OpenAIChatClient(api_key="sk-...")

researcher = ReActAgent(
    client=client,
    model_name="gpt-4o",
    name="researcher",
    system_prompt="You research topics thoroughly and return detailed findings.",
)

supervisor = ReActAgent(
    client=client,
    model_name="gpt-4o",
    system_prompt="You coordinate research tasks and synthesize results.",
    tools=[researcher.as_tool(description="Research a topic in depth")],
)

response = await supervisor("What are the latest advances in fusion energy?")
The supervisor calls the researcher tool as needed, passing subtask prompts. The researcher runs independently and returns its result, which the supervisor incorporates into its final response.

as_tool() parameters

ParameterTypeDefaultDescription
name`strNone`agent’s nameTool name exposed to the supervisor
description`strNone`NoneTool description sent to the supervisor’s LLM
output_extractor`CallableNone`NoneFunction to post-process the sub-agent’s response before returning it to the supervisor
statefulboolFalseWhether the sub-agent preserves memory across multiple calls within the same parent run
When stateful=True, the sub-agent’s conversation history accumulates across all calls during the parent run. Use this when the supervisor needs the specialist to remember earlier context — for example, a researcher that builds on its own previous findings. With the default stateful=False, each call to the tool starts from a clean memory state.

Forking

agent.fork() creates an independent copy of the agent with the same configuration and a forked copy of the conversation history. Changes to the fork’s memory never affect the original.
import asyncio
from motus.agent import ReActAgent
from motus.models import OpenAIChatClient

client = OpenAIChatClient(api_key="sk-...")
agent = ReActAgent(client=client, model_name="gpt-4o")

async def main():
    await agent("Summarize the pros and cons of microservices.")

    # Create two independent branches from the same starting point
    branch_a = agent.fork()
    branch_b = agent.fork()

    response_a = await branch_a("Now argue strongly in favor.")
    response_b = await branch_b("Now argue strongly against.")

    # original agent is unchanged
    original = await agent("What did you just summarize?")

asyncio.run(main())
Forking is useful for A/B comparisons, parallel exploratory conversations, and any case where you want to branch from a known conversation state without modifying the original.

Parallel workflows with @agent_task

For fully parallel execution, use the @agent_task decorator from the Motus runtime. Sub-agent calls submitted as tasks are scheduled concurrently by the task graph engine — no explicit asyncio.gather needed.
import asyncio
from motus.agent import ReActAgent
from motus.models import OpenAIChatClient
from motus.runtime import init, resolve
from motus.runtime.agent_task import agent_task

client = OpenAIChatClient(api_key="sk-...")

analyst = ReActAgent(
    client=client,
    model_name="gpt-4o",
    name="analyst",
    system_prompt="You analyze data and report key findings.",
)

writer = ReActAgent(
    client=client,
    model_name="gpt-4o",
    name="writer",
    system_prompt="You write clear, concise summaries.",
)

@agent_task
async def run_analyst(topic: str) -> str:
    return await analyst(f"Analyze: {topic}")

@agent_task
async def run_writer(topic: str) -> str:
    return await writer(f"Write a one-paragraph summary of: {topic}")

async def main():
    rt = init()

    # Both tasks are submitted and run concurrently
    analysis_future = run_analyst("global chip supply chains")
    summary_future = run_writer("global chip supply chains")

    analysis = resolve(analysis_future, timeout=60)
    summary = resolve(summary_future, timeout=60)

    print("Analysis:", analysis)
    print("Summary:", summary)

asyncio.run(main())
The runtime builds a dependency DAG from the submitted tasks. Tasks with no dependencies on each other — like run_analyst and run_writer above — execute in parallel automatically. Use resolve() to retrieve results after submission.