diff --git a/astrbot/core/tools/computer_tools/edit_engine.py b/astrbot/core/tools/computer_tools/edit_engine.py new file mode 100644 index 0000000000..24c222d8fa --- /dev/null +++ b/astrbot/core/tools/computer_tools/edit_engine.py @@ -0,0 +1,815 @@ +""" +Robust file editing engine for AstrBot, inspired by opencode's multi-strategy replacer chain. + +Implements 9 fallback replacers to handle LLM-generated edits that may have: +- indentation drift +- whitespace normalization issues +- escape sequence mismatches (\\n vs actual newline) +- trailing/leading whitespace differences +- block-level fuzzy matching via Levenshtein similarity + +""" + +from __future__ import annotations + +import asyncio +import difflib +import os +import weakref +from collections.abc import Callable, Iterator +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +Replacer = Callable[[str, str], Iterator[str]] + +# File-level locks to prevent concurrent edits on the same file. +# Use WeakValueDictionary so locks for deleted files can be garbage-collected. +_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary() + + +def get_file_lock(path: str) -> asyncio.Lock: + """Get or create an asyncio.Lock for the given file path.""" + resolved = str(Path(path).resolve()) + lock = _locks.get(resolved) + if lock is None: + lock = asyncio.Lock() + _locks[resolved] = lock + return lock + + +# --------------------------------------------------------------------------- +# Line-ending / BOM helpers (mirrors opencode src/tool/edit.ts) +# --------------------------------------------------------------------------- + + +def _normalize_line_endings(text: str) -> str: + """ + Normalize actual CRLF line endings to LF. + + ONLY handles real carriage-return + newline sequences (\\r\\n bytes). + Does NOT interpret escape sequences — literal \\n in file content + (e.g. Python string literals) must be preserved as-is. + + Escape sequence handling for search strings is done by the + _escape_normalized_replacer in the replacer chain. + """ + return text.replace("\r\n", "\n") + + +def _detect_line_ending(text: str) -> Literal["\n", "\r\n"]: + return "\r\n" if "\r\n" in text else "\n" + + +def _convert_to_line_ending(text: str, ending: Literal["\n", "\r\n"]) -> str: + if ending == "\n": + return text + # Convert standalone \n to \r\n, but avoid converting existing \r\n to \r\r\n + # by first normalizing any existing \r\n to \n, then converting all \n to \r\n + text = text.replace("\r\n", "\n") + return text.replace("\n", "\r\n") + + +# --------------------------------------------------------------------------- +# Levenshtein distance (for BlockAnchorReplacer) +# --------------------------------------------------------------------------- + + +def _levenshtein(a: str, b: str) -> int: + if a == "" or b == "": + return max(len(a), len(b)) + # Use a single row DP to reduce memory + prev = list(range(len(b) + 1)) + for i in range(1, len(a) + 1): + curr = [i] + ai = a[i - 1] + for j in range(1, len(b) + 1): + cost = 0 if ai == b[j - 1] else 1 + curr.append(min(curr[-1] + 1, prev[j] + 1, prev[j - 1] + cost)) + prev = curr + return prev[len(b)] + + +# --------------------------------------------------------------------------- +# Escape helpers +# --------------------------------------------------------------------------- + + +def _unescape(s: str) -> str: + """ + Unescape common escape sequences in a string. + + Handles: \\n, \\t, \\r, \\b, \\f, \\v, \\\\, \\", \\', \\`, \\$ + Also handles \\xNN hex and \\uNNNN unicode escapes. + """ + result = [] + i = 0 + while i < len(s): + if s[i] == "\\" and i + 1 < len(s): + nxt = s[i + 1] + if nxt == "n": + result.append("\n") + elif nxt == "t": + result.append("\t") + elif nxt == "r": + result.append("\r") + elif nxt == "b": + result.append("\b") + elif nxt == "f": + result.append("\f") + elif nxt == "v": + result.append("\v") + elif nxt == "x" and i + 3 < len(s): + # \xNN hex escape + try: + val = int(s[i + 2 : i + 4], 16) + result.append(chr(val)) + i += 4 + continue + except ValueError: + result.append(s[i]) + result.append(nxt) + elif nxt == "u" and i + 5 < len(s): + # \uNNNN unicode escape + try: + val = int(s[i + 2 : i + 6], 16) + result.append(chr(val)) + i += 6 + continue + except ValueError: + result.append(s[i]) + result.append(nxt) + elif nxt in ("'", '"', "`", "\\", "$"): + result.append(nxt) + else: + # Unknown escape: preserve both characters + result.append(s[i]) + result.append(nxt) + i += 2 + else: + result.append(s[i]) + i += 1 + return "".join(result) + + +# --------------------------------------------------------------------------- +# Replacer implementations +# --------------------------------------------------------------------------- + + +def _simple_replacer(content: str, find: str) -> Iterator[str]: + """Exact match.""" + if not find: + return + yield find + + +def _escape_normalized_replacer(content: str, find: str) -> Iterator[str]: + """ + Handle escaped sequences like \\n, \\t in the find string. + + This replacer tries two approaches: + 1. Unescape the find string and look for it in content + 2. If find contains literal backslash sequences, try matching content + blocks after unescaping them + + Results are deduplicated to avoid yielding the same block twice. + """ + if not find: + return + + yielded = set() + + # Approach 1: unescape find and search in content + unescaped_find = _unescape(find) + if unescaped_find != find and unescaped_find in content: + yielded.add(unescaped_find) + yield unescaped_find + + # Approach 2: if find contains literal backslash sequences, + # try matching against content blocks after unescaping + if "\\" in find: + lines = content.split("\n") + find_lines = unescaped_find.split("\n") + for i in range(len(lines) - len(find_lines) + 1): + block = "\n".join(lines[i : i + len(find_lines)]) + if block not in yielded and _unescape(block) == unescaped_find: + yielded.add(block) + yield block + + +def _line_trimmed_replacer(content: str, find: str) -> Iterator[str]: + """Match blocks where each line matches after trim.""" + if not find: + return + original_lines = content.split("\n") + search_lines = find.split("\n") + if search_lines and search_lines[-1] == "": + search_lines.pop() + if not search_lines: + return + for i in range(len(original_lines) - len(search_lines) + 1): + if all( + original_lines[i + j].strip() == search_lines[j].strip() + for j in range(len(search_lines)) + ): + yield "\n".join(original_lines[i : i + len(search_lines)]) + + +def _block_anchor_replacer(content: str, find: str) -> Iterator[str]: + """ + Use first and last line as anchors, then use Levenshtein similarity on middle lines. + Single candidate threshold: 0.0 (accept if anchors match) + Multiple candidates threshold: 0.3 (pick best) + """ + if not find: + return + original_lines = content.split("\n") + search_lines = find.split("\n") + if len(search_lines) < 3: + return + if search_lines and search_lines[-1] == "": + search_lines.pop() + if len(search_lines) < 3: + return + + first_anchor = search_lines[0].strip() + last_anchor = search_lines[-1].strip() + search_block_size = len(search_lines) + + candidates: list[tuple[int, int]] = [] + # Limit the search window for the last anchor to avoid matching unrelated lines far away + max_window = max(search_block_size * 2, search_block_size + 10) + for i, line in enumerate(original_lines): + if line.strip() != first_anchor: + continue + for j in range(i + 2, min(len(original_lines), i + max_window)): + if original_lines[j].strip() == last_anchor: + candidates.append((i, j)) + if not candidates: + return + + def _similarity(start: int, end: int) -> float: + actual_size = end - start + 1 + lines_to_check = min(search_block_size - 2, actual_size - 2) + if lines_to_check <= 0: + return 1.0 + sim = 0.0 + for k in range(1, lines_to_check + 1): + if start + k >= len(original_lines) or k >= len(search_lines) - 1: + break + ol = original_lines[start + k].strip() + sl = search_lines[k].strip() + max_len = max(len(ol), len(sl)) + if max_len == 0: + continue + dist = _levenshtein(ol, sl) + sim += (1 - dist / max_len) / lines_to_check + return sim + + if len(candidates) == 1: + start, end = candidates[0] + if _similarity(start, end) >= 0.0: + yield "\n".join(original_lines[start : end + 1]) + return + + best_match: tuple[int, int] | None = None + max_sim = -1.0 + for start, end in candidates: + sim = _similarity(start, end) + if sim > max_sim: + max_sim = sim + best_match = (start, end) + + if max_sim >= 0.3 and best_match: + start, end = best_match + yield "\n".join(original_lines[start : end + 1]) + + +def _whitespace_normalized_replacer(content: str, find: str) -> Iterator[str]: + """Collapse all whitespace sequences to a single space before matching.""" + if not find: + return + + def _norm(t: str) -> str: + return " ".join(t.split()) + + normalized_find = _norm(find) + if not normalized_find: + return + + # Single-line matches + for line in content.split("\n"): + if _norm(line) == normalized_find: + yield line + + # Multi-line block matches + lines = content.split("\n") + find_lines = find.split("\n") + if len(find_lines) > 1: + for i in range(len(lines) - len(find_lines) + 1): + block = "\n".join(lines[i : i + len(find_lines)]) + if _norm(block) == normalized_find: + yield block + + +def _indentation_flexible_replacer(content: str, find: str) -> Iterator[str]: + """ + Match blocks where removing common indentation makes them equal to find. + + Important: yields the ORIGINAL block (with original indentation), + not the de-indented version. This preserves the file's indentation + during replacement. + """ + if not find: + return + + def _remove_indent(text: str) -> str: + lines = text.split("\n") + non_empty = [ln for ln in lines if ln.strip()] + if not non_empty: + return text + min_indent = min(len(ln) - len(ln.lstrip()) for ln in non_empty if ln.strip()) + return "\n".join(ln[min_indent:] if ln.strip() else ln for ln in lines) + + normalized_find = _remove_indent(find) + content_lines = content.split("\n") + find_lines = find.split("\n") + for i in range(len(content_lines) - len(find_lines) + 1): + block = "\n".join(content_lines[i : i + len(find_lines)]) + if _remove_indent(block) == normalized_find: + yield block + + +def _trimmed_boundary_replacer(content: str, find: str) -> Iterator[str]: + """Match if the trimmed version of find exists in content.""" + if not find: + return + trimmed = find.strip() + if trimmed == find: + return + if trimmed in content: + yield trimmed + + lines = content.split("\n") + find_lines = find.split("\n") + for i in range(len(lines) - len(find_lines) + 1): + block = "\n".join(lines[i : i + len(find_lines)]) + if block.strip() == trimmed: + yield block + + +def _context_aware_replacer(content: str, find: str) -> Iterator[str]: + """ + Use first and last line as context anchors, accept if >= 50% of middle lines match. + """ + if not find: + return + find_lines = find.split("\n") + if len(find_lines) < 3: + return + if find_lines and find_lines[-1] == "": + find_lines.pop() + if len(find_lines) < 3: + return + + first_line = find_lines[0].strip() + last_line = find_lines[-1].strip() + content_lines = content.split("\n") + + for i, line in enumerate(content_lines): + if line.strip() != first_line: + continue + for j in range(i + 2, len(content_lines)): + if content_lines[j].strip() != last_line: + continue + block_lines = content_lines[i : j + 1] + if len(block_lines) != len(find_lines): + continue + matching = 0 + total_nonempty = 0 + for k in range(1, len(block_lines) - 1): + bl = block_lines[k].strip() + fl = find_lines[k].strip() + if bl or fl: + total_nonempty += 1 + if bl == fl: + matching += 1 + if total_nonempty == 0 or matching / total_nonempty >= 0.5: + yield "\n".join(block_lines) + return # Only first match + break + + +def _multi_occurrence_replacer(content: str, find: str) -> Iterator[str]: + """Yield all exact matches (used with replace_all).""" + if not find: + return + start = 0 + while True: + idx = content.find(find, start) + if idx == -1: + break + yield find + start = idx + len(find) + + +# Ordered chain: most specific first, most lenient last. +# Escape-normalized is placed early because it handles a common LLM issue +# (using \\n instead of actual newlines) before more aggressive fuzzy matchers. +_REPLACERS: list[Replacer] = [ + _simple_replacer, + _escape_normalized_replacer, + _line_trimmed_replacer, + _block_anchor_replacer, + _whitespace_normalized_replacer, + _indentation_flexible_replacer, + _trimmed_boundary_replacer, + _context_aware_replacer, + _multi_occurrence_replacer, +] + + +# --------------------------------------------------------------------------- +# Indent adjustment helper +# --------------------------------------------------------------------------- + + +def _get_leading_whitespace(line: str) -> str: + """Return the leading whitespace of a line.""" + return line[: len(line) - len(line.lstrip())] + + +def _first_nonempty_line_indent(text: str) -> str: + """Get the leading whitespace of the first non-empty line in text.""" + for line in text.split("\n"): + if line.strip(): + return _get_leading_whitespace(line) + return "" + + +def _adjust_replacement_indent( + new_string: str, + old_string: str, + matched_block: str, + content: str = "", + match_idx: int = -1, +) -> str: + """ + Adjust new_string's indentation to preserve the file's indent level. + + When a fuzzy replacer (e.g. LineTrimmedReplacer, IndentationFlexibleReplacer) + matches a block whose indentation differs from old_string, this function + computes the indent delta and applies it to every non-empty line of + new_string so the replacement preserves the file's original indentation. + + When *content* and *match_idx* are provided, the effective indent is + computed from the actual line in the file rather than from matched_block + alone. This correctly handles substring matches (e.g. SimpleReplacer + matching ``"print(x)"`` inside ``" print(x)"``). + + Indent adjustment rules: + - If the match starts at the beginning of a line (match_idx == line_start), + all lines of new_string get the indent delta applied. + - If the match starts after a whitespace-only prefix (at the content + boundary of an indented line), the first line is NOT adjusted — + the content prefix already provides the indent. Lines 2+ get adjusted. + - If the match starts mid-line (after non-whitespace content), NO lines + are adjusted — the user wrote absolute indentation and the first line + inherits the prefix context. + + For exact matches (old_string == matched_block and both span the full + line), delta is zero and this is a no-op. + """ + old_indent = _first_nonempty_line_indent(old_string) + + # Compute effective file indent from the actual line context when available. + # This handles substring matches where matched_block doesn't include + # the line's leading whitespace. + match_starts_at_line_beginning = True + adjust_subsequent_lines = True + if content and match_idx >= 0: + line_start = content.rfind("\n", 0, match_idx) + 1 + effective_line = content[line_start:] + file_indent = _get_leading_whitespace(effective_line) + if match_idx == line_start: + # Match starts at the very beginning of the line + match_starts_at_line_beginning = True + adjust_subsequent_lines = True + else: + prefix = content[line_start:match_idx] + match_starts_at_line_beginning = False + if prefix.strip() == "": + # Match starts after whitespace-only prefix (content boundary) + # First line inherits prefix; subsequent lines get adjusted + adjust_subsequent_lines = True + else: + # Match starts after non-whitespace (truly mid-line) + # User wrote absolute indentation; no adjustment needed + adjust_subsequent_lines = False + else: + file_indent = _first_nonempty_line_indent(matched_block) + + if file_indent == old_indent: + return new_string + + # Determine indent character from the file's indentation + indent_char = " " + if file_indent: + indent_char = file_indent[0] + elif old_indent: + indent_char = old_indent[0] + + delta = len(file_indent) - len(old_indent) + + new_lines = new_string.split("\n") + adjusted: list[str] = [] + for i, line in enumerate(new_lines): + if not line.strip(): + # Empty or whitespace-only line: keep as-is + adjusted.append(line) + elif i == 0 and not match_starts_at_line_beginning: + # First line and match is not at line start: don't adjust. + # The content prefix already provides the indent context. + adjusted.append(line) + elif i > 0 and not adjust_subsequent_lines: + # Subsequent lines in a truly mid-line match: don't adjust. + # User wrote absolute indentation. + adjusted.append(line) + elif delta > 0: + # Need to add indentation to match file's deeper indent level + adjusted.append(indent_char * delta + line) + elif delta < 0: + # Need to remove indentation (file is less indented than old_string) + current_indent = _get_leading_whitespace(line) + remove = min(-delta, len(current_indent)) + adjusted.append(line[remove:]) + else: + adjusted.append(line) + + return "\n".join(adjusted) + + +# --------------------------------------------------------------------------- +# Core replace function +# --------------------------------------------------------------------------- + + +def robust_replace( + content: str, + old_string: str, + new_string: str, + *, + replace_all: bool = False, +) -> tuple[str, int]: + """ + Replace old_string with new_string using the multi-strategy replacer chain. + + Returns: + A tuple of (new_content, replacements_count). + + Raises: + ValueError: If old_string cannot be found, or if multiple non-unique + matches are found (when replace_all=False). + """ + if old_string == new_string: + raise ValueError( + "No changes to apply: old_string and new_string are identical." + ) + + not_found = True + + for replacer in _REPLACERS: + matches = list(replacer(content, old_string)) + if not matches: + continue + + # Collect all unique match positions + match_positions: list[tuple[int, str]] = [] + for match in matches: + start = 0 + while True: + idx = content.find(match, start) + if idx == -1: + break + # Avoid duplicate positions from overlapping matches + if not any(pos <= idx < pos + len(m) for pos, m in match_positions): + match_positions.append((idx, match)) + start = idx + 1 + + if not match_positions: + continue + + not_found = False + + if replace_all: + # Replace all occurrences, from end to start to preserve indices + new_content = content + replacements = 0 + for idx, match in sorted(match_positions, key=lambda x: x[0], reverse=True): + adjusted_new = _adjust_replacement_indent( + new_string, + old_string, + match, + content, + idx, + ) + new_content = ( + new_content[:idx] + adjusted_new + new_content[idx + len(match) :] + ) + replacements += 1 + return new_content, replacements + + # Single replacement mode: require exactly one match + if len(match_positions) == 1: + idx, match = match_positions[0] + adjusted_new = _adjust_replacement_indent( + new_string, + old_string, + match, + content, + idx, + ) + return content[:idx] + adjusted_new + content[idx + len(match) :], 1 + + # Multiple matches found in single-replacement mode: continue to next replacer + # to try a more specific strategy + continue + + if not_found: + raise ValueError( + "Could not find oldString in the file. It must match exactly, " + "including whitespace, indentation, and line endings. " + "Try providing more surrounding context to make the match unique." + ) + raise ValueError( + "Found multiple matches for oldString. Provide more surrounding context " + "to make the match unique, or use replace_all=True to change every instance." + ) + + +# --------------------------------------------------------------------------- +# Edit result model +# --------------------------------------------------------------------------- + + +@dataclass +class EditResult: + success: bool + replacements: int = 0 + diff: str = "" + error: str = "" + old_content: str = "" + new_content: str = "" + + +# --------------------------------------------------------------------------- +# Async file edit with locking and line-ending preservation (local runtime) +# --------------------------------------------------------------------------- + + +def bytes_edit_file( + raw_bytes: bytes, + old_string: str, + new_string: str, + *, + replace_all: bool = False, + encoding: str = "utf-8", +) -> tuple[bytes, EditResult]: + """ + Core line-ending-aware edit logic operating on raw bytes. + + Performs: + 1. BOM detection and preservation + 2. Line-ending detection (CRLF vs LF) + 3. Normalize to LF for matching (replacer chain works on LF) + 4. Execute robust_replace() + 5. Convert back to original line endings + 6. Re-attach BOM if present + + Returns: + (output_bytes, EditResult) — the caller is responsible for writing + *output_bytes* back to the file. + + Raises: + ValueError: If old_string cannot be found or is not unique. + """ + has_bom = raw_bytes.startswith(b"\xef\xbb\xbf") + if has_bom: + raw_bytes = raw_bytes[3:] + + old_content = raw_bytes.decode(encoding) + original_ending = _detect_line_ending(old_content) + + # Normalize for matching: ONLY normalize actual CRLF line endings. + # Escape sequence handling (\n vs actual newline, \t vs tab, etc.) + # is deferred to the _escape_normalized_replacer in the replacer chain. + normalized_old = _normalize_line_endings(old_string) + normalized_new = _normalize_line_endings(new_string) + + # Normalize file content to LF for matching (replacers work on LF) + normalized_content = _normalize_line_endings(old_content) + + # Perform replacement + new_content, replacements = robust_replace( + normalized_content, + normalized_old, + normalized_new, + replace_all=replace_all, + ) + + # Convert back to original line endings + if original_ending == "\r\n": + new_content = _convert_to_line_ending(new_content, "\r\n") + + # Re-add BOM if present + write_bytes = b"" + if has_bom: + write_bytes += b"\xef\xbb\xbf" + write_bytes += new_content.encode(encoding) + + # Generate unified diff + diff = build_unified_diff("", old_content, new_content) + + result = EditResult( + success=True, + replacements=replacements, + diff=diff, + old_content=old_content, + new_content=new_content, + ) + return write_bytes, result + + +async def edit_file( + path: str, + old_string: str, + new_string: str, + *, + replace_all: bool = False, + encoding: str = "utf-8", +) -> EditResult: + """ + Edit a file using the robust multi-strategy replacer. + + Features: + - File-level asyncio lock prevents concurrent edits + - Preserves original line endings (\\n vs \\r\\n) + - Preserves BOM if present + - Returns unified diff of changes + """ + lock = get_file_lock(path) + async with lock: + try: + # Read file in binary mode to preserve original line endings + raw_bytes = await asyncio.to_thread(_read_file_bytes, path) + write_bytes, result = bytes_edit_file( + raw_bytes, + old_string, + new_string, + replace_all=replace_all, + encoding=encoding, + ) + # Write file in binary mode to preserve restored line endings + await asyncio.to_thread(_write_file_bytes, path, write_bytes) + return result + except Exception as exc: + return EditResult( + success=False, + error=str(exc), + ) + + +def _read_file_bytes(path: str) -> bytes: + with open(path, "rb") as f: + return f.read() + + +def _write_file_bytes(path: str, data: bytes) -> None: + # Ensure parent directory exists + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + with open(path, "wb") as f: + f.write(data) + + +# --------------------------------------------------------------------------- +# Public diff helper +# --------------------------------------------------------------------------- + + +def build_unified_diff( + old_path: str, + old_content: str, + new_content: str, +) -> str: + """Generate a unified diff between two content strings.""" + diff = list( + difflib.unified_diff( + old_content.splitlines(), + new_content.splitlines(), + fromfile=old_path, + tofile=old_path, + lineterm="", + ) + ) + return "\n".join(diff) diff --git a/astrbot/core/tools/computer_tools/fs.py b/astrbot/core/tools/computer_tools/fs.py index 5660022fd0..6b4a9b8709 100644 --- a/astrbot/core/tools/computer_tools/fs.py +++ b/astrbot/core/tools/computer_tools/fs.py @@ -33,6 +33,7 @@ - In sandbox runtime, relative paths are passed through unchanged. """ +import base64 import os import uuid from dataclasses import dataclass, field @@ -55,6 +56,12 @@ from ..registry import builtin_tool from . import util as computer_util +from .edit_engine import ( + EditResult, + bytes_edit_file, + edit_file, + get_file_lock, +) from .util import ( check_admin_permission, is_local_runtime, @@ -368,22 +375,150 @@ async def call( return f"Error writing file: {exc}" +async def _sandbox_read_bytes(sb, path: str) -> bytes: + """Read a file in binary mode inside the sandbox via sb.python.exec. + + Returns the raw bytes so that original line endings (CRLF/LF) are preserved. + Raises IOError on read failure. + """ + code = ( + "import base64 as _b64, sys\n" + f"path = {path!r}\n" + "try:\n" + " with open(path, 'rb') as _f:\n" + " _data = _f.read()\n" + " print(_b64.b64encode(_data).decode(), end='')\n" + "except Exception as _e:\n" + " print('ERROR:' + str(_e), end='', file=sys.stderr)\n" + " sys.exit(1)\n" + ) + result = await sb.python.exec(code, timeout=30) + if not result.get("success", False): + error = str(result.get("error", "") or "").strip() + if not error: + output = result.get("output", "") + if isinstance(output, dict): + error = str(output.get("error", "") or "").strip() + raise OSError( + f"Failed to read file in sandbox: {error or 'unknown read error'}" + ) + output = result.get("output", "") + if isinstance(output, dict): + output = output.get("text", "") + b64_text = str(output).strip() + return base64.b64decode(b64_text) + + +async def _sandbox_write_bytes(sb, path: str, data: bytes) -> None: + """Write raw bytes to a file inside the sandbox via sb.python.exec. + + Raises IOError on write failure. + """ + b64_data = base64.b64encode(data).decode() + code = ( + "import base64 as _b64, sys\n" + f"path = {path!r}\n" + f"b64 = {b64_data!r}\n" + "try:\n" + " _raw = _b64.b64decode(b64)\n" + " with open(path, 'wb') as _f:\n" + " _f.write(_raw)\n" + " print('ok', end='')\n" + "except Exception as _e:\n" + " print('ERROR:' + str(_e), end='', file=sys.stderr)\n" + " sys.exit(1)\n" + ) + result = await sb.python.exec(code, timeout=30) + if not result.get("success", False): + error = str(result.get("error", "") or "").strip() + if not error: + output = result.get("output", "") + if isinstance(output, dict): + error = str(output.get("error", "") or "").strip() + raise OSError( + f"Failed to write file in sandbox: {error or 'unknown write error'}" + ) + + +def _format_result( + path: str, + result: EditResult, + *, + replace_all: bool, +) -> str: + """Build the human-readable / LLM-readable result string with optional diff.""" + if not result.success: + return f"Error editing file: {result.error}" + + mode_text = "all matches" if replace_all else "first match" + + lines = [ + f"Edited {path}.", + f"Replaced {result.replacements} occurrence(s) using {mode_text} mode.", + ] + + if result.diff: + diff_preview = result.diff + if len(diff_preview) > 2000: + diff_preview = diff_preview[:2000] + "\n... (diff truncated)" + lines.append("") + lines.append("Diff:") + lines.append("```diff") + lines.append(diff_preview) + lines.append("```") + + return "\n".join(lines) + + @builtin_tool(config=_COMPUTER_RUNTIME_TOOL_CONFIG) @dataclass class FileEditTool(FunctionTool): + """ + Enhanced file editing tool with robust fuzzy matching. + + In local runtime it uses the full robust edit engine (BOM + + line-ending preservation, file locks); in sandbox runtimes it + mediates reads and writes through the booter's filesystem + abstraction while applying the same 9-strategy replacer chain. + + This tool is designed to handle LLM-generated edits that may + have minor whitespace, indentation, or escape sequence + differences from the actual file content. + """ + name: str = "astrbot_file_edit_tool" - description: str = "Editing files." + description: str = ( + "Editing files with robust fuzzy matching. " + "Supports exact match, escape-normalized match, line-trimmed match, block-anchor match, " + "whitespace-normalized match, indentation-flexible match, " + "trimmed-boundary match, context-aware match, " + "and multi-occurrence replacement. " + "When editing text from Read tool output, preserve the exact indentation " + "(tabs/spaces) as it appears AFTER the line number prefix. " + "The line number prefix format is: line number + colon + space (e.g., '1: '). " + "Everything after that space is the actual file content to match. " + "Never include any part of the line number prefix in oldString or newString. " + "The edit will FAIL if oldString is not found. " + "The edit will FAIL if oldString is found multiple times and replace_all is false. " + "Use replace_all for renaming variables or strings across the file." + ) parameters: dict = field( default_factory=lambda: { "type": "object", "properties": { "path": { "type": "string", - "description": "Path of the file to edit. If relative, will be in workspace root.", + "description": ( + "Path of the file to edit. If relative, will be in workspace root." + ), }, "old": { "type": "string", - "description": "The exact old text to replace.", + "description": ( + "The text to replace. Must be an exact substring of the file content, " + "but the tool will try multiple matching strategies if exact match fails. " + "Include sufficient surrounding context (3-5 lines) to make the match unique." + ), }, "new": { "type": "string", @@ -391,7 +526,10 @@ class FileEditTool(FunctionTool): }, "replace_all": { "type": "boolean", - "description": "Whether to replace all matches. Defaults to false.", + "description": ( + "Whether to replace all matches. Defaults to false. " + "Useful for renaming variables or strings across the file." + ), }, }, "required": ["path", "old", "new"], @@ -409,45 +547,67 @@ async def call( umo = str(context.context.event.unified_msg_origin) local_env = is_local_runtime(context) restricted = _is_restricted_env(context) + try: - normalized_path = ( - _normalize_rw_path( + if local_env: + normalized_path = _normalize_rw_path( path, restricted=restricted, local_env=local_env, umo=umo, write=True, ) - if local_env - else path.strip() - ) + else: + normalized_path = path.strip() + if not normalized_path: raise ValueError("`path` must be a non-empty string.") - normalized_old = _decode_escaped_text(old) - normalized_new = _decode_escaped_text(new) - sb = await get_booter( - context.context.context, - context.context.event.unified_msg_origin, - ) - result = await sb.fs.edit_file( - path=normalized_path, - old_string=normalized_old, - new_string=normalized_new, - replace_all=replace_all, - encoding="utf-8", - ) - if not result.get("success", False): - error_detail = str(result.get("error", "") or "").strip() - return ( - "Error editing file: " - f"{error_detail or 'unknown filesystem edit error'}" + + if local_env: + result = await edit_file( + path=normalized_path, + old_string=old, + new_string=new, + replace_all=replace_all, + encoding="utf-8", + ) + else: + sb = await get_booter( + context.context.context, + umo, ) - replacements = int(result.get("replacements", 0) or 0) - mode_text = "all matches" if replace_all else "first match" - return ( - f"Edited {normalized_path}. " - f"Replaced {replacements} occurrence(s) using {mode_text} mode." + lock = get_file_lock(normalized_path) + async with lock: + # 1. Binary read — preserves original line endings (CRLF/LF) + try: + raw_bytes = await _sandbox_read_bytes(sb, normalized_path) + except OSError as exc: + return f"Error editing file: {exc}" + + # 2. Line-ending-aware edit (reuses edit_engine core logic) + try: + write_bytes, result = bytes_edit_file( + raw_bytes, + old, + new, + replace_all=replace_all, + encoding="utf-8", + ) + except ValueError as exc: + return f"Error editing file: {exc}" + + # 3. Binary write — preserves restored line endings + try: + await _sandbox_write_bytes(sb, normalized_path, write_bytes) + except OSError as exc: + return f"Error editing file: {exc}" + + return _format_result( + normalized_path, + result, + replace_all=replace_all, ) + except PermissionError as exc: return f"Error: {exc}" except Exception as exc: diff --git a/dashboard/src/components/chat/ReasoningSidebar.vue b/dashboard/src/components/chat/ReasoningSidebar.vue index a20a4bb896..e2d0d1f240 100644 --- a/dashboard/src/components/chat/ReasoningSidebar.vue +++ b/dashboard/src/components/chat/ReasoningSidebar.vue @@ -1,6 +1,17 @@ diff --git a/dashboard/src/components/chat/message_list_comps/IPythonToolBlock.vue b/dashboard/src/components/chat/message_list_comps/IPythonToolBlock.vue index 716d8d1b0b..edb72a4f9d 100644 --- a/dashboard/src/components/chat/message_list_comps/IPythonToolBlock.vue +++ b/dashboard/src/components/chat/message_list_comps/IPythonToolBlock.vue @@ -1,21 +1,23 @@ + + diff --git a/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue b/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue index 902cc51113..608edae579 100644 --- a/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue +++ b/dashboard/src/components/shared/ThemeAwareMarkdownCodeBlock.vue @@ -1,5 +1,12 @@