Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions .zed/rules
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ Examples:
src/orcapod/
types.py — Schema, ColumnConfig, ContentHash
system_constants.py — Column prefixes and separators
errors.py — InputValidationError, DuplicateTagError, FieldNotResolvableError
errors.py — InputValidationError, DuplicateKeyError, FieldNotResolvableError
config.py — Config dataclass
contexts/ — DataContext (semantic_hasher, arrow_hasher, type_converter)
protocols/
hashing_protocols.py — PipelineElementProtocol, ContentIdentifiableProtocol
core_protocols/ — StreamProtocol, PodProtocol, SourceProtocol,
DataFunctionProtocol, DatagramProtocol, TagProtocol,
DataFunctionProtocol, DatagramProtocol, KeyProtocol,
DataProtocol, TrackerProtocol
core/
base.py — ContentIdentifiableBase, PipelineElementBase, TraceableBase
Expand All @@ -156,7 +156,7 @@ src/orcapod/
tracker.py — Invocation tracking
datagrams/
datagram.py — Datagram (unified dict/Arrow backing, lazy conversion)
tag_data.py — Tag (+ system tags), Data (+ source info)
key_data.py — Key (+ system keys), Data (+ source info)
sources/
base.py — RootSource (abstract, no upstream)
arrow_table_source.py — Core source — all other sources delegate to it
Expand All @@ -173,15 +173,15 @@ src/orcapod/
merge_join.py — MergeJoin (binary, colliding cols → sorted list[T])
semijoin.py — SemiJoin (binary, non-commutative)
batch.py — Batch (group rows, types become list[T])
column_selection.py — Select/Drop Tag/Data columns
mappers.py — MapTags, MapData (rename columns)
column_selection.py — Select/Drop Key/Data columns
mappers.py — MapKeys, MapData (rename columns)
filters.py — PolarsFilter
hashing/
semantic_hashing/ — BaseSemanticHasher, type handlers
semantic_types/ — Type conversion (Python ↔ Arrow)
databases/ — ArrowDatabaseProtocol implementations (Delta Lake, in-memory)
utils/
arrow_data_utils.py — System tag manipulation, source info, column helpers
arrow_data_utils.py — System key manipulation, source info, column helpers
arrow_utils.py — Arrow table utilities
schema_utils.py — Schema extraction, union, intersection, compatibility
lazy_module.py — LazyModule for deferred heavy imports
Expand All @@ -208,26 +208,26 @@ See orcapod-design.md at the project root for the full design specification.

RootSource → ArrowTableStream → [Operator / FunctionPod] → ArrowTableStream → ...

Every stream is an immutable sequence of (Tag, Data) pairs backed by a PyArrow Table.
Tag columns are join keys and metadata; data columns are the data payload.
Every stream is an immutable sequence of (Key, Data) pairs backed by a PyArrow Table.
Key columns are join keys and metadata; data columns are the data payload.

### Core abstractions

Datagram (core/datagrams/datagram.py) — immutable data container with lazy dict ↔ Arrow
conversion. Two specializations:
- Tag — metadata columns + hidden system tag columns for provenance tracking
- Key — metadata columns + hidden system key columns for provenance tracking
- Data — data columns + per-column source info provenance tokens

Stream (core/streams/arrow_table_stream.py) — immutable (Tag, Data) sequence.
Stream (core/streams/arrow_table_stream.py) — immutable (Key, Data) sequence.
Key methods: output_schema(), keys(), iter_data(), as_table().

Source (core/sources/) — produces a stream from external data. ArrowTableSource is the core
implementation; CSV/Delta/DataFrame/Dict/List sources all delegate to it internally. Each
source adds source-info columns and a system tag column. DerivedSource wraps a
source adds source-info columns and a system key column. DerivedSource wraps a
FunctionNode/OperatorNode's DB records as a new source.

Function Pod (core/function_pod.py) — wraps a DataFunction that transforms individual
data. Never inspects tags. Two execution models:
data. Never inspects keys. Two execution models:
- FunctionPod → FunctionPodStream: lazy, in-memory
- FunctionNode: DB-backed, two-phase (yield cached results first, then compute missing)

Expand All @@ -242,8 +242,8 @@ FunctionNode.

### Strict operator / function pod boundary

Operators: inspect tags (never data content), can rename columns, cannot synthesize values.
Function Pods: inspect data content (never tags), synthesize new values, cached by content.
Operators: inspect keys (never data content), can rename columns, cannot synthesize values.
Function Pods: inspect data content (never keys), synthesize new values, cached by content.

