Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cecli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def get_parser(default_config_files, git_root):
)
group.add_argument(
"--model-overrides",
"--model-settings",
metavar="MODEL_OVERRIDES_JSON",
help=(
'Specify model tag overrides directly as JSON/YAML string (e.g., \'{"gpt-4o": {"high":'
Expand Down
38 changes: 33 additions & 5 deletions cecli/helpers/hashline.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,46 @@ def get_hashline_diff(
elif operation == "insert":
find_text = ""
# For insert operations, we need to calculate hashlines for the text to insert
# The text should be hashed starting at the line after the end line
# with surrounding context for proper neighborhood-based hashing
if text:
# Insert after the end line, so start hashline at found_end + 2 (1-indexed)
replace_text = hashline(text, start_line=found_end + 2)
original_lines = original_content.splitlines()
text_lines = text.splitlines()
# Get up to 3 lines of context before (ending at found_end) and after the insertion point
ctx_before = original_lines[max(0, found_end - 2) : found_end + 1]
ctx_after = original_lines[found_end + 1 : min(len(original_lines), found_end + 4)]
# Build a mini document with context so HashPos computes correct neighborhood hashes
mini_lines = ctx_before + text_lines + ctx_after
mini_text = "\n".join(mini_lines)
hashed_mini = hashline(mini_text)
hashed_mini_lines = hashed_mini.splitlines(keepends=True)
# Extract only the replacement text portion's hashlines
replace_lines_hashed = hashed_mini_lines[
len(ctx_before) : len(ctx_before) + len(text_lines)
]
replace_text = "".join(replace_lines_hashed)
else:
replace_text = ""
# For replace operation, we're replacing the range
elif operation == "replace":
find_text = original_range_content
# For replace operations, the replacement text should be hashed starting at the start line
# For replace operations, the replacement text should be hashed
# with surrounding context for proper neighborhood-based hashing
if text:
replace_text = hashline(text, start_line=found_start + 1)
original_lines = original_content.splitlines()
text_lines = text.splitlines()
# Get up to 3 lines of context before and after the range
ctx_before = original_lines[max(0, found_start - 3) : found_start]
ctx_after = original_lines[found_end + 1 : min(len(original_lines), found_end + 4)]
# Build a mini document with context so HashPos computes correct neighborhood hashes
mini_lines = ctx_before + text_lines + ctx_after
mini_text = "\n".join(mini_lines)
hashed_mini = hashline(mini_text)
hashed_mini_lines = hashed_mini.splitlines(keepends=True)
# Extract only the replacement text portion's hashlines
replace_lines_hashed = hashed_mini_lines[
len(ctx_before) : len(ctx_before) + len(text_lines)
]
replace_text = "".join(replace_lines_hashed)
else:
replace_text = ""
else:
Expand Down
150 changes: 70 additions & 80 deletions cecli/helpers/hashpos/hashpos.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

class HashPos:
B64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~_"
# The actual coprime period (64 * 63)
PERIOD = 4032
# Regex pattern for HashPos format: {4-char-hash}::
HASH_PREFIX_RE = re.compile(r"^([0-9a-zA-Z\~_@]{4})::")
# Regex for normalization: 4 hash chars optionally followed by '::'
Expand All @@ -18,70 +16,53 @@ def __init__(self, source_text: str = ""):
self.lines = source_text.splitlines()
self.total = len(self.lines)

def _get_content_bits(self, text: str) -> int:
return xxhash.xxh3_64_intdigest(text.encode("utf-8")) & 0xFFF

def _get_anchor_bits(self, line_idx: int) -> int:
a1 = (line_idx * 53 + 13) % 64
a2 = (line_idx * 59 + 31) % 63
return (a1 << 6) | a2

def _spread_bits(self, x: int) -> int:
def _get_region_bits(self, line_idx: int) -> tuple[int, int]:
"""
Spreads 12 bits of x into 24 bits by inserting a 0 between each bit.
Input: 000000000000abcdefghijkl (12 bits)
Output: 0a0b0c0d0e0f0g0h0i0j0k0l (24 bits)
Uses line_idx modulo 16 (4 bits) to get two 2-bit flags (b1, b2).
This guarantees up to 16 consecutive repeating lines get unique spatial anchors.
"""
x &= 0xFFF # Ensure we only have 12 bits
# Shift bits by 8, mask keeps the blocks separated
# x starts: 000000000000 abcdefgh ijkl
x = (x | (x << 8)) & 0x00FF00FF # 0000abcd efgh0000 00000000 ijkl...
# Shift by 4, then 2, then 1 to create 1-bit gaps
x = (x | (x << 4)) & 0x0F0F0F0F
x = (x | (x << 2)) & 0x33333333
x = (x | (x << 1)) & 0x55555555 # Result: 0a0b0c0d0e0f0g0h0i0j0k0l
return x
mod_val = line_idx % 16

# Split the 4-bit modulo value into two separate 2-bit flags
b1 = (mod_val >> 2) & 3 # Top 2 bits (mask with 0b11)
b2 = mod_val & 3 # Bottom 2 bits
return b1, b2

def _compact_bits(self, x: int) -> int:
def _get_neighborhood_hash(self, line_idx: int) -> int:
"""
The inverse of spread: pulls every other bit back together.
Input: 0a0b0c0d0e0f0g0h0i0j0k0l (24 bits)
Output: 000000000000abcdefghijkl (12 bits)
Creates a 20-bit digest using the current line and the 3 lines
before and after it.
"""
x &= 0x55555555 # Mask to ensure we only look at the "active" bits
x = (x | (x >> 1)) & 0x33333333
x = (x | (x >> 2)) & 0x0F0F0F0F
x = (x | (x >> 4)) & 0x00FF00FF
x = (x | (x >> 8)) & 0x0000FFFF # Result: abcdefghijkl
return x
start = max(0, line_idx - 3)
end = min(self.total, line_idx + 4)

context_window = "\n".join(self.lines[start:end])
full_hash = xxhash.xxh3_64_intdigest(context_window.encode("utf-8"))

# Isolate exactly 20 bits
return full_hash & 0xFFFFF

def _interleave(self, content: int, anchor: int) -> int:
def generate_private_id(self, text: str) -> str:
"""
Weaves content and anchor bits together.
Content bits occupy the 'odd' positions, Anchor bits occupy the 'even'.
Generates a fast 12-bit (3 hex chars) hash based purely on the line text.
"""
# Spread content bits and shift by 1 to put them in positions 1, 3, 5...
# Spread anchor bits and leave them in positions 0, 2, 4...
return (self._spread_bits(content) << 1) | self._spread_bits(anchor)
bits = xxhash.xxh3_64_intdigest(text.encode("utf-8")) & 0xFFF
return f"{bits:03x}"

def _deinterleave(self, mixed: int) -> tuple[int, int]:
def generate_public_id(self, text: str, line_idx: int) -> str:
"""
Extracts content and anchor bits from a 24-bit interleaved integer.
Generates a 4-char Base64 ID combining modulo buckets and context hash.
Layout: [2-bit b1] [10-bit Hash A] [2-bit b2] [10-bit Hash B]
"""
# To get content: shift right by 1, then compact
content = self._compact_bits(mixed >> 1)
# To get anchor: just compact (the mask inside _compact_bits handles the rest)
anchor = self._compact_bits(mixed)
return content, anchor
b1, b2 = self._get_region_bits(line_idx)
neighborhood_hash = self._get_neighborhood_hash(line_idx)

def generate_private_id(self, text: str) -> str:
bits = self._get_content_bits(text)
return f"{bits:03x}"
# Split the 20-bit hash into two 10-bit halves
hash_a = (neighborhood_hash >> 10) & 0x3FF
hash_b = neighborhood_hash & 0x3FF

def generate_public_id(self, text: str, line_idx: int) -> str:
content_bits = self._get_content_bits(text)
anchor_bits = self._get_anchor_bits(line_idx)
packed = self._interleave(content_bits, anchor_bits)
# Construct the mixed 24-bit integer
packed = (b1 << 22) | (hash_a << 12) | (b2 << 10) | hash_b

res = ""
for _ in range(4):
Expand All @@ -90,56 +71,69 @@ def generate_public_id(self, text: str, line_idx: int) -> str:
return res

def unpack_public_id(self, public_id: str) -> tuple[int, int]:
"""
Reverses the Public ID back into its (Modulo 16, Neighborhood Hash) values.
"""
packed = 0
for i, char in enumerate(public_id):
packed |= self.B64.index(char) << (6 * i)

return self._deinterleave(packed)
b1 = (packed >> 22) & 3
hash_a = (packed >> 12) & 0x3FF
b2 = (packed >> 10) & 3
hash_b = packed & 0x3FF

mod_val = (b1 << 2) | b2
neighborhood_hash = (hash_a << 10) | hash_b

return mod_val, neighborhood_hash

def format_content(self, use_private_ids: bool = False, start_line: int = 1) -> str:
formatted_lines = []
for i, line in enumerate(self.lines):
prefix = (
self.generate_private_id(line)
if use_private_ids
else self.generate_public_id(line, i + start_line)
else self.generate_public_id(line, i)
)
formatted_lines.append(f"{prefix}::{line}")
return "\n".join(formatted_lines)

def resolve_to_lines(self, public_id: str, start_line: int = 1) -> list[int]:
target_content, target_anchor = self.unpack_public_id(public_id)
content_matches = []
perfect_matches = []
target_mod, target_hash = self.unpack_public_id(public_id)
matches = []

# Find all lines whose neighborhood hash matches our target
for i, line in enumerate(self.lines):
if self._get_content_bits(line) == target_content:
current_anchor = self._get_anchor_bits(i + start_line)
if current_anchor == target_anchor:
perfect_matches.append(i)
else:
dist = abs(current_anchor - target_anchor)
# Use the actual coprime period for the circular logic
dist = min(dist, self.PERIOD - dist)
if self._get_neighborhood_hash(i) == target_hash:
matches.append(i)

if not matches:
return []

# If perfectly unique, return it immediately
if len(matches) == 1:
return matches

# ~1% chance of collision around 10 items
if dist <= 1:
content_matches.append((dist, i))
# Distance Heuristic: If multiple matches exist (e.g. repeated code blocks),
# prioritize the one whose modulo is closest to the target modulo.
# We use circular distance since mod 16 wraps around (0 is adjacent to 15).
def modulo_distance(idx: int) -> int:
current_mod = idx % 16
dist = abs(current_mod - target_mod)
return min(dist, 16 - dist)

if perfect_matches:
return perfect_matches
matches.sort(key=modulo_distance)

content_matches.sort(key=lambda x: x[0])
return [match[1] for match in content_matches]
return matches

def resolve_range(self, start_id: str, end_id: str) -> tuple[int, int]:
"""
Resolves a block range from two Public IDs.

Logic:
1. Resolve all candidates for both IDs.
2. Find the pair of (start, end) that are logically ordered and
have the lowest combined distance score.
1. Resolve all candidates for both IDs (sorted by best match).
2. Find the pair of (start, end) that are logically ordered.
3. Returns (start_index, end_index)
"""
starts = self.resolve_to_lines(start_id)
Expand All @@ -148,13 +142,9 @@ def resolve_range(self, start_id: str, end_id: str) -> tuple[int, int]:
if not starts or not ends:
raise ValueError(f"Could not resolve IDs: {start_id}..{end_id}")

# If both have 'perfect' matches that are logically ordered, use them immediately
# Note: resolve_to_lines returns perfect matches first.
for s in starts:
for e in ends:
if s <= e:
# Return the first logical pair found
# (This prioritizes perfect matches or closest heuristics)
return s, e

raise ValueError(
Expand Down
Loading
Loading