diff --git a/astrbot/core/tools/computer_tools/edit_engine.py b/astrbot/core/tools/computer_tools/edit_engine.py
new file mode 100644
index 0000000000..24c222d8fa
--- /dev/null
+++ b/astrbot/core/tools/computer_tools/edit_engine.py
@@ -0,0 +1,815 @@
+"""
+Robust file editing engine for AstrBot, inspired by opencode's multi-strategy replacer chain.
+
+Implements 9 fallback replacers to handle LLM-generated edits that may have:
+- indentation drift
+- whitespace normalization issues
+- escape sequence mismatches (\\n vs actual newline)
+- trailing/leading whitespace differences
+- block-level fuzzy matching via Levenshtein similarity
+
+"""
+
+from __future__ import annotations
+
+import asyncio
+import difflib
+import os
+import weakref
+from collections.abc import Callable, Iterator
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Literal
+
+Replacer = Callable[[str, str], Iterator[str]]
+
+# File-level locks to prevent concurrent edits on the same file.
+# Use WeakValueDictionary so locks for deleted files can be garbage-collected.
+_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
+
+
+def get_file_lock(path: str) -> asyncio.Lock:
+ """Get or create an asyncio.Lock for the given file path."""
+ resolved = str(Path(path).resolve())
+ lock = _locks.get(resolved)
+ if lock is None:
+ lock = asyncio.Lock()
+ _locks[resolved] = lock
+ return lock
+
+
+# ---------------------------------------------------------------------------
+# Line-ending / BOM helpers (mirrors opencode src/tool/edit.ts)
+# ---------------------------------------------------------------------------
+
+
+def _normalize_line_endings(text: str) -> str:
+ """
+ Normalize actual CRLF line endings to LF.
+
+ ONLY handles real carriage-return + newline sequences (\\r\\n bytes).
+ Does NOT interpret escape sequences — literal \\n in file content
+ (e.g. Python string literals) must be preserved as-is.
+
+ Escape sequence handling for search strings is done by the
+ _escape_normalized_replacer in the replacer chain.
+ """
+ return text.replace("\r\n", "\n")
+
+
+def _detect_line_ending(text: str) -> Literal["\n", "\r\n"]:
+ return "\r\n" if "\r\n" in text else "\n"
+
+
+def _convert_to_line_ending(text: str, ending: Literal["\n", "\r\n"]) -> str:
+ if ending == "\n":
+ return text
+ # Convert standalone \n to \r\n, but avoid converting existing \r\n to \r\r\n
+ # by first normalizing any existing \r\n to \n, then converting all \n to \r\n
+ text = text.replace("\r\n", "\n")
+ return text.replace("\n", "\r\n")
+
+
+# ---------------------------------------------------------------------------
+# Levenshtein distance (for BlockAnchorReplacer)
+# ---------------------------------------------------------------------------
+
+
+def _levenshtein(a: str, b: str) -> int:
+ if a == "" or b == "":
+ return max(len(a), len(b))
+ # Use a single row DP to reduce memory
+ prev = list(range(len(b) + 1))
+ for i in range(1, len(a) + 1):
+ curr = [i]
+ ai = a[i - 1]
+ for j in range(1, len(b) + 1):
+ cost = 0 if ai == b[j - 1] else 1
+ curr.append(min(curr[-1] + 1, prev[j] + 1, prev[j - 1] + cost))
+ prev = curr
+ return prev[len(b)]
+
+
+# ---------------------------------------------------------------------------
+# Escape helpers
+# ---------------------------------------------------------------------------
+
+
+def _unescape(s: str) -> str:
+ """
+ Unescape common escape sequences in a string.
+
+ Handles: \\n, \\t, \\r, \\b, \\f, \\v, \\\\, \\", \\', \\`, \\$
+ Also handles \\xNN hex and \\uNNNN unicode escapes.
+ """
+ result = []
+ i = 0
+ while i < len(s):
+ if s[i] == "\\" and i + 1 < len(s):
+ nxt = s[i + 1]
+ if nxt == "n":
+ result.append("\n")
+ elif nxt == "t":
+ result.append("\t")
+ elif nxt == "r":
+ result.append("\r")
+ elif nxt == "b":
+ result.append("\b")
+ elif nxt == "f":
+ result.append("\f")
+ elif nxt == "v":
+ result.append("\v")
+ elif nxt == "x" and i + 3 < len(s):
+ # \xNN hex escape
+ try:
+ val = int(s[i + 2 : i + 4], 16)
+ result.append(chr(val))
+ i += 4
+ continue
+ except ValueError:
+ result.append(s[i])
+ result.append(nxt)
+ elif nxt == "u" and i + 5 < len(s):
+ # \uNNNN unicode escape
+ try:
+ val = int(s[i + 2 : i + 6], 16)
+ result.append(chr(val))
+ i += 6
+ continue
+ except ValueError:
+ result.append(s[i])
+ result.append(nxt)
+ elif nxt in ("'", '"', "`", "\\", "$"):
+ result.append(nxt)
+ else:
+ # Unknown escape: preserve both characters
+ result.append(s[i])
+ result.append(nxt)
+ i += 2
+ else:
+ result.append(s[i])
+ i += 1
+ return "".join(result)
+
+
+# ---------------------------------------------------------------------------
+# Replacer implementations
+# ---------------------------------------------------------------------------
+
+
+def _simple_replacer(content: str, find: str) -> Iterator[str]:
+ """Exact match."""
+ if not find:
+ return
+ yield find
+
+
+def _escape_normalized_replacer(content: str, find: str) -> Iterator[str]:
+ """
+ Handle escaped sequences like \\n, \\t in the find string.
+
+ This replacer tries two approaches:
+ 1. Unescape the find string and look for it in content
+ 2. If find contains literal backslash sequences, try matching content
+ blocks after unescaping them
+
+ Results are deduplicated to avoid yielding the same block twice.
+ """
+ if not find:
+ return
+
+ yielded = set()
+
+ # Approach 1: unescape find and search in content
+ unescaped_find = _unescape(find)
+ if unescaped_find != find and unescaped_find in content:
+ yielded.add(unescaped_find)
+ yield unescaped_find
+
+ # Approach 2: if find contains literal backslash sequences,
+ # try matching against content blocks after unescaping
+ if "\\" in find:
+ lines = content.split("\n")
+ find_lines = unescaped_find.split("\n")
+ for i in range(len(lines) - len(find_lines) + 1):
+ block = "\n".join(lines[i : i + len(find_lines)])
+ if block not in yielded and _unescape(block) == unescaped_find:
+ yielded.add(block)
+ yield block
+
+
+def _line_trimmed_replacer(content: str, find: str) -> Iterator[str]:
+ """Match blocks where each line matches after trim."""
+ if not find:
+ return
+ original_lines = content.split("\n")
+ search_lines = find.split("\n")
+ if search_lines and search_lines[-1] == "":
+ search_lines.pop()
+ if not search_lines:
+ return
+ for i in range(len(original_lines) - len(search_lines) + 1):
+ if all(
+ original_lines[i + j].strip() == search_lines[j].strip()
+ for j in range(len(search_lines))
+ ):
+ yield "\n".join(original_lines[i : i + len(search_lines)])
+
+
+def _block_anchor_replacer(content: str, find: str) -> Iterator[str]:
+ """
+ Use first and last line as anchors, then use Levenshtein similarity on middle lines.
+ Single candidate threshold: 0.0 (accept if anchors match)
+ Multiple candidates threshold: 0.3 (pick best)
+ """
+ if not find:
+ return
+ original_lines = content.split("\n")
+ search_lines = find.split("\n")
+ if len(search_lines) < 3:
+ return
+ if search_lines and search_lines[-1] == "":
+ search_lines.pop()
+ if len(search_lines) < 3:
+ return
+
+ first_anchor = search_lines[0].strip()
+ last_anchor = search_lines[-1].strip()
+ search_block_size = len(search_lines)
+
+ candidates: list[tuple[int, int]] = []
+ # Limit the search window for the last anchor to avoid matching unrelated lines far away
+ max_window = max(search_block_size * 2, search_block_size + 10)
+ for i, line in enumerate(original_lines):
+ if line.strip() != first_anchor:
+ continue
+ for j in range(i + 2, min(len(original_lines), i + max_window)):
+ if original_lines[j].strip() == last_anchor:
+ candidates.append((i, j))
+ if not candidates:
+ return
+
+ def _similarity(start: int, end: int) -> float:
+ actual_size = end - start + 1
+ lines_to_check = min(search_block_size - 2, actual_size - 2)
+ if lines_to_check <= 0:
+ return 1.0
+ sim = 0.0
+ for k in range(1, lines_to_check + 1):
+ if start + k >= len(original_lines) or k >= len(search_lines) - 1:
+ break
+ ol = original_lines[start + k].strip()
+ sl = search_lines[k].strip()
+ max_len = max(len(ol), len(sl))
+ if max_len == 0:
+ continue
+ dist = _levenshtein(ol, sl)
+ sim += (1 - dist / max_len) / lines_to_check
+ return sim
+
+ if len(candidates) == 1:
+ start, end = candidates[0]
+ if _similarity(start, end) >= 0.0:
+ yield "\n".join(original_lines[start : end + 1])
+ return
+
+ best_match: tuple[int, int] | None = None
+ max_sim = -1.0
+ for start, end in candidates:
+ sim = _similarity(start, end)
+ if sim > max_sim:
+ max_sim = sim
+ best_match = (start, end)
+
+ if max_sim >= 0.3 and best_match:
+ start, end = best_match
+ yield "\n".join(original_lines[start : end + 1])
+
+
+def _whitespace_normalized_replacer(content: str, find: str) -> Iterator[str]:
+ """Collapse all whitespace sequences to a single space before matching."""
+ if not find:
+ return
+
+ def _norm(t: str) -> str:
+ return " ".join(t.split())
+
+ normalized_find = _norm(find)
+ if not normalized_find:
+ return
+
+ # Single-line matches
+ for line in content.split("\n"):
+ if _norm(line) == normalized_find:
+ yield line
+
+ # Multi-line block matches
+ lines = content.split("\n")
+ find_lines = find.split("\n")
+ if len(find_lines) > 1:
+ for i in range(len(lines) - len(find_lines) + 1):
+ block = "\n".join(lines[i : i + len(find_lines)])
+ if _norm(block) == normalized_find:
+ yield block
+
+
+def _indentation_flexible_replacer(content: str, find: str) -> Iterator[str]:
+ """
+ Match blocks where removing common indentation makes them equal to find.
+
+ Important: yields the ORIGINAL block (with original indentation),
+ not the de-indented version. This preserves the file's indentation
+ during replacement.
+ """
+ if not find:
+ return
+
+ def _remove_indent(text: str) -> str:
+ lines = text.split("\n")
+ non_empty = [ln for ln in lines if ln.strip()]
+ if not non_empty:
+ return text
+ min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty if ln.strip())
+ return "\n".join(ln[min_indent:] if ln.strip() else ln for ln in lines)
+
+ normalized_find = _remove_indent(find)
+ content_lines = content.split("\n")
+ find_lines = find.split("\n")
+ for i in range(len(content_lines) - len(find_lines) + 1):
+ block = "\n".join(content_lines[i : i + len(find_lines)])
+ if _remove_indent(block) == normalized_find:
+ yield block
+
+
+def _trimmed_boundary_replacer(content: str, find: str) -> Iterator[str]:
+ """Match if the trimmed version of find exists in content."""
+ if not find:
+ return
+ trimmed = find.strip()
+ if trimmed == find:
+ return
+ if trimmed in content:
+ yield trimmed
+
+ lines = content.split("\n")
+ find_lines = find.split("\n")
+ for i in range(len(lines) - len(find_lines) + 1):
+ block = "\n".join(lines[i : i + len(find_lines)])
+ if block.strip() == trimmed:
+ yield block
+
+
+def _context_aware_replacer(content: str, find: str) -> Iterator[str]:
+ """
+ Use first and last line as context anchors, accept if >= 50% of middle lines match.
+ """
+ if not find:
+ return
+ find_lines = find.split("\n")
+ if len(find_lines) < 3:
+ return
+ if find_lines and find_lines[-1] == "":
+ find_lines.pop()
+ if len(find_lines) < 3:
+ return
+
+ first_line = find_lines[0].strip()
+ last_line = find_lines[-1].strip()
+ content_lines = content.split("\n")
+
+ for i, line in enumerate(content_lines):
+ if line.strip() != first_line:
+ continue
+ for j in range(i + 2, len(content_lines)):
+ if content_lines[j].strip() != last_line:
+ continue
+ block_lines = content_lines[i : j + 1]
+ if len(block_lines) != len(find_lines):
+ continue
+ matching = 0
+ total_nonempty = 0
+ for k in range(1, len(block_lines) - 1):
+ bl = block_lines[k].strip()
+ fl = find_lines[k].strip()
+ if bl or fl:
+ total_nonempty += 1
+ if bl == fl:
+ matching += 1
+ if total_nonempty == 0 or matching / total_nonempty >= 0.5:
+ yield "\n".join(block_lines)
+ return # Only first match
+ break
+
+
+def _multi_occurrence_replacer(content: str, find: str) -> Iterator[str]:
+ """Yield all exact matches (used with replace_all)."""
+ if not find:
+ return
+ start = 0
+ while True:
+ idx = content.find(find, start)
+ if idx == -1:
+ break
+ yield find
+ start = idx + len(find)
+
+
+# Ordered chain: most specific first, most lenient last.
+# Escape-normalized is placed early because it handles a common LLM issue
+# (using \\n instead of actual newlines) before more aggressive fuzzy matchers.
+_REPLACERS: list[Replacer] = [
+ _simple_replacer,
+ _escape_normalized_replacer,
+ _line_trimmed_replacer,
+ _block_anchor_replacer,
+ _whitespace_normalized_replacer,
+ _indentation_flexible_replacer,
+ _trimmed_boundary_replacer,
+ _context_aware_replacer,
+ _multi_occurrence_replacer,
+]
+
+
+# ---------------------------------------------------------------------------
+# Indent adjustment helper
+# ---------------------------------------------------------------------------
+
+
+def _get_leading_whitespace(line: str) -> str:
+ """Return the leading whitespace of a line."""
+ return line[: len(line) - len(line.lstrip())]
+
+
+def _first_nonempty_line_indent(text: str) -> str:
+ """Get the leading whitespace of the first non-empty line in text."""
+ for line in text.split("\n"):
+ if line.strip():
+ return _get_leading_whitespace(line)
+ return ""
+
+
+def _adjust_replacement_indent(
+ new_string: str,
+ old_string: str,
+ matched_block: str,
+ content: str = "",
+ match_idx: int = -1,
+) -> str:
+ """
+ Adjust new_string's indentation to preserve the file's indent level.
+
+ When a fuzzy replacer (e.g. LineTrimmedReplacer, IndentationFlexibleReplacer)
+ matches a block whose indentation differs from old_string, this function
+ computes the indent delta and applies it to every non-empty line of
+ new_string so the replacement preserves the file's original indentation.
+
+ When *content* and *match_idx* are provided, the effective indent is
+ computed from the actual line in the file rather than from matched_block
+ alone. This correctly handles substring matches (e.g. SimpleReplacer
+ matching ``"print(x)"`` inside ``" print(x)"``).
+
+ Indent adjustment rules:
+ - If the match starts at the beginning of a line (match_idx == line_start),
+ all lines of new_string get the indent delta applied.
+ - If the match starts after a whitespace-only prefix (at the content
+ boundary of an indented line), the first line is NOT adjusted —
+ the content prefix already provides the indent. Lines 2+ get adjusted.
+ - If the match starts mid-line (after non-whitespace content), NO lines
+ are adjusted — the user wrote absolute indentation and the first line
+ inherits the prefix context.
+
+ For exact matches (old_string == matched_block and both span the full
+ line), delta is zero and this is a no-op.
+ """
+ old_indent = _first_nonempty_line_indent(old_string)
+
+ # Compute effective file indent from the actual line context when available.
+ # This handles substring matches where matched_block doesn't include
+ # the line's leading whitespace.
+ match_starts_at_line_beginning = True
+ adjust_subsequent_lines = True
+ if content and match_idx >= 0:
+ line_start = content.rfind("\n", 0, match_idx) + 1
+ effective_line = content[line_start:]
+ file_indent = _get_leading_whitespace(effective_line)
+ if match_idx == line_start:
+ # Match starts at the very beginning of the line
+ match_starts_at_line_beginning = True
+ adjust_subsequent_lines = True
+ else:
+ prefix = content[line_start:match_idx]
+ match_starts_at_line_beginning = False
+ if prefix.strip() == "":
+ # Match starts after whitespace-only prefix (content boundary)
+ # First line inherits prefix; subsequent lines get adjusted
+ adjust_subsequent_lines = True
+ else:
+ # Match starts after non-whitespace (truly mid-line)
+ # User wrote absolute indentation; no adjustment needed
+ adjust_subsequent_lines = False
+ else:
+ file_indent = _first_nonempty_line_indent(matched_block)
+
+ if file_indent == old_indent:
+ return new_string
+
+ # Determine indent character from the file's indentation
+ indent_char = " "
+ if file_indent:
+ indent_char = file_indent[0]
+ elif old_indent:
+ indent_char = old_indent[0]
+
+ delta = len(file_indent) - len(old_indent)
+
+ new_lines = new_string.split("\n")
+ adjusted: list[str] = []
+ for i, line in enumerate(new_lines):
+ if not line.strip():
+ # Empty or whitespace-only line: keep as-is
+ adjusted.append(line)
+ elif i == 0 and not match_starts_at_line_beginning:
+ # First line and match is not at line start: don't adjust.
+ # The content prefix already provides the indent context.
+ adjusted.append(line)
+ elif i > 0 and not adjust_subsequent_lines:
+ # Subsequent lines in a truly mid-line match: don't adjust.
+ # User wrote absolute indentation.
+ adjusted.append(line)
+ elif delta > 0:
+ # Need to add indentation to match file's deeper indent level
+ adjusted.append(indent_char * delta + line)
+ elif delta < 0:
+ # Need to remove indentation (file is less indented than old_string)
+ current_indent = _get_leading_whitespace(line)
+ remove = min(-delta, len(current_indent))
+ adjusted.append(line[remove:])
+ else:
+ adjusted.append(line)
+
+ return "\n".join(adjusted)
+
+
+# ---------------------------------------------------------------------------
+# Core replace function
+# ---------------------------------------------------------------------------
+
+
+def robust_replace(
+ content: str,
+ old_string: str,
+ new_string: str,
+ *,
+ replace_all: bool = False,
+) -> tuple[str, int]:
+ """
+ Replace old_string with new_string using the multi-strategy replacer chain.
+
+ Returns:
+ A tuple of (new_content, replacements_count).
+
+ Raises:
+ ValueError: If old_string cannot be found, or if multiple non-unique
+ matches are found (when replace_all=False).
+ """
+ if old_string == new_string:
+ raise ValueError(
+ "No changes to apply: old_string and new_string are identical."
+ )
+
+ not_found = True
+
+ for replacer in _REPLACERS:
+ matches = list(replacer(content, old_string))
+ if not matches:
+ continue
+
+ # Collect all unique match positions
+ match_positions: list[tuple[int, str]] = []
+ for match in matches:
+ start = 0
+ while True:
+ idx = content.find(match, start)
+ if idx == -1:
+ break
+ # Avoid duplicate positions from overlapping matches
+ if not any(pos <= idx < pos + len(m) for pos, m in match_positions):
+ match_positions.append((idx, match))
+ start = idx + 1
+
+ if not match_positions:
+ continue
+
+ not_found = False
+
+ if replace_all:
+ # Replace all occurrences, from end to start to preserve indices
+ new_content = content
+ replacements = 0
+ for idx, match in sorted(match_positions, key=lambda x: x[0], reverse=True):
+ adjusted_new = _adjust_replacement_indent(
+ new_string,
+ old_string,
+ match,
+ content,
+ idx,
+ )
+ new_content = (
+ new_content[:idx] + adjusted_new + new_content[idx + len(match) :]
+ )
+ replacements += 1
+ return new_content, replacements
+
+ # Single replacement mode: require exactly one match
+ if len(match_positions) == 1:
+ idx, match = match_positions[0]
+ adjusted_new = _adjust_replacement_indent(
+ new_string,
+ old_string,
+ match,
+ content,
+ idx,
+ )
+ return content[:idx] + adjusted_new + content[idx + len(match) :], 1
+
+ # Multiple matches found in single-replacement mode: continue to next replacer
+ # to try a more specific strategy
+ continue
+
+ if not_found:
+ raise ValueError(
+ "Could not find oldString in the file. It must match exactly, "
+ "including whitespace, indentation, and line endings. "
+ "Try providing more surrounding context to make the match unique."
+ )
+ raise ValueError(
+ "Found multiple matches for oldString. Provide more surrounding context "
+ "to make the match unique, or use replace_all=True to change every instance."
+ )
+
+
+# ---------------------------------------------------------------------------
+# Edit result model
+# ---------------------------------------------------------------------------
+
+
+@dataclass
+class EditResult:
+ success: bool
+ replacements: int = 0
+ diff: str = ""
+ error: str = ""
+ old_content: str = ""
+ new_content: str = ""
+
+
+# ---------------------------------------------------------------------------
+# Async file edit with locking and line-ending preservation (local runtime)
+# ---------------------------------------------------------------------------
+
+
+def bytes_edit_file(
+ raw_bytes: bytes,
+ old_string: str,
+ new_string: str,
+ *,
+ replace_all: bool = False,
+ encoding: str = "utf-8",
+) -> tuple[bytes, EditResult]:
+ """
+ Core line-ending-aware edit logic operating on raw bytes.
+
+ Performs:
+ 1. BOM detection and preservation
+ 2. Line-ending detection (CRLF vs LF)
+ 3. Normalize to LF for matching (replacer chain works on LF)
+ 4. Execute robust_replace()
+ 5. Convert back to original line endings
+ 6. Re-attach BOM if present
+
+ Returns:
+ (output_bytes, EditResult) — the caller is responsible for writing
+ *output_bytes* back to the file.
+
+ Raises:
+ ValueError: If old_string cannot be found or is not unique.
+ """
+ has_bom = raw_bytes.startswith(b"\xef\xbb\xbf")
+ if has_bom:
+ raw_bytes = raw_bytes[3:]
+
+ old_content = raw_bytes.decode(encoding)
+ original_ending = _detect_line_ending(old_content)
+
+ # Normalize for matching: ONLY normalize actual CRLF line endings.
+ # Escape sequence handling (\n vs actual newline, \t vs tab, etc.)
+ # is deferred to the _escape_normalized_replacer in the replacer chain.
+ normalized_old = _normalize_line_endings(old_string)
+ normalized_new = _normalize_line_endings(new_string)
+
+ # Normalize file content to LF for matching (replacers work on LF)
+ normalized_content = _normalize_line_endings(old_content)
+
+ # Perform replacement
+ new_content, replacements = robust_replace(
+ normalized_content,
+ normalized_old,
+ normalized_new,
+ replace_all=replace_all,
+ )
+
+ # Convert back to original line endings
+ if original_ending == "\r\n":
+ new_content = _convert_to_line_ending(new_content, "\r\n")
+
+ # Re-add BOM if present
+ write_bytes = b""
+ if has_bom:
+ write_bytes += b"\xef\xbb\xbf"
+ write_bytes += new_content.encode(encoding)
+
+ # Generate unified diff
+ diff = build_unified_diff("", old_content, new_content)
+
+ result = EditResult(
+ success=True,
+ replacements=replacements,
+ diff=diff,
+ old_content=old_content,
+ new_content=new_content,
+ )
+ return write_bytes, result
+
+
+async def edit_file(
+ path: str,
+ old_string: str,
+ new_string: str,
+ *,
+ replace_all: bool = False,
+ encoding: str = "utf-8",
+) -> EditResult:
+ """
+ Edit a file using the robust multi-strategy replacer.
+
+ Features:
+ - File-level asyncio lock prevents concurrent edits
+ - Preserves original line endings (\\n vs \\r\\n)
+ - Preserves BOM if present
+ - Returns unified diff of changes
+ """
+ lock = get_file_lock(path)
+ async with lock:
+ try:
+ # Read file in binary mode to preserve original line endings
+ raw_bytes = await asyncio.to_thread(_read_file_bytes, path)
+ write_bytes, result = bytes_edit_file(
+ raw_bytes,
+ old_string,
+ new_string,
+ replace_all=replace_all,
+ encoding=encoding,
+ )
+ # Write file in binary mode to preserve restored line endings
+ await asyncio.to_thread(_write_file_bytes, path, write_bytes)
+ return result
+ except Exception as exc:
+ return EditResult(
+ success=False,
+ error=str(exc),
+ )
+
+
+def _read_file_bytes(path: str) -> bytes:
+ with open(path, "rb") as f:
+ return f.read()
+
+
+def _write_file_bytes(path: str, data: bytes) -> None:
+ # Ensure parent directory exists
+ os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
+ with open(path, "wb") as f:
+ f.write(data)
+
+
+# ---------------------------------------------------------------------------
+# Public diff helper
+# ---------------------------------------------------------------------------
+
+
+def build_unified_diff(
+ old_path: str,
+ old_content: str,
+ new_content: str,
+) -> str:
+ """Generate a unified diff between two content strings."""
+ diff = list(
+ difflib.unified_diff(
+ old_content.splitlines(),
+ new_content.splitlines(),
+ fromfile=old_path,
+ tofile=old_path,
+ lineterm="",
+ )
+ )
+ return "\n".join(diff)
diff --git a/astrbot/core/tools/computer_tools/fs.py b/astrbot/core/tools/computer_tools/fs.py
index 5660022fd0..6b4a9b8709 100644
--- a/astrbot/core/tools/computer_tools/fs.py
+++ b/astrbot/core/tools/computer_tools/fs.py
@@ -33,6 +33,7 @@
- In sandbox runtime, relative paths are passed through unchanged.
"""
+import base64
import os
import uuid
from dataclasses import dataclass, field
@@ -55,6 +56,12 @@
from ..registry import builtin_tool
from . import util as computer_util
+from .edit_engine import (
+ EditResult,
+ bytes_edit_file,
+ edit_file,
+ get_file_lock,
+)
from .util import (
check_admin_permission,
is_local_runtime,
@@ -368,22 +375,150 @@ async def call(
return f"Error writing file: {exc}"
+async def _sandbox_read_bytes(sb, path: str) -> bytes:
+ """Read a file in binary mode inside the sandbox via sb.python.exec.
+
+ Returns the raw bytes so that original line endings (CRLF/LF) are preserved.
+ Raises IOError on read failure.
+ """
+ code = (
+ "import base64 as _b64, sys\n"
+ f"path = {path!r}\n"
+ "try:\n"
+ " with open(path, 'rb') as _f:\n"
+ " _data = _f.read()\n"
+ " print(_b64.b64encode(_data).decode(), end='')\n"
+ "except Exception as _e:\n"
+ " print('ERROR:' + str(_e), end='', file=sys.stderr)\n"
+ " sys.exit(1)\n"
+ )
+ result = await sb.python.exec(code, timeout=30)
+ if not result.get("success", False):
+ error = str(result.get("error", "") or "").strip()
+ if not error:
+ output = result.get("output", "")
+ if isinstance(output, dict):
+ error = str(output.get("error", "") or "").strip()
+ raise OSError(
+ f"Failed to read file in sandbox: {error or 'unknown read error'}"
+ )
+ output = result.get("output", "")
+ if isinstance(output, dict):
+ output = output.get("text", "")
+ b64_text = str(output).strip()
+ return base64.b64decode(b64_text)
+
+
+async def _sandbox_write_bytes(sb, path: str, data: bytes) -> None:
+ """Write raw bytes to a file inside the sandbox via sb.python.exec.
+
+ Raises IOError on write failure.
+ """
+ b64_data = base64.b64encode(data).decode()
+ code = (
+ "import base64 as _b64, sys\n"
+ f"path = {path!r}\n"
+ f"b64 = {b64_data!r}\n"
+ "try:\n"
+ " _raw = _b64.b64decode(b64)\n"
+ " with open(path, 'wb') as _f:\n"
+ " _f.write(_raw)\n"
+ " print('ok', end='')\n"
+ "except Exception as _e:\n"
+ " print('ERROR:' + str(_e), end='', file=sys.stderr)\n"
+ " sys.exit(1)\n"
+ )
+ result = await sb.python.exec(code, timeout=30)
+ if not result.get("success", False):
+ error = str(result.get("error", "") or "").strip()
+ if not error:
+ output = result.get("output", "")
+ if isinstance(output, dict):
+ error = str(output.get("error", "") or "").strip()
+ raise OSError(
+ f"Failed to write file in sandbox: {error or 'unknown write error'}"
+ )
+
+
+def _format_result(
+ path: str,
+ result: EditResult,
+ *,
+ replace_all: bool,
+) -> str:
+ """Build the human-readable / LLM-readable result string with optional diff."""
+ if not result.success:
+ return f"Error editing file: {result.error}"
+
+ mode_text = "all matches" if replace_all else "first match"
+
+ lines = [
+ f"Edited {path}.",
+ f"Replaced {result.replacements} occurrence(s) using {mode_text} mode.",
+ ]
+
+ if result.diff:
+ diff_preview = result.diff
+ if len(diff_preview) > 2000:
+ diff_preview = diff_preview[:2000] + "\n... (diff truncated)"
+ lines.append("")
+ lines.append("Diff:")
+ lines.append("```diff")
+ lines.append(diff_preview)
+ lines.append("```")
+
+ return "\n".join(lines)
+
+
@builtin_tool(config=_COMPUTER_RUNTIME_TOOL_CONFIG)
@dataclass
class FileEditTool(FunctionTool):
+ """
+ Enhanced file editing tool with robust fuzzy matching.
+
+ In local runtime it uses the full robust edit engine (BOM +
+ line-ending preservation, file locks); in sandbox runtimes it
+ mediates reads and writes through the booter's filesystem
+ abstraction while applying the same 9-strategy replacer chain.
+
+ This tool is designed to handle LLM-generated edits that may
+ have minor whitespace, indentation, or escape sequence
+ differences from the actual file content.
+ """
+
name: str = "astrbot_file_edit_tool"
- description: str = "Editing files."
+ description: str = (
+ "Editing files with robust fuzzy matching. "
+ "Supports exact match, escape-normalized match, line-trimmed match, block-anchor match, "
+ "whitespace-normalized match, indentation-flexible match, "
+ "trimmed-boundary match, context-aware match, "
+ "and multi-occurrence replacement. "
+ "When editing text from Read tool output, preserve the exact indentation "
+ "(tabs/spaces) as it appears AFTER the line number prefix. "
+ "The line number prefix format is: line number + colon + space (e.g., '1: '). "
+ "Everything after that space is the actual file content to match. "
+ "Never include any part of the line number prefix in oldString or newString. "
+ "The edit will FAIL if oldString is not found. "
+ "The edit will FAIL if oldString is found multiple times and replace_all is false. "
+ "Use replace_all for renaming variables or strings across the file."
+ )
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"path": {
"type": "string",
- "description": "Path of the file to edit. If relative, will be in workspace root.",
+ "description": (
+ "Path of the file to edit. If relative, will be in workspace root."
+ ),
},
"old": {
"type": "string",
- "description": "The exact old text to replace.",
+ "description": (
+ "The text to replace. Must be an exact substring of the file content, "
+ "but the tool will try multiple matching strategies if exact match fails. "
+ "Include sufficient surrounding context (3-5 lines) to make the match unique."
+ ),
},
"new": {
"type": "string",
@@ -391,7 +526,10 @@ class FileEditTool(FunctionTool):
},
"replace_all": {
"type": "boolean",
- "description": "Whether to replace all matches. Defaults to false.",
+ "description": (
+ "Whether to replace all matches. Defaults to false. "
+ "Useful for renaming variables or strings across the file."
+ ),
},
},
"required": ["path", "old", "new"],
@@ -409,45 +547,67 @@ async def call(
umo = str(context.context.event.unified_msg_origin)
local_env = is_local_runtime(context)
restricted = _is_restricted_env(context)
+
try:
- normalized_path = (
- _normalize_rw_path(
+ if local_env:
+ normalized_path = _normalize_rw_path(
path,
restricted=restricted,
local_env=local_env,
umo=umo,
write=True,
)
- if local_env
- else path.strip()
- )
+ else:
+ normalized_path = path.strip()
+
if not normalized_path:
raise ValueError("`path` must be a non-empty string.")
- normalized_old = _decode_escaped_text(old)
- normalized_new = _decode_escaped_text(new)
- sb = await get_booter(
- context.context.context,
- context.context.event.unified_msg_origin,
- )
- result = await sb.fs.edit_file(
- path=normalized_path,
- old_string=normalized_old,
- new_string=normalized_new,
- replace_all=replace_all,
- encoding="utf-8",
- )
- if not result.get("success", False):
- error_detail = str(result.get("error", "") or "").strip()
- return (
- "Error editing file: "
- f"{error_detail or 'unknown filesystem edit error'}"
+
+ if local_env:
+ result = await edit_file(
+ path=normalized_path,
+ old_string=old,
+ new_string=new,
+ replace_all=replace_all,
+ encoding="utf-8",
+ )
+ else:
+ sb = await get_booter(
+ context.context.context,
+ umo,
)
- replacements = int(result.get("replacements", 0) or 0)
- mode_text = "all matches" if replace_all else "first match"
- return (
- f"Edited {normalized_path}. "
- f"Replaced {replacements} occurrence(s) using {mode_text} mode."
+ lock = get_file_lock(normalized_path)
+ async with lock:
+ # 1. Binary read — preserves original line endings (CRLF/LF)
+ try:
+ raw_bytes = await _sandbox_read_bytes(sb, normalized_path)
+ except OSError as exc:
+ return f"Error editing file: {exc}"
+
+ # 2. Line-ending-aware edit (reuses edit_engine core logic)
+ try:
+ write_bytes, result = bytes_edit_file(
+ raw_bytes,
+ old,
+ new,
+ replace_all=replace_all,
+ encoding="utf-8",
+ )
+ except ValueError as exc:
+ return f"Error editing file: {exc}"
+
+ # 3. Binary write — preserves restored line endings
+ try:
+ await _sandbox_write_bytes(sb, normalized_path, write_bytes)
+ except OSError as exc:
+ return f"Error editing file: {exc}"
+
+ return _format_result(
+ normalized_path,
+ result,
+ replace_all=replace_all,
)
+
except PermissionError as exc:
return f"Error: {exc}"
except Exception as exc:
diff --git a/dashboard/src/components/chat/ReasoningSidebar.vue b/dashboard/src/components/chat/ReasoningSidebar.vue
index a20a4bb896..e2d0d1f240 100644
--- a/dashboard/src/components/chat/ReasoningSidebar.vue
+++ b/dashboard/src/components/chat/ReasoningSidebar.vue
@@ -1,6 +1,17 @@
-
+
+
diff --git a/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue b/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue
index 902cc51113..608edae579 100644
--- a/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue
+++ b/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue
@@ -1,5 +1,12 @@
+
@@ -16,6 +23,7 @@
import { computed, inject, type Ref } from "vue";
import { MarkdownCodeBlockNode } from "markstream-vue";
import { useAttrs } from "vue";
+import DiffPreview from "@/components/chat/message_list_comps/DiffPreview.vue";
defineOptions({
inheritAttrs: false,
diff --git a/dashboard/src/utils/shikiLimitedBundle.js b/dashboard/src/utils/shikiLimitedBundle.js
index b0ece53879..89862f7cb4 100644
--- a/dashboard/src/utils/shikiLimitedBundle.js
+++ b/dashboard/src/utils/shikiLimitedBundle.js
@@ -1,9 +1,12 @@
import { createHighlighterCore } from "shiki/core";
import { createJavaScriptRegexEngine } from "shiki/engine/javascript";
import bash from "shiki/langs/bash.mjs";
+import c from "shiki/langs/c.mjs";
+import cpp from "shiki/langs/cpp.mjs";
import css from "shiki/langs/css.mjs";
import diff from "shiki/langs/diff.mjs";
import dockerfile from "shiki/langs/dockerfile.mjs";
+import go from "shiki/langs/go.mjs";
import html from "shiki/langs/html.mjs";
import ini from "shiki/langs/ini.mjs";
import java from "shiki/langs/java.mjs";
@@ -13,6 +16,7 @@ import jsx from "shiki/langs/jsx.mjs";
import markdown from "shiki/langs/markdown.mjs";
import powershell from "shiki/langs/powershell.mjs";
import python from "shiki/langs/python.mjs";
+import rust from "shiki/langs/rust.mjs";
import sql from "shiki/langs/sql.mjs";
import tsx from "shiki/langs/tsx.mjs";
import typescript from "shiki/langs/typescript.mjs";
@@ -26,9 +30,12 @@ import vitesseLight from "shiki/themes/vitesse-light.mjs";
export const LIMITED_SHIKI_LANGUAGES = [
...bash,
+ ...c,
+ ...cpp,
...css,
...diff,
...dockerfile,
+ ...go,
...html,
...ini,
...java,
@@ -38,6 +45,7 @@ export const LIMITED_SHIKI_LANGUAGES = [
...markdown,
...powershell,
...python,
+ ...rust,
...sql,
...tsx,
...typescript,
@@ -57,10 +65,16 @@ const BUILT_IN_LANGUAGES = ["text", "plaintext", "plain"];
export const LIMITED_SHIKI_LANGUAGE_ALIASES = {
bat: "powershell",
+ "c++": "cpp",
+ cc: "cpp",
cjs: "javascript",
console: "bash",
cts: "typescript",
+ cxx: "cpp",
docker: "dockerfile",
+ golang: "go",
+ h: "c",
+ hpp: "cpp",
htm: "html",
js: "javascript",
md: "markdown",
@@ -71,6 +85,7 @@ export const LIMITED_SHIKI_LANGUAGE_ALIASES = {
ps1: "powershell",
pwsh: "powershell",
py: "python",
+ rs: "rust",
shell: "bash",
shellscript: "bash",
sh: "bash",
diff --git a/dashboard/src/utils/systemNotice.ts b/dashboard/src/utils/systemNotice.ts
new file mode 100644
index 0000000000..b0497e76ef
--- /dev/null
+++ b/dashboard/src/utils/systemNotice.ts
@@ -0,0 +1,199 @@
+/**
+ * Utilities for detecting and extracting system-injected suffixes from
+ * tool call results. Two kinds of suffixes are recognised:
+ *
+ * 1. **[SYSTEM NOTICE]** — repeated-tool warnings, follow-up notices, etc.
+ * Always prefixed with the literal marker `[SYSTEM NOTICE]` followed by
+ * one of the known SYSTEM_NOTICE_PREFIXES.
+ *
+ * 2. **Tool-result overflow notice** — emitted when a tool output is too
+ * large and gets spilled to a file. The notice text starts with
+ * `"Truncated tool output preview shown above."` and does NOT carry the
+ * `[SYSTEM NOTICE]` marker (it is a plain-text suffix).
+ *
+ * Both suffixes are always appended at the very end of the tool result by
+ * the backend, possibly as a chain of multiple notices (e.g. repeated-tool
+ * warning + follow-up notice). When the tool output itself contains text
+ * that looks like a system notice (e.g. reading a log file), the algorithm
+ * must correctly ignore those false positives.
+ *
+ * **Key invariant**: the real system notice chain starts at the EARLIEST
+ * valid candidate where everything from that position to end-of-text is
+ * exclusively notice-chain content (no file content mixed in).
+ *
+ * Known message prefixes produced by the AstrBot system (not file content).
+ * Used to distinguish genuine system notices from text that happens to
+ * contain the literal string "[SYSTEM NOTICE]".
+ */
+export const SYSTEM_NOTICE_PREFIXES = [
+ "By the way,",
+ "Important:",
+ "User sent",
+];
+
+const MARKER = "[SYSTEM NOTICE]";
+
+/** Distinctive opening of the tool-result overflow notice. */
+const OVERFLOW_NOTICE_PREFIX = "Truncated tool output preview shown above.";
+
+// ── Helpers ─────────────────────────────────────────────────────────────
+
+/**
+ * Check whether the text starting at `idx` begins with a genuine
+ * `[SYSTEM NOTICE]` marker followed by one of the known prefixes.
+ */
+function isGenuineSystemNotice(text: string, idx: number): boolean {
+ const suffix = text.slice(idx).trim();
+ return SYSTEM_NOTICE_PREFIXES.some((p) =>
+ suffix.startsWith(`${MARKER} ${p}`),
+ );
+}
+
+/**
+ * Check whether the entire region from `fromPos` to end-of-text contains
+ * ONLY notice-chain content (no real file content).
+ *
+ * Allowed line types within the notice chain:
+ * - Blank lines (separators between notices)
+ * - Lines containing `[SYSTEM NOTICE]` (notice markers and their bodies)
+ * - Lines starting with the overflow notice prefix
+ * - Numbered list items (`N. …`) — but ONLY when preceded by a
+ * `[SYSTEM NOTICE] User sent` (follow-up notice) in the chain.
+ * This prevents file content like `1. item` from being mistaken for
+ * notice content.
+ * - Continuation lines of a notice paragraph (non-blank, directly after
+ * a notice marker line, before any blank line gap).
+ */
+function isNoticeChainRegion(text: string, fromPos: number): boolean {
+ const region = text.slice(fromPos);
+ const lines = region.split("\n");
+ let inFollowUp = false;
+ let sawBlankAfterNotice = false;
+
+ for (const rawLine of lines) {
+ const trimmed = rawLine.trim();
+
+ // Blank lines are separators between notices — always allowed.
+ if (trimmed === "") {
+ if (!inFollowUp) sawBlankAfterNotice = true;
+ continue;
+ }
+
+ // [SYSTEM NOTICE] marker — starts a new notice in the chain.
+ if (trimmed.includes(MARKER)) {
+ // Track whether this is a follow-up notice (allows numbered items).
+ const markerIdx = trimmed.indexOf(MARKER);
+ const afterMarker = trimmed.slice(markerIdx).trim();
+ if (afterMarker.startsWith(`${MARKER} User sent`)) {
+ inFollowUp = true;
+ }
+ sawBlankAfterNotice = false;
+ continue;
+ }
+
+ // Overflow notice prefix — part of the chain.
+ if (trimmed.startsWith(OVERFLOW_NOTICE_PREFIX)) {
+ inFollowUp = false;
+ sawBlankAfterNotice = false;
+ continue;
+ }
+
+ // Numbered list items — only valid after a "User sent" follow-up notice.
+ if (inFollowUp && /^\d+\.\s/.test(trimmed)) continue;
+
+ // Continuation lines:
+ // - In follow-up mode: free-form instruction text is part of the notice body.
+ // - Outside follow-up: continuation of an overflow notice paragraph
+ // (before any blank line gap).
+ if (inFollowUp || !sawBlankAfterNotice) {
+ continue;
+ }
+
+ // Anything else is real file content — not a valid notice chain.
+ return false;
+ }
+
+ return true;
+}
+
+// ── Public API ──────────────────────────────────────────────────────────
+
+/**
+ * Find the character index where the system notice suffix begins.
+ *
+ * Returns the EARLIEST position where a genuine notice candidate starts
+ * and everything from that position to end-of-text is exclusively
+ * notice-chain content. This ensures that when multiple notices are
+ * chained (e.g. repeated-tool warning + follow-up notice), ALL of them
+ * are captured in the suffix.
+ *
+ * For tools whose result may contain file content (e.g. astrbot_file_read_tool,
+ * astrbot_file_edit_tool), the raw result can include literal
+ * `[SYSTEM NOTICE]` strings or overflow-notice-like text that are part of
+ * the file being read/edited, not a system message. These are rejected
+ * because the region from the fake notice to end-of-text contains real
+ * file content.
+ *
+ * **Strategy** (first valid candidate wins):
+ * 1. Find all `[SYSTEM NOTICE]` markers in the text (in order).
+ * 2. For each genuine marker (earliest first), check if the region from
+ * that position to end-of-text is a valid notice chain.
+ * 3. Return the first position that passes.
+ * 4. If no `[SYSTEM NOTICE]` passes, check for overflow notice.
+ * 5. If nothing passes, return -1.
+ *
+ * @returns The character index where the notice suffix begins, or -1.
+ */
+export function findSystemNoticeIndex(text: string): number {
+ // ── Collect all candidates and sort by position ascending ─────────
+ const positions: { pos: number; type: "system" | "overflow" }[] = [];
+
+ let searchFrom = 0;
+ while (searchFrom < text.length) {
+ const idx = text.indexOf(MARKER, searchFrom);
+ if (idx < 0) break;
+ positions.push({ pos: idx, type: "system" });
+ searchFrom = idx + MARKER.length;
+ }
+
+ searchFrom = 0;
+ while (searchFrom < text.length) {
+ const idx = text.indexOf(OVERFLOW_NOTICE_PREFIX, searchFrom);
+ if (idx < 0) break;
+ positions.push({ pos: idx, type: "overflow" });
+ searchFrom = idx + OVERFLOW_NOTICE_PREFIX.length;
+ }
+
+ positions.sort((a, b) => a.pos - b.pos);
+
+ // ── Check each candidate (earliest first) ────────────────────────
+ for (const { pos, type } of positions) {
+ // Quick prefix check — skip non-genuine candidates.
+ if (type === "system" && !isGenuineSystemNotice(text, pos)) continue;
+
+ if (isNoticeChainRegion(text, pos)) {
+ return pos;
+ }
+ }
+
+ return -1;
+}
+
+/**
+ * Split tool result text into the main content and the optional
+ * system notice suffix.
+ *
+ * @param text The raw tool result text.
+ * @returns An object with `content` (the main result) and `notice`
+ * (the system notice string, or null if none).
+ */
+export function splitSystemNotice(
+ text: string,
+): { content: string; notice: string | null } {
+ const idx = findSystemNoticeIndex(text);
+ if (idx < 0) return { content: text, notice: null };
+ return {
+ content: text.slice(0, idx).trim(),
+ notice: text.slice(idx).trim(),
+ };
+}