### Two identity chains

Expand All @@ -253,25 +253,25 @@ Every pipeline element has two parallel hashes:
2. pipeline_hash() — schema + topology only. Ignores data content. Used for DB path scoping
so that different sources with identical schemas share database tables.

Base case: RootSource.pipeline_identity_structure() returns (tag_schema, data_schema).
Base case: RootSource.pipeline_identity_structure() returns (key_schema, data_schema).
Each downstream node's pipeline hash commits to its own identity plus upstream pipeline
hashes, forming a Merkle chain.

### Column naming conventions

__ prefix — System metadata (ColumnConfig meta)
_source_ prefix — Source info provenance (ColumnConfig source)
_tag:: prefix — System tag (ColumnConfig system_tags)
_key:: prefix — System key (ColumnConfig system_keys)
_context_key — Data context (ColumnConfig context)

Prefixes are computed from SystemConstant in system_constants.py.

### System tag evolution rules
### System key evolution rules

1. Name-preserving — single-stream ops. Column name/value pass through unchanged.
2. Name-extending — multi-input ops. System tag column name gets
2. Name-extending — multi-input ops. System key column name gets
::{pipeline_hash}:{canonical_position} appended. Commutative operators sort by
pipeline_hash and sort system tag values per row.
pipeline_hash and sort system key values per row.
3. Type-evolving — aggregation ops. Column type changes from str to list[str].

### Key patterns
Expand All @@ -285,12 +285,12 @@ Prefixes are computed from SystemConstant in system_constants.py.

### Important implementation details

- ArrowTableSource raises ValueError if any tag_columns are not in the table.
- ArrowTableSource raises ValueError if any key_columns are not in the table.
- ArrowTableStream requires at least one data column; raises ValueError otherwise.
- FunctionNode Phase 1 returns ALL records in the shared pipeline_path DB table.
Phase 2 skips inputs whose hash is already in the DB.
- Empty data → ArrowTableSource raises ValueError("Table is empty").
- DerivedSource before run() → raises ValueError (no computed records).
- Join requires non-overlapping data columns; raises InputValidationError on collision.
- MergeJoin requires colliding columns to have identical types; merges into sorted list[T].
- Operators predict output schema (including system tag names) without computation.
- Operators predict output schema (including system key names) without computation.
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,37 @@

### Breaking Changes

#### `tag` → `key` rename (hard break)

All identifiers containing `tag`/`tags`/`Tag` have been renamed to
`key`/`keys`/`Key`. No deprecation aliases. Pre-v0.1 artifacts will not load.

| Old name | New name |
|---|---|
| `Tag` | `Key` |
| `TagProtocol` | `KeyProtocol` |
| `TagValue` | `KeyValue` |
| `DuplicateTagError` | `DuplicateKeyError` |
| `SelectTagColumns` | `SelectKeyColumns` |
| `DropTagColumns` | `DropKeyColumns` |
| `MapTags` | `MapKeys` |
| `system_tags()` | `system_keys()` |
| `map_tags()` | `map_keys()` |
| `select_tag_columns()` | `select_key_columns()` |
| `drop_tag_columns()` | `drop_key_columns()` |
| `sort_by_tags` | `sort_by_keys` |
| `SYSTEM_TAG_PREFIX` | `SYSTEM_KEY_PREFIX` |
| `SYSTEM_TAG_PREFIX_NAME` (`"tag"`) | `SYSTEM_KEY_PREFIX_NAME` (`"key"`) |
| `SYSTEM_TAG_SOURCE_ID_PREFIX` | `SYSTEM_KEY_SOURCE_ID_PREFIX` |
| `SYSTEM_TAG_RECORD_ID_PREFIX` | `SYSTEM_KEY_RECORD_ID_PREFIX` |
| `SYSTEM_TAG_SOURCE_ID_FIELD` | `SYSTEM_KEY_SOURCE_ID_FIELD` |
| `SYSTEM_TAG_RECORD_ID_FIELD` | `SYSTEM_KEY_RECORD_ID_FIELD` |
| `ColumnConfig(system_tags=...)` | `ColumnConfig(system_keys=...)` |
| Column prefix `_tag_` | `_key_` (e.g. `_tag_source_id` → `_key_source_id`) |
| Column prefix `_tag::` | `_key::` (e.g. `_tag::source:abc` → `_key::source:abc`) |
| `src/orcapod/core/datagrams/tag_data.py` | `key_data.py` |
| `test-objective/unit/test_tag.py` | `test_key.py` |

