SIGN IN SIGN UP

A lightweight, powerful framework for multi-agent workflows

0 0 162 Python
import asyncio
from collections.abc import Mapping
from datetime import datetime
from pydantic import BaseModel
from agents import Agent, ModelSettings, Runner, gen_trace_id, trace
# This tool is still in experimental phase and the details could be changed until being GAed.
from agents.extensions.experimental.codex import (
CodexToolStreamEvent,
ThreadErrorEvent,
ThreadOptions,
ThreadStartedEvent,
TurnCompletedEvent,
TurnFailedEvent,
TurnStartedEvent,
codex_tool,
)
# Derived from codex_tool(name="codex_engineer") when run_context_thread_id_key is omitted.
THREAD_ID_KEY = "codex_thread_id_engineer"
async def on_codex_stream(payload: CodexToolStreamEvent) -> None:
event = payload.event
if isinstance(event, ThreadStartedEvent):
log(f"codex thread started: {event.thread_id}")
return
if isinstance(event, TurnStartedEvent):
log("codex turn started")
return
if isinstance(event, TurnCompletedEvent):
log(f"codex turn completed, usage: {event.usage}")
return
if isinstance(event, TurnFailedEvent):
log(f"codex turn failed: {event.error.message}")
return
if isinstance(event, ThreadErrorEvent):
log(f"codex stream error: {event.message}")
def _timestamp() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log(message: str) -> None:
timestamp = _timestamp()
lines = str(message).splitlines() or [""]
for line in lines:
print(f"{timestamp} {line}")
def read_context_value(context: Mapping[str, str] | BaseModel, key: str) -> str | None:
# either dict or pydantic model
if isinstance(context, Mapping):
return context.get(key)
return getattr(context, key, None)
async def main() -> None:
agent = Agent(
name="Codex Agent (same thread)",
instructions=(
2026-03-06 17:59:35 +09:00
"Always use the Codex tool to inspect the local workspace and answer the user's "
"question. Treat the workspace as read-only and answer concisely."
),
tools=[
codex_tool(
# Give each Codex tool a unique `codex_` name when you run multiple tools in one agent.
# Name-based defaults keep their run-context thread IDs separated.
name="codex_engineer",
2026-03-06 17:14:32 +09:00
sandbox_mode="read-only",
default_thread_options=ThreadOptions(
model="gpt-5.4",
model_reasoning_effort="low",
network_access_enabled=True,
web_search_enabled=False,
approval_policy="never",
),
on_stream=on_codex_stream,
# Reuse the same Codex thread across runs that share this context object.
use_run_context_thread_id=True,
)
],
model_settings=ModelSettings(tool_choice="required"),
)
class MyContext(BaseModel):
something: str | None = None
# the default is "codex_thread_id"; missing this works as well
codex_thread_id_engineer: str | None = None # aligns with run_context_thread_id_key
context = MyContext()
# Simple dict object works as well:
# context: dict[str, str] = {}
trace_id = gen_trace_id()
log(f"View trace: https://platform.openai.com/traces/trace?trace_id={trace_id}")
with trace("Codex same thread example", trace_id=trace_id):
2026-03-06 17:59:35 +09:00
log("Turn 1: inspect AGENTS.md with the Codex tool.")
first_prompt = (
"Use the Codex tool to inspect AGENTS.md in this repository and list the mandatory "
"local verification commands. Do not modify any files."
)
first_result = await Runner.run(agent, first_prompt, context=context)
first_thread_id = read_context_value(context, THREAD_ID_KEY)
log(first_result.final_output)
log(f"thread id after turn 1: {first_thread_id}")
2026-03-06 17:59:35 +09:00
if first_thread_id is None:
log("thread id after turn 1 is unavailable; turn 2 may start a new Codex thread.")
log("Turn 2: continue with the same Codex thread.")
2026-03-06 17:59:35 +09:00
second_prompt = (
"Continue from the same Codex thread. Rewrite that verification workflow as a single "
"short sentence. Do not modify any files."
)
second_result = await Runner.run(agent, second_prompt, context=context)
second_thread_id = read_context_value(context, THREAD_ID_KEY)
log(second_result.final_output)
log(f"thread id after turn 2: {second_thread_id}")
log(
"same thread reused: "
+ str(first_thread_id is not None and first_thread_id == second_thread_id)
)
if __name__ == "__main__":
asyncio.run(main())