fix: concurrency hardening — idempotency cache reservation + agent token store (#452)#493
fix: concurrency hardening — idempotency cache reservation + agent token store (#452)#493hognek wants to merge 8 commits into
Conversation
|
Warning Review limit reached
More reviews will be available in 11 minutes and 31 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (6)
📝 WalkthroughWalkthroughThis PR extends the agent management system with request deduplication, persistent per-agent token storage with multi-worker concurrency control, automatic model archive promotion on worker join, and replaces the activity endpoint with a real-time event stream backed by a ring buffer and served via SSE alongside a live-updating HTML UI. ChangesAgent Token Persistence, Request Idempotency, Model Promotion, and Activity Streaming
Sequence Diagram(s)sequenceDiagram
participant C1 as Client A
participant C2 as Client B
participant Handler as Handler
participant Cache as Idempotency<br/>Cache
participant Store as Agent<br/>TokensStore
participant DB as SQLite
C1->>Cache: POST /api/agents<br/>Idempotency-Key: key1
Cache-->>C1: try_reserve → proceed
C2->>Cache: POST /api/agents<br/>Idempotency-Key: key1
Cache-->>C2: try_reserve → wait
C1->>Handler: proceed with creation
Handler->>Store: issue(agent_name)
Store->>DB: BEGIN IMMEDIATE
Store->>DB: INSERT token
DB-->>Store: success
Store->>Handler: token row
Handler->>Cache: set(key1, result)
Cache->>C2: event fires
C2->>Cache: get(key1)
Cache-->>C2: cached result
Handler-->>C1: created response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 14
🧹 Nitpick comments (2)
tests/test_agent_tokens_store.py (1)
68-129: ⚡ Quick winCover the issue→revoke interleaving too.
The concurrency cases here exercise duplicate writers and lock contention, but not the window after
issue()commits and before it re-reads the active row. A regression test for that interleaving would catch the false-500 path inissue().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_agent_tokens_store.py` around lines 68 - 129, Add a new concurrency test (e.g., test_issue_handles_revoke_interleaving) that reproduces the window between the store.issue() INSERT/commit and its subsequent re-read: use two connections (one via AgentTokensStore.issue and one raw aiosqlite connection) and coordinate them with BEGIN IMMEDIATE + a scheduled task (like delayed_issue) and sleeps so the raw connection performs a revoke/INSERT or DELETE that changes the active row immediately after the store's transaction commits but before the store re-reads; assert the final outcome is the expected AgentTokenExistsError or correct token state and close both connections; use the same helpers/patterns from test_begin_immediate_blocks_concurrent_writer (raw.execute, BEGIN IMMEDIATE, asyncio.create_task, await asyncio.sleep) and reference AgentTokensStore.issue and AgentTokenExistsError to locate where to force the interleaving.tests/test_agent_idempotency.py (1)
9-96: ⚡ Quick winAdd endpoint-level idempotency regressions.
This covers the cache primitive, but the bug is in the handlers' early-return paths. The suite still passes while a duplicate request hangs forever on 400/409/202 branches, so please add route-level tests that send the same
Idempotency-Keythrough those exits and assert the waiter completes.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_agent_idempotency.py` around lines 9 - 96, The idempotency primitive tests are fine but handlers' early-return branches (HTTP responses 400/409/202) cause duplicate requests to hang; add route-level tests that exercise those endpoint-level early exits to ensure waiter requests complete. Create async tests (e.g., alongside TestIdempotencyCache) that: 1) send an initial request to the real route with an Idempotency-Key that triggers an early-return branch (400/409/202), 2) concurrently send a second request with the same Idempotency-Key and assert it does not hang but receives a completed response (or the same cached result), and 3) assert the waiter path finishes (await the second request/response) and that subsequent cache.get/endpoint call returns the expected cached result; target the actual HTTP route handlers responsible for those branches so the regression is caught.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tinyagentos/agent_tokens_store.py`:
- Around line 80-99: The current issue() commits then re-reads the active token
by agent_name, allowing a concurrent revoke() to remove it before the read and
cause a RuntimeError; instead, capture the inserted row inside the same
transaction by selecting the row by its primary key (e.g., using SQLite's
last_insert_rowid() or the inserted ID returned) before committing, convert it
with _row_to_dict, then commit and return that captured row; update the logic in
issue() to perform the SELECT by primary key inside the transaction and avoid
re-querying "WHERE agent_name = ? AND revoked_at IS NULL".
In `@tinyagentos/cluster/manager.py`:
- Around line 96-106: The inline await of promote_compatible_models in
register_worker blocks the registration call; instead schedule promotion as a
background task so registration returns immediately. Replace the awaited call to
promote_compatible_models(worker_hardware=info.hardware, worker_name=info.name,
notifications=self._notifications) with an asynchronous background task (e.g.,
asyncio.create_task or your service's task runner) that invokes
promote_compatible_models with the same args, and ensure the background task
captures and logs exceptions so failures don't propagate to the registration
flow; keep the worker marked online before scheduling this task.
In `@tinyagentos/cluster/model_archive.py`:
- Around line 135-139: The current check in model_archive.py lets a model
requiring a specific arch pass when the worker's cpu.arch is missing; update the
logic where req_arch is read (variable req_arch) and worker_arch is obtained
from hw_cpu to treat a missing worker_arch as incompatible—i.e., change the
condition to return False when worker_arch is falsy or does not equal req_arch
(replace the existing if worker_arch and worker_arch != req_arch: return False
with an if not worker_arch or worker_arch != req_arch: return False).
- Around line 119-125: The gpu compatibility branch in model_archive.py (the
req_gpu_accel checks) misses handling for req_gpu_accel == "mlx", so models
requiring "mlx" are not validated; update the same block (the req_gpu_accel
checks) to include a condition that returns False when req_gpu_accel == "mlx"
and hw_gpu.get("mlx") is falsy (mirroring the existing "cuda"/"rocm"/"vulkan"
checks), ensuring mlx-required archives are only allowed on workers advertising
hw_gpu["mlx"].
- Around line 244-251: The emit_event() call inside promote_compatible_models()
can raise and abort the whole promotion loop; wrap the
notifications.emit_event(...) call in a try/except that catches Exception (do
not re-raise), log the error using the module/class logger (e.g., logger or
process_logger) with context including model_id and worker_name, and continue so
later compatible models are still promoted; ensure the except block does not
change control flow other than logging.
In `@tinyagentos/routes/activity.py`:
- Around line 355-357: The client-side events array (events) is unbounded: after
pushing new items with unshift() you should trim it to MAX_VISIBLE to prevent
memory growth and slow rendering; update the code in the handler that uses
events.unshift(...) (and the similar block around lines 495-500) to increment
eventCount as you do, then if events.length > MAX_VISIBLE call events.length =
MAX_VISIBLE (or splice) to drop the oldest entries so only the most recent
MAX_VISIBLE remain; keep using the existing MAX_VISIBLE, eventCount, and events
identifiers so behavior and counts stay consistent.
- Around line 507-516: The current setConnected(live) function conflates
activity-timer with actual SSE socket health: change it so the 30s timeout only
marks activity as "Idle/Stale" rather than "Disconnected" and track socket
health separately (e.g., add a socketLive boolean updated from
EventSource.onopen/onerror/onclose or a new function setSocketConnected). Update
UI using both states (for example, statusText = socketLive ? (activityAlive ?
"Connected" : "Idle") : "Disconnected — retrying…") and rename setConnected to
setActivityAlive or split into setActivityAlive and setSocketConnected, keep
references to statusDot, statusText, and staleTimer when implementing this.
- Around line 75-80: subscribe() currently registers the new queue before
capturing the replay snapshot, causing races where an event published between
snapshot capture and registration is delivered twice; fix by capturing the
current snapshot and registering the queue under the same lock atomically:
inside async def subscribe(), create q, then async with self._lock: copy the
snapshot (e.g. list(self._snapshot) or call self.snapshot()), add q to
self._subscribers, release the lock, and only after releasing the lock enqueue
the copied snapshot events into q; reference symbols: subscribe, self._lock,
self._subscribers, and the snapshot storage/method (e.g. self._snapshot or
snapshot()) so the snapshot+registration happen in one critical section.
- Around line 194-205: After parsing JSON in the activity route, ensure the
decoded body is a mapping before calling body.get; replace the unchecked use of
body.get in the handler that currently does "body = await request.json()" and
then "event_type = body.get(...)" with an explicit type check (e.g., if not
isinstance(body, dict): return JSONResponse({"error": "invalid JSON body:
expected object"}, status_code=400)), then proceed to retrieve event_type and
validate against VALID_EVENT_TYPES; update references in this route to use the
validated mapping to avoid raising an exception when the JSON is an array or
scalar.
- Around line 96-106: get_buffer currently returns the module-level _buffer
before checking the incoming request's app state, which causes multiple app
instances in one process to share the first app's ActivityBuffer; change
get_buffer so it first inspects request.app.state.activity_buffer and returns or
initializes that buffer (ActivityBuffer) on the request's app, and only then
fall back to setting/using the module singleton _buffer if needed; update
references in publish_event (and any callers of get_buffer) to ensure they
operate on the request app's buffer rather than the stale module-level _buffer.
In `@tinyagentos/routes/agents.py`:
- Around line 165-168: The idempotency cache uses the raw Idempotency-Key header
so the same key can incorrectly collide across endpoints; change the key fed to
idempotency_cache.try_reserve to be scoped by operation (e.g., include
request.url.path and/or request.method and a payload fingerprint) rather than
the raw header alone. Locate the usage around
request.headers.get("Idempotency-Key") and idempotency_cache.try_reserve and
build a composite key (route+method+header and optionally a body hash) before
calling try_reserve; apply the same change to the other occurrence referenced
(lines 546-549) so each endpoint/payload is isolated.
- Around line 164-171: The idempotency reservation created by
idempotency_cache.try_reserve (using idempotency_key and event) must always be
released so waiting callers don't hang; wrap the handler logic that runs after
try_reserve in a try/finally (or ensure all early returns/exceptions) and in the
finally call the cache API to resolve the reservation (e.g., set the cached
result and notify the event or call the cache's release/resolve method) so
event.wait() unblocks; update the blocks around idempotency_cache.try_reserve /
event.wait() (and analogous spots at the other occurrences you noted) to always
clear or resolve the reserved key on every exit path.
- Around line 88-91: The current set() stores a fresh asyncio.Event into
self._entries without signaling it, so subsequent try_reserve() returns ("wait",
event) and waits forever; update the set() implementation that inserts
(asyncio.Event(), result) to immediately mark that event resolved by calling
event.set() (or store a pre-resolved event) so any waiting try_reserve()/get()
callers will not block; reference the set(), try_reserve(), get(), and
self._entries symbols when locating and changing the code.
In `@tinyagentos/routes/cluster.py`:
- Around line 661-712: The route decorator for promote_archived_models is using
router.get but performs side effects; change the decorator from
router.get("/api/cluster/promote-archived") to
router.post("/api/cluster/promote-archived") in the promote_archived_models
handler, update any callers/tests/CLI that invoke this endpoint to use POST
instead of GET, and regenerate/update any API docs or client code that reference
the old GET method so callers continue to work.
---
Nitpick comments:
In `@tests/test_agent_idempotency.py`:
- Around line 9-96: The idempotency primitive tests are fine but handlers'
early-return branches (HTTP responses 400/409/202) cause duplicate requests to
hang; add route-level tests that exercise those endpoint-level early exits to
ensure waiter requests complete. Create async tests (e.g., alongside
TestIdempotencyCache) that: 1) send an initial request to the real route with an
Idempotency-Key that triggers an early-return branch (400/409/202), 2)
concurrently send a second request with the same Idempotency-Key and assert it
does not hang but receives a completed response (or the same cached result), and
3) assert the waiter path finishes (await the second request/response) and that
subsequent cache.get/endpoint call returns the expected cached result; target
the actual HTTP route handlers responsible for those branches so the regression
is caught.
In `@tests/test_agent_tokens_store.py`:
- Around line 68-129: Add a new concurrency test (e.g.,
test_issue_handles_revoke_interleaving) that reproduces the window between the
store.issue() INSERT/commit and its subsequent re-read: use two connections (one
via AgentTokensStore.issue and one raw aiosqlite connection) and coordinate them
with BEGIN IMMEDIATE + a scheduled task (like delayed_issue) and sleeps so the
raw connection performs a revoke/INSERT or DELETE that changes the active row
immediately after the store's transaction commits but before the store re-reads;
assert the final outcome is the expected AgentTokenExistsError or correct token
state and close both connections; use the same helpers/patterns from
test_begin_immediate_blocks_concurrent_writer (raw.execute, BEGIN IMMEDIATE,
asyncio.create_task, await asyncio.sleep) and reference AgentTokensStore.issue
and AgentTokenExistsError to locate where to force the interleaving.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: cc4e9c7f-6537-46f7-8200-02e31114df4f
📒 Files selected for processing (11)
tests/test_agent_idempotency.pytests/test_agent_tokens_store.pytests/test_model_archive.pytinyagentos/agent_tokens_store.pytinyagentos/app.pytinyagentos/channel_hub/adapter_manager.pytinyagentos/cluster/manager.pytinyagentos/cluster/model_archive.pytinyagentos/routes/activity.pytinyagentos/routes/agents.pytinyagentos/routes/cluster.py
| await self._db.commit() | ||
| except aiosqlite.IntegrityError: | ||
| await self._db.execute("ROLLBACK") | ||
| raise AgentTokenExistsError(agent_name) from None | ||
| except Exception: | ||
| await self._db.execute("ROLLBACK") | ||
| raise | ||
|
|
||
| row = await ( | ||
| await self._db.execute( | ||
| "SELECT id, agent_name, token, created_at, revoked_at " | ||
| "FROM agent_tokens WHERE agent_name = ? AND revoked_at IS NULL", | ||
| (agent_name,), | ||
| ) | ||
| ).fetchone() | ||
|
|
||
| if row is None: | ||
| raise RuntimeError(f"Token for '{agent_name}' not found after issue") | ||
|
|
||
| return _row_to_dict(row) |
There was a problem hiding this comment.
Return the inserted row before another writer can revoke it.
issue() commits and then re-reads WHERE agent_name = ? AND revoked_at IS NULL. A concurrent revoke() can run between those awaits, so this method can raise RuntimeError even though the insert already succeeded. Capture the inserted row by primary key inside the issuing transaction instead of re-querying by "still active".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/agent_tokens_store.py` around lines 80 - 99, The current issue()
commits then re-reads the active token by agent_name, allowing a concurrent
revoke() to remove it before the read and cause a RuntimeError; instead, capture
the inserted row inside the same transaction by selecting the row by its primary
key (e.g., using SQLite's last_insert_rowid() or the inserted ID returned)
before committing, convert it with _row_to_dict, then commit and return that
captured row; update the logic in issue() to perform the SELECT by primary key
inside the transaction and avoid re-querying "WHERE agent_name = ? AND
revoked_at IS NULL".
| # Promote any archived models this worker can now run | ||
| try: | ||
| from tinyagentos.cluster.model_archive import ( | ||
| promote_compatible_models, | ||
| ) | ||
|
|
||
| await promote_compatible_models( | ||
| worker_hardware=info.hardware, | ||
| worker_name=info.name, | ||
| notifications=self._notifications, | ||
| ) |
There was a problem hiding this comment.
Decouple worker registration from archive promotion.
register_worker() now waits for the full scan/move pass before returning. Since promotion does blocking filesystem work and may copy large directories across volumes, a worker join can become a long-running request even though the worker is already marked online.
Consider scheduling promotion as a background task after registration succeeds instead of awaiting it inline.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/cluster/manager.py` around lines 96 - 106, The inline await of
promote_compatible_models in register_worker blocks the registration call;
instead schedule promotion as a background task so registration returns
immediately. Replace the awaited call to
promote_compatible_models(worker_hardware=info.hardware, worker_name=info.name,
notifications=self._notifications) with an asynchronous background task (e.g.,
asyncio.create_task or your service's task runner) that invokes
promote_compatible_models with the same args, and ensure the background task
captures and logs exceptions so failures don't propagate to the registration
flow; keep the worker marked online before scheduling this task.
| if req_gpu_accel: | ||
| if req_gpu_accel == "cuda" and not hw_gpu.get("cuda"): | ||
| return False | ||
| if req_gpu_accel == "rocm" and not hw_gpu.get("rocm"): | ||
| return False | ||
| if req_gpu_accel == "vulkan" and not hw_gpu.get("vulkan"): | ||
| return False |
There was a problem hiding this comment.
Honor gpu_accel="mlx" during compatibility checks.
This branch never validates the "mlx" value documented above, so an Apple-only archived model can be promoted on any worker that happens to satisfy the other requirements.
Suggested fix
if req_gpu_accel:
if req_gpu_accel == "cuda" and not hw_gpu.get("cuda"):
return False
if req_gpu_accel == "rocm" and not hw_gpu.get("rocm"):
return False
if req_gpu_accel == "vulkan" and not hw_gpu.get("vulkan"):
return False
+ if req_gpu_accel == "mlx" and worker_gpu_type != "apple":
+ return False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if req_gpu_accel: | |
| if req_gpu_accel == "cuda" and not hw_gpu.get("cuda"): | |
| return False | |
| if req_gpu_accel == "rocm" and not hw_gpu.get("rocm"): | |
| return False | |
| if req_gpu_accel == "vulkan" and not hw_gpu.get("vulkan"): | |
| return False | |
| if req_gpu_accel: | |
| if req_gpu_accel == "cuda" and not hw_gpu.get("cuda"): | |
| return False | |
| if req_gpu_accel == "rocm" and not hw_gpu.get("rocm"): | |
| return False | |
| if req_gpu_accel == "vulkan" and not hw_gpu.get("vulkan"): | |
| return False | |
| if req_gpu_accel == "mlx" and worker_gpu_type != "apple": | |
| return False |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/cluster/model_archive.py` around lines 119 - 125, The gpu
compatibility branch in model_archive.py (the req_gpu_accel checks) misses
handling for req_gpu_accel == "mlx", so models requiring "mlx" are not
validated; update the same block (the req_gpu_accel checks) to include a
condition that returns False when req_gpu_accel == "mlx" and hw_gpu.get("mlx")
is falsy (mirroring the existing "cuda"/"rocm"/"vulkan" checks), ensuring
mlx-required archives are only allowed on workers advertising hw_gpu["mlx"].
| req_arch = requirements.get("arch") | ||
| if req_arch: | ||
| worker_arch = hw_cpu.get("arch", "") | ||
| if worker_arch and worker_arch != req_arch: | ||
| return False |
There was a problem hiding this comment.
Treat unknown CPU architecture as incompatible when a manifest requires one.
With the current condition, a model requiring "arch": "x86_64" is still promotable when the worker reports no cpu.arch at all.
Suggested fix
req_arch = requirements.get("arch")
if req_arch:
worker_arch = hw_cpu.get("arch", "")
- if worker_arch and worker_arch != req_arch:
+ if not worker_arch or worker_arch != req_arch:
return False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| req_arch = requirements.get("arch") | |
| if req_arch: | |
| worker_arch = hw_cpu.get("arch", "") | |
| if worker_arch and worker_arch != req_arch: | |
| return False | |
| req_arch = requirements.get("arch") | |
| if req_arch: | |
| worker_arch = hw_cpu.get("arch", "") | |
| if not worker_arch or worker_arch != req_arch: | |
| return False |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/cluster/model_archive.py` around lines 135 - 139, The current
check in model_archive.py lets a model requiring a specific arch pass when the
worker's cpu.arch is missing; update the logic where req_arch is read (variable
req_arch) and worker_arch is obtained from hw_cpu to treat a missing worker_arch
as incompatible—i.e., change the condition to return False when worker_arch is
falsy or does not equal req_arch (replace the existing if worker_arch and
worker_arch != req_arch: return False with an if not worker_arch or worker_arch
!= req_arch: return False).
| if notifications: | ||
| await notifications.emit_event( | ||
| "model.promoted", | ||
| f"Archived model '{model_id}' promoted", | ||
| f"Worker '{worker_name}' can now run '{model_id}'. " | ||
| f"Moved from archive to active models.", | ||
| level="info", | ||
| ) |
There was a problem hiding this comment.
Keep notification failures from aborting the promotion loop.
A single emit_event() exception bubbles out of promote_compatible_models(), so later compatible models stay archived even though the caller treats promotion as best-effort.
Suggested fix
if promote_model(model):
promoted.append(model_id)
if notifications:
- await notifications.emit_event(
- "model.promoted",
- f"Archived model '{model_id}' promoted",
- f"Worker '{worker_name}' can now run '{model_id}'. "
- f"Moved from archive to active models.",
- level="info",
- )
+ try:
+ await notifications.emit_event(
+ "model.promoted",
+ f"Archived model '{model_id}' promoted",
+ f"Worker '{worker_name}' can now run '{model_id}'. "
+ f"Moved from archive to active models.",
+ level="info",
+ )
+ except Exception:
+ logger.exception(
+ "model_archive: notification emit failed for promoted model '%s'",
+ model_id,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if notifications: | |
| await notifications.emit_event( | |
| "model.promoted", | |
| f"Archived model '{model_id}' promoted", | |
| f"Worker '{worker_name}' can now run '{model_id}'. " | |
| f"Moved from archive to active models.", | |
| level="info", | |
| ) | |
| if notifications: | |
| try: | |
| await notifications.emit_event( | |
| "model.promoted", | |
| f"Archived model '{model_id}' promoted", | |
| f"Worker '{worker_name}' can now run '{model_id}'. " | |
| f"Moved from archive to active models.", | |
| level="info", | |
| ) | |
| except Exception: | |
| logger.exception( | |
| "model_archive: notification emit failed for promoted model '%s'", | |
| model_id, | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/cluster/model_archive.py` around lines 244 - 251, The
emit_event() call inside promote_compatible_models() can raise and abort the
whole promotion loop; wrap the notifications.emit_event(...) call in a
try/except that catches Exception (do not re-raise), log the error using the
module/class logger (e.g., logger or process_logger) with context including
model_id and worker_name, and continue so later compatible models are still
promoted; ensure the except block does not change control flow other than
logging.
| function setConnected(live) { | ||
| if (live) { | ||
| statusDot.className = "status-dot live"; | ||
| statusText.textContent = "Connected"; | ||
| if (staleTimer) { clearTimeout(staleTimer); staleTimer = null; } | ||
| staleTimer = setTimeout(() => setConnected(false), 30_000); | ||
| } else { | ||
| statusDot.className = "status-dot stale"; | ||
| statusText.textContent = "Disconnected — retrying\u2026"; | ||
| } |
There was a problem hiding this comment.
This status label is measuring activity, not connection health.
After the first event, Lines 511-512 always flip the UI to “Disconnected” if no more events arrive for 30 seconds. A quiet but healthy stream will therefore be marked disconnected even when the SSE socket is still fine. Either track socket state separately, or rename this state to idle/stale instead of disconnected.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/routes/activity.py` around lines 507 - 516, The current
setConnected(live) function conflates activity-timer with actual SSE socket
health: change it so the 30s timeout only marks activity as "Idle/Stale" rather
than "Disconnected" and track socket health separately (e.g., add a socketLive
boolean updated from EventSource.onopen/onerror/onclose or a new function
setSocketConnected). Update UI using both states (for example, statusText =
socketLive ? (activityAlive ? "Connected" : "Idle") : "Disconnected —
retrying…") and rename setConnected to setActivityAlive or split into
setActivityAlive and setSocketConnected, keep references to statusDot,
statusText, and staleTimer when implementing this.
| entry = self._entries.get(key) | ||
| if entry is None: | ||
| self._entries[key] = (asyncio.Event(), result) | ||
| return |
There was a problem hiding this comment.
Set cold cache entries to a resolved state.
set() creates a fresh asyncio.Event here but never signals it. A later try_reserve() on the same key will return ("wait", event) and block forever even though get() already has a cached result.
Proposed fix
entry = self._entries.get(key)
if entry is None:
- self._entries[key] = (asyncio.Event(), result)
+ event = asyncio.Event()
+ event.set()
+ self._entries[key] = (event, result)
return📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| entry = self._entries.get(key) | |
| if entry is None: | |
| self._entries[key] = (asyncio.Event(), result) | |
| return | |
| entry = self._entries.get(key) | |
| if entry is None: | |
| event = asyncio.Event() | |
| event.set() | |
| self._entries[key] = (event, result) | |
| return |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/routes/agents.py` around lines 88 - 91, The current set() stores
a fresh asyncio.Event into self._entries without signaling it, so subsequent
try_reserve() returns ("wait", event) and waits forever; update the set()
implementation that inserts (asyncio.Event(), result) to immediately mark that
event resolved by calling event.set() (or store a pre-resolved event) so any
waiting try_reserve()/get() callers will not block; reference the set(),
try_reserve(), get(), and self._entries symbols when locating and changing the
code.
| # --- Idempotency guard --- | ||
| idempotency_cache = getattr(request.app.state, "idempotency_cache", None) | ||
| idempotency_key = request.headers.get("Idempotency-Key") | ||
| if idempotency_key and idempotency_cache is not None: | ||
| mode, event = idempotency_cache.try_reserve(idempotency_key) | ||
| if mode == "wait": | ||
| await event.wait() | ||
| return idempotency_cache.get(idempotency_key) |
There was a problem hiding this comment.
Always release the reservation, even when the request exits early.
Both handlers reserve before validation, but only resolve the key on the happy-path 200 response. Any 400/404/409/202 return or exception leaves the event unset, so later callers with the same key wait forever.
Also applies to: 195-198, 545-552, 859-862
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/routes/agents.py` around lines 164 - 171, The idempotency
reservation created by idempotency_cache.try_reserve (using idempotency_key and
event) must always be released so waiting callers don't hang; wrap the handler
logic that runs after try_reserve in a try/finally (or ensure all early
returns/exceptions) and in the finally call the cache API to resolve the
reservation (e.g., set the cached result and notify the event or call the
cache's release/resolve method) so event.wait() unblocks; update the blocks
around idempotency_cache.try_reserve / event.wait() (and analogous spots at the
other occurrences you noted) to always clear or resolve the reserved key on
every exit path.
| idempotency_cache = getattr(request.app.state, "idempotency_cache", None) | ||
| idempotency_key = request.headers.get("Idempotency-Key") | ||
| if idempotency_key and idempotency_cache is not None: | ||
| mode, event = idempotency_cache.try_reserve(idempotency_key) |
There was a problem hiding this comment.
Scope idempotency by operation, not just the raw header.
app.state.idempotency_cache is shared across both endpoints, and the cache key is just the header value. Reusing the same Idempotency-Key on /api/agents and /api/agents/deploy — or on the same route with a different payload — can return the wrong cached result and skip the second action entirely.
Also applies to: 546-549
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/routes/agents.py` around lines 165 - 168, The idempotency cache
uses the raw Idempotency-Key header so the same key can incorrectly collide
across endpoints; change the key fed to idempotency_cache.try_reserve to be
scoped by operation (e.g., include request.url.path and/or request.method and a
payload fingerprint) rather than the raw header alone. Locate the usage around
request.headers.get("Idempotency-Key") and idempotency_cache.try_reserve and
build a composite key (route+method+header and optionally a body hash) before
calling try_reserve; apply the same change to the other occurrence referenced
(lines 546-549) so each endpoint/payload is isolated.
| @router.get("/api/cluster/promote-archived") | ||
| async def promote_archived_models(request: Request): | ||
| """Manual trigger: scan all online workers and promote any archived | ||
| models that are now compatible with cluster hardware. | ||
|
|
||
| Called by the user from the Cluster page or admin CLI. Safe to call | ||
| repeatedly — already-promoted models are skipped. | ||
| """ | ||
| cluster = request.app.state.cluster_manager | ||
| notifications = getattr(request.app.state, "notifications", None) | ||
|
|
||
| workers = cluster.get_workers() | ||
| online = [w for w in workers if w.status == "online"] | ||
|
|
||
| from tinyagentos.cluster.model_archive import ( | ||
| find_promotable, | ||
| promote_model, | ||
| ) | ||
|
|
||
| promoted_by_worker: dict[str, list[str]] = {} | ||
| total = 0 | ||
|
|
||
| for w in online: | ||
| promotable = find_promotable( | ||
| worker_hardware=w.hardware, | ||
| worker_name=w.name, | ||
| ) | ||
| for model in promotable: | ||
| model_id = model.get("model_id", "?") | ||
| if promote_model(model): | ||
| promoted_by_worker.setdefault(w.name, []).append(model_id) | ||
| total += 1 | ||
| if notifications: | ||
| try: | ||
| await notifications.emit_event( | ||
| "model.promoted", | ||
| f"Archived model '{model_id}' promoted", | ||
| f"Worker '{w.name}' can now run '{model_id}'. " | ||
| f"Moved from archive to active models.", | ||
| level="info", | ||
| ) | ||
| except Exception: | ||
| logger.exception( | ||
| "notification emit failed for model promotion %s", | ||
| model_id, | ||
| ) | ||
|
|
||
| return { | ||
| "promoted": total, | ||
| "by_worker": promoted_by_worker, | ||
| "workers_scanned": len(online), | ||
| } |
There was a problem hiding this comment.
Make the manual promotion trigger a POST, not a GET.
This handler performs side effects by moving files and emitting events, so exposing it as GET makes accidental promotion possible through prefetching or other safe-method assumptions.
Suggested fix
-@router.get("/api/cluster/promote-archived")
+@router.post("/api/cluster/promote-archived")
async def promote_archived_models(request: Request):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @router.get("/api/cluster/promote-archived") | |
| async def promote_archived_models(request: Request): | |
| """Manual trigger: scan all online workers and promote any archived | |
| models that are now compatible with cluster hardware. | |
| Called by the user from the Cluster page or admin CLI. Safe to call | |
| repeatedly — already-promoted models are skipped. | |
| """ | |
| cluster = request.app.state.cluster_manager | |
| notifications = getattr(request.app.state, "notifications", None) | |
| workers = cluster.get_workers() | |
| online = [w for w in workers if w.status == "online"] | |
| from tinyagentos.cluster.model_archive import ( | |
| find_promotable, | |
| promote_model, | |
| ) | |
| promoted_by_worker: dict[str, list[str]] = {} | |
| total = 0 | |
| for w in online: | |
| promotable = find_promotable( | |
| worker_hardware=w.hardware, | |
| worker_name=w.name, | |
| ) | |
| for model in promotable: | |
| model_id = model.get("model_id", "?") | |
| if promote_model(model): | |
| promoted_by_worker.setdefault(w.name, []).append(model_id) | |
| total += 1 | |
| if notifications: | |
| try: | |
| await notifications.emit_event( | |
| "model.promoted", | |
| f"Archived model '{model_id}' promoted", | |
| f"Worker '{w.name}' can now run '{model_id}'. " | |
| f"Moved from archive to active models.", | |
| level="info", | |
| ) | |
| except Exception: | |
| logger.exception( | |
| "notification emit failed for model promotion %s", | |
| model_id, | |
| ) | |
| return { | |
| "promoted": total, | |
| "by_worker": promoted_by_worker, | |
| "workers_scanned": len(online), | |
| } | |
| `@router.post`("/api/cluster/promote-archived") | |
| async def promote_archived_models(request: Request): | |
| """Manual trigger: scan all online workers and promote any archived | |
| models that are now compatible with cluster hardware. | |
| Called by the user from the Cluster page or admin CLI. Safe to call | |
| repeatedly — already-promoted models are skipped. | |
| """ | |
| cluster = request.app.state.cluster_manager | |
| notifications = getattr(request.app.state, "notifications", None) | |
| workers = cluster.get_workers() | |
| online = [w for w in workers if w.status == "online"] | |
| from tinyagentos.cluster.model_archive import ( | |
| find_promotable, | |
| promote_model, | |
| ) | |
| promoted_by_worker: dict[str, list[str]] = {} | |
| total = 0 | |
| for w in online: | |
| promotable = find_promotable( | |
| worker_hardware=w.hardware, | |
| worker_name=w.name, | |
| ) | |
| for model in promotable: | |
| model_id = model.get("model_id", "?") | |
| if promote_model(model): | |
| promoted_by_worker.setdefault(w.name, []).append(model_id) | |
| total += 1 | |
| if notifications: | |
| try: | |
| await notifications.emit_event( | |
| "model.promoted", | |
| f"Archived model '{model_id}' promoted", | |
| f"Worker '{w.name}' can now run '{model_id}'. " | |
| f"Moved from archive to active models.", | |
| level="info", | |
| ) | |
| except Exception: | |
| logger.exception( | |
| "notification emit failed for model promotion %s", | |
| model_id, | |
| ) | |
| return { | |
| "promoted": total, | |
| "by_worker": promoted_by_worker, | |
| "workers_scanned": len(online), | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tinyagentos/routes/cluster.py` around lines 661 - 712, The route decorator
for promote_archived_models is using router.get but performs side effects;
change the decorator from router.get("/api/cluster/promote-archived") to
router.post("/api/cluster/promote-archived") in the promote_archived_models
handler, update any callers/tests/CLI that invoke this endpoint to use POST
instead of GET, and regenerate/update any API docs or client code that reference
the old GET method so callers continue to work.
…ken store (jaylfc#452) - Add IdempotencyCache with try_reserve()/set() pattern using asyncio.Event sentinels to close the get-then-set race in add_agent and deploy_agent_endpoint - Add AgentTokensStore with BEGIN IMMEDIATE for multi-worker-safe token issuance and clean AgentTokenExistsError on unique constraint violations - Register idempotency_cache on app.state in both app factory paths - Resolve pre-existing merge conflict in adapter_manager.py
b8baedf to
ffb986c
Compare
The SSE event feed rewrite was unrelated to the concurrency/idempotency feature (jaylfc#452). Restored the original JSON hardware monitoring endpoint so the PR stays focused on one issue.
Summary
Closes #452
Summary by CodeRabbit
New Features
Idempotency-Keyheader; duplicate requests return cached responses.Tests