#### `packets` → `data` rename (hard break)

All identifiers containing `packet`/`packets`/`Packet` have been renamed to
Expand Down
44 changes: 22 additions & 22 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ Examples:
src/orcapod/
├── types.py # Schema, ColumnConfig, ContentHash
├── system_constants.py # Column prefixes and separators
├── errors.py # InputValidationError, DuplicateTagError, FieldNotResolvableError
├── errors.py # InputValidationError, DuplicateKeyError, FieldNotResolvableError
├── config.py # Config dataclass
├── contexts/ # DataContext (semantic_hasher, arrow_hasher, type_converter)
├── protocols/
│ ├── hashing_protocols.py # PipelineElementProtocol, ContentIdentifiableProtocol
│ └── core_protocols/ # StreamProtocol, PodProtocol, SourceProtocol,
│ # DataFunctionProtocol, DatagramProtocol, TagProtocol,
│ # DataFunctionProtocol, DatagramProtocol, KeyProtocol,
│ # DataProtocol, TrackerProtocol
├── core/
│ ├── base.py # ContentIdentifiableBase, PipelineElementBase, TraceableBase
Expand All @@ -166,7 +166,7 @@ src/orcapod/
│ ├── tracker.py # Invocation tracking
│ ├── datagrams/
│ │ ├── datagram.py # Datagram (unified dict/Arrow backing, lazy conversion)
│ │ └── tag_data.py # Tag (+ system tags), Data (+ source info)
│ │ └── key_data.py # Key (+ system keys), Data (+ source info)
│ ├── sources/
│ │ ├── base.py # RootSource (abstract, no upstream)
│ │ ├── arrow_table_source.py # Core source — all other sources delegate to it
Expand All @@ -183,15 +183,15 @@ src/orcapod/
│ ├── merge_join.py # MergeJoin (binary, colliding cols → sorted list[T])
│ ├── semijoin.py # SemiJoin (binary, non-commutative)
│ ├── batch.py # Batch (group rows, types become list[T])
│ ├── column_selection.py # Select/Drop Tag/Data columns
│ ├── mappers.py # MapTags, MapData (rename columns)
│ ├── column_selection.py # Select/Drop Key/Data columns
│ ├── mappers.py # MapKeys, MapData (rename columns)
│ └── filters.py # PolarsFilter
├── hashing/
│ └── semantic_hashing/ # BaseSemanticHasher, type handlers
├── semantic_types/ # Type conversion (Python ↔ Arrow)
├── databases/ # ArrowDatabaseProtocol implementations (Delta Lake, in-memory)
└── utils/
├── arrow_data_utils.py # System tag manipulation, source info, column helpers
├── arrow_data_utils.py # System key manipulation, source info, column helpers
├── arrow_utils.py # Arrow table utilities
├── schema_utils.py # Schema extraction, union, intersection, compatibility
└── lazy_module.py # LazyModule for deferred heavy imports
Expand Down Expand Up @@ -221,26 +221,26 @@ See `orcapod-design.md` at the project root for the full design specification.
RootSource → ArrowTableStream → [Operator / FunctionPod] → ArrowTableStream → ...
```

Every stream is an immutable sequence of (Tag, Data) pairs backed by a PyArrow Table.
Tag columns are join keys and metadata; data columns are the data payload.
Every stream is an immutable sequence of (Key, Data) pairs backed by a PyArrow Table.
Key columns are join keys and metadata; data columns are the data payload.

### Core abstractions

**Datagram** (`core/datagrams/datagram.py`) — immutable data container with lazy dict ↔ Arrow
conversion. Two specializations:
- **Tag** — metadata columns + hidden system tag columns for provenance tracking
- **Key** — metadata columns + hidden system key columns for provenance tracking
- **Data** — data columns + per-column source info provenance tokens

**Stream** (`core/streams/arrow_table_stream.py`) — immutable (Tag, Data) sequence.
**Stream** (`core/streams/arrow_table_stream.py`) — immutable (Key, Data) sequence.
Key methods: `output_schema()`, `keys()`, `iter_data()`, `as_table()`.

**Source** (`core/sources/`) — produces a stream from external data. `ArrowTableSource` is the
core implementation; CSV/Delta/DataFrame/Dict/List sources all delegate to it internally. Each
source adds source-info columns and a system tag column. `DerivedSource` wraps a
source adds source-info columns and a system key column. `DerivedSource` wraps a
FunctionNode/OperatorNode's DB records as a new source.

**Function Pod** (`core/function_pod.py`) — wraps a `DataFunction` that transforms individual
data. Never inspects tags. Two execution models:
data. Never inspects keys. Two execution models:
- `FunctionPod` → `FunctionPodStream`: lazy, in-memory
- `FunctionNode`: DB-backed, two-phase (yield cached results first, then compute missing)

Expand All @@ -258,7 +258,7 @@ FunctionNode.
| | Operator | Function Pod |
|---|---|---|
| Inspects data content | Never | Yes |
| Inspects / uses tags | Yes | No |
| Inspects / uses keys | Yes | No |
| Can rename columns | Yes | No |
| Synthesizes new values | No | Yes |
| Stream arity | Configurable | Single in, single out |
Expand All @@ -272,7 +272,7 @@ Every pipeline element has two parallel hashes:
2. **`pipeline_hash()`** — schema + topology only. Ignores data content. Used for DB path
scoping so that different sources with identical schemas share database tables.

Base case: `RootSource.pipeline_identity_structure()` returns `(tag_schema, data_schema)`.
Base case: `RootSource.pipeline_identity_structure()` returns `(key_schema, data_schema)`.
Each downstream node's pipeline hash commits to its own identity plus the pipeline hashes of
its upstreams, forming a Merkle chain.

Expand All @@ -285,28 +285,28 @@ The pipeline hash uses a **resolver pattern** — `PipelineElementProtocol` obje
|--------|---------|---------|---------------|
| `__` | System metadata | `__data_id`, `__pod_version` | `ColumnConfig(meta=True)` |
| `_source_` | Source info provenance | `_source_age` | `ColumnConfig(source=True)` |
| `_tag::` | System tag | `_tag::source:abc123` | `ColumnConfig(system_tags=True)` |
| `_key::` | System key | `_key::source:abc123` | `ColumnConfig(system_keys=True)` |
| `_context_key` | Data context | `_context_key` | `ColumnConfig(context=True)` |

Prefixes are computed from `SystemConstant` in `system_constants.py`. The `constants` singleton
(with no global prefix) is used throughout.

### System tag evolution rules
### System key evolution rules

1. **Name-preserving** — single-stream ops (filter, select, map). Column name and value pass
through unchanged.
2. **Name-extending** — multi-input ops (join, merge join). Each input's system tag column
2. **Name-extending** — multi-input ops (join, merge join). Each input's system key column
name gets `::{pipeline_hash}:{canonical_position}` appended. Commutative operators
canonically order inputs by `pipeline_hash` and sort system tag values per row.
canonically order inputs by `pipeline_hash` and sort system key values per row.
3. **Type-evolving** — aggregation ops (batch). Column type changes from `str` to `list[str]`.

### Schema types and ColumnConfig

`Schema` (`types.py`) — immutable `Mapping[str, DataType]` with `optional_fields` support.
`output_schema()` always returns `(tag_schema, data_schema)` as a tuple of Schemas.
`output_schema()` always returns `(key_schema, data_schema)` as a tuple of Schemas.

`ColumnConfig` (`types.py`) — frozen dataclass controlling which column groups are included.
Fields: `meta`, `context`, `source`, `system_tags`, `content_hash`, `sort_by_tags`.
Fields: `meta`, `context`, `source`, `system_keys`, `content_hash`, `sort_by_keys`.
Normalize via `ColumnConfig.handle_config(columns, all_info)` at the top of `output_schema()`
and `as_table()` methods. `all_info=True` sets everything to True.

Expand All @@ -323,7 +323,7 @@ and `as_table()` methods. `all_info=True` sets everything to True.

### Important implementation details

- `ArrowTableSource.__init__` raises `ValueError` if any `tag_columns` are not in the table.
- `ArrowTableSource.__init__` raises `ValueError` if any `key_columns` are not in the table.
- `ArrowTableStream` requires at least one data column; raises `ValueError` otherwise.
- `FunctionNode.iter_data()` Phase 1 returns ALL records in the shared `pipeline_path`
DB table (not filtered to current inputs). Phase 2 skips inputs whose hash is already
Expand All @@ -333,5 +333,5 @@ and `as_table()` methods. `all_info=True` sets everything to True.
- Join requires non-overlapping data columns; raises `InputValidationError` on collision.
- MergeJoin requires colliding data columns to have identical types; merges into sorted
`list[T]` with source columns reordered to match.
- Operators predict their output schema (including system tag column names) without
- Operators predict their output schema (including system key column names) without
performing the actual computation.
Loading
Loading