From 032849284bcfcb7b064ff986f850b3c816d8da7e Mon Sep 17 00:00:00 2001 From: yumengv <534244420@qq.com> Date: Wed, 27 May 2026 09:13:33 +0800 Subject: [PATCH] fix(tui): preserve selection after Ctrl+C copy Keep selected text highlighted after Ctrl+C copies it. No-selection Ctrl+C still keeps the quick-clear then second-press exit behavior. --- frontends/tui_v3.py | 9217 +++++++++++++++++++++---------------------- 1 file changed, 4608 insertions(+), 4609 deletions(-) diff --git a/frontends/tui_v3.py b/frontends/tui_v3.py index 8b011ec9..c6d84216 100644 --- a/frontends/tui_v3.py +++ b/frontends/tui_v3.py @@ -1,4609 +1,4608 @@ -"""tui_v3 — scrollback-first TUI for GenericAgent, consolidated. - -Merged from frontends/tui/ (cjk, clipboard, renderer, protocol, core/sb) -into a single file so the v3 frontend ships as one drop-in module. -Run: `python -m frontends.tui_v3` or `python frontends/tui_v3.py`. -""" -from __future__ import annotations - -import asyncio, atexit, json, locale, logging, os, queue, random, re, select, shutil, signal, subprocess -import sys, tempfile, threading, time - -_IS_WINDOWS = os.name == 'nt' - - -# Make `frontends/` parent (project root) importable so `from agentmain import …` -# works whether this file is run as `python -m frontends.tui_v3` or directly -# via `python frontends/tui_v3.py`. -_proj_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -_front_dir = os.path.dirname(os.path.abspath(__file__)) -for _p in (_proj_root, _front_dir): - if _p not in sys.path: - sys.path.insert(0, _p) - -from agentmain import GeneraticAgent -from dataclasses import dataclass -from dataclasses import dataclass, field -from functools import lru_cache -from io import StringIO -from rich.cells import cell_len -from rich.console import Console -from rich.markdown import Markdown -from rich.text import Text -from rich.theme import Theme -from typing import Callable - -# ════════════════════════════════════════════════════════════════════════════ -# i18n — minimal dict-based zh/en translation layer (inlined; was tui_v3_i18n.py) -# -# - Single nested dict `_I18N[lang][key]` → format string. -# - `t(key, **fmt)` returns the formatted string for the current language; -# falls back to English, then to the key itself (so missing keys are visible). -# - Language detection: persisted user choice > system locale > 'en'. -# - Persistence: temp/tui_v3_settings.json (workspace-local, matches v2 pattern). -# -# Strings that intentionally stay single-language (English): -# - Spinner gerunds (Pondering/Brewing/...) — ported from v2. -# - Tech jargon embedded in zh strings: 'tokens', 'ctx', 'model', 'session', … -# ════════════════════════════════════════════════════════════════════════════ - -_SETTINGS_PATH = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "..", "temp", "tui_v3_settings.json" -) - -_SUPPORTED = ('zh', 'en') -_DEFAULT = 'en' - -_LANG: str = _DEFAULT - - -# ---------------- spinner gerunds (English only, from v2) ---------------- - -SPINNER_GERUNDS = ( - "Pondering", "Reticulating", "Sleuthing", "Hatching", "Pouncing", - "Brewing", "Sharpening", "Untangling", "Compiling", "Unraveling", - "Distilling", "Calibrating", "Marinating", "Conjuring", "Foraging", - "Spelunking", "Synthesizing", "Refactoring thoughts", "Tracing breadcrumbs", - "Following the rabbit hole", - "Routing", "Threading", "Polling", "Spinning", "Hooking", - "Patching", "Caching", "Yielding", "Hydrating", "Folding", - "Streaming", "Resolving", "Reaping", "Tuning", -) - - -# Language display names (always shown in their own script — never translated, -# so users always see them in a form they recognize). -LANG_LABELS = { - 'zh': '简体中文', - 'en': 'English', -} - - -# Rotating usage tips — one picked per launch, shown in the banner (v2 _TIPS). -# Only covers features v3 has actually adapted; the en/zh lists run parallel. -_TIPS = { - 'en': [ - "Tip: press / to open the command palette — arrow keys to pick, Enter drops it into the input box.", - "Tip: pasted images / files fold into [Image #N] / [File #N] placeholders; backspace deletes the whole block.", - "Tip: /btw lets a side-agent answer without interrupting the main task.", - "Tip: /rewind [n] rewinds the last n turns; /stop aborts the current task.", - "Tip: /continue lists past sessions — arrow keys to pick, Enter to restore.", - "Tip: Ctrl+J / Shift+Enter inserts a newline in multi-line input; Enter sends.", - "Tip: put [multi-select] in an ask_user prompt to switch to a multi-pick picker.", - "Tip: /cost shows token usage; /llm views / switches the model.", - "Tip: /new [name] starts a fresh session; /language switches the interface language.", - "Tip: /export clip copies the last reply to your system clipboard; /export all prints the log path.", - "Tip: Ctrl+O folds / unfolds all completed tool chips — each fold collapses to one line.", - ], - 'zh': [ - "Tip: 按 / 唤起命令面板 —— 方向键选择,Enter 落入输入框。", - "Tip: 粘贴图片 / 文件会折叠成 [Image #N] / [File #N] 占位符,退格可整块删除。", - "Tip: /btw <问题> 让 side-agent 回答而不打断主任务。", - "Tip: /rewind [n] 回退最近 n 轮对话;/stop 中止当前任务。", - "Tip: /continue 列出历史会话 —— 方向键选择,Enter 恢复。", - "Tip: 多行输入用 Ctrl+J / Shift+Enter 换行;Enter 直接发送。", - "Tip: ask_user 题目里写 [多选] 会自动切到多选 picker。", - "Tip: /cost 查看 token 用量;/llm 查看 / 切换模型。", - "Tip: /new [name] 新建会话;/language 切换界面语言。", - "Tip: /export clip 把最后回复复制到系统剪贴板;/export all 打印日志路径。", - "Tip: Ctrl+O 折叠 / 展开所有已完成的工具 chip —— 每个 chip 折叠成一行。", - ], -} - - -# ---------------- translations ---------------- -# Keys are dot-namespaced. Format placeholders use {name}. - -_I18N: dict[str, dict[str, str]] = { - 'en': { - # /help intro & rows — phrasing mirrors v2's COMMANDS table. - 'help.title': 'Commands:', - 'help.help': ' /help Show help', - 'help.status': ' /status View session status', - 'help.sessions': ' /sessions List all sessions', - 'help.llm': ' /llm [n] View / switch model', - 'help.btw': ' /btw Side question — does not interrupt main agent', - 'help.review': ' /review [request] In-session code review (report inline)', - 'help.rewind': ' /rewind [n] Rewind the last n rounds', - 'help.continue': ' /continue [n|name] List / restore historical sessions', - 'help.new': ' /new [name] Start a new session (clears the current one)', - 'help.rename': ' /rename Rename the current session', - 'help.clear': ' /clear Clear display (does not touch LLM history)', - 'help.cost': ' /cost Token usage for the current session', - 'help.verbose': ' /verbose Tool-call audit (↑↓ select · Enter switch · c copy · q quit)', - 'help.export': ' /export [sub] Export last reply: clip / file [name] / all', - 'help.stop': ' /stop Abort current task', - 'help.language': ' /language [code] View / switch interface language', - 'help.quit': ' /quit Quit', - 'help.esc': ' Esc Cancel ask · clear draft · stop task (no exit)', - 'help.cc': ' Ctrl+C × 2 Quit (when idle; only aborts the task while running)', - 'help.cl': ' Ctrl+L Force repaint (recover from sleep/wake)', - 'help.cz': ' Ctrl+Z / Ctrl+Y Undo / redo input edits', - 'help.shift_arrow': ' Shift+←→↑↓ Select text (Ctrl+C copy / Ctrl+X cut / Ctrl+A all)', - - # _CMDS palette entries — same wording as /help, condensed for one-line hint. - 'cmd.help.desc': 'Show help', - 'cmd.status.desc': 'View session status', - 'cmd.llm.desc': 'View / switch model', - 'cmd.btw.desc': 'Side question — does not interrupt main agent', - 'cmd.review.desc': 'In-session code review', - 'cmd.rewind.desc': 'Rewind the last n rounds', - 'cmd.continue.desc': 'List / restore historical sessions', - 'cmd.new.desc': 'Start a new session', - 'cmd.rename.desc': 'Rename the current session', - 'cmd.clear.desc': 'Clear display (LLM history untouched)', - 'cmd.cost.desc': 'Token usage for the current session', - 'cmd.verbose.desc': 'Tool-call audit', - 'cmd.export.desc': 'Export the last reply (clip/file/all)', - 'cmd.stop.desc': 'Abort current task', - 'cmd.language.desc': 'View / switch interface language', - 'cmd.quit.desc': 'Quit', - - # _CMDS arg hints — mirror v2 (lowercase n, full word "question"). - 'cmd.llm.arg': '[n]', - 'cmd.btw.arg': '', - 'cmd.review.arg': '[request]', - 'cmd.rewind.arg': '[n]', - 'cmd.continue.arg': '[n|name]', - 'cmd.new.arg': '[name]', - 'cmd.rename.arg': '', - 'cmd.export.arg': '[clip|file|all]', - 'cmd.language.arg': '[code]', - - # status line (one-liner above input box) - 'status.asking': '◉ waiting · Esc cancel', - 'status.running.tail': ' · Esc stop', - 'status.tps': ' · {rate:.0f} tok/s', - 'status.cc_confirm': 'Press Ctrl+C again to quit', - 'status.ready': '○ ready', - - # /status output rows - 'status.title': ' Session status', - 'status.label.model': 'model:', - 'status.label.state': 'state:', - 'status.label.rounds': 'rounds:', - 'status.label.context': 'context:', - 'status.label.cwd': 'cwd:', - 'status.state.running': 'running · {verb} {elapsed}', - 'status.state.waiting': 'waiting · ask_user pending', - 'status.state.idle': 'idle', - 'status.ctx.unknown': 'n/a', - 'status.ctx.fmt': '{used:,} / {cap:,} ctx', - - # banner - 'banner.label.model': 'model:', - 'banner.label.directory': 'directory:', - 'banner.label.session': 'session:', - 'banner.session.single': 'single · scrollback', - 'banner.llm_hint': '/llm switch', - - # messages — success / status - 'msg.ask_cancelled': '✗ ask cancelled · type freely or ask again', - 'msg.abort_requested': '⏹ abort requested · Esc', - 'msg.abort_done': '⏹ abort requested', - 'msg.idle_no_task': '(idle, no task)', - 'msg.cleared': 'new conversation · context cleared', - 'msg.new_session': 'new session · previous conversation cleared', - 'msg.new_session_named': 'new session "{name}" · previous conversation cleared', - 'msg.renamed': '✎ session renamed to "{name}"', - 'msg.rewind': '↩ rewound {n} turn(s) (removed {removed} history entries; scrollback is not editable)', - 'msg.no_rewindable': 'no rewindable turns', - 'msg.continue_loading': '┄┄ loaded {name}, full context above ┄┄', - 'msg.continue_ready': '┄┄ {msg} · continue typing ┄┄', - 'msg.llm_switched': 'LLM → {name}', - 'msg.export_done': 'exported: {path}', - 'msg.export_clipped': 'copied to clipboard ({n} chars)', - 'msg.export_clip_failed': '❌ copy failed: no clipboard tool found', - 'msg.export_all': 'full log:\n{path}', - 'msg.export_all_missing': 'no log file yet', - 'msg.review_empty': '(review produced no output)', - 'msg.no_export': '(no reply to export)', - 'msg.no_tracker': '(no stats yet)', - 'msg.no_history': ' no restorable historical sessions', - 'msg.no_llms': '(no LLMs available)', - 'msg.no_tools': ' (no tool-call records)', - 'msg.lang_current': 'Current language: {label} ({code})', - 'msg.lang_switched': 'Language → {label}', - 'msg.btw_no_answer': '(no answer)', - 'btw.title': 'side-questions · Esc to clear', - 'btw.querying': ' ⋯ querying…', - - # plan card - 'plan.header': 'Plan ({done}/{total})', - 'plan.complete': '✓ Plan complete ({n}/{n})', - 'plan.placeholder': 'Plan mode activated', - 'plan.waiting': 'waiting for {path} …', - 'plan.overflow': '+{n} more', - - # errors - 'err.running_blocked': 'busy — /stop before using this command', - 'err.continue_usage': 'usage: /continue or /continue N', - 'err.index_oob': '❌ index out of range (valid: 1-{max})', - 'err.btw_usage': 'usage: /btw (does not pollute main context)', - 'err.rewind_usage': 'usage: /rewind ', - 'err.rename_usage': 'usage: /rename ', - 'err.rewind_range': '❌ rewind failed: n must be 1-{max}', - 'err.lang_usage': 'usage: /language [code] (codes: {available})', - 'err.lang_unknown': 'unknown language code: {code} (available: {available})', - 'err.unknown_cmd': 'unknown command /{name} — /help to list', - 'err.multi_session': '/{name}: multi-session backend not wired yet; command reserved', - 'err.menu_cb': '❌ menu callback failed: {err}', - 'err.no_llm': 'No LLM configured — check mykey.py', - 'err.no_tty': 'tui_v3: needs a real TTY (run it in iTerm directly)', - 'err.dep_missing': 'Error: {name} is not installed.', - 'err.dep_install': 'Install with: pip install rich prompt_toolkit', - - # menu / picker / palette - 'menu.default_title': 'Pick', - 'menu.hint': '↑↓ pick · Enter confirm · Esc cancel', - 'ask.default_q': 'answer:', - 'ask.title': '◉ answer', - 'ask.pending': ' +{n} pending', - 'ask.hint.multi': '↳ ↑↓ move · Space toggle · Enter submit · Esc cancel', - 'ask.hint.single': '↳ ↑↓ navigate (options ⇄ input) · Enter confirm · Esc cancel', - 'ask.hint.freetext': '↳ ↑↓ back to options · type to input · Enter submit · Esc cancel', - - # continue picker - 'continue.title': 'Restore historical session', - 'continue.row.fmt': '{rel:>4} {rounds:>3}r {preview}', - 'continue.unit.round': 'r', - - # llm picker - 'llm.title': 'Switch LLM', - - # export picker - 'export.title': 'Export the last reply', - 'export.opt.clip': 'Copy to clipboard', - 'export.opt.file': 'Save to file (temp/)', - 'export.opt.all': 'Show full log path', - - # rewind picker - 'rewind.title': 'Rewind to which turn', - 'rewind.option': 'rewind {n} turn(s) · {preview}', - - # language picker - 'lang.title': 'Switch interface language', - - # verbose - 'verbose.title': ' Tool Trace', - 'verbose.hint': ' ↑↓ pick · PgUp/Dn scroll · Enter switch[{field}] · c copy · e export · q quit', - 'verbose.empty': '(empty)', - - # answer prefix when committing user reply to ask_user - 'msg.answer_prefix': '[ans] {text}', - }, - - 'zh': { - # /help intro & rows — 措辞与 v2 COMMANDS 表对齐。 - 'help.title': '命令:', - 'help.help': ' /help 显示帮助', - 'help.status': ' /status 查看会话状态', - 'help.sessions': ' /sessions 列出所有会话', - 'help.llm': ' /llm [n] 查看 / 切换模型', - 'help.btw': ' /btw 旁问 — 不打断主 agent', - 'help.review': ' /review [request] in-session 代码审查(直接输出报告)', - 'help.rewind': ' /rewind [n] 回退最近 n 轮', - 'help.continue': ' /continue [n|name] 列出 / 恢复历史会话', - 'help.new': ' /new [name] 新建会话(清空当前会话)', - 'help.rename': ' /rename 重命名当前会话', - 'help.clear': ' /clear 清空显示(不动 LLM 历史)', - 'help.cost': ' /cost 显示当前会话 token 用量', - 'help.verbose': ' /verbose 工具调用审计(↑↓ 选 · Enter 切换 · c 复制 · q 退)', - 'help.export': ' /export [sub] 导出最后回复:clip / file [name] / all', - 'help.stop': ' /stop 中止当前任务', - 'help.language': ' /language [code] 查看 / 切换界面语言', - 'help.quit': ' /quit 退出', - 'help.esc': ' Esc 撤回提问 · 清草稿 · 停任务(不退出)', - 'help.cc': ' Ctrl+C × 2 退出(空闲时;运行中只 abort 任务)', - 'help.cl': ' Ctrl+L 强制重画(睡眠唤醒后修复)', - 'help.cz': ' Ctrl+Z / Ctrl+Y 撤销 / 重做 输入框编辑', - 'help.shift_arrow': ' Shift+←→↑↓ 选中文字(Ctrl+C 复制 / Ctrl+X 剪切 / Ctrl+A 全选)', - - # _CMDS palette entries — 与 /help 同源,命令面板单行显示。 - 'cmd.help.desc': '显示帮助', - 'cmd.status.desc': '查看会话状态', - 'cmd.llm.desc': '查看 / 切换模型', - 'cmd.btw.desc': '旁问 — 不打断主 agent', - 'cmd.review.desc': 'in-session 代码审查', - 'cmd.rewind.desc': '回退最近 n 轮', - 'cmd.continue.desc': '列出 / 恢复历史会话', - 'cmd.new.desc': '新建会话', - 'cmd.rename.desc': '重命名当前会话', - 'cmd.clear.desc': '清空显示(不动 LLM 历史)', - 'cmd.cost.desc': '显示当前会话 token 用量', - 'cmd.verbose.desc': '工具调用审计', - 'cmd.export.desc': '导出最后回复(剪贴板/文件/日志路径)', - 'cmd.stop.desc': '中止当前任务', - 'cmd.language.desc': '查看 / 切换界面语言', - 'cmd.quit.desc': '退出', - - # arg hints — 与 v2 对齐:小写 n、完整的 question 等。 - 'cmd.llm.arg': '[n]', - 'cmd.btw.arg': '', - 'cmd.review.arg': '[request]', - 'cmd.rewind.arg': '[n]', - 'cmd.continue.arg': '[n|name]', - 'cmd.new.arg': '[name]', - 'cmd.rename.arg': '', - 'cmd.export.arg': '[clip|file|all]', - 'cmd.language.arg': '[code]', - - # status line - 'status.asking': '◉ 待答 · Esc 撤回提问', - 'status.running.tail': ' · Esc 停', - 'status.tps': ' · {rate:.0f} tok/s', - 'status.cc_confirm': '再按 Ctrl+C 退出', - 'status.ready': '○ 就绪', - - # /status - 'status.title': ' 会话状态', - 'status.label.model': 'model:', - 'status.label.state': 'state:', - 'status.label.rounds': 'rounds:', - 'status.label.context': 'context:', - 'status.label.cwd': 'cwd:', - 'status.state.running': 'running · {verb} {elapsed}', - 'status.state.waiting': 'waiting · ask_user pending', - 'status.state.idle': 'idle', - 'status.ctx.unknown': 'n/a', - 'status.ctx.fmt': '{used:,} / {cap:,} ctx', - - # banner - 'banner.label.model': 'model:', - 'banner.label.directory': 'directory:', - 'banner.label.session': 'session:', - 'banner.session.single': '单会话 · scrollback', - 'banner.llm_hint': '/llm 切换', - - # messages - 'msg.ask_cancelled': '✗ 已撤回提问 · 可直接输入或重新发问', - 'msg.abort_requested': '⏹ 已请求中止 · Esc', - 'msg.abort_done': '⏹ 已请求中止', - 'msg.idle_no_task': '(空闲,无任务)', - 'msg.cleared': '新对话 · 上下文已清空', - 'msg.new_session': '新会话 · 上一段对话已清空', - 'msg.new_session_named': '新会话「{name}」· 上一段对话已清空', - 'msg.renamed': '✎ 会话已重命名为「{name}」', - 'msg.rewind': '↩ 回退 {n} 轮(移除 {removed} 条历史;scrollback 不可改,以此为界)', - 'msg.no_rewindable': '没有可回退的轮次', - 'msg.continue_loading': '┄┄ 载入 {name},以下为完整上文 ┄┄', - 'msg.continue_ready': '┄┄ {msg} · 接着说即可 ┄┄', - 'msg.llm_switched': 'LLM → {name}', - 'msg.export_done': '已导出: {path}', - 'msg.export_clipped': '已复制到剪贴板({n} 字符)', - 'msg.export_clip_failed': '❌ 复制失败:未找到剪贴板工具', - 'msg.export_all': '完整日志:\n{path}', - 'msg.export_all_missing': '尚无日志文件', - 'msg.review_empty': '(review 无输出)', - 'msg.no_export': '(没有可导出的回答)', - 'msg.no_tracker': '(暂无统计)', - 'msg.no_history': ' 没有可恢复的历史会话', - 'msg.no_llms': '(无可用 LLM)', - 'msg.no_tools': ' (暂无工具调用记录)', - 'msg.lang_current': '当前界面语言:{label}({code})', - 'msg.lang_switched': '界面语言 → {label}', - 'msg.btw_no_answer': '(无回答)', - 'btw.title': '旁问 · Esc 清空', - 'btw.querying': ' ⋯ 查询中…', - - # plan card - 'plan.header': '计划 ({done}/{total})', - 'plan.complete': '✓ 计划完成 ({n}/{n})', - 'plan.placeholder': '计划模式已激活', - 'plan.waiting': '等待写入 {path} …', - 'plan.overflow': '还有 {n} 项', - - # errors - 'err.running_blocked': '运行中,先 /stop 再用该命令', - 'err.continue_usage': '用法: /continue 或 /continue N', - 'err.index_oob': '❌ 索引越界(有效 1-{max})', - 'err.btw_usage': '用法: /btw <旁问>(不污染主上下文)', - 'err.rewind_usage': '用法:/rewind ', - 'err.rename_usage': '用法:/rename ', - 'err.rewind_range': '❌ 回退失败:n 应在 1-{max}', - 'err.lang_usage': '用法:/language [code] (可选 code:{available})', - 'err.lang_unknown': '未知语言代码:{code} (可选:{available})', - 'err.unknown_cmd': '未知命令 /{name} — /help 看可用命令', - 'err.multi_session': '/{name}:多会话后端尚未接入,命令已预留但暂未实现', - 'err.menu_cb': '❌ 菜单回调失败: {err}', - 'err.no_llm': 'No LLM configured — check mykey.py', - 'err.no_tty': 'tui_v3: needs a real TTY (run it in iTerm directly)', - 'err.dep_missing': 'Error: {name} is not installed.', - 'err.dep_install': 'Install with: pip install rich prompt_toolkit', - - # menu / picker / palette - 'menu.default_title': '选择', - 'menu.hint': '↑↓ 选 · Enter 确认 · Esc 取消', - 'ask.default_q': '请回答:', - 'ask.title': '◉ 请回答', - 'ask.pending': ' +{n} 待答', - 'ask.hint.multi': '↳ ↑↓ 移动 · Space 标记 · Enter 提交 · Esc 撤回', - 'ask.hint.single': '↳ ↑↓ 切换(选项 ⇄ 输入框)· Enter 确认 · Esc 撤回', - 'ask.hint.freetext': '↳ ↑↓ 回到选项 · 输字符输入 · Enter 提交 · Esc 撤回', - - # continue picker - 'continue.title': '恢复历史会话', - 'continue.row.fmt': '{rel:>4} {rounds:>3}轮 {preview}', - 'continue.unit.round': '轮', - - # llm picker - 'llm.title': '切换 LLM', - - # export picker - 'export.title': '导出最后回复', - 'export.opt.clip': '复制到剪贴板', - 'export.opt.file': '保存到文件(temp/)', - 'export.opt.all': '显示完整日志路径', - - # rewind picker - 'rewind.title': '选择回退到的轮次', - 'rewind.option': '回退 {n} 轮 · {preview}', - - # language picker - 'lang.title': '切换界面语言', - - # verbose - 'verbose.title': ' Tool Trace', - 'verbose.hint': ' ↑↓ 选 · PgUp/Dn 滚 · Enter 切换[{field}] · c 复制 · e 导出 · q 退', - 'verbose.empty': '(空)', - - # answer prefix - 'msg.answer_prefix': '[答] {text}', - }, -} - - -def t(key: str, **fmt) -> str: - """Translate `key` to current language; fall back to en then to key itself. - Missing format fields raise KeyError — caller bug, not i18n bug.""" - val = _I18N.get(_LANG, {}).get(key) - if val is None and _LANG != 'en': - val = _I18N.get('en', {}).get(key) - if val is None: - val = key # visible breadcrumb for missing keys - if fmt: - try: - return val.format(**fmt) - except (KeyError, IndexError, ValueError): - return val - return val - - -def tip_count() -> int: - """Number of rotating banner tips (same for every language).""" - return len(_TIPS['en']) - - -def tip(idx: int) -> str: - """Banner tip at `idx`, resolved in the current language so a /language - switch relabels it on the next banner repaint.""" - pool = _TIPS.get(_LANG) or _TIPS['en'] - return pool[idx % len(pool)] if pool else '' - - -def get_lang() -> str: - return _LANG - - -def set_lang(code: str) -> bool: - """Switch active language and persist. Returns True on success.""" - global _LANG - if code not in _SUPPORTED: - return False - _LANG = code - _save_settings({'lang': code}) - return True - - -def supported() -> tuple[str, ...]: - return _SUPPORTED - - -def _detect_system_lang() -> str: - """System language: check LC_ALL → LC_MESSAGES → LANG → locale.getlocale(). - Prefix `zh` → 'zh', else 'en'.""" - for env in ('LC_ALL', 'LC_MESSAGES', 'LANG'): - v = os.environ.get(env) - if v: - if v.lower().startswith('zh'): - return 'zh' - return 'en' - try: - loc = locale.getlocale()[0] or '' - if loc.lower().startswith('zh'): - return 'zh' - except Exception: - pass - return _DEFAULT - - -def _load_settings() -> dict: - try: - with open(_SETTINGS_PATH, 'r', encoding='utf-8') as f: - data = json.load(f) - return data if isinstance(data, dict) else {} - except Exception: - return {} - - -def _save_settings(patch: dict) -> None: - cur = _load_settings() - cur.update(patch) - try: - os.makedirs(os.path.dirname(_SETTINGS_PATH), exist_ok=True) - with open(_SETTINGS_PATH, 'w', encoding='utf-8') as f: - json.dump(cur, f, ensure_ascii=False, indent=2) - except Exception: - pass - - -def init_lang() -> str: - """Resolve and install initial language: persisted > system > default. - Call once at startup; returns the resolved code.""" - global _LANG - saved = _load_settings().get('lang') - if saved in _SUPPORTED: - _LANG = saved - else: - _LANG = _detect_system_lang() - return _LANG - - -# `_t` alias kept so the hundreds of existing `_t(...)` call sites are untouched. -_t = t - -# Resolve language once on import so any module-level string (banner, _CMDS, -# /help) sees the right locale. -init_lang() -# ════════════════════════════════════════════════════════════════════════════ - - -# Module-level `clip` shim: keep sb.py-style `clip.copy(...)` calls -# working without a separate clipboard module — the underlying funcs -# (copy, paste, paste_image) are defined later in this same file. -class _Clip: - @staticmethod - def copy(text): return copy(text) - @staticmethod - def paste(): return paste() - @staticmethod - def paste_image(): return paste_image() -clip = _Clip() - - -def _enable_windows_vt_mode() -> None: - """Enable UTF-8 + ANSI escape processing on Windows consoles when possible.""" - if not _IS_WINDOWS: - return - try: - import ctypes - kernel32 = ctypes.windll.kernel32 - # Make classic conhost/cmd decode UTF-8 bytes written by _w(). This is - # harmless in mintty/Git Bash where these calls usually fail because the - # std handles are pipes/ptys rather than Win32 console handles. - kernel32.SetConsoleOutputCP(65001) - kernel32.SetConsoleCP(65001) - enable_vt = 0x0004 - for handle_id in (-11, -12): # STD_OUTPUT_HANDLE, STD_ERROR_HANDLE - handle = kernel32.GetStdHandle(handle_id) - mode = ctypes.c_uint32() - if kernel32.GetConsoleMode(handle, ctypes.byref(mode)): - kernel32.SetConsoleMode(handle, mode.value | enable_vt) - except Exception: - # Safe fallback: modern terminals usually already support ANSI/UTF-8; - # older conhost may render escape codes, but the TUI should not crash. - pass - - -def _enter_utf8_charset() -> None: - """Ask VT-compatible terminals to interpret subsequent bytes as UTF-8.""" - # ESC % G is the ISO-2022/VT sequence for selecting UTF-8. It fixes some - # mintty/Git-Bash launches where the child process inherits a legacy locale - # and mojibakes UTF-8 box drawing/CJK into CP936-looking text. - _w('\x1b%G') - - -def _ptk_keypress_to_bytes(kp) -> bytes: - """Map prompt_toolkit KeyPress objects to tui_v3's internal byte protocol. - - prompt_toolkit normalizes platform-specific console input (Win32 console, - ConPTY, mintty/msys pty) into symbolic Keys. Keep the editor core small by - translating those symbols back to the bytes already handled by _feed/_keys. - """ - try: - from prompt_toolkit.keys import Keys - except Exception: - Keys = None # type: ignore[assignment] - - key = getattr(kp, 'key', None) - data = getattr(kp, 'data', '') or '' - - # Printable text and paste chunks. PTK may deliver a multi-character data - # string for bracketed paste/typeahead; forwarding UTF-8 preserves CJK/emoji. - if isinstance(data, str) and data and (len(data) > 1 or (len(data) == 1 and ord(data) >= 0x20 and data != '\x7f')): - return data.encode('utf-8', 'replace') - - name = getattr(key, 'name', str(key)) - key_s = str(key) - norm = name.lower().replace('_', '-').replace('keys.', '') - norm_s = key_s.lower().replace('_', '-').replace('keys.', '') - aliases = {norm, norm_s} - - def has(*needles: str) -> bool: - return any(n in a for a in aliases for n in needles) - - # Enter submits; Ctrl+J / Shift+Enter insert newline when PTK can distinguish. - if has('controlm', 'c-m') or key_s in ('\r', '\n'): - return b'\r' - if has('controlj', 'c-j', 's-enter', 'shift-enter'): - return b'\n' - - # Navigation. Existing _keys() uses these small control bytes. - if has('up') and has('shift'): - return b'\x1c' - if has('down') and has('shift'): - return b'\x1d' - if has('left') and has('shift'): - return b'\x1e' - if has('right') and has('shift'): - return b'\x1f' - if has('up'): - return b'\x10' - if has('down'): - return b'\x0e' - if has('left'): - return b'\x02' - if has('right'): - return b'\x06' - if has('home'): - return b'\x01' - if has('end'): - return b'\x05' - if has('delete'): - return b'\x7f' - if has('backspace') or data == '\x7f' or data == '\x08': - return b'\x7f' - if has('escape') or data == '\x1b': - return b'\x1b' - - ctrl = { - 'controla': b'\x01', 'c-a': b'\x01', - 'controlb': b'\x02', 'c-b': b'\x02', - 'controlc': b'\x03', 'c-c': b'\x03', - 'controld': b'\x04', 'c-d': b'\x04', - 'controle': b'\x05', 'c-e': b'\x05', - 'controlf': b'\x06', 'c-f': b'\x06', - 'controlh': b'\x7f', 'c-h': b'\x7f', - 'controlj': b'\n', 'c-j': b'\n', - 'controlk': b'\x0b', 'c-k': b'\x0b', - 'controll': b'\x0c', 'c-l': b'\x0c', - 'controln': b'\x0e', 'c-n': b'\x0e', - 'controlp': b'\x10', 'c-p': b'\x10', - 'controlu': b'\x15', 'c-u': b'\x15', - 'controlv': b'\x16', 'c-v': b'\x16', - 'controlx': b'\x18', 'c-x': b'\x18', - 'controly': b'\x19', 'c-y': b'\x19', - 'controlz': b'\x1a', 'c-z': b'\x1a', - } - for a in aliases: - if a in ctrl: - return ctrl[a] - - if isinstance(data, str) and data: - return data.encode('utf-8', 'replace') - return b'' - - -# ──────────────────────────────────────────────────────────────────────────── -# cjk: CJK wrap monkey-patch for Rich -# ──────────────────────────────────────────────────────────────────────────── - -log = logging.getLogger(__name__) - -_CJK_RANGES = ( - (0x4E00, 0x9FFF), (0x3400, 0x4DBF), (0x20000, 0x2A6DF), - (0x2A700, 0x2B73F), (0x2B740, 0x2B81F), (0x2B820, 0x2CEAF), - (0x2CEB0, 0x2EBEF), (0xF900, 0xFAFF), (0x2F800, 0x2FA1F), - (0x3000, 0x303F), (0x3040, 0x309F), (0x30A0, 0x30FF), - (0x31F0, 0x31FF), (0xFF00, 0xFFEF), (0xAC00, 0xD7AF), -) - - -def _is_cjk(ch: str) -> bool: - cp = ord(ch) - return any(lo <= cp <= hi for lo, hi in _CJK_RANGES) - - -def _is_wide(ch: str) -> bool: - try: - from rich.cells import cell_len - return cell_len(ch) == 2 - except ImportError: - return _is_cjk(ch) - - -def install_cjk_wrap() -> bool: - """Monkey-patch Rich's word-wrap to handle CJK char-level breaks. - Returns True on success, False on fallback.""" - try: - import rich._wrap as wrap_mod - from rich.cells import cell_len - except (ImportError, AttributeError) as e: - log.warning("CJK patch skipped: %s", e) - return False - - orig_divide = getattr(wrap_mod, 'divide_line', None) - if orig_divide is None: - log.warning("CJK patch skipped: Rich lacks divide_line") - return False - - def _patched_divide_line(text, width, fold=True): - divides = set() - line_width = 0 - for i, ch in enumerate(text._text if hasattr(text, '_text') else str(text)): - char_w = cell_len(ch) if ch != '\n' else 0 - if line_width + char_w > width and line_width > 0: - if _is_wide(ch) or fold: - divides.add(i) - line_width = char_w - continue - line_width += char_w - if ch == '\n': - line_width = 0 - # Merge with original for non-CJK content - try: - orig_divides = orig_divide(text, width, fold) - divides.update(orig_divides) - except Exception: - pass - return sorted(divides) - - try: - wrap_mod.divide_line = _patched_divide_line - log.info("CJK wrap patch installed for Rich %s", _rich_version()) - return True - except Exception as e: - log.warning("CJK patch failed: %s", e) - return False - - -def _rich_version() -> str: - try: - from importlib.metadata import version - return version('rich') - except Exception: - return '?' - - -# ──────────────────────────────────────────────────────────────────────────── -# clipboard: cross-platform copy/paste via native tools -# ──────────────────────────────────────────────────────────────────────────── - -log = logging.getLogger(__name__) - -_TEMP_DIR = os.path.join(tempfile.gettempdir(), 'genericagent_tui') -_platform = sys.platform -_HAS_WAYLAND = bool(os.environ.get('WAYLAND_DISPLAY')) - - -def _run(cmd: list[str], input: bytes | None = None, timeout: float = 3.0) -> bytes | None: - try: - r = subprocess.run(cmd, input=input, capture_output=True, timeout=timeout) - return r.stdout if r.returncode == 0 else None - except (FileNotFoundError, subprocess.TimeoutExpired, OSError) as e: - log.debug("clipboard cmd %s failed: %s", cmd, e) - return None - - -def copy(text: str) -> bool: - data = text.encode('utf-8') - if _platform == 'darwin': - return _run(['pbcopy'], input=data) is not None - if _platform == 'win32': - return _run(['clip.exe'], input=data) is not None - if _HAS_WAYLAND and shutil.which('wl-copy'): - return _run(['wl-copy'], input=data) is not None - if shutil.which('xclip'): - return _run(['xclip', '-selection', 'clipboard'], input=data) is not None - if shutil.which('xsel'): - return _run(['xsel', '--clipboard', '--input'], input=data) is not None - log.warning("No clipboard tool found") - return False - - -def paste() -> str | None: - out: bytes | None = None - if _platform == 'darwin': - out = _run(['pbpaste']) - elif _platform == 'win32': - out = _run(['powershell', '-NoProfile', '-Command', 'Get-Clipboard']) - elif _HAS_WAYLAND and shutil.which('wl-paste'): - out = _run(['wl-paste', '--no-newline']) - elif shutil.which('xclip'): - out = _run(['xclip', '-selection', 'clipboard', '-o']) - elif shutil.which('xsel'): - out = _run(['xsel', '--clipboard', '--output']) - if out is not None: - return out.decode('utf-8', errors='replace') - return None - - -def paste_image() -> str | None: - """Save clipboard image to temp file, return path or None.""" - os.makedirs(_TEMP_DIR, exist_ok=True) - import time - path = os.path.join(_TEMP_DIR, f'clip_{int(time.time()*1000)}.png') - ok = False - if _platform == 'darwin': - script = ( - 'use framework "AppKit"\n' - 'set pb to current application\'s NSPasteboard\'s generalPasteboard()\n' - 'set imgData to pb\'s dataForType:"public.png"\n' - 'if imgData is missing value then error "no image"\n' - 'imgData\'s writeToFile:"' + path + '" atomically:true\n' - ) - ok = _run(['osascript', '-e', script], timeout=5.0) is not None - elif _HAS_WAYLAND and shutil.which('wl-paste'): - data = _run(['wl-paste', '-t', 'image/png']) - if data: - with open(path, 'wb') as f: - f.write(data) - ok = True - elif shutil.which('xclip'): - data = _run(['xclip', '-selection', 'clipboard', '-t', 'image/png', '-o']) - if data and len(data) > 8: - with open(path, 'wb') as f: - f.write(data) - ok = True - return path if ok and os.path.isfile(path) else None - - -def _grab_clipboard_file() -> tuple[str, bool] | None: - """Return (path, is_image) from the clipboard via PIL — ported from v2. - - PIL.ImageGrab.grabclipboard() is the one cross-platform path that also - works on Windows (osascript/xclip/wl-paste below don't). It handles two - shapes: a list of copied file paths, or a raw bitmap Image (saved to a - temp PNG). is_image distinguishes images (→ `[Image #N]`, sent to the - model) from any other file (→ `[File #N]`, expanded to its path).""" - try: - from PIL import ImageGrab, Image - data = ImageGrab.grabclipboard() - except Exception: - return None - if isinstance(data, list): - for item in data: - if isinstance(item, str) and os.path.isfile(item): - return (item, os.path.splitext(item)[1].lower() in _IMAGE_EXTS) - return None - if isinstance(data, Image.Image): - try: - os.makedirs(_TEMP_DIR, exist_ok=True) - path = os.path.join(_TEMP_DIR, f'clip_{int(time.time() * 1000)}.png') - data.save(path, 'PNG') - return (path, True) - except Exception: - return None - return None - - -def _cleanup(): - if os.path.isdir(_TEMP_DIR): - shutil.rmtree(_TEMP_DIR, ignore_errors=True) - -atexit.register(_cleanup) - - -# ──────────────────────────────────────────────────────────────────────────── -# renderer: markdown / ANSI sanitisation / fold -# ──────────────────────────────────────────────────────────────────────────── - -# Comprehensive ANSI sanitization — matches v2's thoroughness -_ANSI_INCOMPLETE_RE = re.compile(r'\x1b\[[0-9;]*$') -_ANSI_DEC_PRIVATE_RE = re.compile(r'\x1b\[\?[0-9;]*[a-zA-Z]') -_ANSI_OSC_RE = re.compile(r'\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)?') -_ANSI_MODE_SET_RE = re.compile(r'\x1b[=>][0-9]*') -# Keep SGR (color) codes, strip everything else -_ANSI_SGR_RE = re.compile(r'\x1b\[[0-9;]*m') - -_TURN_MARKER_RE = re.compile(r'\*\*LLM Running \(Turn (\d+)\).*?\*\*') -_META_TAG_RE = re.compile(r'<(?:thinking|summary|tool_use|file_content)>.*?', re.DOTALL) -_TOOL_USE_BLOCK_RE = re.compile(r'```json\s*\{[^}]*"tool_name"[^}]*\}\s*```', re.DOTALL) -_TOOL_USE_TAG_RE = re.compile(r'\s*\{.*?"tool_name"\s*:\s*"([^"]+)".*?\}\s*', re.DOTALL) -_SUMMARY_RE = re.compile(r'\s*(.*?)\s*', re.DOTALL) -_QUAD_BACKTICK_RE = re.compile(r'(`{4,})') -_ASK_USER_RE = re.compile(r'"tool_name"\s*:\s*"ask_user".*?"question"\s*:\s*"([^"]*)"', re.DOTALL) - -# v2 fold_turns helpers (tuiapp_v2.py:240-267). 4-fence stash keeps a tool -# result's `` ``` `` from being misread as a turn boundary; the per-turn -# `` regex uses a negative lookahead so two adjacent summaries don't -# merge; the title cleaner strips fenced code + thinking before extraction. -_FENCE4_STASH_RE = re.compile(r'^`{4,}.*?^`{4,}\n?', re.DOTALL | re.MULTILINE) -_TURN_SPLIT_FOLD_RE = re.compile(r'(\*\*LLM Running \(Turn \d+\) \.\.\.\*\*)') -_SUMMARY_PERTURN_RE = re.compile(r'\s*((?:(?!).)*?)\s*', re.DOTALL) -_TITLE_CLEAN_RE = re.compile(r'`{3,}.*?`{3,}|.*?', re.DOTALL) -_TITLE_ARGS_TAIL_RE = re.compile(r',?\s*args:.*$') - - -@dataclass -class FoldSegment: - title: str - body: str - turn: int - is_last: bool = False - - -@dataclass -class Block: - """A unit of finalized scrollback history, stored as SOURCE (not rendered - lines). Resize replays each block through its renderer at the new width, - so width-baked structures (chip boxes, banner box) reflow correctly. The - actual scrollback bytes above the viewport stay frozen at the old width — - that's a terminal physics constraint — but the viewport and everything - new flows correctly.""" - kind: str # 'user' | 'assistant' | 'plain' | 'banner' - source: str # source text (or '' for banner — regenerated on demand) - tool_n: int = 0 # tool count cached at last render (for stable tids) - - -def sanitize_ansi(text: str) -> str: - """Strip non-SGR ANSI escapes and incomplete sequences from streaming chunks.""" - text = _ANSI_DEC_PRIVATE_RE.sub('', text) - text = _ANSI_OSC_RE.sub('', text) - text = _ANSI_MODE_SET_RE.sub('', text) - text = _ANSI_INCOMPLETE_RE.sub('', text) - return text - - -def _render_checkboxes(text: str) -> str: - """Convert markdown task lists to visual checkboxes.""" - text = re.sub(r'^(\s*[-*+]\s)\[ \]', r'\1☐', text, flags=re.MULTILINE) - text = re.sub(r'^(\s*[-*+]\s)\[x\]', r'\1☑', text, flags=re.MULTILINE | re.IGNORECASE) - return text - - -def strip_meta_tags(text: str) -> str: - """Strip internal tags, render tool_use as readable summaries.""" - def _tool_replace(m): - name = m.group(1) - if name == 'ask_user': - q_match = _ASK_USER_RE.search(m.group(0)) - if q_match: - return f'> {q_match.group(1)}' - return f'🔧 {name}' - text = _TOOL_USE_TAG_RE.sub(_tool_replace, text) - text = _META_TAG_RE.sub('', text) - text = _TOOL_USE_BLOCK_RE.sub('', text) - text = _render_checkboxes(text) - return re.sub(r'\n{3,}', '\n\n', text).strip() - - -def _extract_title(text: str, max_len: int = 72) -> str: - m = _SUMMARY_RE.search(text) - if m: - title = m.group(1).strip() - else: - first = text.strip().split('\n')[0] if text.strip() else '' - title = re.sub(r'^[#*>\-\s]+', '', first).strip() - if len(title) > max_len: - title = title[:max_len - 1] + '…' - return title or '...' - - -def fold_segments(text: str) -> list[FoldSegment]: - """Split agent response into per-turn fold segments.""" - if not text: - return [] - cleaned = _QUAD_BACKTICK_RE.sub(lambda m: '~' * len(m.group(1)), text) - parts = _TURN_MARKER_RE.split(cleaned) - if len(parts) <= 1: - return [FoldSegment(title=_extract_title(text), body=text, turn=1, is_last=True)] - segments: list[FoldSegment] = [] - if parts[0].strip(): - segments.append(FoldSegment(title=_extract_title(parts[0]), body=parts[0], turn=0)) - for i in range(1, len(parts), 2): - turn_num = int(parts[i]) if i < len(parts) else len(segments) + 1 - body = parts[i + 1] if i + 1 < len(parts) else '' - body = body.replace('~' * 4, '````') - segments.append(FoldSegment(title=_extract_title(body), body=body, turn=turn_num)) - if segments: - segments[-1].is_last = True - return segments - - -# Render cache: (content_hash, width) -> rendered object -_render_cache: dict[tuple[int, int], object] = {} -_CACHE_MAX = 200 - - -class HardBreakMarkdown(Markdown): - """Markdown that treats softbreaks as hardbreaks, preserving code blocks.""" - def __init__(self, markup: str, **kwargs): - lines = [] - in_code = False - for line in markup.split('\n'): - stripped = line.strip() - if stripped.startswith('```'): - in_code = not in_code - if in_code: - lines.append(line) - else: - lines.append(line + ' ') - super().__init__('\n'.join(lines), **kwargs) - - -def _markdown_to_text(cleaned: str, width: int) -> Text: - """Render Markdown to a CONCRETE Text (v2 approach). A Textual Static holding - a live rich.markdown.Markdown does not re-composite reliably when scrolled - past the viewport (height measurement is unstable → frozen/blank scroll); - a pre-rendered Text has a fixed line count and scrolls correctly.""" - from io import StringIO - from rich.console import Console - buf = StringIO() - Console(file=buf, width=max(1, width), force_terminal=True, - color_system='truecolor', legacy_windows=False - ).print(HardBreakMarkdown(cleaned), end='') - return Text.from_ansi(buf.getvalue().rstrip('\n')) - - -def render_message(text: str, role: str = 'assistant', width: int = 0) -> Text: - """Render a message to a concrete Text. width<=0 → provisional plain text - (the widget re-renders via on_resize once its real width is known).""" - cleaned = strip_meta_tags(text) if role == 'assistant' else text - if not cleaned.strip(): - cleaned = '...' - if role == 'system': - return Text(cleaned, style='dim') - if width <= 0: - return Text(cleaned) - key = (hash(cleaned), width) - cached = _render_cache.get(key) - if cached is not None: - return cached - try: - result = _markdown_to_text(cleaned, width) - except Exception: - result = Text(cleaned) - if len(_render_cache) >= _CACHE_MAX: - for k in list(_render_cache.keys())[:_CACHE_MAX // 4]: - _render_cache.pop(k, None) - _render_cache[key] = result - return result - - -# ──────────────────────────────────────────────────────────────────────────── -# protocol: AgentBridge + typed events over display_queue -# ──────────────────────────────────────────────────────────────────────────── - -@dataclass(frozen=True) -class StreamEvent: - text: str - turn: int = 0 - source: str = "user" - -@dataclass(frozen=True) -class DoneEvent: - text: str - turn: int = 0 - source: str = "user" - outputs: list[str] = field(default_factory=list) - -@dataclass(frozen=True) -class AskUserEvent: - question: str - candidates: list[str] = field(default_factory=list) - -@dataclass(frozen=True) -class SystemEvent: - text: str - -@dataclass(frozen=True) -class ErrorEvent: - message: str - exception: Exception | None = None - -AgentEvent = StreamEvent | DoneEvent | AskUserEvent | SystemEvent | ErrorEvent - -_HOOK_KEY = '_tui_v3_ask_user' - - -def _extract_ask_user(ctx: dict | None) -> AskUserEvent | None: - er = (ctx or {}).get('exit_reason') or {} - if er.get('result') != 'EXITED': - return None - payload = er.get('data') or {} - if payload.get('status') != 'INTERRUPT' or payload.get('intent') != 'HUMAN_INTERVENTION': - return None - data = payload.get('data') or {} - return AskUserEvent( - question=data.get('question', ''), - candidates=data.get('candidates', []), - ) - - -class AgentBridge: - """Wraps GenericAgent for the TUI. One bridge per session.""" - - def __init__(self, llm_no: int = 0): - self.agent = GeneraticAgent() - self.agent.llm_no = llm_no - if llm_no and hasattr(self.agent, 'llmclients') and self.agent.llmclients: - self.agent.llmclient = self.agent.llmclients[llm_no % len(self.agent.llmclients)] - self.agent.inc_out = True - self.agent.verbose = True - self.ask_user_queue: queue.Queue[AskUserEvent] = queue.Queue() - self._install_hook() - self._healthy = True - self._init_error: str | None = None - if not getattr(self.agent, 'llmclient', None): - self._healthy = False - self._init_error = _t('err.no_llm') - self._runner = threading.Thread(target=self._run_safe, daemon=True, name=f'ga-tui-agent') - self._runner.start() - - def _run_safe(self): - try: - self.agent.run() - except Exception as e: - self._healthy = False - self._init_error = str(e) - - def _install_hook(self): - if not hasattr(self.agent, '_turn_end_hooks'): - self.agent._turn_end_hooks = {} - self.agent._turn_end_hooks[_HOOK_KEY] = self._on_turn_end - - def _on_turn_end(self, ctx: dict): - ev = _extract_ask_user(ctx) - if ev: - self.ask_user_queue.put(ev) - - def submit(self, query: str, images: list | None = None) -> queue.Queue: - return self.agent.put_task(query, source='user', images=images) - - def abort(self): - self.agent.abort() - - @property - def is_running(self) -> bool: - return self.agent.is_running - - @property - def llm_name(self) -> str: - try: - return self.agent.get_llm_name() - except Exception: - return '?' - - def list_llms(self) -> list[tuple[int, str, bool]]: - return self.agent.list_llms() - - def switch_llm(self, n: int): - self.agent.next_llm(n) - - def drain_display_queue(self, dq: queue.Queue, timeout: float = 0.25): - """Generator: yields typed events from a display_queue.""" - while True: - try: - item = dq.get(timeout=timeout) - except queue.Empty: - yield None - continue - if not isinstance(item, dict): - continue - if 'done' in item: - yield DoneEvent( - text=item['done'], - turn=item.get('turn', 0), - source=item.get('source', 'user'), - outputs=item.get('outputs', []), - ) - break - if 'next' in item: - yield StreamEvent( - text=item['next'], - turn=item.get('turn', 0), - source=item.get('source', 'user'), - ) - - -# ──────────────────────────────────────────────────────────────────────────── -# sb: scrollback-first TUI core (input, paint, flow, ask, /verbose, …) -# ──────────────────────────────────────────────────────────────────────────── - -# Prose hierarchy via ATTRIBUTES only (bold/italic/underline) — NO dim for body -# content (dim on white = unreadable grey). Keep normal prose inherited from the -# surrounding tile; avoid Rich's inline-code reverse without pinning prose dark. -_MD_THEME = Theme({ - 'markdown.h1': 'bold underline', 'markdown.h2': 'bold underline', - 'markdown.h3': 'bold', 'markdown.h4': 'bold', - 'markdown.h5': 'bold', 'markdown.h6': 'bold', - 'markdown.strong': 'bold', 'markdown.em': 'italic', - # Rich's default inline-code ``reverse`` can vanish on themed tiles; keep it - # bold but inherited so it stays readable on both light and dark surfaces. - 'markdown.code': 'bold', 'markdown.code_block': 'none', - 'markdown.block_quote': 'italic', 'markdown.hr': 'none', - 'markdown.link': 'underline', 'markdown.link_url': 'underline', - # Bullet inherits the surrounding foreground (a pinned dark hue vanished on - # dark terminals); bold alone keeps it visible on any background. - 'markdown.item.bullet': 'bold', -}, inherit=True) - - -PROMPT = '❯ ' -CONT = ' ' -# macOS Terminal.app quantises ALL truecolor escapes to their nearest 256-color -# slot, and the slot it picks for #5e6ad2 (iTerm lavender) is 62/#5f5fd7 — that's -# the "blue" border the user sees. It also renders \x1b[2m as a heavy 30%-opacity -# multiply instead of the gentle blend iTerm does — that's the "heavy shadow". -# Branching on TERM_PROGRAM lets iTerm keep its truecolor + dim look, while -# Apple_Terminal uses pinned 256-color slots that match iTerm's RENDERED result. -_IS_APPLE_TERMINAL = os.environ.get('TERM_PROGRAM') == 'Apple_Terminal' -_RST = '\x1b[0m' -if _IS_APPLE_TERMINAL: - _DIM = '\x1b[38;5;244m' # mid-gray — no \x1b[2m, no "shadow" - _ACCENT = '\x1b[38;5;105m' # 256-slot light purple, closest to iTerm rendered look - _BORDER = '\x1b[38;5;146m' # light lavender -else: - _DIM = '\x1b[2m' - _ACCENT = '\x1b[38;2;94;106;210m' # Linear lavender #5e6ad2 - _BORDER = '\x1b[38;5;146m' -_INK_U = '\x1b[38;5;234m' # user ink — near-black, strong (as requested) -# Linear surface ladder: user gets its own panel; AI = plain white surface. -_TILE_U = '\x1b[48;5;251m' + _INK_U # user panel (near-black ink) -_MARK = _ACCENT + '❯' + _RST # prompt mark — the single accent -_BG_TOK = {str(n) for n in list(range(40, 48)) + [49] + list(range(100, 108))} -_SGR_RE = re.compile(r'\x1b\[([0-9;]*)m') -_CSI_ERASE_RE = re.compile(r'\x1b\[[0-9;?]*[JK]') -_SGR_TOKEN_RE = re.compile(r'\x1b\[[0-9;]*m') - - -def _tile(s: str, style: str) -> str: - # re-assert style after every reset so muted-markdown \x1b[0m can't punch - # a hole in the block; \x1b[K fills to the edge (CJK-safe full-width tile). - return style + s.replace(_RST, _RST + style) + '\x1b[K' + _RST - - -def _border(left: str, right: str, width: int, style: str = _BORDER) -> str: - width = max(1, width) - if width == 1: - return style + left + _RST - return style + left + '─' * max(0, width - 2) + right + _RST - - -def _strip_bg(s: str) -> str: - """Drop only BACKGROUND SGR — keep foreground colour (curated syntax/diff - stays, Linear-style functional colour) but no ugly box behind code.""" - def repl(m: re.Match) -> str: - toks = m.group(1).split(';') if m.group(1) else ['0'] - out, i = [], 0 - while i < len(toks): - t = toks[i] - if t == '48': - i += 3 if (i + 1 < len(toks) and toks[i + 1] == '5') else \ - 5 if (i + 1 < len(toks) and toks[i + 1] == '2') else 1 - continue - if t in _BG_TOK: - i += 1; continue - out.append(t); i += 1 - return '\x1b[' + ';'.join(out) + 'm' if out else '\x1b[0m' - return _SGR_RE.sub(repl, s) -_ESC_RE = re.compile(rb'\x1b\[[0-9;?]*[ -/]*[@-~]|\x1b.') -_FILE_REF_RE = re.compile(r'@([\w./\-~]+)') -_PASTE_PH_RE = re.compile(r'\[Pasted text #(\d+) \+\d+ lines\]') -_FILE_PH_RE = re.compile(r'\[File #(\d+)\]') -_IMG_PH_RE = re.compile(r'\[Image #(\d+)\]') -# All paste placeholders — used for whole-block delete (v2 parity): backspace -# flush against any of these wipes the entire placeholder, not one char. -_PLACEHOLDER_RES = (_PASTE_PH_RE, _IMG_PH_RE, _FILE_PH_RE) -_IMAGE_EXTS = {'.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif', '.ico'} -_TURN_MK_RE = re.compile( - r'\*\*LLM Running \(Turn \d+\)[^\n]*\*\*' # native live-format marker - r'|^[ \t]*Turn \d+\s*\.{3,}[ \t]*$', # plain subagent form on its own line - re.M) -_TOOL_RE = re.compile( - r'🛠️ Tool: `([^`]+)`[^\n]*\n' # 1 = name - r'(`{3,})[^\n]*\n(.*?)\n\2[ \t]*\n*' # 2 = fence delim, 3 = args body - r'(?:' - r'(`{5,})[^\n]*\n(.*?)\n\4[ \t]*\n*' # 4 = result fence (5-bt), 5 = body - r'|' # ─ OR ─ - r'(.*?)(?=^🛠️ Tool: `|^\*\*LLM Running|^|\Z)' # 6 = live exec trace - r')', - re.DOTALL | re.MULTILINE) -# Prompted-style tool wrappers GA models emit AS TEXT in saved logs (no -# structured tool_use block). Fold them into chips too so /continue replays -# match live mode. Whitelist = every name in assets/tools_schema.json + the -# native metadata wrappers; user HTML (
///

/