"""Thin async client for opencode's HTTP + SSE API. Talks to `opencode serve` (default 127.0.0.1:4096). Auth is off unless OPENCODE_SERVER_PASSWORD is set, matching upstream defaults. The single SSE endpoint is `GET /event`; per-session streams don't exist (sst/opencode#7451), so callers filter by sessionID client-side. """ from __future__ import annotations import base64 import os from collections.abc import AsyncIterator from dataclasses import dataclass from typing import Any import httpx from httpx_sse import aconnect_sse @dataclass(frozen=True) class Event: type: str properties: dict[str, Any] raw: dict[str, Any] def _auth_header() -> dict[str, str]: pw = os.environ.get("OPENCODE_SERVER_PASSWORD", "") if not pw: return {} user = os.environ.get("OPENCODE_SERVER_USERNAME", "opencode") token = base64.b64encode(f"{user}:{pw}".encode()).decode() return {"Authorization": f"Basic {token}"} class OpenCodeClient: def __init__( self, base_url: str | None = None, *, timeout: float = 30.0, ) -> None: self.base_url = ( base_url or os.environ.get("OPENCODE_URL") or "http://127.0.0.1:4096" ).rstrip("/") # SSE needs no read timeout; REST calls cap at `timeout`. self._sse_timeout = httpx.Timeout(timeout, read=None) self._rest_timeout = httpx.Timeout(timeout) self._headers = _auth_header() async def list_sessions( self, *, scope: str = "project", limit: int = 50 ) -> list[dict[str, Any]]: async with httpx.AsyncClient( base_url=self.base_url, headers=self._headers, timeout=self._rest_timeout, ) as c: r = await c.get("/session", params={"scope": scope, "limit": limit}) r.raise_for_status() return r.json() async def get_session_messages(self, session_id: str) -> list[dict[str, Any]]: async with httpx.AsyncClient( base_url=self.base_url, headers=self._headers, timeout=self._rest_timeout, ) as c: r = await c.get(f"/session/{session_id}/message") r.raise_for_status() return r.json() async def stream_events(self) -> AsyncIterator[Event]: """Yield events from /event. Caller handles reconnect.""" async with httpx.AsyncClient( base_url=self.base_url, headers=self._headers, timeout=self._sse_timeout, ) as c: async with aconnect_sse(c, "GET", "/event") as src: async for sse in src.aiter_sse(): if not sse.data: continue payload = sse.json() yield Event( type=payload.get("type", ""), properties=payload.get("properties", {}) or {}, raw=payload, )