feat(adapters): opencode HTTP/SSE adapter (#588 phase 1)#591
Conversation
… vocabulary (#588) Drive opencode's headless server (POST /session, /session/:id/prompt_async, GET /event SSE) and map its events (message.part.delta field=text/reasoning, tool parts, session.idle, session.next.model.switched) onto the taOS reply kinds delta/final/tool_call/tool_result/reasoning/error. Matches the ACPAdapter contract: constructed with (config, sink); the sink receives reply dicts {kind, trace_id, content/error} (the body shape bridge_session.record_reply consumes). Pure map_opencode_event codec + session.idle ends the turn; the native model-switch event is recorded for reverse reconcile. 26 unit + integration tests.
📝 WalkthroughWalkthroughThis PR introduces a new ChangesOpencode Adapter
Possibly related issues
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| return [] | ||
| # Only act on parts that belong to our session (best-effort — part | ||
| # objects don't always carry sessionID; we accept if no filter hits). | ||
| part_session = part.get("sessionID") or props.get("sessionID") |
There was a problem hiding this comment.
WARNING: Inconsistent session filtering
The _our() helper returns True when our_session is None (accepting all events), but later checks like if our_session and part_session and part_session != our_session: behave differently. This could lead to processing events from other sessions before our session is established.
Consider making the filtering consistent by using _our() helper throughout or adjusting the logic.
| return [] | ||
|
|
||
| call_id = part.get("callID") or part.get("id", "") | ||
| tool_name = part.get("tool") or part.get("name", "") |
There was a problem hiding this comment.
WARNING: Potential missing tool identification
Using part.get("tool") or part.get("name", "") could result in an empty string if both fields are missing. This would create a tool_call/tool_result with an empty tool name, which might cause issues downstream.
Consider adding a validation check or providing a default tool name like "unknown".
| try: | ||
| # Ensure we have a session before opening the SSE stream. | ||
| await self.ensure_session() | ||
| state["session_id"] = self.session_id |
There was a problem hiding this comment.
SUGGESTION: Redundant assignment
The line state["session_id"] = self.session_id is redundant because state was initialized with {"session_id": self.session_id, ...} just a few lines above (line 284-290).
This assignment can be safely removed.
Code Review SummaryStatus: 3 Issues Found | Recommendation: Address before merge Overview
Issue Details (click to expand)WARNING
The Consider making the filtering consistent by using Using Consider adding a validation check or providing a default tool name like "unknown". | SUGGESTION
The line This assignment can be safely removed. | Files Reviewed (2 files)
Reviewed by nemotron-3-super-120b-a12b-20230311:free · 379,471 tokens |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tinyagentos/adapters/opencode_adapter.py`:
- Around line 195-205: Concurrent calls to ensure_session() and prompt() can
race on self.session_id and _turn_state causing mixed streams; protect the
adapter with an asyncio.Lock (e.g., add a self._prompt_lock created in __init__)
and acquire it at the start of prompt() (and any other entrypoints that mutate
session/turn state like ensure_session()) to serialize turns, or alternatively
detect and raise on concurrent prompts. Also, inside prompt() (and
ensure_session() where a session id is returned) capture the returned session id
into a local variable and use that local for the remainder of the turn instead
of repeatedly reading self.session_id; ensure _turn_state is only mutated while
holding the lock and reset in the finally block while still under lock.
- Around line 304-313: In prompt(), after entering the async with
client.stream(...) as stream: block, call stream.raise_for_status() and handle
the HTTPError so you don't start prompt_async for non-2xx responses;
specifically, invoke stream.raise_for_status() before launching prompt_async
(referencing client.stream and stream) and on exception either log/propagate the
error or set state["done"] so the existing session.idle-based completion logic
isn't relied on for HTTP failures; ensure prompt_async is only started when the
stream response is successful.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 473d8316-a160-4946-a71d-566e01677314
📒 Files selected for processing (2)
tests/test_opencode_adapter.pytinyagentos/adapters/opencode_adapter.py
| def __init__(self, config: OpenCodeConfig, sink) -> None: | ||
| self._cfg = config | ||
| # sink is called once per mapped taOS reply with a dict carrying at | ||
| # least ``kind`` and ``trace_id`` plus kind-specific fields, matching | ||
| # bridge_session.record_reply's body contract (mirrors ACPAdapter). | ||
| self._sink = sink | ||
| self.session_id: str | None = None | ||
| self._client: httpx.AsyncClient | None = None | ||
| self._stream_ctx = None # holds the active httpx stream context | ||
| # Per-turn mutable state, reset in finally block. | ||
| self._turn_state: dict = {} |
There was a problem hiding this comment.
Serialize turns and stop re-reading self.session_id.
ensure_session() and prompt() share mutable session state with no locking. On overlapping calls, both coroutines can create different server sessions, and Line 316 can POST to a newer self.session_id than the one copied into state["session_id"], so replies from separate turns can get mixed onto the same stream. Guard the adapter with an asyncio.Lock (or fail fast on concurrent prompts) and carry the returned session ID in a local variable for the whole turn.
Suggested fix
class OpenCodeAdapter:
def __init__(self, config: OpenCodeConfig, sink) -> None:
self._cfg = config
self._sink = sink
self.session_id: str | None = None
self._client: httpx.AsyncClient | None = None
self._stream_ctx = None # holds the active httpx stream context
+ self._session_lock = asyncio.Lock()
+ self._prompt_lock = asyncio.Lock()
self._turn_state: dict = {}
async def ensure_session(self) -> str:
- if self.session_id is not None:
- return self.session_id
-
- client = await self._get_client()
- resp = await client.post(
- f"{self._base}/session",
- json={"title": f"taOS-{self._cfg.agent or 'agent'}"},
- )
- resp.raise_for_status()
- data = resp.json()
- session_id = data.get("id") or (data.get("info") or {}).get("id") or ""
- if not session_id:
- raise ValueError(f"opencode /session returned no id: {data!r}")
- self.session_id = session_id
- logger.debug("opencode_adapter: created session %s", session_id)
- return session_id
+ if self.session_id is not None:
+ return self.session_id
+ async with self._session_lock:
+ if self.session_id is not None:
+ return self.session_id
+ client = await self._get_client()
+ resp = await client.post(
+ f"{self._base}/session",
+ json={"title": f"taOS-{self._cfg.agent or 'agent'}"},
+ )
+ resp.raise_for_status()
+ data = resp.json()
+ session_id = data.get("id") or (data.get("info") or {}).get("id") or ""
+ if not session_id:
+ raise ValueError(f"opencode /session returned no id: {data!r}")
+ self.session_id = session_id
+ logger.debug("opencode_adapter: created session %s", session_id)
+ return session_id
async def prompt(self, text: str, trace_id: str | None = None) -> None:
- try:
- await self.ensure_session()
- state["session_id"] = self.session_id
+ async with self._prompt_lock:
+ try:
+ session_id = await self.ensure_session()
+ state["session_id"] = session_id
client = await self._get_client()
async with client.stream(...) as stream:
prompt_resp = await client.post(
- f"{self._base}/session/{self.session_id}/prompt_async",
+ f"{self._base}/session/{session_id}/prompt_async",
...
)
- except Exception as exc:
- ...
- finally:
- self._turn_state = {}
+ except Exception as exc:
+ ...
+ finally:
+ self._turn_state = {}Also applies to: 229-250, 295-317
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/adapters/opencode_adapter.py` around lines 195 - 205, Concurrent
calls to ensure_session() and prompt() can race on self.session_id and
_turn_state causing mixed streams; protect the adapter with an asyncio.Lock
(e.g., add a self._prompt_lock created in __init__) and acquire it at the start
of prompt() (and any other entrypoints that mutate session/turn state like
ensure_session()) to serialize turns, or alternatively detect and raise on
concurrent prompts. Also, inside prompt() (and ensure_session() where a session
id is returned) capture the returned session id into a local variable and use
that local for the remainder of the turn instead of repeatedly reading
self.session_id; ensure _turn_state is only mutated while holding the lock and
reset in the finally block while still under lock.
| async with client.stream( | ||
| "GET", | ||
| f"{self._base}/event", | ||
| timeout=httpx.Timeout( | ||
| connect=self._cfg.connect_timeout, | ||
| read=self._cfg.read_timeout, | ||
| write=30.0, | ||
| pool=30.0, | ||
| ), | ||
| ) as stream: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the file and show the relevant region with line numbers
ls -la tinyagentos/adapters/opencode_adapter.py
sed -n '260,360p' tinyagentos/adapters/opencode_adapter.py | nl -ba | sed -n '1,130p'
# Search for "stream ended before session.idle" to see where it’s triggered
rg -n "stream ended before session\.idle" -S tinyagentos/adapters/opencode_adapter.py tinyagentos -g'*.py'
# Search for handling of prompt_async / prompt_resp around this code region
rg -n "prompt_async|prompt_resp|/event" tinyagentos/adapters/opencode_adapter.py
# Also check whether stream.raise_for_status() or raise_for_status is used elsewhere for similar patterns
rg -n "raise_for_status\(" tinyagentos/adapters/opencode_adapter.py tinyagentos -g'*.py'Repository: jaylfc/taOS
Length of output: 207
🏁 Script executed:
#!/bin/bash
set -euo pipefail
ls -la tinyagentos/adapters/opencode_adapter.py
sed -n '260,360p' tinyagentos/adapters/opencode_adapter.py | nl -ba | sed -n '1,130p'
rg -n "stream ended before session\.idle" -S tinyagentos/adapters/opencode_adapter.py tinyagentos -g'*.py' || true
rg -n "prompt_async|prompt_resp|/event" tinyagentos/adapters/opencode_adapter.py || true
rg -n "raise_for_status\(" tinyagentos/adapters/opencode_adapter.py tinyagentos -g'*.py' || trueRepository: jaylfc/taOS
Length of output: 207
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tinyagentos/adapters/opencode_adapter.py"
# Show the exact region around the referenced lines (304-313-ish) without using nl
# Use line numbers via awk for compatibility
awk 'NR>=260 && NR<=360 {printf "%d\t%s\n", NR, $0}' "$FILE" | sed -n '1,120p'
# Find where the "stream ended before session.idle" path is created
rg -n "stream ended before session\.idle" "$FILE" || true
# Check for /event and prompt_async/prompt_resp usage
rg -n "/event|prompt_async|prompt_resp" "$FILE" || true
# Confirm whether raise_for_status is already used anywhere on this file
rg -n "raise_for_status\(" "$FILE" || trueRepository: jaylfc/taOS
Length of output: 4905
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tinyagentos/adapters/opencode_adapter.py"
echo "---- context around resp.raise_for_status (near line 242) ----"
awk 'NR>=200 && NR<=270 {printf "%d\t%s\n", NR, $0}' "$FILE" | head -n 100
echo "---- context after the SSE loop (continue from ~355) ----"
awk 'NR>=330 && NR<=460 {printf "%d\t%s\n", NR, $0}' "$FILE" | head -n 160Repository: jaylfc/taOS
Length of output: 5213
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tinyagentos/adapters/opencode_adapter.py"
# Find where "done" is set / where "session.idle" is handled
rg -n "state\\.(get\\(\"done\"\\)|\\[\"done\"\\]|\\bdone\\b)" "$FILE"
rg -n "session\\.idle|idle" "$FILE"
# Find map_opencode_event implementation
rg -n "def map_opencode_event|map_opencode_event\\(" -S "$FILE"Repository: jaylfc/taOS
Length of output: 680
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tinyagentos/adapters/opencode_adapter.py"
# Show map_opencode_event + surrounding comments/docs
sed -n '1,230p' "$FILE" | nl -ba | head -n 260Repository: jaylfc/taOS
Length of output: 98
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="tinyagentos/adapters/opencode_adapter.py"
# Print the beginning of the file with line numbers for readability
awk 'NR>=1 && NR<=230 {printf "%d\t%s\n", NR, $0}' "$FILE"Repository: jaylfc/taOS
Length of output: 9835
Check the /event HTTP status before starting prompt_async.
prompt() enters async with client.stream(.../event) as stream: but never calls stream.raise_for_status(). If /event returns a non-2xx (e.g., 401/404), the code still fires prompt_async, then eventually emits the generic "opencode stream ended before session.idle" error because state["done"] is only set when a session.idle SSE arrives.
Suggested fix
async with client.stream(
"GET",
f"{self._base}/event",
timeout=httpx.Timeout(
connect=self._cfg.connect_timeout,
read=self._cfg.read_timeout,
write=30.0,
pool=30.0,
),
) as stream:
+ stream.raise_for_status()
# Now fire the async prompt.
prompt_resp = await client.post(
f"{self._base}/session/{self.session_id}/prompt_async",
json={
"model": {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async with client.stream( | |
| "GET", | |
| f"{self._base}/event", | |
| timeout=httpx.Timeout( | |
| connect=self._cfg.connect_timeout, | |
| read=self._cfg.read_timeout, | |
| write=30.0, | |
| pool=30.0, | |
| ), | |
| ) as stream: | |
| async with client.stream( | |
| "GET", | |
| f"{self._base}/event", | |
| timeout=httpx.Timeout( | |
| connect=self._cfg.connect_timeout, | |
| read=self._cfg.read_timeout, | |
| write=30.0, | |
| pool=30.0, | |
| ), | |
| ) as stream: | |
| stream.raise_for_status() | |
| # Now fire the async prompt. | |
| prompt_resp = await client.post( | |
| f"{self._base}/session/{self.session_id}/prompt_async", | |
| json={ | |
| "model": { |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/adapters/opencode_adapter.py` around lines 304 - 313, In
prompt(), after entering the async with client.stream(...) as stream: block,
call stream.raise_for_status() and handle the HTTPError so you don't start
prompt_async for non-2xx responses; specifically, invoke
stream.raise_for_status() before launching prompt_async (referencing
client.stream and stream) and on exception either log/propagate the error or set
state["done"] so the existing session.idle-based completion logic isn't relied
on for HTTP failures; ensure prompt_async is only started when the stream
response is successful.
First building block of the opencode taOS agent (#588). A host-side adapter that drives opencode's headless server and maps its event stream onto taOS's reply vocabulary.
What
tinyagentos/adapters/opencode_adapter.py:POST /session→POST /session/:id/prompt_async→ SSEGET /event. Optional HTTP Basic auth (OPENCODE_SERVER_PASSWORD).map_opencode_event(evt, state)keyed on the real event vocabulary (verified live on the Pi):message.part.delta{field:text|reasoning, delta} → delta/reasoning, tool parts → tool_call/tool_result,session.idle→ final,session.next.model.switched→ recorded for reverse-reconcile.(config, sink); the sink receives reply dicts{kind, trace_id, content/error}— the body shapebridge_session.record_replyconsumes.prompt()never raises (transport/server errors degrade to anerrorreply).Grounding
Built against opencode-ai
1.15.13running live on the Pi (drove a real turn through the LiteLLM proxy). Event shapes captured from the live/eventstream, not docs.Tests
tests/test_opencode_adapter.py— 26 unit + integration tests: the codec per event type (text/reasoning/tool/idle/model-switch/other-session/malformed), andprompt()over a mocked SSE stream asserting the sink reply-dict sequence (delta, delta, final), trace-id propagation, and error-on-transport-failure / stream-ends-early.Next (#588): the host opencode runtime + the taOS agent as an agent record with its own LiteLLM key, configurable in the Agents app.
Summary by CodeRabbit
New Features
Tests