diff --git a/.env.example b/.env.example index 2b69e2d..50d77e1 100644 --- a/.env.example +++ b/.env.example @@ -96,3 +96,89 @@ CLK_TELEGRAM_ENABLED=false CLK_TELEGRAM_SKIP=false # Default workspace the bot operates on when /workspace has not been set. CLK_TELEGRAM_WORKSPACE= + +# ---------------------------------------------------------------------- +# Robustness loops (see README "Robustness loops" section) +# ---------------------------------------------------------------------- +# Every layer below has an off-switch so you can dial cost. All values +# are forwarded into .clk/config/clk.config.json by kickoff.sh; you can +# also edit that file directly to override per project. + +# Auto-consensus fan-out: every "careful" dispatch fans into N parallel +# stochastic samples + chief coalescing, instead of a single dispatch. +# off — only PROPOSE_CONSENSUS triggers fan-out (legacy) +# on_careful — fan out stages marked careful=true (default) +# always — fan out every non-chief dispatch (×N cost) +# Cost: ~×consensus.max_samples per affected dispatch. +CLK_ROBUSTNESS_AUTO_CONSENSUS=on_careful + +# Critic-judge refinement: draft -> critic -> revise inner loop. +# off — only explicit refine: blocks in workflow YAML +# careful_only — careful=true stages get one critic pass (default) +# all — every non-chief, non-qa stage gets one critic pass +# Cost: +1 critic dispatch + up to refine_max_rounds-1 worker revisions +# per affected stage. +CLK_ROBUSTNESS_AUTO_REFINE=careful_only + +# After every dispatch the response is scored for emptiness, malformed +# ACTION/POST blocks, missing declared outputs, and low confidence; +# recoverable failures re-dispatch with a repair preamble, escalating +# to consensus on the final retry. Set to 0 to disable. +CLK_ROBUSTNESS_MAX_QUALITY_RETRIES=2 + +# Responses shorter than this many characters are treated as suspect-empty. +CLK_ROBUSTNESS_MIN_RESPONSE_CHARS=40 + +# Critic-judge inner loop bounds. +CLK_ROBUSTNESS_REFINE_MAX_ROUNDS=3 +CLK_ROBUSTNESS_REFINE_ACCEPT_THRESHOLD=0.8 + +# Inter-agent Q&A protocol: POST: question TO: URGENCY: blocking. +# qa_parallel_judges optionally dispatches multiple peers in parallel. +# max_qa_depth caps the chain length (peer A asks B asks C ...). +CLK_ROBUSTNESS_QA_PARALLEL_JUDGES=1 +CLK_ROBUSTNESS_MAX_QA_DEPTH=3 + +# Ralph / autoresearch plateau detection. After plateau_window +# consecutive iterations without improvement, the loop escalates + +# reframes; failing that, it terminates gracefully with done.md. +# off — never auto-terminate (legacy behavior) +# escalate_only — force consensus on next iterations only +# reframe_only — only ask chief to re-cast +# escalate_then_reframe — both, then terminate (default) +CLK_ROBUSTNESS_PLATEAU_WINDOW=3 +CLK_ROBUSTNESS_PLATEAU_ACTION=escalate_then_reframe + +# ---------------------------------------------------------------------- +# Prior-knob reference (already supported; documented here for parity) +# ---------------------------------------------------------------------- +# Provider invocation timeout, seconds. 0 = use harness default (300s). +CLK_PROVIDER_TIMEOUT_S=0 +# Per-provider-invocation no-output watchdog (kill if no stdout for N s). +CLK_PROVIDER_NO_OUTPUT_TIMEOUT_S=0 +# Provider-level retry policy (transient errors: rate limits, timeouts, ...). +CLK_PROVIDER_RETRY_MAX_RETRIES=10 +CLK_PROVIDER_RETRY_BACKOFF_S=5 +# Stage-level retry policy (whole stage repeats with a larger backoff). +CLK_PROVIDER_RETRY_STAGE_MAX_RETRIES=10 +CLK_PROVIDER_RETRY_STAGE_BACKOFF_S=30 +# Chief supervise loop: cap on cycles before warning and stopping. +CLK_SUPERVISE_MAX_CYCLES=20 +# Stochastic consensus caps (same scope as the auto-consensus knob). +CLK_CONSENSUS_MAX_SAMPLES=6 +CLK_CONSENSUS_MAX_PARALLEL=4 +# Dynamic-roles cap (chief cannot mint more than this many dynamic agents). +CLK_CASTING_MAX_DYNAMIC_ROLES=12 +# Auto-commit per agent-run after successful ACTION application. +CLK_AUTO_COMMIT=true +# File-action batch caps (per agent response). +CLK_VALIDATION_MAX_FILES_PER_BATCH=25 +CLK_VALIDATION_WARN_FILES_PER_BATCH=5 +# Meta-prompt drafting (chief tightens worker prompts before dispatch). +# off | careful_only (default) | always +CLK_META_PROMPT_DISPATCH=careful_only +CLK_META_PROMPT_ROLE=careful_only +# Per-stage checkpoint (chief CONTINUE/REDIRECT/ABORT after every stage). +CLK_REVIEW_PER_STAGE=false +# Recovery on unmet deps: chief recovery passes per stage. +CLK_RECOVERY_MAX_PER_STAGE=3 diff --git a/README.md b/README.md index 71ce09b..9e56a0d 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,21 @@ committed automatically. If you've used CLK before, the highlights of this release: +- **Robustness loops by default.** Every meaningful dispatch is now + scored after the provider returns; empty / malformed / contract- + violating / low-confidence responses are re-dispatched with a repair + preamble, escalating to a stochastic consensus fan-out on the final + retry. Stages marked `careful: true` fan into N parallel samples + proactively (configurable via `robustness.auto_consensus`). The + critic-judge inner loop (`refine:` stage attribute, or default-on + for careful stages) drives draft → critic → revise until the critic + signs off. Ralph and autoresearch detect plateau / regression and + escalate-then-reframe instead of burning the full iteration budget. + Agents can ask peers directed clarifying questions via + `POST: question TO: URGENCY: blocking` and the harness + routes the answer inline. Everything is gated by + `clk.config.json::robustness.*` (or `CLK_ROBUSTNESS_*` env vars) so + you can throttle cost — see **Robustness loops** below. - **The setup wizard explains itself.** `kickoff.sh --setup` is now a series of explain-then-ask blocks (provider, loop settings, tool detection, telegram, GitHub, git identity) — every question is @@ -325,6 +340,12 @@ container has a terminal. If no `.env` is present it will prompt for provider and settings before launching. Pass your idea as the first argument to skip the prompt and go straight to the engineering workflow. +> **`install_local.sh` is not needed inside Docker.** The `Dockerfile` runs +> `pip install -e .` at image-build time, so all Python dependencies are +> already present. Keep `CLK_RUN_INSTALL=false` (the default) — setting it to +> `true` in a Docker environment would redundantly re-create a `.clk/venv` +> that the container doesn't need. + All examples below assume the image is tagged `clk` locally — either build it from source or pull a prebuilt image and re-tag it (see the next two sections). @@ -391,7 +412,12 @@ what the value does before asking for it, modeled on the `codex`, `gemini`, `pi`, `ollama`, `openwebui`). One-liner per choice. 2. **Loop settings** — max iterations, project name, install flag, - TUI/no-TUI. + TUI/no-TUI. The **install flag** (`CLK_RUN_INSTALL`) controls + whether `scripts/install_local.sh` runs inside each kickoff + directory to create a local `.clk/venv`. **Leave it `false` + (the default) when running in Docker** — the image already has + all Python dependencies installed at build time, so the local + venv step is unnecessary. 3. **Auth mode** — only for CLI providers; `cli` reuses your local `claude login` / `codex login` / `gemini login`, `apikey` prompts for a key directly. @@ -938,6 +964,46 @@ Or per model: provider is eating the budget. Updated lazily from the same numbers the title bar shows. +### Robustness-loop multipliers + +The robustness loops (see **Robustness loops**) trade tokens for +quality. Use this table to pick a regime: + +| Knob | Worst-case multiplier per affected dispatch | Recommended starting point | +|----------------------------------------|--------------------------------------------------------------------------|----------------------------| +| `robustness.auto_consensus` | `off` → ×1; `on_careful` → ×(N+1) on careful stages only; `always` → ×(N+1) on every dispatch (where N = `consensus.max_samples`, default 6) | `on_careful` (default) | +| `robustness.auto_refine` | `off` → ×1; `careful_only` → ×(1 + 1 worker revision + 1 critic) on careful stages; `all` → that on every stage | `careful_only` (default) | +| `robustness.max_quality_retries` | At most this many extra dispatches when a response fails the quality check; 0 disables | 2 (default) | +| `robustness.refine_max_rounds` | Cap on critic↔worker round-trips inside a refine loop | 3 (default) | +| `robustness.max_qa_depth` | Cap on inter-agent Q&A chain depth (each peer answer can ask one peer) | 3 (default) | +| `robustness.plateau_window` | How many no-improvement Ralph/autoresearch iterations before escalation | 3 (default) | +| `robustness.plateau_action` | `off` disables adaptive loop termination entirely | `escalate_then_reframe` | + +Cost-minimal regime (closest to legacy CLK behavior, no extra tokens): + +```jsonc +"robustness": { + "auto_consensus": "off", + "auto_refine": "off", + "max_quality_retries": 0, + "plateau_action": "off" +} +``` + +Cost-maximal "lean into the loop" regime (every dispatch fans out, +critic gates every careful stage, plateau detection on, Q&A protocol +fully open): + +```jsonc +"robustness": { + "auto_consensus": "always", + "auto_refine": "all", + "max_quality_retries": 3, + "refine_max_rounds": 4, + "plateau_action": "escalate_then_reframe" +} +``` + ## Pi extension A native [pi.dev](https://pi.dev) extension that brings the full CLK @@ -1300,6 +1366,14 @@ Type `/cast` in the TUI to force a re-cast at any time, or run `clk cast` from the CLI. To inspect or edit by hand: `clk roles list|add --name X --role "..."|remove --name X`. +Agents communicate via a **blackboard** at `.clk/blackboard/` — short +markdown POST blocks each agent emits at the end of its run, filtered +into peers' prompts based on each stage's `inputs:` selectors. +Directed clarifying questions are a special POST type +(`POST: question TO: URGENCY: blocking`) routed inline by the +harness — see **Robustness loops** for the protocol details and depth +caps. + ## Action protocol Agents drive real changes by emitting `ACTION:` blocks the harness @@ -1327,6 +1401,11 @@ emit `ACTION` blocks that fix the upstream failure, or `PROPOSE_ROLE` a specialist that can. Capped at 3 recovery passes per stage (configurable via `clk.config.json::recovery::max_per_stage`). +This section is about *dependency* failures. *Content* failures — +empty, malformed, or low-confidence agent output that nonetheless +returned `ok=True` — are handled by the response-quality re-dispatch +loop documented in **Robustness loops** above. + ## Workflows YAML workflows live in `.clk/config/workflows/`. The default @@ -1369,7 +1448,260 @@ or forced via `/loop`): a small experiment, and records the learning regardless of pass/fail. Both modes respect `max_iterations` and stop early when -`.clk/state/done.md` is created. +`.clk/state/done.md` is created. Both also auto-detect plateau and +regression and adapt — see **Robustness loops** below. + +## Robustness loops + +CLK leans into the loop: every dispatch is wrapped in self-correcting +behavior so the harness does not just accept the first thing a +sub-agent returns. This section is a single index of every loop the +harness runs — old and new — with the config knob that tunes each +one and the activity-log event you can grep for in `.clk/logs/`. + +All knobs live under `clk.config.json::robustness.*` (and the +parallel `CLK_ROBUSTNESS_*` env-var family — see `.env.example`). +Every layer has an off-switch so you can throttle cost. + +### 1. Provider retry (existing) + +Transient provider errors (rate limits, timeouts, "no endpoints +available", HTTP 429) are retried with exponential backoff before the +response surfaces at the workflow layer. + +- Code: `clk_harness/orchestration/agent.py::AgentRunner._should_retry_provider` +- Config: `clk.config.json::provider_retry.{max_retries, backoff_s}` +- Logged events: `provider_attempt`, `provider_retry` +- Kill switch: set `provider_retry.max_retries: 0` + +### 2. Stage retry (existing) + +When a workflow stage fails with a retryable provider error after the +inner provider-retry budget is exhausted, the workflow runner retries +the entire stage with a larger backoff before giving up on the stage. + +- Code: `workflow.py::WorkflowRunner._is_retryable_stage_error` +- Config: `clk.config.json::provider_retry.{stage_max_retries, stage_backoff_s}` +- Logged events: `workflow_stage_retry` +- Kill switch: set `provider_retry.stage_max_retries: 0` + +### 3. Supervise cycles (existing) + +The chief's `supervise` stage decides whether the user's prompt has +been fully addressed; if not, it emits a `PROPOSE_WORKFLOW` and the +whole workflow re-runs. See **Chief supervisor loop** for the full +description. + +- Config: `clk.config.json::supervise.max_cycles` (default 20) +- Kill switch: set `supervise.max_cycles: 1` + +### 4. Recovery on unmet deps (existing) + +When a stage's dependencies fail, the chief is dispatched in recovery +mode to re-cast, remediate, or accept the gap. See **Self-healing on +unmet deps**. This handles *dependency* failures; *content* failures +are handled by Layer 6 below. + +- Config: `clk.config.json::recovery.max_per_stage` (default 3) + +### 5. Review & checkpoint stages (existing) + +Stages marked `phase: review` automatically receive a chief-authored +review prompt containing the upstream stages' POST blocks, and the +chief emits a verdict (continue / redirect / abort). Stages marked +`careful: true` add a post-stage checkpoint and (when configured) +trigger meta-prompt drafting on dispatch. + +Example: + +```yaml +- id: design_spec + agent: architect + careful: true + outputs: [design_brief] + objective: Draft the API contract. +- id: review_design + agent: chief + phase: review + depends_on: [design_spec] +``` + +- Config: `clk.config.json::review.per_stage` (apply to *every* stage) +- Logged events: `workflow_checkpoint`, `consensus_coalesced` + +### 6. Auto-quality re-dispatch (new) + +After every dispatch, the response is scored against +`response_quality`: + +- empty / sub-threshold text +- malformed `ACTION:` or `POST:` blocks +- missing declared `outputs` (the stage's contract keys) +- self-reported low confidence (`CONFIDENCE: <0..1>` parsed from the + response) +- refusal patterns (treated as not-recoverable — surfaces to the + chief instead of retrying blindly) + +Recoverable failures are re-dispatched with a repair preamble that +quotes the specific reasons back to the worker, up to +`robustness.max_quality_retries`. On the final retry, when +`auto_consensus` is not `"off"`, the dispatch escalates to a +stochastic consensus fan-out rather than another single-shot retry. + +- Code: `orchestration/response_quality.py`, `agent.py::_dispatch_with_quality_loop` +- Config: `robustness.{max_quality_retries, min_response_chars}` +- Logged events: `agent_quality_retry`, `agent_quality_final` +- Kill switch: `robustness.max_quality_retries: 0` + +### 7. Stochastic consensus, opt-in + automatic (existing + new) + +Any agent can emit `PROPOSE_CONSENSUS` to fan a question into N +independent samples; the harness runs them in parallel, logs them, +and dispatches the chief to coalesce. New in this release: +`robustness.auto_consensus` makes the fan-out automatic. + +| `auto_consensus` | Behavior | +|--------------------------|--------------------------------------------------------------------------| +| `off` | Only `PROPOSE_CONSENSUS` triggers fan-out (legacy behavior). | +| `on_careful` *(default)* | Stages marked `careful: true` fan out automatically. | +| `always` | Every non-chief dispatch fans out (×N samples — most expensive setting). | + +Cost: a fan-out costs roughly N + 1 dispatches (N samples + 1 chief +coalescing). Caps at `consensus.max_samples` (default 6) and +`consensus.max_parallel` (default 4). + +- Logged events: `consensus_started`, `consensus_sample_dispatch`, + `consensus_samples_completed`, `consensus_coalesced` +- Kill switch: `robustness.auto_consensus: "off"` + +### 8. Inter-agent clarifying Q&A (new) + +Agents emit: + +``` +POST: question +TO: architect +URGENCY: blocking +BODY: +Are user IDs opaque strings or integers? +END_POST +``` + +With `URGENCY: blocking`, the harness dispatches the target peer +immediately to answer; the peer's `POST: answer` lists the question +id in its `CONSUMES`, and the asker sees the answer in the next +blackboard digest. `URGENCY: async` records the question for the +chief to schedule in a later cycle. + +Chain depth is capped at `robustness.max_qa_depth` (default 3) so a +question can't trigger an unbounded chain of clarifications. + +- Code: `agent.py::_route_blocking_questions`, `blackboard.py` +- Config: `robustness.{max_qa_depth, qa_parallel_judges}` +- Logged events: `qa_dispatch`, `qa_chain_capped`, `qa_chain_cycle`, + `qa_target_unknown` +- Kill switch: omit the `TO:` field in your `POST: question` blocks; + no protocol-level off-switch (Q&A is opt-in per post). + +### 9. Critic-judge refinement (new) + +Stages may declare a refinement loop that threads a critic between +worker rounds. The critic scores the worker's output 0..1; if below +the accept threshold, the worker is re-dispatched with the critic's +feedback until accept or `max_rounds` is reached. + +```yaml +- id: design_spec + agent: architect + refine: + critic: critic + max_rounds: 4 + accept_threshold: 0.8 + objective: Draft the spec. +``` + +When the stage has no explicit `refine:` block, `robustness.auto_refine` +decides whether one round runs anyway: + +| `auto_refine` | Behavior | +|----------------------------|---------------------------------------------------------| +| `off` | Only stages with `refine:` use the inner loop. | +| `careful_only` *(default)* | Stages marked `careful: true` get one critic pass. | +| `all` | Every non-chief, non-qa, non-critic stage gets one pass.| + +The critic's last two lines must be: + +``` +VERDICT: accept # or `revise` +SCORE: <0..1> +``` + +- Code: `workflow.py::WorkflowRunner._refine_loop` +- Config: `robustness.{auto_refine, refine_max_rounds, + refine_accept_threshold}` +- Logged events: `refine_critic_verdict` +- Kill switch: `robustness.auto_refine: "off"` AND remove any + `refine:` blocks from your workflow YAML. + +### 10. Adaptive Ralph & autoresearch (new) + +Both loops record every iteration's outcome to +`.clk/state/experiments.jsonl`. After `robustness.plateau_window` +consecutive iterations without measurable improvement, the loop: + +1. **Escalates** — the next iteration's dispatches carry + `careful=true` in their extra, which (via Layer 7) fans them into + stochastic consensus. +2. **Reframes** — the chief is dispatched with a "plateau dispatch" + prompt asking it to re-cast roles or re-author the workflow with a + qualitatively different approach (new metric, new experiment + family) rather than another marginal tweak. +3. **Terminates gracefully** — if escalation + reframe fail to break + the plateau across two more iterations, `done.md` is written with + reason "plateau" rather than burning the full iteration budget. + +Regression (last iteration failed after at least one earlier success +in the window) triggers an additional critic dispatch on the failing +diff before the next plan, so the next iteration starts from an +informed view of what broke. + +Autoresearch additionally gains an evaluator gate (previously only in +Ralph): if the analyst's writes break the build, the working tree is +reverted rather than committed. + +Both loops also short-circuit when a planner or surveyor returns +empty / unrecoverable output; rather than commit garbage, the +iteration is recorded with `improved=False`. + +- Code: `ralph_loop.py::RalphLoop`, `autoresearch_loop.py::AutoresearchLoop` +- Config: `robustness.{plateau_window, plateau_action}` + (`escalate_then_reframe` | `escalate_only` | `reframe_only` | `off`) +- Logged events: `ralph_plateau_detected`, `ralph_plateau_escalate`, + `ralph_plateau_terminated`, `ralph_regression_detected`, + `ralph_iteration_skipped_low_quality`, + `autoresearch_step_skipped_low_quality`, `autoresearch_revert` +- Kill switch: `robustness.plateau_action: "off"` + +### Putting it together + +A typical "careful" engineering stage now runs: + +1. Stage dispatched with `careful: true`. +2. `auto_consensus=on_careful` → N samples fan out in parallel. +3. Chief coalesces the samples. +4. `auto_refine=careful_only` → critic scores the coalesced output; + the worker is revised until critic accepts or `max_rounds`. +5. Stage validation runs. +6. Checkpoint (if enabled) — chief CONTINUE / REDIRECT / ABORT + verdict. +7. Outputs contract check; warn if any declared key was not posted. + +Tracing this in `.clk/logs/`: + +``` +grep -E '^(consensus_|refine_|workflow_checkpoint|agent_quality_)' \ + .clk/logs/activity.jsonl | jq . +``` ## Completion criteria diff --git a/clk_harness/config.py b/clk_harness/config.py index 3625391..74ba9ef 100644 --- a/clk_harness/config.py +++ b/clk_harness/config.py @@ -194,6 +194,28 @@ def save_json(path: Path, data: Dict[str, Any], *, backup: bool = True) -> None: "max_dynamic_roles": 12, "auto_cast_on_idea": True, }, + "robustness": { + # Sub-sub-agent fan-out on dispatch. + # off | on_careful (fan out only stages marked careful=true) | always. + "auto_consensus": "on_careful", + # Critic-judge inner refinement loop. + # off | careful_only (default) | all + "auto_refine": "careful_only", + # Cap on automatic re-dispatch attempts after a quality failure. + "max_quality_retries": 2, + # Below this, responses are treated as suspect and may be re-run. + "min_response_chars": 40, + # Critic-judge loop bounds. + "refine_max_rounds": 3, + "refine_accept_threshold": 0.8, + # Inter-agent Q&A bounds. + "qa_parallel_judges": 1, + "max_qa_depth": 3, + # Ralph / autoresearch plateau detection. + "plateau_window": 3, + # escalate_then_reframe | escalate_only | reframe_only | off + "plateau_action": "escalate_then_reframe", + }, } DEFAULT_PROVIDERS: Dict[str, Any] = { diff --git a/clk_harness/orchestration/agent.py b/clk_harness/orchestration/agent.py index 6122036..81a2e1e 100644 --- a/clk_harness/orchestration/agent.py +++ b/clk_harness/orchestration/agent.py @@ -27,6 +27,7 @@ from . import casting as _casting from . import actions as _actions from . import blackboard as _blackboard +from . import response_quality as _response_quality def _read_recent_casting_rejections(paths: Paths, *, limit: int = 8) -> str: @@ -195,6 +196,22 @@ def render_prompt(self, agent: AgentSpec, objective: str, extra: Optional[Dict[s log_exception("orchestration.agent.render_prompt", exc) return objective + # Phases whose dispatches must never re-trigger the auto-consensus or + # quality-retry layers. Otherwise consensus coalescing, checkpoint + # verdicts, recovery dispatches, and the critic-judge inner loop + # would all recurse into themselves. + _META_PHASES = frozenset({ + "consensus_sample", + "consensus", + "checkpoint", + "recovery", + "draft_dispatch_prompt", + "draft_role_prompt", + "qa_answer", + "refine_critic", + "refine_worker", + }) + def run( self, agent_name: str, @@ -203,6 +220,253 @@ def run( extra: Optional[Dict[str, Any]] = None, dry_run: Optional[bool] = None, ) -> AgentRun: + """Public dispatch entry point. + + Wraps :meth:`_dispatch_once` with two robustness layers: + + * **Proactive auto-consensus** (`robustness.auto_consensus`) — + stages marked ``careful: true`` (or all stages, when set to + ``"always"``) fan into N stochastic samples and a chief + coalescing pass instead of a single dispatch. + * **Quality-driven re-dispatch** — after a normal dispatch, the + response is scored against ``response_quality``; recoverable + failures (empty, malformed, contract-missing, low-confidence) + trigger a re-run with a repair preamble, escalating to + consensus on the final retry. + + Both layers are gated by ``clk.config.json::robustness`` and + bypassed for dispatches whose ``extra.phase`` indicates a + meta-path (consensus coalescing, recovery, checkpoint, etc.) so + the harness never loops on itself. + """ + extra_dict: Dict[str, Any] = dict(extra or {}) + phase = str(extra_dict.get("phase") or "") + in_meta = phase in self._META_PHASES + is_dry = self.clk_cfg.get("dry_run", False) if dry_run is None else dry_run + + if not in_meta and not is_dry and self._should_auto_consensus(agent_name, extra_dict): + return self._dispatch_auto_consensus( + agent_name, + objective, + extra=extra_dict, + dry_run=dry_run, + reason="auto_consensus_proactive", + ) + + if in_meta or is_dry: + return self._dispatch_once(agent_name, objective, extra=extra_dict, dry_run=dry_run) + + return self._dispatch_with_quality_loop( + agent_name, objective, extra=extra_dict, dry_run=dry_run + ) + + def _dispatch_with_quality_loop( + self, + agent_name: str, + objective: str, + *, + extra: Dict[str, Any], + dry_run: Optional[bool], + ) -> AgentRun: + """Quality-validated dispatch wrapper. + + Runs :meth:`_dispatch_once`, scores the response, and re-runs + the worker with a repair preamble when the verdict is + recoverable. Escalates to ``_dispatch_auto_consensus`` on the + final retry when ``auto_consensus`` is not ``"off"``. + """ + cfg = self.clk_cfg.get("robustness") or {} + max_retries = int(cfg.get("max_quality_retries") or 0) + min_chars = int(cfg.get("min_response_chars") or 40) + auto_consensus_mode = str(cfg.get("auto_consensus") or "off").lower() + expected_outputs = list(extra.get("stage_outputs") or []) + + attempt = 0 + current_objective = objective + last_run: Optional[AgentRun] = None + while True: + attempt += 1 + attempt_extra = dict(extra) + attempt_extra["quality_attempt"] = attempt + run = self._dispatch_once( + agent_name, current_objective, extra=attempt_extra, dry_run=dry_run + ) + last_run = run + if not run.response.ok: + return run + try: + q = _response_quality.score( + run.response.text, + min_chars=min_chars, + expected_outputs=expected_outputs, + ) + except Exception as exc: + log_exception("orchestration.agent._dispatch_with_quality_loop.score", exc) + return run + if q.ok or not q.recoverable or attempt > max_retries: + if not q.ok: + log_event( + self.paths, + "agent_quality_final", + agent=agent_name, + attempt=attempt, + ok=q.ok, + recoverable=q.recoverable, + flags=list(q.flags), + reasons=list(q.reasons), + score=q.score, + confidence=q.confidence, + needs_review=q.needs_review, + ) + return run + log_event( + self.paths, + "agent_quality_retry", + agent=agent_name, + attempt=attempt, + next_attempt=attempt + 1, + max_attempts=max_retries + 1, + flags=list(q.flags), + reasons=list(q.reasons), + score=q.score, + confidence=q.confidence, + needs_review=q.needs_review, + ) + self._observer_log( + f"quality :: {agent_name} :: retry {attempt}/{max_retries} " + f"flags={','.join(q.flags) or '?'} score={q.score:.2f}" + ) + # On the final retry, optionally escalate to a consensus + # fan-out rather than another single-shot retry — that way + # we get sub-sub-agents on actually-shaky outputs even when + # the stage isn't marked careful. + if attempt == max_retries and auto_consensus_mode != "off": + return self._dispatch_auto_consensus( + agent_name, + objective, + extra=extra, + dry_run=dry_run, + reason=f"quality_escalation:{','.join(q.flags)}", + ) + current_objective = q.repair_hint() + "\n\nOriginal objective:\n" + objective + return last_run # unreachable + + def _should_auto_consensus(self, agent_name: str, extra: Dict[str, Any]) -> bool: + """Proactive auto-consensus trigger check.""" + cfg = self.clk_cfg.get("robustness") or {} + mode = str(cfg.get("auto_consensus") or "off").lower() + if mode in ("", "off", "false", "0"): + return False + # Never fan-out the chief on its own meta-paths. + if agent_name == "chief": + return False + if mode == "always": + return True + # on_careful: only when the stage explicitly opted in. + if mode == "on_careful": + return bool(extra.get("careful")) + return False + + def _dispatch_auto_consensus( + self, + agent_name: str, + objective: str, + *, + extra: Dict[str, Any], + dry_run: Optional[bool], + reason: str = "auto_consensus", + ) -> AgentRun: + """Fan-out a single dispatch into N stochastic samples + coalesce. + + Reuses :meth:`_run_consensus_sample` (same code path as + ``PROPOSE_CONSENSUS``) so the sampling, logging, and parallelism + behavior is identical. The chief is invoked to coalesce. + """ + cfg = self.clk_cfg.get("consensus") or {} + sample_count = max(1, min(int(cfg.get("max_samples") or 3), 6)) + max_parallel = max(1, int(cfg.get("max_parallel") or 4)) + name = f"auto_{agent_name}_{datetime.now().strftime('%H%M%S%f')}" + log_event( + self.paths, + "consensus_started", + agent=agent_name, + name=name, + objective=objective, + agents=[agent_name] * sample_count, + samples=sample_count, + max_parallel=max_parallel, + trigger=reason, + ) + self._observer_log( + f"consensus :: auto/{agent_name} :: starting {sample_count} samples " + f"(reason={reason})" + ) + results: List[Dict[str, Any]] = [] + with ThreadPoolExecutor(max_workers=min(max_parallel, sample_count)) as pool: + futs = { + pool.submit(self._run_consensus_sample, name, idx + 1, agent_name, objective): idx + 1 + for idx in range(sample_count) + } + for fut in as_completed(futs): + idx = futs[fut] + try: + results.append(fut.result()) + except Exception as exc: + log_exception("orchestration.agent._dispatch_auto_consensus.sample", exc) + results.append({ + "sample": idx, "agent": agent_name, "ok": False, + "error": str(exc), "text": "", + }) + results.sort(key=lambda r: int(r.get("sample") or 0)) + log_event( + self.paths, + "consensus_samples_completed", + agent=agent_name, + name=name, + results=results, + trigger=reason, + ) + coalesce_objective = self._consensus_coalesce_objective(name, objective, results) + coalesced = self._dispatch_once( + "chief", + coalesce_objective, + extra={ + "phase": "consensus", + "consensus_name": name, + "consensus_trigger": reason, + "stage_id": extra.get("stage_id"), + "workflow": extra.get("workflow"), + }, + dry_run=dry_run, + ) + log_event( + self.paths, + "consensus_coalesced", + agent="chief", + name=name, + ok=coalesced.response.ok, + response_text=coalesced.response.text or "", + error=coalesced.response.error, + trigger=reason, + ) + # Re-label so downstream logging shows the auto path, not "chief". + coalesced.agent = agent_name + return coalesced + + def _dispatch_once( + self, + agent_name: str, + objective: str, + *, + extra: Optional[Dict[str, Any]] = None, + dry_run: Optional[bool] = None, + ) -> AgentRun: + """Single provider dispatch with provider-level retry only. + + This was the body of :meth:`run` before the robustness layers + wrapped it. Keep it self-contained so consensus / refine / + recovery paths can call it without re-entering the wrappers. + """ agent = self.get_agent(agent_name) provider = self.get_provider(agent.provider) prompt = self.render_prompt(agent, objective, extra) @@ -789,6 +1053,13 @@ def _apply_posts(self, run: AgentRun, extra: Dict[str, Any]) -> None: Stage_id / workflow are taken from ``extra`` so a post made during a workflow stage records its provenance and the workflow runner can verify the stage's declared outputs against it. + + When a question post carries a ``target_agent`` and + ``urgency=blocking``, the harness dispatches the named agent to + answer the question synchronously before this run returns. + That makes the asker's worker effectively block on the answer, + which gets posted back to the blackboard with ``post_type: + answer`` and ``consumes: []``. """ text = (run.response.text or "") if not text or "POST:" not in text: @@ -807,6 +1078,109 @@ def _apply_posts(self, run: AgentRun, extra: Dict[str, Any]) -> None: for p in posted: if p.id and p.id not in run.posts: run.posts.append(p.id) + # Route blocking questions: dispatch the target agent inline so + # the asker effectively sees the answer in subsequent rounds. + try: + self._route_blocking_questions(run, posted, extra) + except Exception as exc: + log_exception("orchestration.agent._apply_posts.route_qa", exc) + + def _route_blocking_questions( + self, + run: AgentRun, + posted: List["_blackboard.Post"], + extra: Dict[str, Any], + ) -> None: + """Dispatch peer agents to answer ``POST: question`` blocks. + + Skipped entirely when there are no question posts targeted at a + peer, when we're already inside a Q&A chain that has exhausted + its depth budget, or when the dispatcher is itself in a meta- + phase (consensus, recovery, etc.). + """ + questions = [ + p for p in posted + if (p.post_type or "").lower() == "question" + and (p.target_agent or "").strip() + and (p.urgency or "blocking").lower() == "blocking" + ] + if not questions: + return + if str(extra.get("phase") or "") in self._META_PHASES: + return + cfg = self.clk_cfg.get("robustness") or {} + max_depth = int(cfg.get("max_qa_depth") or 3) + chain: List[str] = list(extra.get("qa_chain") or []) + if len(chain) >= max_depth: + log_event( + self.paths, + "qa_chain_capped", + agent=run.agent, + depth=len(chain), + max_depth=max_depth, + chain=list(chain), + ) + return + agents_known = set((self.agents_cfg.get("agents") or {}).keys()) + for q in questions: + target = q.target_agent.strip() + if not target or target not in agents_known: + log_event( + self.paths, + "qa_target_unknown", + agent=run.agent, + target=target, + question_id=q.id, + ) + continue + if target == run.agent: + # Self-questions don't need routing. + continue + if target in chain: + log_event( + self.paths, + "qa_chain_cycle", + agent=run.agent, + target=target, + chain=list(chain), + ) + continue + next_chain = chain + [run.agent] + answer_objective = ( + f"Peer question routed by the harness.\n\n" + f"Asker: `{run.agent}` (stage `{extra.get('stage_id') or '?'}`)\n" + f"Question id: `{q.id}`\n\n" + f"Question:\n{q.body}\n\n" + "Answer this directly. Emit a POST: answer block whose\n" + f"CONSUMES list contains `{q.id}`. Keep the body focused " + "on what the asker needs to make progress — do not start " + "a new sub-thread of questions of your own." + ) + log_event( + self.paths, + "qa_dispatch", + agent=run.agent, + target=target, + question_id=q.id, + chain=next_chain, + urgency=q.urgency or "blocking", + ) + self._observer_log( + f"qa :: {run.agent} → {target} :: {q.id[:32]}" + ) + self._dispatch_once( + target, + answer_objective, + extra={ + "phase": "qa_answer", + "qa_chain": next_chain, + "qa_question_id": q.id, + "qa_asker": run.agent, + "stage_id": extra.get("stage_id"), + "workflow": extra.get("workflow"), + }, + dry_run=self.clk_cfg.get("dry_run", False), + ) def _apply_actions(self, run: AgentRun) -> None: """Execute ACTION blocks; merge harness-written files back into the run.""" diff --git a/clk_harness/orchestration/autoresearch_loop.py b/clk_harness/orchestration/autoresearch_loop.py index 20a8efe..b8af2fd 100644 --- a/clk_harness/orchestration/autoresearch_loop.py +++ b/clk_harness/orchestration/autoresearch_loop.py @@ -20,11 +20,13 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import Any, Dict, List, Optional from ..config import Paths -from ..git_ops import add_all, commit as git_commit, has_changes +from ..git_ops import add_all, commit as git_commit, has_changes, head_sha, revert_to +from ..utils.activity_log import log_event from ..utils.logging_utils import log, log_exception +from . import response_quality as _response_quality from .agent import AgentRunner from .evaluator import Evaluator @@ -63,14 +65,42 @@ def run(self, *, dry_run: bool = False) -> List[Experiment]: break return out + def _robustness_cfg(self) -> Dict[str, Any]: + return dict(self.runner.clk_cfg.get("robustness") or {}) + def _step(self, idx: int, *, dry_run: bool) -> Experiment: started = datetime.now().isoformat(timespec="seconds") + before = head_sha(self.paths.root) + min_chars = int(self._robustness_cfg().get("min_response_chars") or 40) survey = self.runner.run( "ralph", f"Autoresearch step #{idx}: survey state and propose next experiment.", extra={"iteration": idx, "loop": "autoresearch"}, dry_run=dry_run, ) + survey_quality = _response_quality.score(survey.response.text, min_chars=min_chars) + if not survey_quality.ok and not survey_quality.recoverable: + log_event( + self.paths, + "autoresearch_step_skipped_low_quality", + agent="ralph", + iteration=idx, + survey_quality=survey_quality.summary(), + flags=list(survey_quality.flags), + ) + log( + f"autoresearch #{idx}: skipping — survey returned no usable text", + level="WARN", + ) + finished = datetime.now().isoformat(timespec="seconds") + return Experiment( + index=idx, + started_at=started, + finished_at=finished, + question="(survey produced no question; step skipped)", + finding="", + committed=False, + ) question_lines = (survey.response.text or "").strip().splitlines() question = next( (l for l in question_lines if l.strip().startswith(("Q:", "Question:", "Hypothesis:"))), @@ -92,17 +122,38 @@ def _step(self, idx: int, *, dry_run: bool) -> Experiment: finding_preview = (analyst.response.text or "")[:400] committed = False + # Evaluator gate: if the analyst's writes broke the build, + # revert to the pre-step HEAD rather than committing a broken + # state. Same protocol Ralph already uses. if not dry_run and has_changes(self.paths.root) and analyst.response.ok: - if add_all(self.paths.root): - committed = git_commit( - self.paths.root, + eval_result = self.evaluator.run() + if eval_result.ok: + if add_all(self.paths.root): + committed = git_commit( + self.paths.root, + agent="autoresearch", + objective=question, + files_changed=analyst.files_written, + validation=f"critic ok={critic.response.ok}; " + f"eval={eval_result.summary()[:200]}", + next_step="select next question", + body_extra=finding_preview, + ) + elif before: + log_event( + self.paths, + "autoresearch_revert", agent="autoresearch", - objective=question, - files_changed=analyst.files_written, - validation=f"critic ok={critic.response.ok}", - next_step="select next question", - body_extra=finding_preview, + iteration=idx, + eval_summary=eval_result.summary()[:400], + sha_before=before, + ) + log( + f"autoresearch #{idx}: evaluator failed; reverting to " + f"{before[:8]}", + level="WARN", ) + revert_to(self.paths.root, before) finished = datetime.now().isoformat(timespec="seconds") return Experiment( index=idx, diff --git a/clk_harness/orchestration/blackboard.py b/clk_harness/orchestration/blackboard.py index 4a8b078..18e316c 100644 --- a/clk_harness/orchestration/blackboard.py +++ b/clk_harness/orchestration/blackboard.py @@ -87,6 +87,13 @@ class Post: stage_id: str = "" workflow: str = "" ts: str = "" + # Inter-agent Q&A routing. When ``target_agent`` is set on a + # ``post_type: "question"`` post, the harness dispatches the named + # agent to answer it before the asker's run returns (when + # ``urgency == "blocking"``) or surfaces it to the chief on the + # next supervise cycle (when ``urgency == "async"``). + target_agent: str = "" + urgency: str = "" def to_dict(self) -> Dict[str, Any]: return { @@ -99,6 +106,8 @@ def to_dict(self) -> Dict[str, Any]: "produces": list(self.produces), "body": self.body, "ts": self.ts, + "target_agent": self.target_agent, + "urgency": self.urgency, } @classmethod @@ -113,6 +122,8 @@ def from_dict(cls, raw: Dict[str, Any], *, fallback_id: str = "") -> "Post": stage_id=str(raw.get("stage_id") or ""), workflow=str(raw.get("workflow") or ""), ts=str(raw.get("ts") or ""), + target_agent=str(raw.get("target_agent") or ""), + urgency=str(raw.get("urgency") or ""), ) @@ -147,6 +158,8 @@ def post( stage_id: str = "", workflow: str = "", slug_hint: str = "", + target_agent: str = "", + urgency: str = "", ) -> Post: """Persist a new post and return it. Always succeeds (best-effort logging). @@ -166,6 +179,8 @@ def post( stage_id=stage_id or "", workflow=workflow or "", ts=datetime.now().isoformat(timespec="seconds"), + target_agent=target_agent or "", + urgency=urgency or "", ) target = bb / f"{pid}.json" try: @@ -335,6 +350,31 @@ def find_outputs_satisfied( return missing +def find_unanswered_questions( + paths: Paths, + *, + target_agent: Optional[str] = None, +) -> List[Post]: + """Return question posts that have no matching answer. + + A ``post_type="question"`` post is treated as answered when some + later ``post_type="answer"`` post lists the question's id in its + ``consumes``. When ``target_agent`` is given, only questions + targeted at that agent are returned. + """ + posts = list_posts(paths) + questions = [p for p in posts if p.post_type == "question"] + if target_agent: + questions = [p for p in questions if (p.target_agent or "") == target_agent] + answered_ids: set = set() + for p in posts: + if p.post_type != "answer": + continue + for qid in (p.consumes or []): + answered_ids.add(str(qid)) + return [q for q in questions if q.id not in answered_ids] + + def digest( paths: Paths, *, @@ -380,7 +420,7 @@ def digest( _POST_HEAD_RE = re.compile(r"^\s*POST\s*:\s*(?P[A-Za-z][A-Za-z0-9_\-]*)\s*$", re.MULTILINE) _POST_END_RE = re.compile(r"^\s*END_POST\s*$", re.IGNORECASE) _POST_FIELD_RE = re.compile( - r"^(PRODUCES|CONSUMES|TITLE|SLUG)\s*:\s*(.*)$", re.IGNORECASE + r"^(PRODUCES|CONSUMES|TITLE|SLUG|TO|URGENCY)\s*:\s*(.*)$", re.IGNORECASE ) _POST_BODY_RE = re.compile(r"^\s*BODY\s*:\s*$", re.IGNORECASE) @@ -415,6 +455,8 @@ def parse_post_blocks(text: str) -> List[Dict[str, Any]]: "produces": [], "consumes": [], "body": "", + "target_agent": "", + "urgency": "", } i += 1 body_lines: List[str] = [] @@ -441,6 +483,14 @@ def parse_post_blocks(text: str) -> List[Dict[str, Any]]: block["consumes"] = [ x.strip() for x in re.split(r"[,\s]+", val) if x.strip() ] + elif key == "TO": + block["target_agent"] = re.sub(r"[^A-Za-z0-9_\-]", "", val) + elif key == "URGENCY": + u = val.strip().lower() + if u in {"blocking", "block", "sync"}: + block["urgency"] = "blocking" + elif u in {"async", "background", "deferred"}: + block["urgency"] = "async" i += 1 continue if _POST_BODY_RE.match(line): @@ -481,6 +531,8 @@ def apply_post_blocks( stage_id=stage_id, workflow=workflow, slug_hint=slug_hint, + target_agent=b.get("target_agent") or "", + urgency=b.get("urgency") or "", ) out.append(p) except Exception as exc: diff --git a/clk_harness/orchestration/ralph_loop.py b/clk_harness/orchestration/ralph_loop.py index 39230c6..68465c3 100644 --- a/clk_harness/orchestration/ralph_loop.py +++ b/clk_harness/orchestration/ralph_loop.py @@ -7,6 +7,14 @@ 4. validation runs 5. if validation passes, the iteration is committed 6. otherwise the working tree is reset to the pre-iteration HEAD + +When ``robustness.plateau_window`` consecutive iterations stop showing +improvement, the loop escalates (forces consensus fan-out on the next +plan/engineer/qa via ``careful=true`` in extra), then reframes (asks the +chief to re-author the workflow), and finally terminates gracefully +rather than burning the full iteration budget. Same protocol applies +to regression — a failing iteration that follows a passing run +triggers a critic dispatch before the next plan. """ from __future__ import annotations @@ -17,7 +25,7 @@ from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import Any, Dict, List, Optional from ..config import Paths from ..git_ops import ( @@ -27,7 +35,9 @@ head_sha, revert_to, ) +from ..utils.activity_log import log_event from ..utils.logging_utils import log, log_exception +from . import response_quality as _response_quality from .agent import AgentRunner from .evaluator import Evaluator, EvalResult @@ -61,35 +71,239 @@ def __init__( def run(self, *, dry_run: bool = False) -> List[IterationOutcome]: outcomes: List[IterationOutcome] = [] + plateau_streak = 0 for i in range(1, self.max_iterations + 1): - outcome = self._iterate(i, dry_run=dry_run) + adaptive_extra = self._adaptive_extra(outcomes) + outcome = self._iterate(i, dry_run=dry_run, adaptive_extra=adaptive_extra) outcomes.append(outcome) self._record(outcome) if self._is_done(): log("ralph: completion criteria met; stopping") break + # Plateau / regression detection runs only when we have + # enough history; on the first few iterations we let the + # loop warm up. + verdict = self._progress_verdict(outcomes) + if verdict == "plateau": + plateau_streak += 1 + self._handle_plateau(i, plateau_streak, dry_run=dry_run) + if self._should_terminate_for_plateau(plateau_streak): + self._write_plateau_done(i, plateau_streak) + break + elif verdict == "regressing": + plateau_streak = 0 + self._handle_regression(i, dry_run=dry_run) + else: + plateau_streak = 0 return outcomes - def _iterate(self, idx: int, *, dry_run: bool) -> IterationOutcome: + # -- adaptive helpers ------------------------------------------------ + + def _robustness_cfg(self) -> Dict[str, Any]: + return dict(self.runner.clk_cfg.get("robustness") or {}) + + def _plateau_window(self) -> int: + try: + return max(2, int(self._robustness_cfg().get("plateau_window") or 3)) + except (TypeError, ValueError): + return 3 + + def _plateau_action(self) -> str: + action = str(self._robustness_cfg().get("plateau_action") or "escalate_then_reframe").lower() + return action + + def _progress_verdict(self, outcomes: List[IterationOutcome]) -> str: + """Return one of ``"improving" | "plateau" | "regressing"``.""" + window = self._plateau_window() + if len(outcomes) < window: + return "improving" + recent = outcomes[-window:] + improvements = sum(1 for o in recent if o.improved) + if improvements == 0: + return "plateau" + # Regression: last iteration failed after at least one prior + # iteration in the window passed. + if not recent[-1].improved and any(o.improved for o in recent[:-1]): + return "regressing" + return "improving" + + def _adaptive_extra(self, outcomes: List[IterationOutcome]) -> Dict[str, Any]: + """Extra fields injected into the next iteration's dispatches. + + Right now this is just the ``careful`` flag: when we're stuck, + force consensus fan-out (the wrapper in + :class:`AgentRunner.run` treats ``careful=true`` as a trigger + for ``auto_consensus`` and ``auto_refine``). + """ + if not outcomes or self._progress_verdict(outcomes) == "improving": + return {} + return {"careful": True, "loop_adaptive": True} + + def _handle_plateau(self, idx: int, streak: int, *, dry_run: bool) -> None: + action = self._plateau_action() + log_event( + self.paths, + "ralph_plateau_detected", + agent="ralph", + iteration=idx, + streak=streak, + action=action, + ) + log( + f"ralph: plateau detected at iteration {idx} (streak={streak}); " + f"action={action}", + level="WARN", + ) + if action in ("escalate_only", "escalate_then_reframe"): + # Escalation is already wired via _adaptive_extra setting + # careful=true on the next iteration; just record the intent. + log_event(self.paths, "ralph_plateau_escalate", agent="ralph", iteration=idx) + if action in ("reframe_only", "escalate_then_reframe") and streak >= 2 and not dry_run: + try: + self.runner.run( + "chief", + ( + f"Plateau dispatch — Ralph has run {streak} consecutive " + "iterations without measurable improvement. Re-cast roles " + "or re-author the engineering workflow with a " + "PROPOSE_WORKFLOW so the next iterations attempt a " + "qualitatively different approach (new metric, new " + "experiment family). Avoid another marginal tweak." + ), + extra={ + "phase": "recovery", + "loop": "ralph", + "iteration": idx, + "plateau_streak": streak, + }, + dry_run=dry_run, + ) + except Exception as exc: + log_exception("orchestration.ralph_loop._handle_plateau.reframe", exc) + + def _handle_regression(self, idx: int, *, dry_run: bool) -> None: + log_event(self.paths, "ralph_regression_detected", agent="ralph", iteration=idx) + log(f"ralph: regression detected at iteration {idx}", level="WARN") + if dry_run: + return + # Ask the critic to look at what just broke before we plan the + # next iteration. The critic posts to the blackboard and + # subsequent ralph runs will see it via $blackboard_digest. + try: + self.runner.run( + "critic", + ( + f"Ralph regression check — the iteration before #{idx + 1} " + "regressed (it ran but did not improve the metric, after a " + "prior passing run). Read the latest commit and the " + "evaluator output, identify what broke, and post a brief " + "POST: critique block so the next ralph iteration avoids " + "the same trap." + ), + extra={"phase": "recovery", "loop": "ralph", "iteration": idx}, + dry_run=dry_run, + ) + except Exception as exc: + log_exception("orchestration.ralph_loop._handle_regression", exc) + + def _should_terminate_for_plateau(self, streak: int) -> bool: + action = self._plateau_action() + # Off → never terminate from plateau detection. + if action in ("off", "false", "0", ""): + return False + # After the escalate + reframe attempts plus two more chances to + # break out, give up gracefully. + return streak >= max(2, self._plateau_window()) + 2 + + def _write_plateau_done(self, idx: int, streak: int) -> None: + try: + self.paths.state.mkdir(parents=True, exist_ok=True) + (self.paths.state / "done.md").write_text( + f"# Ralph plateau termination\n\n" + f"Stopped at iteration {idx} after {streak} consecutive " + "iterations without measurable improvement.\n" + "Escalation + reframe attempts did not break the plateau, " + "so the loop terminated gracefully rather than burning the " + "remaining iteration budget.\n", + encoding="utf-8", + ) + log_event( + self.paths, + "ralph_plateau_terminated", + agent="ralph", + iteration=idx, + streak=streak, + ) + except Exception as exc: + log_exception("orchestration.ralph_loop._write_plateau_done", exc) + + def _iterate( + self, + idx: int, + *, + dry_run: bool, + adaptive_extra: Optional[Dict[str, Any]] = None, + ) -> IterationOutcome: started = datetime.now().isoformat(timespec="seconds") before = head_sha(self.paths.root) objective = f"Ralph iteration #{idx}: select and execute one measurable improvement." + base_extra: Dict[str, Any] = {"iteration": idx, "loop": "ralph"} + base_extra.update(adaptive_extra or {}) # 1. Plan with Ralph plan = self.runner.run( "ralph", objective, - extra={"iteration": idx, "loop": "ralph"}, + extra=base_extra, dry_run=dry_run, ) + # Quality guard: a Ralph that returned empty / malformed + # planner output can't drive a productive iteration, so we + # short-circuit rather than re-using whatever stray line happens + # to be at index 0. + plan_quality = _response_quality.score( + plan.response.text, + min_chars=int((self._robustness_cfg().get("min_response_chars") or 40)), + ) + eng_obj_lines = (plan.response.text or "").strip().splitlines() + if not plan_quality.ok and not plan_quality.recoverable: + eng_obj_text = "" + else: + eng_obj_text = eng_obj_lines[0] if eng_obj_lines else "" + if not eng_obj_text: + log_event( + self.paths, + "ralph_iteration_skipped_low_quality", + agent="ralph", + iteration=idx, + plan_quality=plan_quality.summary(), + flags=list(plan_quality.flags), + ) + log( + f"ralph #{idx}: skipping — planner returned no usable objective", + level="WARN", + ) + finished = datetime.now().isoformat(timespec="seconds") + return IterationOutcome( + index=idx, + started_at=started, + finished_at=finished, + objective="(planner produced no objective; iteration skipped)", + improved=False, + committed=False, + eval_summary="skipped: planner low quality", + sha_before=before, + sha_after=before, + ) + # 2. Engineer one slice - eng_obj = (plan.response.text or "").strip().splitlines()[:1] - eng_obj_text = eng_obj[0] if eng_obj else "Implement the next improvement" + engineer_extra = dict(base_extra) + engineer_extra["from"] = "ralph" engineer = self.runner.run( "engineer", eng_obj_text, - extra={"iteration": idx, "loop": "ralph", "from": "ralph"}, + extra=engineer_extra, dry_run=dry_run, ) @@ -97,7 +311,7 @@ def _iterate(self, idx: int, *, dry_run: bool) -> IterationOutcome: qa = self.runner.run( "qa", f"Audit changes from iteration #{idx} and validate.", - extra={"iteration": idx, "loop": "ralph"}, + extra=base_extra, dry_run=dry_run, ) diff --git a/clk_harness/orchestration/response_quality.py b/clk_harness/orchestration/response_quality.py new file mode 100644 index 0000000..c9fff5f --- /dev/null +++ b/clk_harness/orchestration/response_quality.py @@ -0,0 +1,308 @@ +"""Response-quality validator for agent outputs. + +Layer 1 of the robustness loops. Every dispatch in +:class:`~clk_harness.orchestration.agent.AgentRunner` is gated through +:func:`score` after the provider returns. When ``ResponseQuality.ok`` is +false the runner can re-dispatch with a repair preamble or escalate to +stochastic consensus, instead of accepting weak / malformed output. + +The validator is intentionally cheap: regex + string checks, no provider +calls, no I/O beyond what's already passed in. Specific things it +detects: + +* empty / sub-threshold text +* malformed ACTION blocks (header without END_ACTION, etc.) +* malformed POST blocks +* missing declared `outputs` POST keys (when ``expected_outputs`` is + passed in) +* self-reported low confidence (``CONFIDENCE: <0..1>``, + ``NEEDS_REVIEW: true``) +* refusal patterns ("I cannot", "I'm sorry, but ...") for ordinary tasks + +Each flag carries a short reason string so the re-dispatch preamble can +quote it back to the worker. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Iterable, List, Optional, Sequence + +from . import blackboard as _blackboard +from . import actions as _actions + + +# --------------------------------------------------------------------------- +# Public dataclass +# --------------------------------------------------------------------------- + + +@dataclass +class ResponseQuality: + """Verdict on a single agent response. + + ``ok`` is False whenever any flag in :attr:`flags` is set; the caller + decides whether to retry based on :attr:`recoverable`. ``score`` is a + rough 0..1 indicator usable by the critic-judge inner loop (Layer 3) + when no external critic is dispatched. + """ + + ok: bool = True + score: float = 1.0 + flags: List[str] = field(default_factory=list) + reasons: List[str] = field(default_factory=list) + # When False the caller should give up on this run rather than retry + # (e.g. an explicit refusal). When True, a repair preamble is worth + # spending tokens on. + recoverable: bool = True + # The CONFIDENCE / NEEDS_REVIEW values as parsed from the response, + # surfaced separately so callers can log them without re-parsing. + confidence: Optional[float] = None + needs_review: Optional[bool] = None + + def summary(self) -> str: + if self.ok: + return f"ok score={self.score:.2f}" + return f"flags={','.join(self.flags) or '?'} score={self.score:.2f}" + + def repair_hint(self) -> str: + """Compact, model-readable description of what went wrong. + + Used by :class:`AgentRunner` to prefix the re-dispatch objective + with the specific issues so the worker fixes them rather than + re-rolling at random. + """ + if self.ok or not self.reasons: + return "" + bullets = "\n".join(f"- {r}" for r in self.reasons) + return ( + "Your previous response was rejected by the harness for the " + "following reasons:\n" + f"{bullets}\n" + "Re-emit a complete response that fixes every item above." + ) + + +# --------------------------------------------------------------------------- +# Detectors +# --------------------------------------------------------------------------- + + +_CONFIDENCE_RE = re.compile( + r"^\s*CONFIDENCE\s*:\s*([0-9]*\.?[0-9]+)\s*$", + re.IGNORECASE | re.MULTILINE, +) +_NEEDS_REVIEW_RE = re.compile( + r"^\s*NEEDS_REVIEW\s*:\s*(true|yes|y|1|false|no|n|0)\s*$", + re.IGNORECASE | re.MULTILINE, +) +_REFUSAL_RES: Sequence[re.Pattern] = tuple( + re.compile(pat, re.IGNORECASE) + for pat in ( + r"\bi\s+cannot\b", + r"\bi\s+can'?t\b\s+(?:help|assist|do|comply)", + r"\bi\s+(?:am|'m)\s+(?:sorry|unable)\b.*\b(?:cannot|can'?t|won'?t)\b", + r"\bas\s+an\s+ai\s+(?:language\s+)?model\b", + r"\bI\s+do\s+not\s+have\s+the\s+ability\b", + ) +) +_HEADERLESS_ACTION_RE = re.compile(r"^\s*ACTION\s*:\s*([A-Za-z]+)", re.IGNORECASE | re.MULTILINE) +_END_ACTION_RE = re.compile(r"^\s*END_ACTION\s*$", re.IGNORECASE | re.MULTILINE) +_POST_HEAD_RE = re.compile(r"^\s*POST\s*:\s*([A-Za-z][A-Za-z0-9_]*)\s*$", re.IGNORECASE | re.MULTILINE) +_POST_END_RE = re.compile(r"^\s*END_POST\s*$", re.IGNORECASE | re.MULTILINE) + + +def _parse_confidence(text: str) -> Optional[float]: + m = _CONFIDENCE_RE.search(text or "") + if not m: + return None + try: + v = float(m.group(1)) + except ValueError: + return None + if v < 0: + v = 0.0 + if v > 1: + # tolerate 0..100 scale + v = min(1.0, v / 100.0) + return v + + +def _parse_needs_review(text: str) -> Optional[bool]: + m = _NEEDS_REVIEW_RE.search(text or "") + if not m: + return None + return m.group(1).lower() in {"true", "yes", "y", "1"} + + +def _detect_refusal(text: str) -> bool: + t = text or "" + for pat in _REFUSAL_RES: + if pat.search(t): + return True + return False + + +def _action_block_balance(text: str) -> int: + """Difference between unterminated ACTION headers and END_ACTION lines. + + Returns 0 when balanced (or no actions at all). Positive when there + are more ACTION headers than END_ACTION markers (malformed). + """ + heads = len(_HEADERLESS_ACTION_RE.findall(text or "")) + ends = len(_END_ACTION_RE.findall(text or "")) + if heads == 0: + return 0 + return heads - ends + + +def _post_block_balance(text: str) -> int: + heads = len(_POST_HEAD_RE.findall(text or "")) + ends = len(_POST_END_RE.findall(text or "")) + if heads == 0: + return 0 + return heads - ends + + +def _missing_outputs(text: str, expected: Sequence[str]) -> List[str]: + """Return the subset of ``expected`` keys not present in any POST + block's ``PRODUCES:`` list.""" + if not expected: + return [] + try: + blocks = _blackboard.parse_post_blocks(text or "") + except Exception: + blocks = [] + produced: set = set() + for b in blocks: + for key in (b.get("produces") or []): + produced.add(str(key)) + return [k for k in expected if k not in produced] + + +# --------------------------------------------------------------------------- +# Top-level scoring +# --------------------------------------------------------------------------- + + +def score( + text: Optional[str], + *, + min_chars: int = 40, + expected_outputs: Optional[Sequence[str]] = None, + require_confidence: bool = False, +) -> ResponseQuality: + """Score a single response text against the harness's quality rules. + + Parameters + ---------- + text: + The full response text emitted by the worker. + min_chars: + Threshold below which the response is treated as suspect-empty. + expected_outputs: + When set, every key must appear in some POST block's PRODUCES list + for the response to pass. + require_confidence: + When True, missing the ``CONFIDENCE:`` line itself counts as a flag. + Off by default so existing agents that have not been re-prompted + yet aren't penalised. + """ + + q = ResponseQuality() + body = (text or "").strip() + q.confidence = _parse_confidence(text or "") + q.needs_review = _parse_needs_review(text or "") + + # 1. Empty / near-empty + if not body or len(body) < max(1, int(min_chars)): + q.flags.append("empty") + q.reasons.append( + f"Response body was {len(body)} chars (minimum {min_chars}). " + "Re-emit a substantive response." + ) + + # 2. Refusal — not recoverable; harness should escalate to chief instead + if _detect_refusal(text or ""): + q.flags.append("refusal") + q.reasons.append( + "Response looked like a refusal. The task is in-scope for this " + "harness; respond directly or, if blocked, post a POST: question " + "to the chief explaining the obstacle." + ) + q.recoverable = False + + # 3. Malformed ACTION blocks + act_balance = _action_block_balance(text or "") + if act_balance > 0: + q.flags.append("malformed_action") + q.reasons.append( + f"{act_balance} ACTION header(s) had no matching END_ACTION. " + "Every ACTION block must terminate with a line `END_ACTION`." + ) + + # 4. Malformed POST blocks + post_balance = _post_block_balance(text or "") + if post_balance > 0: + q.flags.append("malformed_post") + q.reasons.append( + f"{post_balance} POST header(s) had no matching END_POST. " + "Every POST block must terminate with a line `END_POST`." + ) + + # 5. Missing declared outputs + missing = _missing_outputs(text or "", list(expected_outputs or [])) + if missing: + q.flags.append("outputs_missing") + q.reasons.append( + "Declared output contract keys not satisfied: " + f"{', '.join(missing)}. Each key must appear in some " + "POST block's PRODUCES: list." + ) + + # 6. Self-reported low confidence + if q.confidence is not None and q.confidence < 0.5: + q.flags.append("low_confidence") + q.reasons.append( + f"You reported CONFIDENCE: {q.confidence:.2f}. Either " + "improve the response or escalate via POST: question." + ) + if q.needs_review is True: + q.flags.append("needs_review_self") + q.reasons.append( + "You set NEEDS_REVIEW: true. Sharpen the answer or call out " + "the specific uncertainty so a peer can resolve it." + ) + + if require_confidence and q.confidence is None: + q.flags.append("confidence_missing") + q.reasons.append( + "Response did not include a CONFIDENCE: <0..1> line. Emit " + "one final line stating your confidence so the harness can " + "decide whether to re-sample." + ) + + # Final aggregation + q.ok = not q.flags + # Rough score: 1.0 minus fixed deductions per flag, floored at 0. + deductions = { + "empty": 0.6, + "refusal": 0.5, + "malformed_action": 0.4, + "malformed_post": 0.3, + "outputs_missing": 0.4, + "low_confidence": 0.3, + "needs_review_self": 0.2, + "confidence_missing": 0.1, + } + s = 1.0 + for f in q.flags: + s -= deductions.get(f, 0.2) + q.score = max(0.0, round(s, 3)) + return q + + +def is_recoverable(q: ResponseQuality) -> bool: + """Convenience: is the quality verdict worth retrying?""" + return (not q.ok) and q.recoverable diff --git a/clk_harness/orchestration/workflow.py b/clk_harness/orchestration/workflow.py index 2200dc0..1e50e3b 100644 --- a/clk_harness/orchestration/workflow.py +++ b/clk_harness/orchestration/workflow.py @@ -274,6 +274,13 @@ class WorkflowStage: # chief checkpoint after the stage completes (CONTINUE / REDIRECT / # ABORT) AND uses meta-prompt drafting on dispatch when configured. careful: bool = False + # Critic-judge inner refinement loop. When present, after the + # worker's first response the harness dispatches the named critic + # agent to score the response 0..1; if below + # ``accept_threshold`` the worker is re-dispatched with the critic's + # feedback, up to ``max_rounds`` total worker dispatches. ``None`` + # means "use the default policy from clk.config.json::robustness". + refine: Optional[Dict[str, Any]] = None metadata: Dict[str, Any] = field(default_factory=dict) @@ -331,6 +338,13 @@ def load_workflow(path: Path) -> Workflow: rounds = int(raw.get("rounds") or 1) except (TypeError, ValueError): rounds = 1 + refine_raw = raw.get("refine") + if isinstance(refine_raw, dict): + refine_cfg: Optional[Dict[str, Any]] = dict(refine_raw) + elif refine_raw in (True, "true", "yes", 1): + refine_cfg = {} + else: + refine_cfg = None stages.append( WorkflowStage( id=str(raw.get("id") or raw.get("agent") or "stage"), @@ -344,6 +358,7 @@ def load_workflow(path: Path) -> Workflow: phase=str(raw.get("phase") or "").strip().lower(), rounds=max(1, rounds), careful=bool(raw.get("careful") or False), + refine=refine_cfg, metadata=dict(raw.get("metadata") or {}), ) ) @@ -700,6 +715,18 @@ def _run_stage( break assert run is not None # the loop runs at least once + + # Critic-judge refinement loop. When the stage opts in + # (explicit ``refine:`` block or careful=true under the default + # auto_refine policy), dispatch a critic agent to score the + # response; if the critic says revise, re-dispatch the worker + # with the critic's feedback until accept or max_rounds. + if not dry_run and run.response.ok and self._refine_enabled(stage): + try: + run = self._refine_loop(workflow, stage, run, cycle_context, dry_run) + except Exception as exc: + log_exception("orchestration.workflow._run_stage.refine", exc) + ok = run.response.ok if dry_run: v_ok, v_out = True, "(dry-run: validation skipped)" @@ -871,6 +898,201 @@ def _meta_dispatch_enabled(self, stage: WorkflowStage) -> bool: # default mode "careful_only" return bool(stage.careful) + # -- critic-judge refinement (Layer 3 robustness loop) --------------- + + def _refine_enabled(self, stage: WorkflowStage) -> bool: + """Decide whether the critic-judge refinement loop should run. + + Explicit ``refine:`` on the stage always wins. Otherwise we + fall back to ``robustness.auto_refine`` (off | careful_only | + all). ``chief`` and ``qa`` agents are skipped to avoid the + critic critiquing its own coalescing output or the validator. + """ + if stage.agent in ("chief", "qa", "critic"): + return False + if stage.refine is not None: + return True + cfg = (self.runner.clk_cfg.get("robustness") or {}) + mode = str(cfg.get("auto_refine") or "off").lower() + if mode in ("", "off", "false", "0"): + return False + if mode == "all": + return True + # default mode "careful_only" + return bool(stage.careful) + + def _refine_loop( + self, + workflow: "Workflow", + stage: WorkflowStage, + first_run: AgentRun, + cycle_context: str, + dry_run: Optional[bool], + ) -> AgentRun: + """Run draft → critic → revise until accept or max_rounds. + + Reuses the runner's existing dispatch path for both the critic + and the revised worker. The critic is dispatched in a ``phase: + refine_critic`` extra so the wrapper's auto-consensus and + quality-retry layers don't recurse. + + Returns the final worker run — either the revised one or the + original when the critic accepts immediately. + """ + defaults = (self.runner.clk_cfg.get("robustness") or {}) + cfg = dict(stage.refine or {}) + critic_name = str(cfg.get("critic") or "critic") + try: + max_rounds = int(cfg.get("max_rounds") or defaults.get("refine_max_rounds") or 3) + except (TypeError, ValueError): + max_rounds = 3 + try: + threshold = float(cfg.get("accept_threshold") or defaults.get("refine_accept_threshold") or 0.8) + except (TypeError, ValueError): + threshold = 0.8 + + # If the named critic isn't in the roster, fall back to the + # `critic` baseline; if even that is missing, skip silently. + agents_cfg = (self.runner.agents_cfg.get("agents") or {}) + if critic_name not in agents_cfg: + critic_name = "critic" if "critic" in agents_cfg else "" + if not critic_name: + return first_run + + current_run = first_run + for round_idx in range(1, max_rounds + 1): + verdict, judge_score, feedback = self._dispatch_critic( + workflow, stage, current_run, critic_name, round_idx, max_rounds, dry_run, + ) + log_event( + self.paths, + "refine_critic_verdict", + agent=stage.agent, + critic=critic_name, + workflow=workflow.name, + stage_id=stage.id, + round=round_idx, + max_rounds=max_rounds, + verdict=verdict, + score=judge_score, + accept_threshold=threshold, + ) + self.runner._observer_log( + f"refine :: {stage.id} :: round {round_idx}/{max_rounds} " + f"{critic_name}→ verdict={verdict} score={judge_score:.2f}" + ) + if verdict == "accept" or judge_score >= threshold: + return current_run + if round_idx == max_rounds: + # Out of budget — keep the latest worker output even + # though the critic isn't satisfied. + return current_run + revise_objective = ( + f"Refinement round {round_idx + 1}/{max_rounds} of stage " + f"`{stage.id}`. The critic (`{critic_name}`) scored your " + f"previous response {judge_score:.2f}/1.0 and asked for " + "revisions:\n\n" + f"{feedback}\n\n" + "Revise the response so the critic's points are addressed. " + "Keep what already works; rewrite only what was flagged. " + "Re-emit POST and ACTION blocks the same way you did the " + "first time so the harness can record the updated work.\n\n" + f"Original objective:\n{stage.objective}" + ) + current_run = self.runner.run( + stage.agent, + revise_objective, + extra={ + "phase": "refine_worker", + "stage_id": stage.id, + "workflow": workflow.name, + "cycle_context": cycle_context, + "blackboard_inputs": list(stage.inputs), + "stage_outputs": list(stage.outputs), + "refine_round": round_idx + 1, + "refine_max_rounds": max_rounds, + }, + dry_run=dry_run, + ) + if not current_run.response.ok: + return current_run + return current_run + + _REFINE_VERDICT_RE = re.compile( + r"^\s*VERDICT\s*:\s*(accept|revise|reject)\b", re.IGNORECASE | re.MULTILINE, + ) + _REFINE_SCORE_RE = re.compile( + r"^\s*SCORE\s*:\s*([0-9]*\.?[0-9]+)", re.IGNORECASE | re.MULTILINE, + ) + + def _dispatch_critic( + self, + workflow: "Workflow", + stage: WorkflowStage, + worker_run: AgentRun, + critic_name: str, + round_idx: int, + max_rounds: int, + dry_run: Optional[bool], + ) -> Tuple[str, float, str]: + """Run one critic pass; return ``(verdict, score, feedback)``. + + ``verdict`` is normalised to ``"accept"`` or ``"revise"``. + ``score`` is parsed from the critic's ``SCORE: <0..1>`` line and + defaults to 0.0 (i.e. "revise") when missing. + ``feedback`` is the critic's full response text, used verbatim + in the revision objective. + """ + worker_text = (worker_run.response.text or "").strip() + if len(worker_text) > 4000: + worker_text = worker_text[:4000].rstrip() + "\n…(truncated)" + outputs_text = ( + ", ".join(stage.outputs) if stage.outputs else "(no declared outputs)" + ) + critic_objective = ( + f"Refinement-loop critic pass for workflow `{workflow.name}` " + f"stage `{stage.id}` (round {round_idx}/{max_rounds}).\n\n" + f"Worker: `{stage.agent}`\n" + f"Worker's objective:\n{stage.objective}\n\n" + f"Declared output contract keys: {outputs_text}\n\n" + f"Worker's response:\n---\n{worker_text}\n---\n\n" + "Score the response 0..1 against the objective and the " + "declared output contract. List concrete, specific " + "revisions the worker should make. Be brief — three to six " + "bullets is plenty. End your response with exactly two " + "lines:\n" + "VERDICT: accept # or `revise` if any item must change\n" + "SCORE: <0..1>\n" + ) + critic_run = self.runner.run( + critic_name, + critic_objective, + extra={ + "phase": "refine_critic", + "stage_id": stage.id, + "workflow": workflow.name, + "refine_round": round_idx, + }, + dry_run=dry_run, + ) + text = critic_run.response.text or "" + verdict_m = self._REFINE_VERDICT_RE.search(text) + verdict = (verdict_m.group(1).lower() if verdict_m else "revise") + if verdict not in ("accept", "revise"): + verdict = "revise" + score_m = self._REFINE_SCORE_RE.search(text) + try: + score_val = float(score_m.group(1)) if score_m else 0.0 + except (TypeError, ValueError): + score_val = 0.0 + score_val = max(0.0, min(1.0, score_val)) + # When the critic accepted but didn't post a score, treat it as + # a confident pass; when it asked to revise but didn't score, + # treat as a moderate-low score so the loop continues. + if score_m is None: + score_val = 1.0 if verdict == "accept" else 0.4 + return verdict, score_val, text.strip() + def _dispatch_checkpoint( self, workflow: Workflow, diff --git a/clk_harness/templates/prompts.py b/clk_harness/templates/prompts.py index 7d33283..b3cf707 100644 --- a/clk_harness/templates/prompts.py +++ b/clk_harness/templates/prompts.py @@ -17,7 +17,21 @@ from typing import Dict -_BASE_FOOTER = """ +_CONFIDENCE_BLOCK = """ +Self-assessment footer (read by the harness's response-quality loop) +End your response with exactly two lines: + + CONFIDENCE: <0..1> # how confident you are this response is right + NEEDS_REVIEW: # set true when a peer should re-check before commit + +The harness uses these to decide whether to auto-trigger a stochastic +consensus re-run on your response. Be honest — low confidence is +useful signal, not a failure. If your CONFIDENCE is < 0.5, the harness +will re-dispatch you (or fan out a consensus) with a repair preamble. +""" + + +_BASE_FOOTER = _CONFIDENCE_BLOCK + """ Blackboard (shared context with peer agents) $blackboard_digest @@ -106,6 +120,8 @@ POST: TITLE: # optional + TO: # optional, only for post_type: question + URGENCY: blocking|async # optional, only for post_type: question PRODUCES: # optional, satisfies stage outputs CONSUMES: # optional, links provenance BODY: @@ -123,6 +139,25 @@ your stage's declared `inputs` (see PROPOSE_WORKFLOW). If a stage declares `outputs`, your POST blocks must include each declared key in their PRODUCES list — otherwise the runner warns the contract is unmet. + +Inter-agent Q&A (when you genuinely need a peer's input mid-task): + + POST: question + TO: # required for directed Q&A + URGENCY: blocking # the harness will dispatch the peer NOW + BODY: + + END_POST + +With `URGENCY: blocking`, the harness dispatches `` to +answer before your run is finalised; the peer posts a `POST: answer` +that lists your question id in CONSUMES, and you see it in the next +$$blackboard_digest. Use this sparingly — only when an answer +materially changes your work. Default urgency is `async`, in which case +the question is recorded for the chief to schedule later. + +The harness caps Q&A chains at clk.config.json::robustness.max_qa_depth +(default 3) so peers cannot start a runaway chain of clarifications. """ @@ -635,6 +670,21 @@ 6. One iteration = one question answered. The next ralph invocation reads progress.md, skips closed questions, and picks the next open one. + +Plateau & regression awareness +- The harness records every iteration's outcome and detects plateau + (no `improved=True` outcomes in the last `plateau_window` iterations) + and regression (last iteration failed after at least one earlier + success in the window). +- When a plateau is signalled in your dispatch context (look for + `careful=true` or `loop_adaptive=true` in extra metadata), DO NOT + propose another marginal tweak. Propose a qualitatively different + approach: a new metric, a different experiment family, or a switch + from refinement to autoresearch mode. The harness will fan you out + into a consensus of samples on plateau dispatches. +- When a regression is signalled, the harness has already dispatched + the critic to identify what broke; read the critic's most recent + POST: critique on the blackboard before choosing the next move. """ + _BASE_FOOTER, } diff --git a/kickoff.sh b/kickoff.sh index c62cb82..07e53a9 100644 --- a/kickoff.sh +++ b/kickoff.sh @@ -176,8 +176,10 @@ The provider is the AI that actually writes your code each cycle. autoresearch loops can run. \`project name\` becomes the title of the captured idea and (optionally) the GitHub repo name. The \`run install\` flag triggers .clk/harness/scripts/install_local.sh inside each kickoff -dir so providers like pi can find PyYAML and other deps. \`no TUI\` -switches to a non-interactive pipeline — handy for CI." +dir so providers like pi can find PyYAML and other deps — leave it +\`false\` (the default) when running inside Docker, because the image +already has all Python dependencies installed at build time. +\`no TUI\` switches to a non-interactive pipeline — handy for CI." max_iter="$(_sv_read "Max loop iterations" "${CLK_MAX_ITERATIONS:-10}")" proj_name="$(_sv_read "Project name" "${CLK_PROJECT_NAME:-clk-app}")" run_install="$(_sv_read "Run install_local.sh in each kickoff (true|false)" "${CLK_RUN_INSTALL:-false}")" @@ -920,6 +922,112 @@ fi echo "[kickoff] clk init" "$CLK" init --name "$CLK_PROJECT_NAME" +# --------------------------------------------------------------------------- +# Apply CLK_* env-var overrides into .clk/config/clk.config.json. +# +# The harness ships sane defaults in DEFAULT_CLK_CONFIG (see +# clk_harness/config.py). This block lets the user override any of them +# from the environment without hand-editing the JSON. Recognised vars: +# +# Robustness loops (see README "Robustness loops" section): +# CLK_ROBUSTNESS_AUTO_CONSENSUS off | on_careful | always +# CLK_ROBUSTNESS_AUTO_REFINE off | careful_only | all +# CLK_ROBUSTNESS_MAX_QUALITY_RETRIES int >= 0 +# CLK_ROBUSTNESS_MIN_RESPONSE_CHARS int > 0 +# CLK_ROBUSTNESS_REFINE_MAX_ROUNDS int >= 1 +# CLK_ROBUSTNESS_REFINE_ACCEPT_THRESHOLD float 0..1 +# CLK_ROBUSTNESS_QA_PARALLEL_JUDGES int >= 1 +# CLK_ROBUSTNESS_MAX_QA_DEPTH int >= 1 +# CLK_ROBUSTNESS_PLATEAU_WINDOW int >= 2 +# CLK_ROBUSTNESS_PLATEAU_ACTION off | escalate_only | +# reframe_only | escalate_then_reframe +# +# Prior knobs (already supported, surfaced here for parity): +# CLK_PROVIDER_TIMEOUT_S int seconds, 0 = harness default +# CLK_PROVIDER_NO_OUTPUT_TIMEOUT_S int seconds +# CLK_PROVIDER_RETRY_MAX_RETRIES int +# CLK_PROVIDER_RETRY_BACKOFF_S float seconds +# CLK_PROVIDER_RETRY_STAGE_MAX_RETRIES int +# CLK_PROVIDER_RETRY_STAGE_BACKOFF_S float seconds +# CLK_SUPERVISE_MAX_CYCLES int +# CLK_CONSENSUS_MAX_SAMPLES int +# CLK_CONSENSUS_MAX_PARALLEL int +# CLK_CASTING_MAX_DYNAMIC_ROLES int +# CLK_AUTO_COMMIT true | false +# CLK_VALIDATION_MAX_FILES_PER_BATCH int +# CLK_VALIDATION_WARN_FILES_PER_BATCH int +# CLK_META_PROMPT_DISPATCH off | careful_only | always +# CLK_META_PROMPT_ROLE off | careful_only | always +# CLK_REVIEW_PER_STAGE true | false +# CLK_RECOVERY_MAX_PER_STAGE int +# +# Any unset variable falls through to the harness's default. We don't +# touch keys we didn't see — so a partially-set env still gets the rest +# from DEFAULT_CLK_CONFIG. +# --------------------------------------------------------------------------- +echo "[kickoff] applying CLK_* env-var overrides to .clk/config/clk.config.json" +python3 - <<'PY' +import json, os +from pathlib import Path + +p = Path(".clk/config/clk.config.json") +if not p.exists(): + raise SystemExit(0) # nothing to override +cfg = json.loads(p.read_text(encoding="utf-8")) + + +def _set(env_var, *path, cast=str): + raw = os.environ.get(env_var) + if raw is None or raw == "": + return + try: + val = cast(raw) + except (TypeError, ValueError): + return + cur = cfg + for key in path[:-1]: + cur = cur.setdefault(key, {}) + cur[path[-1]] = val + + +def _bool(s): + return str(s).strip().lower() in {"1", "true", "yes", "y", "on"} + + +# Robustness block +_set("CLK_ROBUSTNESS_AUTO_CONSENSUS", "robustness", "auto_consensus") +_set("CLK_ROBUSTNESS_AUTO_REFINE", "robustness", "auto_refine") +_set("CLK_ROBUSTNESS_MAX_QUALITY_RETRIES", "robustness", "max_quality_retries", cast=int) +_set("CLK_ROBUSTNESS_MIN_RESPONSE_CHARS", "robustness", "min_response_chars", cast=int) +_set("CLK_ROBUSTNESS_REFINE_MAX_ROUNDS", "robustness", "refine_max_rounds", cast=int) +_set("CLK_ROBUSTNESS_REFINE_ACCEPT_THRESHOLD", "robustness", "refine_accept_threshold", cast=float) +_set("CLK_ROBUSTNESS_QA_PARALLEL_JUDGES", "robustness", "qa_parallel_judges", cast=int) +_set("CLK_ROBUSTNESS_MAX_QA_DEPTH", "robustness", "max_qa_depth", cast=int) +_set("CLK_ROBUSTNESS_PLATEAU_WINDOW", "robustness", "plateau_window", cast=int) +_set("CLK_ROBUSTNESS_PLATEAU_ACTION", "robustness", "plateau_action") + +# Prior knobs +_set("CLK_PROVIDER_TIMEOUT_S", "provider_timeout_s", cast=int) +_set("CLK_PROVIDER_NO_OUTPUT_TIMEOUT_S", "provider_no_output_timeout_s", cast=int) +_set("CLK_PROVIDER_RETRY_MAX_RETRIES", "provider_retry", "max_retries", cast=int) +_set("CLK_PROVIDER_RETRY_BACKOFF_S", "provider_retry", "backoff_s", cast=float) +_set("CLK_PROVIDER_RETRY_STAGE_MAX_RETRIES", "provider_retry", "stage_max_retries", cast=int) +_set("CLK_PROVIDER_RETRY_STAGE_BACKOFF_S", "provider_retry", "stage_backoff_s", cast=float) +_set("CLK_SUPERVISE_MAX_CYCLES", "supervise", "max_cycles", cast=int) +_set("CLK_CONSENSUS_MAX_SAMPLES", "consensus", "max_samples", cast=int) +_set("CLK_CONSENSUS_MAX_PARALLEL", "consensus", "max_parallel", cast=int) +_set("CLK_CASTING_MAX_DYNAMIC_ROLES", "casting", "max_dynamic_roles", cast=int) +_set("CLK_AUTO_COMMIT", "auto_commit", cast=_bool) +_set("CLK_VALIDATION_MAX_FILES_PER_BATCH", "validation", "max_files_per_batch", cast=int) +_set("CLK_VALIDATION_WARN_FILES_PER_BATCH", "validation", "warn_files_per_batch", cast=int) +_set("CLK_META_PROMPT_DISPATCH", "meta_prompt", "dispatch") +_set("CLK_META_PROMPT_ROLE", "meta_prompt", "role") +_set("CLK_REVIEW_PER_STAGE", "review", "per_stage", cast=_bool) +_set("CLK_RECOVERY_MAX_PER_STAGE", "recovery", "max_per_stage", cast=int) + +p.write_text(json.dumps(cfg, indent=2, sort_keys=True) + "\n", encoding="utf-8") +PY + echo "[kickoff] activating provider: $CLK_PROVIDER" CLK_PROVIDER="$CLK_PROVIDER" \ CLK_OLLAMA_ENDPOINT="${CLK_OLLAMA_ENDPOINT:-http://localhost:11434}" \ diff --git a/scripts/clk b/scripts/clk index 46a4a10..29234a4 100644 --- a/scripts/clk +++ b/scripts/clk @@ -1,5 +1,10 @@ #!/usr/bin/env bash -# CLK launcher. +# CLK launcher shim. +# +# This is the thin wrapper that resolves where the CLK harness is +# installed and invokes `python -m clk_harness.cli` against the right +# interpreter. For the canonical install procedure, see +# scripts/install_local.sh (which is what populates .clk/venv). # # Resolves two anchors: # * HARNESS_HOME - directory holding clk_harness/, scripts/, pyproject.toml @@ -12,6 +17,12 @@ # # Prefers the project's local .clk/venv interpreter if present, falls back # to the system python3. Always invokes clk_harness.cli as a module. +# +# Related entry points: +# * scripts/install_local.sh — creates .clk/venv (canonical install) +# * scripts/install_tool.sh — installs provider CLIs on demand +# * scripts/run_loop.sh — convenience wrapper around `clk loop` +# * kickoff.sh — top-level project bootstrap set -euo pipefail diff --git a/scripts/install_local.sh b/scripts/install_local.sh index 20524f9..e6d7627 100644 --- a/scripts/install_local.sh +++ b/scripts/install_local.sh @@ -1,19 +1,83 @@ #!/usr/bin/env bash # Local-only install for CLK. # -# Strategy (in order): -# 1. Create `.clk/venv` with pip and `pip install -e .` into it. This is the -# preferred path - it picks up every dep declared in pyproject.toml plus -# any extras and exposes the `clk` console script inside the venv. -# 2. If the venv has no pip (ensurepip missing), install just the runtime -# dependencies parsed from pyproject.toml into `.clk/site-packages` via -# system pip's `--target`. The launcher (`scripts/clk`) adds that dir to -# PYTHONPATH; the package itself runs from the source tree. -# 3. If neither works, print the apt one-liner and exit cleanly. The -# harness still runs via its mini-YAML fallback. +# WHAT THIS SCRIPT DOES +# Installs the Python harness into a project-local virtual environment +# so the `clk` CLI and the in-process FastAPI / TUI work without +# polluting the user's system Python. After it finishes, the +# project's `.clk/venv/bin/clk` is the canonical entry point (and +# `scripts/clk` is a shim that finds it). # -# Optional first arg picks an extras group from pyproject.toml: -# ./scripts/install_local.sh dev # also installs pytest +# The script is idempotent — re-running it upgrades pip and +# reinstalls the package in editable mode. It does NOT delete the +# existing venv so cached deps persist between runs. +# +# DIRECTORY LAYOUT (in a kickoff'd project) +# / ← your code + the harness state +# .clk/ ← all harness state (recoverable) +# harness/ ← copy of the CLK source tree +# scripts/install_local.sh ← THIS SCRIPT +# pyproject.toml ← deps declared here +# venv/ ← preferred install path (1) +# bin/clk ← the console script callers use +# bin/python ← matched against pyproject.toml deps +# site-packages/ ← fallback when venv has no pip (2) +# config/ ← clk.config.json, providers.json, … +# state/ ← agent memory, casting log, … +# logs/, runs/, backups/, blackboard/ +# +# `CLK_PROJECT_ROOT` (set by the kickoff shim) lets this script find +# the project root from `.clk/harness/`. When unset, this script +# assumes it's running from a plain checkout and uses its own parent. +# +# INSTALL STRATEGY (tried in order) +# 1. Create `.clk/venv` with pip and `pip install -e .` into it. This +# is the preferred path — it picks up every dep declared in +# pyproject.toml plus any extras and exposes the `clk` console +# script inside the venv. The launcher `scripts/clk` picks up +# `.clk/venv/bin/clk` automatically. +# 2. If the venv has no pip (ensurepip missing — common on +# stripped-down distros), install just the runtime dependencies +# parsed from pyproject.toml into `.clk/site-packages` via +# system pip's `--target`. The launcher then adds that directory +# to PYTHONPATH; the package itself runs from the source tree. +# 3. If neither works, print the apt one-liner and exit cleanly. +# The harness still runs via its mini-YAML fallback (no PyYAML) +# but lacks the optional FastAPI / Telegram extras. +# +# OPTIONAL EXTRAS +# The first positional argument picks an extras group from +# pyproject.toml: +# ./scripts/install_local.sh # runtime deps only +# ./scripts/install_local.sh dev # adds pytest, pytest-asyncio +# ./scripts/install_local.sh api # adds FastAPI + uvicorn for REST API +# ./scripts/install_local.sh "api,dev" # both +# +# WHAT THIS SCRIPT DOES *NOT* INSTALL +# * Provider CLIs (claude, codex, gemini, pi) — those are installed by +# `kickoff.sh --setup` and by `/install` from inside the TUI; see +# the README "Provider and authentication" section. +# * Telegram-bot dependencies — handled by +# scripts/telegram_setup_wizard.sh; see README "Telegram Bot". +# * Docker — the test orchestrator at scripts/run_all_tests.sh uses +# Docker if available but falls back to --local mode; see README +# "Testing" for the breakdown. +# * GitHub integration — handled by kickoff.sh's GitHub block; see +# README "GitHub integration". +# +# RELATED ENTRY POINTS +# * scripts/clk — launcher shim that resolves `.clk/venv` +# or `.clk/site-packages` automatically +# * scripts/install_tool.sh — installs a provider CLI on demand +# (claude, codex, gemini, ollama, pi) +# * scripts/run_loop.sh — convenience wrapper around `clk loop` +# * scripts/run_all_tests.sh — full test orchestrator (Docker or local) +# * kickoff.sh — top-level project bootstrap; calls this +# script when CLK_RUN_INSTALL=true +# +# See the README "Robustness loops" and "Cost guardrails" sections for +# the runtime config knobs (CLK_ROBUSTNESS_*, provider retry, etc.) +# this script writes — they're tuned via `.env` / kickoff.sh, not here. set -euo pipefail diff --git a/scripts/install_tool.sh b/scripts/install_tool.sh index 2124e9c..d8d4921 100755 --- a/scripts/install_tool.sh +++ b/scripts/install_tool.sh @@ -2,6 +2,11 @@ # scripts/install_tool.sh — single source of truth for "install or # configure provider CLI X." # +# This script is about *provider CLIs* (claude, codex, gemini, pi, +# ollama, openwebui — the LLM backends CLK dispatches to). For +# installing the CLK harness itself (the Python deps that make the +# `clk` CLI work) see scripts/install_local.sh. +# # Used by: # * kickoff.sh --setup (after the user picks a provider) # * the TUI's /install and /configure commands @@ -20,6 +25,12 @@ # scripts/install_tool.sh check NAME # scripts/install_tool.sh install NAME [--auto|--prompt|--print-only] # scripts/install_tool.sh configure NAME +# +# Related entry points: +# * scripts/install_local.sh — install the CLK harness itself +# * scripts/clk — launcher shim for the `clk` CLI +# * kickoff.sh — top-level project bootstrap +# * README "Provider and authentication" — user-facing reference set -euo pipefail diff --git a/scripts/run_loop.sh b/scripts/run_loop.sh index d54e652..99f4559 100644 --- a/scripts/run_loop.sh +++ b/scripts/run_loop.sh @@ -1,6 +1,22 @@ #!/usr/bin/env bash # Convenience wrapper: run the Ralph loop with a sensible default budget. # +# This is a thin shim around `clk loop`. The loop's behavior is +# documented in the README sections: +# +# * "Loops" — the two Ralph modes (refinement, +# autoresearch) and how they pick what to do +# each iteration. +# * "Robustness loops" — what happens around each iteration: +# response-quality re-dispatch, auto-consensus +# fan-out, critic refinement, plateau detection +# with escalate-then-reframe. +# +# Both loops respect `clk.config.json::max_iterations` (overridable via +# `--max-iterations N`) and stop early when `.clk/state/done.md` +# exists. Plateau termination writes done.md automatically when +# `robustness.plateau_action` is enabled (default). +# # Usage: # scripts/run_loop.sh # 20 iterations, ralph mode # scripts/run_loop.sh --mode autoresearch diff --git a/tests/test_docs_consistency.py b/tests/test_docs_consistency.py new file mode 100644 index 0000000..e02bcac --- /dev/null +++ b/tests/test_docs_consistency.py @@ -0,0 +1,211 @@ +"""Documentation-consistency tests. + +These assertions exist so that future edits don't let the README, +``.env.example``, the ``DEFAULT_CLK_CONFIG`` schema, and ``kickoff.sh``'s +env-var mapping drift apart. They run against the literal file +contents — no harness behavior is exercised. + +When a knob is added or renamed, the change must touch four places: + +1. ``clk_harness/config.py::DEFAULT_CLK_CONFIG`` — the schema. +2. ``.env.example`` — the user-facing override. +3. ``kickoff.sh`` — translation from env var → JSON key. +4. ``README.md`` — the Robustness-loops or Cost-guardrails section. + +The tests below check (1) ↔ (2) ↔ (4) for the new ``robustness`` block +specifically, and that (3) sees the same env-var family. The "prior +knobs" block in ``.env.example`` is checked for the env-var names +documented there to ensure the docs-parity pass covers them too. +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import pytest + +from clk_harness.config import DEFAULT_CLK_CONFIG + + +REPO = Path(__file__).resolve().parent.parent +ENV_EXAMPLE = (REPO / ".env.example").read_text(encoding="utf-8") +KICKOFF = (REPO / "kickoff.sh").read_text(encoding="utf-8") +README = (REPO / "README.md").read_text(encoding="utf-8") + + +# --------------------------------------------------------------------------- +# robustness block: round-trip schema ↔ env vars ↔ README +# --------------------------------------------------------------------------- + + +# Some keys are introspected at runtime by name (e.g. "plateau_action") +# while others are knobs the user only sees on a coarser axis (the README +# explains plateau as a single concept rather than enumerating +# `plateau_window` and `plateau_action` separately). Keys listed here are +# excluded from the README-mention check below; everything else must +# appear by name in the Robustness-loops or Cost-guardrails section. +_README_OPTIONAL_KEYS: frozenset = frozenset({ + "qa_parallel_judges", # internal cap, documented as part of the Q&A protocol +}) + + +def test_robustness_defaults_exist() -> None: + """DEFAULT_CLK_CONFIG must carry a ``robustness`` block.""" + assert "robustness" in DEFAULT_CLK_CONFIG + block = DEFAULT_CLK_CONFIG["robustness"] + # Minimum set of keys the layers require to function. + required = { + "auto_consensus", + "auto_refine", + "max_quality_retries", + "min_response_chars", + "refine_max_rounds", + "refine_accept_threshold", + "max_qa_depth", + "plateau_window", + "plateau_action", + } + missing = required - set(block.keys()) + assert not missing, f"DEFAULT_CLK_CONFIG['robustness'] missing keys: {missing}" + + +def _env_var_for(key: str) -> str: + return "CLK_ROBUSTNESS_" + key.upper() + + +def test_env_example_documents_every_robustness_key() -> None: + """Every robustness key in DEFAULT_CLK_CONFIG must appear as a + CLK_ROBUSTNESS_* line in .env.example so users can override it.""" + block = DEFAULT_CLK_CONFIG["robustness"] + for key in block.keys(): + var = _env_var_for(key) + assert var in ENV_EXAMPLE, ( + f"{var} not documented in .env.example " + f"(needed because DEFAULT_CLK_CONFIG['robustness']['{key}'] " + "is now a public knob)" + ) + + +def test_kickoff_sh_maps_every_robustness_key() -> None: + """kickoff.sh's env-var override block must recognise every + CLK_ROBUSTNESS_* variable.""" + block = DEFAULT_CLK_CONFIG["robustness"] + for key in block.keys(): + var = _env_var_for(key) + assert var in KICKOFF, ( + f"{var} not handled in kickoff.sh's env→config mapping " + f"(needed to make the .env.example override actually take effect)" + ) + + +def test_readme_documents_robustness_keys() -> None: + """The README's Robustness-loops or Cost-guardrails section must + mention every robustness knob by name.""" + # Slice off the relevant chunk so we don't count incidental mentions + # elsewhere (e.g. in the changelog "What's new" block which is + # already lossier on purpose). + rl_start = README.find("## Robustness loops") + rl_end_marker = "## Completion criteria" + rl_end = README.find(rl_end_marker, rl_start) if rl_start != -1 else -1 + assert rl_start != -1 and rl_end != -1, ( + "README is missing the ## Robustness loops section " + "(must live between ## Loops and ## Completion criteria)" + ) + rl_section = README[rl_start:rl_end] + + cost_start = README.find("### Robustness-loop multipliers") + cost_end_marker = "## Pi extension" + cost_end = README.find(cost_end_marker, cost_start) if cost_start != -1 else -1 + assert cost_start != -1 and cost_end != -1, ( + "README is missing the ### Robustness-loop multipliers section " + "under ## Cost guardrails" + ) + cost_section = README[cost_start:cost_end] + combined = rl_section + "\n" + cost_section + + block = DEFAULT_CLK_CONFIG["robustness"] + for key in block.keys(): + if key in _README_OPTIONAL_KEYS: + continue + assert key in combined, ( + f"robustness key '{key}' not mentioned in README " + "(Robustness-loops section or Cost-guardrails table)" + ) + + +# --------------------------------------------------------------------------- +# Prior-knob parity (just an inventory check) +# --------------------------------------------------------------------------- + + +_PRIOR_KNOBS = ( + "CLK_PROVIDER_TIMEOUT_S", + "CLK_PROVIDER_NO_OUTPUT_TIMEOUT_S", + "CLK_PROVIDER_RETRY_MAX_RETRIES", + "CLK_PROVIDER_RETRY_BACKOFF_S", + "CLK_PROVIDER_RETRY_STAGE_MAX_RETRIES", + "CLK_PROVIDER_RETRY_STAGE_BACKOFF_S", + "CLK_SUPERVISE_MAX_CYCLES", + "CLK_CONSENSUS_MAX_SAMPLES", + "CLK_CONSENSUS_MAX_PARALLEL", + "CLK_CASTING_MAX_DYNAMIC_ROLES", + "CLK_AUTO_COMMIT", + "CLK_VALIDATION_MAX_FILES_PER_BATCH", + "CLK_VALIDATION_WARN_FILES_PER_BATCH", + "CLK_META_PROMPT_DISPATCH", + "CLK_META_PROMPT_ROLE", + "CLK_REVIEW_PER_STAGE", + "CLK_RECOVERY_MAX_PER_STAGE", +) + + +def test_env_example_documents_prior_knobs() -> None: + """The 'Prior-knob reference' block in .env.example must enumerate + every legacy CLK_* knob the kickoff mapper supports.""" + for var in _PRIOR_KNOBS: + assert var in ENV_EXAMPLE, ( + f"{var} should appear in .env.example so users can see " + "the full set of supported overrides in one place." + ) + + +def test_kickoff_handles_prior_knobs() -> None: + """kickoff.sh's env-var override block must recognise every prior + CLK_* knob too — the docs claim parity, so the script must honor it.""" + for var in _PRIOR_KNOBS: + assert var in KICKOFF, ( + f"{var} listed in .env.example but not handled by kickoff.sh — " + "the override would silently no-op." + ) + + +# --------------------------------------------------------------------------- +# Install-script narration: spot-check the doc comment is present +# --------------------------------------------------------------------------- + + +def test_install_local_header_mentions_layout() -> None: + """The install_local.sh header should describe the layout the script + creates — that's the only place that documentation lives.""" + text = (REPO / "scripts" / "install_local.sh").read_text(encoding="utf-8") + for needle in ( + ".clk/venv", + ".clk/site-packages", + "CLK_PROJECT_ROOT", + "pyproject.toml", + "WHAT THIS SCRIPT DOES", + ): + assert needle in text, ( + f"scripts/install_local.sh header lost reference to '{needle}'" + ) + + +def test_run_loop_header_links_robustness_section() -> None: + """scripts/run_loop.sh should point users at the README section so + the wrapper isn't a black box.""" + text = (REPO / "scripts" / "run_loop.sh").read_text(encoding="utf-8") + assert "Robustness loops" in text or "robustness" in text.lower(), ( + "scripts/run_loop.sh should cross-reference the README's " + "Robustness-loops section so callers know what wraps each iteration." + ) diff --git a/tests/test_response_quality.py b/tests/test_response_quality.py new file mode 100644 index 0000000..00984f9 --- /dev/null +++ b/tests/test_response_quality.py @@ -0,0 +1,174 @@ +"""Unit tests for clk_harness.orchestration.response_quality.""" + +from __future__ import annotations + +import pytest + +from clk_harness.orchestration import response_quality as rq + + +def test_empty_response_flagged() -> None: + q = rq.score("") + assert not q.ok + assert "empty" in q.flags + assert q.recoverable is True + + +def test_short_response_flagged() -> None: + q = rq.score("ok", min_chars=40) + assert not q.ok + assert "empty" in q.flags + + +def test_substantive_response_passes() -> None: + text = ( + "Here is a substantive answer covering the points raised. " + "It includes a recommendation and rationale." + ) + q = rq.score(text) + assert q.ok, q.flags + assert q.score == 1.0 + + +def test_malformed_action_block_flagged() -> None: + text = ( + "Doing the work now.\n\n" + "ACTION: write\n" + "PATH: foo.py\n" + "CONTENT:\n" + "print('hello')\n" + # Missing END_ACTION + ) + q = rq.score(text) + assert "malformed_action" in q.flags + assert not q.ok + + +def test_balanced_action_block_passes() -> None: + text = ( + "Doing the work now.\n\n" + "ACTION: write\n" + "PATH: foo.py\n" + "CONTENT:\n" + "print('hello')\n" + "END_ACTION\n" + "\n" + "Wrapped up." + ) + q = rq.score(text) + assert "malformed_action" not in q.flags + + +def test_malformed_post_block_flagged() -> None: + text = ( + "POST: finding\n" + "BODY:\n" + "important result\n" + # Missing END_POST + "More text\n" + ) + q = rq.score(text) + assert "malformed_post" in q.flags + assert not q.ok + + +def test_missing_outputs_flagged() -> None: + text = ( + "POST: finding\n" + "PRODUCES: alpha\n" + "BODY:\n" + "the alpha result\n" + "END_POST\n" + ) + q = rq.score(text, expected_outputs=["alpha", "beta"]) + assert "outputs_missing" in q.flags + assert "beta" in q.reasons[0] or any("beta" in r for r in q.reasons) + + +def test_satisfied_outputs_pass() -> None: + text = ( + "POST: finding\n" + "PRODUCES: alpha beta\n" + "BODY:\n" + "covered both\n" + "END_POST\n" + "Continuing with analysis to ensure the response is substantive." + ) + q = rq.score(text, expected_outputs=["alpha", "beta"]) + assert "outputs_missing" not in q.flags + + +def test_low_confidence_flagged() -> None: + text = ( + "Here is a long enough answer that includes uncertainty markers.\n" + "CONFIDENCE: 0.2\n" + ) + q = rq.score(text) + assert q.confidence == pytest.approx(0.2) + assert "low_confidence" in q.flags + + +def test_high_confidence_passes() -> None: + text = ( + "Here is a long enough answer including a confidence line.\n" + "CONFIDENCE: 0.9\n" + ) + q = rq.score(text) + assert q.confidence == pytest.approx(0.9) + assert "low_confidence" not in q.flags + assert q.ok + + +def test_needs_review_flagged() -> None: + text = ( + "Here is a long enough answer including a review marker.\n" + "NEEDS_REVIEW: true\n" + ) + q = rq.score(text) + assert q.needs_review is True + assert "needs_review_self" in q.flags + + +def test_refusal_marks_not_recoverable() -> None: + text = ( + "I cannot help with that request, sorry." + ) + q = rq.score(text) + assert "refusal" in q.flags + assert not q.ok + assert not q.recoverable + assert not rq.is_recoverable(q) + + +def test_repair_hint_lists_reasons() -> None: + text = "" + q = rq.score(text) + hint = q.repair_hint() + assert hint + assert "empty" in hint.lower() or "minimum" in hint.lower() + + +def test_repair_hint_empty_for_ok() -> None: + text = "Long enough to clear the empty threshold; this is fine." + q = rq.score(text) + assert q.ok + assert q.repair_hint() == "" + + +def test_confidence_on_0_100_scale_normalised() -> None: + text = "Long answer.\nCONFIDENCE: 85" + q = rq.score(text) + assert q.confidence == pytest.approx(0.85) + + +def test_require_confidence_flag() -> None: + text = "Long enough answer without a CONFIDENCE line at all here." + q = rq.score(text, require_confidence=True) + assert "confidence_missing" in q.flags + + +def test_require_confidence_off_by_default() -> None: + text = "Long enough answer without a CONFIDENCE line at all here." + q = rq.score(text) + assert "confidence_missing" not in q.flags + assert q.ok diff --git a/tests/test_robustness_integration.py b/tests/test_robustness_integration.py new file mode 100644 index 0000000..2833e9b --- /dev/null +++ b/tests/test_robustness_integration.py @@ -0,0 +1,280 @@ +"""Integration tests for the robustness loops. + +Covers the cross-module wiring added in Layer 1-4: + +* ``POST: question`` blocks parse target_agent + urgency. +* Workflow YAML parses ``refine:`` blocks. +* ``find_unanswered_questions`` filters answered questions correctly. +* The :class:`AgentRunner` chokepoint wrappers skip recursion when + invoked under a meta-phase, and proactive auto-consensus fires on + careful stages. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List + +import pytest + +from clk_harness.config import Paths, DEFAULT_CLK_CONFIG +from clk_harness.orchestration import blackboard as bb +from clk_harness.orchestration import workflow as wf +from clk_harness.orchestration.agent import AgentRunner +from clk_harness.providers.base import AgentResponse + + +@pytest.fixture +def paths(tmp_path: Path) -> Paths: + p = Paths(root=tmp_path) + p.ensure() + return p + + +# --------------------------------------------------------------------------- +# Layer 1: response_quality is exercised in test_response_quality.py. +# --------------------------------------------------------------------------- + + +# --------------------------------------------------------------------------- +# Layer 2: blackboard Q&A protocol +# --------------------------------------------------------------------------- + + +def test_post_question_with_to_and_urgency(paths: Paths) -> None: + text = ( + "POST: question\n" + "TO: architect\n" + "URGENCY: blocking\n" + "BODY:\n" + "Are user IDs opaque strings or ints?\n" + "END_POST\n" + ) + posted = bb.apply_post_blocks(paths, text, author="researcher", stage_id="s1") + assert len(posted) == 1 + q = posted[0] + assert q.target_agent == "architect" + assert q.urgency == "blocking" + assert q.post_type == "question" + + +def test_find_unanswered_questions(paths: Paths) -> None: + q = bb.post( + paths, + author="researcher", + body="What is the data model?", + post_type="question", + target_agent="architect", + urgency="blocking", + ) + assert q.id in [p.id for p in bb.find_unanswered_questions(paths)] + # Answer it. + bb.post( + paths, + author="architect", + body="Use UUID strings.", + post_type="answer", + consumes=[q.id], + ) + assert q.id not in [p.id for p in bb.find_unanswered_questions(paths)] + + +def test_find_unanswered_filters_by_target(paths: Paths) -> None: + bb.post( + paths, author="a", body="q1", post_type="question", + target_agent="architect", urgency="blocking", + ) + bb.post( + paths, author="a", body="q2", post_type="question", + target_agent="qa", urgency="blocking", + ) + only_arch = bb.find_unanswered_questions(paths, target_agent="architect") + assert len(only_arch) == 1 + assert only_arch[0].target_agent == "architect" + + +def test_async_urgency_parsed(paths: Paths) -> None: + text = ( + "POST: question\n" + "TO: engineer\n" + "URGENCY: async\n" + "BODY:\n" + "When you next touch the auth module, consider X.\n" + "END_POST\n" + ) + posted = bb.apply_post_blocks(paths, text, author="critic", stage_id="s1") + assert posted[0].urgency == "async" + + +# --------------------------------------------------------------------------- +# Layer 3: workflow stage `refine:` field +# --------------------------------------------------------------------------- + + +def test_workflow_stage_parses_refine_dict(tmp_path: Path) -> None: + yaml_path = tmp_path / "engineering.yaml" + yaml_path.write_text( + """ +name: engineering +description: test +stages: + - id: design_spec + agent: architect + objective: Draft the spec. + refine: + critic: critic + max_rounds: 4 + accept_threshold: 0.75 +""", + encoding="utf-8", + ) + flow = wf.load_workflow(yaml_path) + stage = flow.stages[0] + assert stage.refine == {"critic": "critic", "max_rounds": 4, "accept_threshold": 0.75} + + +def test_workflow_stage_refine_absent_is_none(tmp_path: Path) -> None: + yaml_path = tmp_path / "engineering.yaml" + yaml_path.write_text( + """ +name: engineering +description: test +stages: + - id: plain + agent: engineer + objective: Just do it. +""", + encoding="utf-8", + ) + flow = wf.load_workflow(yaml_path) + assert flow.stages[0].refine is None + + +# --------------------------------------------------------------------------- +# Layer 1: AgentRunner chokepoint dispatch dispatch +# --------------------------------------------------------------------------- + + +class _FakeProvider: + """Provider stub yielding pre-canned responses in order. + + Used to test the chokepoint without spawning real subprocesses. + """ + + def __init__(self, responses: List[AgentResponse]) -> None: + self._responses = list(responses) + self.calls: List[Dict[str, Any]] = [] + + def describe(self) -> str: + return "fake" + + def invoke(self, req): # noqa: ANN001 + self.calls.append({"agent": req.agent, "prompt_len": len(req.prompt or "")}) + if not self._responses: + return AgentResponse(ok=True, text="(default)") + return self._responses.pop(0) + + +def _make_runner(paths: Paths, provider: _FakeProvider, clk_cfg_overrides: Dict[str, Any] | None = None) -> AgentRunner: + agents_cfg = { + "agents": { + "chief": {"prompt": "chief.md", "provider": None, "role": "casting director"}, + "qa": {"prompt": "qa.md", "provider": None, "role": "validator"}, + "ralph": {"prompt": "ralph.md", "provider": None, "role": "loop driver"}, + "engineer": {"prompt": "engineer.md", "provider": None, "role": "implementer"}, + "architect": {"prompt": "architect.md", "provider": None, "role": "design"}, + "critic": {"prompt": "critic.md", "provider": None, "role": "judge"}, + } + } + providers_cfg = {"active": "fake", "providers": {"fake": {"type": "fake"}}} + clk_cfg: Dict[str, Any] = json.loads(json.dumps(DEFAULT_CLK_CONFIG)) # deep copy + clk_cfg["dry_run"] = False + clk_cfg["provider_retry"] = {"max_retries": 0, "backoff_s": 0} + if clk_cfg_overrides: + clk_cfg.update(clk_cfg_overrides) + runner = AgentRunner(paths, agents_cfg, providers_cfg, clk_cfg) + # Override the provider loader to always return our stub. + runner.get_provider = lambda name=None: provider # type: ignore[method-assign] + return runner + + +def test_quality_retry_fires_on_empty_response(paths: Paths) -> None: + # First call: empty (should trigger retry). Second call: substantive. + good = "Here is a substantive engineering plan with concrete steps." + provider = _FakeProvider([ + AgentResponse(ok=True, text=""), + AgentResponse(ok=True, text=good), + ]) + runner = _make_runner(paths, provider, { + "robustness": { + **DEFAULT_CLK_CONFIG["robustness"], + "auto_consensus": "off", + "auto_refine": "off", + "max_quality_retries": 1, + } + }) + run = runner.run("engineer", "Implement feature X.") + assert run.response.text == good + assert len(provider.calls) == 2, "expected one retry after empty first response" + + +def test_quality_retry_capped(paths: Paths) -> None: + # All four calls return empty; runner gives up after max_quality_retries+1. + provider = _FakeProvider([ + AgentResponse(ok=True, text="") for _ in range(10) + ]) + runner = _make_runner(paths, provider, { + "robustness": { + **DEFAULT_CLK_CONFIG["robustness"], + "auto_consensus": "off", + "auto_refine": "off", + "max_quality_retries": 2, + } + }) + runner.run("engineer", "Implement feature X.") + # Initial + 2 retries = 3 attempts. (No escalation since auto_consensus=off.) + assert len(provider.calls) == 3 + + +def test_meta_phase_bypasses_quality_loop(paths: Paths) -> None: + # An empty response inside a meta phase should NOT retry. + provider = _FakeProvider([AgentResponse(ok=True, text="")]) + runner = _make_runner(paths, provider, { + "robustness": { + **DEFAULT_CLK_CONFIG["robustness"], + "auto_consensus": "off", + "auto_refine": "off", + "max_quality_retries": 3, + } + }) + runner.run("engineer", "noop", extra={"phase": "refine_critic"}) + assert len(provider.calls) == 1 + + +def test_should_auto_consensus_on_careful(paths: Paths) -> None: + provider = _FakeProvider([]) + runner = _make_runner(paths, provider, { + "robustness": {**DEFAULT_CLK_CONFIG["robustness"], "auto_consensus": "on_careful"} + }) + assert runner._should_auto_consensus("engineer", {"careful": True}) + assert not runner._should_auto_consensus("engineer", {}) + assert not runner._should_auto_consensus("chief", {"careful": True}) + + +def test_should_auto_consensus_off(paths: Paths) -> None: + provider = _FakeProvider([]) + runner = _make_runner(paths, provider, { + "robustness": {**DEFAULT_CLK_CONFIG["robustness"], "auto_consensus": "off"} + }) + assert not runner._should_auto_consensus("engineer", {"careful": True}) + + +def test_should_auto_consensus_always(paths: Paths) -> None: + provider = _FakeProvider([]) + runner = _make_runner(paths, provider, { + "robustness": {**DEFAULT_CLK_CONFIG["robustness"], "auto_consensus": "always"} + }) + assert runner._should_auto_consensus("engineer", {}) + assert runner._should_auto_consensus("ralph", {}) + assert not runner._should_auto_consensus("chief", {})