from __future__ import annotations import contextlib import importlib.util import os import sys import tempfile import zlib from pathlib import Path from typing import Iterable, Iterator _SDK_PYTHON_DIR = Path(__file__).resolve().parents[1] _SDK_PYTHON_STR = str(_SDK_PYTHON_DIR) if _SDK_PYTHON_STR not in sys.path: sys.path.insert(0, _SDK_PYTHON_STR) from _runtime_setup import ensure_runtime_package_installed def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None: if importlib.util.find_spec("pydantic") is not None: return python = sys.executable raise RuntimeError( "Missing required dependency: pydantic.\n" f"Interpreter: {python}\n" "Install dependencies with the same interpreter used to run this example:\n" f" {python} -m pip install -e {sdk_python_dir}\n" "If you installed with `pip` from another Python, reinstall using the command above." ) def ensure_local_sdk_src() -> Path: """Add sdk/python/src to sys.path so examples run without installing the package.""" sdk_python_dir = _SDK_PYTHON_DIR src_dir = sdk_python_dir / "src" package_dir = src_dir / "codex_app_server" if not package_dir.exists(): raise RuntimeError(f"Could not locate local SDK package at {package_dir}") _ensure_runtime_dependencies(sdk_python_dir) src_str = str(src_dir) if src_str not in sys.path: sys.path.insert(0, src_str) return src_dir def runtime_config(): """Return an example-friendly AppServerConfig for repo-source SDK usage.""" from codex_app_server import AppServerConfig ensure_runtime_package_installed(sys.executable, _SDK_PYTHON_DIR) return AppServerConfig() def _png_chunk(chunk_type: bytes, data: bytes) -> bytes: import struct payload = chunk_type + data checksum = zlib.crc32(payload) & 0xFFFFFFFF return struct.pack(">I", len(data)) + payload + struct.pack(">I", checksum) def _generated_sample_png_bytes() -> bytes: import struct width = 96 height = 96 top_left = (120, 180, 255) top_right = (255, 220, 90) bottom_left = (90, 180, 95) bottom_right = (180, 85, 85) rows = bytearray() for y in range(height): rows.append(0) for x in range(width): if y < height // 2 and x < width // 2: color = top_left elif y < height // 2: color = top_right elif x < width // 2: color = bottom_left else: color = bottom_right rows.extend(color) header = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) return ( b"\x89PNG\r\n\x1a\n" + _png_chunk(b"IHDR", header) + _png_chunk(b"IDAT", zlib.compress(bytes(rows))) + _png_chunk(b"IEND", b"") ) @contextlib.contextmanager def temporary_sample_image_path() -> Iterator[Path]: with tempfile.TemporaryDirectory(prefix="codex-python-example-image-") as temp_root: image_path = Path(temp_root) / "generated_sample.png" image_path.write_bytes(_generated_sample_png_bytes()) yield image_path def server_label(metadata: object) -> str: server = getattr(metadata, "serverInfo", None) server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip() server_version = ((getattr(server, "version", None) or "") if server is not None else "").strip() if server_name and server_version: return f"{server_name} {server_version}" user_agent = ((getattr(metadata, "userAgent", None) or "") if metadata is not None else "").strip() return user_agent or "unknown" def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None: for turn in turns or []: if getattr(turn, "id", None) == turn_id: return turn return None def assistant_text_from_turn(turn: object | None) -> str: if turn is None: return "" chunks: list[str] = [] for item in getattr(turn, "items", []) or []: raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item if not isinstance(raw_item, dict): continue item_type = raw_item.get("type") if item_type == "agentMessage": text = raw_item.get("text") if isinstance(text, str) and text: chunks.append(text) continue if item_type != "message" or raw_item.get("role") != "assistant": continue for content in raw_item.get("content") or []: if not isinstance(content, dict) or content.get("type") != "output_text": continue text = content.get("text") if isinstance(text, str) and text: chunks.append(text) return "".join(chunks)