|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import contextlib
|
||
|
|
import importlib.util
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
import tempfile
|
||
|
|
import zlib
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Iterable, Iterator
|
||
|
|
|
||
|
|
_SDK_PYTHON_DIR = Path(__file__).resolve().parents[1]
|
||
|
|
_SDK_PYTHON_STR = str(_SDK_PYTHON_DIR)
|
||
|
|
if _SDK_PYTHON_STR not in sys.path:
|
||
|
|
sys.path.insert(0, _SDK_PYTHON_STR)
|
||
|
|
|
||
|
|
from _runtime_setup import ensure_runtime_package_installed
|
||
|
|
|
||
|
|
|
||
|
|
def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None:
|
||
|
|
if importlib.util.find_spec("pydantic") is not None:
|
||
|
|
return
|
||
|
|
|
||
|
|
python = sys.executable
|
||
|
|
raise RuntimeError(
|
||
|
|
"Missing required dependency: pydantic.\n"
|
||
|
|
f"Interpreter: {python}\n"
|
||
|
|
"Install dependencies with the same interpreter used to run this example:\n"
|
||
|
|
f" {python} -m pip install -e {sdk_python_dir}\n"
|
||
|
|
"If you installed with `pip` from another Python, reinstall using the command above."
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def ensure_local_sdk_src() -> Path:
|
||
|
|
"""Add sdk/python/src to sys.path so examples run without installing the package."""
|
||
|
|
sdk_python_dir = _SDK_PYTHON_DIR
|
||
|
|
src_dir = sdk_python_dir / "src"
|
||
|
|
package_dir = src_dir / "codex_app_server"
|
||
|
|
if not package_dir.exists():
|
||
|
|
raise RuntimeError(f"Could not locate local SDK package at {package_dir}")
|
||
|
|
|
||
|
|
_ensure_runtime_dependencies(sdk_python_dir)
|
||
|
|
|
||
|
|
src_str = str(src_dir)
|
||
|
|
if src_str not in sys.path:
|
||
|
|
sys.path.insert(0, src_str)
|
||
|
|
return src_dir
|
||
|
|
|
||
|
|
|
||
|
|
def runtime_config():
|
||
|
|
"""Return an example-friendly AppServerConfig for repo-source SDK usage."""
|
||
|
|
from codex_app_server import AppServerConfig
|
||
|
|
|
||
|
|
ensure_runtime_package_installed(sys.executable, _SDK_PYTHON_DIR)
|
||
|
|
return AppServerConfig()
|
||
|
|
|
||
|
|
|
||
|
|
def _png_chunk(chunk_type: bytes, data: bytes) -> bytes:
|
||
|
|
import struct
|
||
|
|
|
||
|
|
payload = chunk_type + data
|
||
|
|
checksum = zlib.crc32(payload) & 0xFFFFFFFF
|
||
|
|
return struct.pack(">I", len(data)) + payload + struct.pack(">I", checksum)
|
||
|
|
|
||
|
|
|
||
|
|
def _generated_sample_png_bytes() -> bytes:
|
||
|
|
import struct
|
||
|
|
|
||
|
|
width = 96
|
||
|
|
height = 96
|
||
|
|
top_left = (120, 180, 255)
|
||
|
|
top_right = (255, 220, 90)
|
||
|
|
bottom_left = (90, 180, 95)
|
||
|
|
bottom_right = (180, 85, 85)
|
||
|
|
|
||
|
|
rows = bytearray()
|
||
|
|
for y in range(height):
|
||
|
|
rows.append(0)
|
||
|
|
for x in range(width):
|
||
|
|
if y < height // 2 and x < width // 2:
|
||
|
|
color = top_left
|
||
|
|
elif y < height // 2:
|
||
|
|
color = top_right
|
||
|
|
elif x < width // 2:
|
||
|
|
color = bottom_left
|
||
|
|
else:
|
||
|
|
color = bottom_right
|
||
|
|
rows.extend(color)
|
||
|
|
|
||
|
|
header = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
|
||
|
|
return (
|
||
|
|
b"\x89PNG\r\n\x1a\n"
|
||
|
|
+ _png_chunk(b"IHDR", header)
|
||
|
|
+ _png_chunk(b"IDAT", zlib.compress(bytes(rows)))
|
||
|
|
+ _png_chunk(b"IEND", b"")
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@contextlib.contextmanager
|
||
|
|
def temporary_sample_image_path() -> Iterator[Path]:
|
||
|
|
with tempfile.TemporaryDirectory(prefix="codex-python-example-image-") as temp_root:
|
||
|
|
image_path = Path(temp_root) / "generated_sample.png"
|
||
|
|
image_path.write_bytes(_generated_sample_png_bytes())
|
||
|
|
yield image_path
|
||
|
|
|
||
|
|
|
||
|
|
def server_label(metadata: object) -> str:
|
||
|
|
server = getattr(metadata, "serverInfo", None)
|
||
|
|
server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip()
|
||
|
|
server_version = ((getattr(server, "version", None) or "") if server is not None else "").strip()
|
||
|
|
if server_name and server_version:
|
||
|
|
return f"{server_name} {server_version}"
|
||
|
|
|
||
|
|
user_agent = ((getattr(metadata, "userAgent", None) or "") if metadata is not None else "").strip()
|
||
|
|
return user_agent or "unknown"
|
||
|
|
|
||
|
|
|
||
|
|
def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None:
|
||
|
|
for turn in turns or []:
|
||
|
|
if getattr(turn, "id", None) == turn_id:
|
||
|
|
return turn
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def assistant_text_from_turn(turn: object | None) -> str:
|
||
|
|
if turn is None:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
chunks: list[str] = []
|
||
|
|
for item in getattr(turn, "items", []) or []:
|
||
|
|
raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item
|
||
|
|
if not isinstance(raw_item, dict):
|
||
|
|
continue
|
||
|
|
|
||
|
|
item_type = raw_item.get("type")
|
||
|
|
if item_type == "agentMessage":
|
||
|
|
text = raw_item.get("text")
|
||
|
|
if isinstance(text, str) and text:
|
||
|
|
chunks.append(text)
|
||
|
|
continue
|
||
|
|
|
||
|
|
if item_type != "message" or raw_item.get("role") != "assistant":
|
||
|
|
continue
|
||
|
|
|
||
|
|
for content in raw_item.get("content") or []:
|
||
|
|
if not isinstance(content, dict) or content.get("type") != "output_text":
|
||
|
|
continue
|
||
|
|
text = content.get("text")
|
||
|
|
if isinstance(text, str) and text:
|
||
|
|
chunks.append(text)
|
||
|
|
|
||
|
|
return "".join(chunks)
|