A powerful Python library for managing Polars LazyFrames with parent-child relationships, lazy evaluation, and FastAPI integration. Part of the LDaCA (Language Data Commons of Australia) ecosystem.
DocWorkspace provides a workspace-based approach to data analysis, where data transformations are tracked as nodes in a directed graph. This enables:
- Relationship Tracking: Understand data lineage and transformation history
- Lazy Evaluation: Optimize performance with Polars LazyFrames
- LazyFrame-Native Graphs: Node data is stored as Polars LazyFrames
- FastAPI Integration: Ready-to-use models and utilities for web APIs
- Serialization: Save and restore entire workspaces with their relationships
pip install "docworkspace>=0.2.0"docworkspace is published on PyPI as a pure-Python package.
git clone https://github.com/Australian-Text-Analytics-Platform/docworkspace.git
cd docworkspace
uv sync --group dev- Python ≥ 3.14
- polars
- polars-text >= 0.1.0
For FastAPI integration:
pip install pydanticimport polars as pl
from docworkspace import Node, Workspace
# Create a workspace
workspace = Workspace("my_analysis")
# Load data
df = pl.DataFrame({
"text": ["Hello world", "Data science", "Python rocks"],
"category": ["greeting", "tech", "programming"],
"score": [0.8, 0.9, 0.95]
})
# Convert to LazyFrame before adding data to the workspace
data_node = workspace.add_node(Node(df.lazy(), name="raw_data"))
# Apply transformations (creates new nodes automatically)
filtered = data_node.filter(pl.col("score") > 0.85)
grouped = filtered.group_by("category").agg(pl.col("score").mean())
# Check relationships
print(f"Total nodes: {len(workspace.nodes)}")
print(f"Root nodes: {len(workspace.get_root_nodes())}")
print(f"Leaf nodes: {len(workspace.get_leaf_nodes())}")
# Visualize the computation graph
print(workspace.visualize_graph())A Node wraps a Polars LazyFrame and tracks relationships with other nodes. Nodes support:
- Transparent Data Access: All DataFrame methods work directly on nodes
- Transparent Data Access: All LazyFrame methods work directly on nodes
- Automatic Relationship Tracking: Operations create child nodes
- Lazy Evaluation: Maintains laziness for performance
- Metadata: Store operation descriptions and custom metadata
# Convert eager data before creating a node when needed
node = Node(df.lazy(), name="my_data")
# All DataFrame operations work directly
filtered_node = node.filter(pl.col("value") > 10)
sorted_node = filtered_node.sort("value", descending=True)
# Check relationships
print(f"Children of original node: {len(node.children)}")
print(f"Parents of sorted node: {len(sorted_node.parents)}")A Workspace manages collections of nodes and provides graph operations:
- Node Management: Add, remove, and retrieve nodes
- Graph Operations: Find roots, leaves, descendants, ancestors
- Serialization: Save/load entire workspaces
- Visualization: Generate text-based and programmatic graph representations
workspace = Workspace("analysis")
# Add nodes
node1 = workspace.add_node(Node(df1, "dataset1"))
node2 = workspace.add_node(Node(df2, "dataset2"))
# Join creates a new node with both parents
joined = node1.join(node2, on="id")
# Explore the graph
roots = workspace.get_root_nodes()
leaves = workspace.get_leaf_nodes()DocWorkspace stores node data as Polars LazyFrame objects.
import polars as pl
# Convert an eager Polars DataFrame when needed
df = pl.DataFrame({"text": ["hello", "world"], "id": [1, 2]})
node1 = Node(df.lazy(), "eager_data")
# Polars LazyFrame (lazy)
lazy_df = pl.LazyFrame({"text": ["foo", "bar"], "id": [3, 4]})
node2 = Node(lazy_df, "lazy_data")
# Both nodes remain LazyFrames inside the workspace
workspace = Workspace("lazyframe_nodes")
for node in [node1, node2]:
workspace.add_node(node)DocWorkspace preserves Polars' lazy evaluation capabilities:
# Start with lazy data
lazy_df = pl.scan_csv("large_file.csv")
node = Node(lazy_df, "raw_data")
# Chain operations (all remain lazy)
filtered = node.filter(pl.col("value") > 100)
grouped = filtered.group_by("category").agg(pl.col("value").sum())
sorted_result = grouped.sort("value", descending=True)
# Only materialize when needed
final_result = sorted_result.collect() # This creates a new materialized nodeUnderstand your data lineage:
# Create a processing pipeline
raw_data = Node(df.lazy(), "raw")
cleaned = raw_data.filter(pl.col("value").is_not_null())
normalized = cleaned.with_columns(pl.col("value") / pl.col("value").max())
final = normalized.select(["id", "normalized_value"])
# Explore relationships
print("Processing chain:")
current = final
while current.parents:
parent = current.parents[0]
print(f"{parent.name} -> {current.name} ({current.operation})")
current = parentReady-to-use models for web APIs:
from docworkspace import FastAPIUtils, WorkspaceGraph, NodeSummary
# Convert workspace to FastAPI-compatible format
graph_data = workspace.to_api_graph()
# Get node summaries
summaries = [FastAPIUtils.node_to_api_summary(node) for node in workspace.nodes.values()]Save and restore complete workspaces:
# Save workspace with all nodes and relationships
workspace.serialize("my_workspace.json")
# Load workspace later
restored_workspace = Workspace.deserialize("my_workspace.json")
# All nodes and relationships are preserved
assert len(restored_workspace.nodes) == len(workspace.nodes)Create custom operations that maintain relationships:
def custom_transform(node: Node, operation_name: str) -> Node:
"""Apply custom transformation and track the operation."""
# Your custom logic here
result_data = node.data.with_columns(pl.col("value") * 2)
# Create new node with relationship tracking
return Node(
data=result_data,
name=f"{operation_name}_{node.name}",
workspace=node.workspace,
parents=[node],
operation=operation_name
)
# Use custom operation
transformed = custom_transform(original_node, "double_values")Analyze your computation graph:
# Find all descendants of a node
descendants = workspace.get_descendants(node.id)
# Find all ancestors
ancestors = workspace.get_ancestors(node.id)
# Get topological ordering
ordered_nodes = workspace.get_topological_order()
# Check for cycles (shouldn't happen in normal usage)
has_cycles = workspace.has_cycles()DocWorkspace tracks the text/document column via node metadata:
# Create a DataFrame with a text column
df = pl.DataFrame({
"doc_id": ["d1", "d2", "d3"],
"text": ["Hello world", "Data science", "Python rocks"],
"metadata": ["type1", "type2", "type1"]
})
node = Node(df.lazy(), "corpus")
node.document = "text"
# Document metadata is preserved across operations
filtered = node.filter(pl.col("metadata") == "type1")
print(f"Document column preserved: {filtered.document}")Node(data, name=None, workspace=None, parents=None, operation=None)document: Optional[str]- Document column tracked in node metadatadata: pl.LazyFrame- Underlying frame-like object
collect() -> Node- Materialize lazy data (creates new node)materialize() -> Node- Alias for collect()info(json=False) -> Dict- Get node informationjson_schema() -> Dict[str, str]- Get JSON-compatible schema
All Polars LazyFrame operations are available directly:
filter(condition) -> Nodeselect(columns) -> Nodewith_columns(*exprs) -> Nodegroup_by(*columns) -> Nodesort(by, descending=False) -> Nodejoin(other, on, how="inner") -> Node- And many more...
Workspace(name=None, data=None, data_name=None, csv_lazy=True, **csv_kwargs)id: str- Unique workspace identifiername: str- Human-readable namenodes: Dict[str, Node]- All nodes in the workspace
add_node(node) -> Node- Add a node to the workspaceremove_node(node_id, materialize_children=False) -> bool- Remove a nodeget_node(node_id) -> Optional[Node]- Get node by IDget_node_by_name(name) -> Optional[Node]- Get node by namelist_nodes() -> List[Node]- Get all nodes
get_root_nodes() -> List[Node]- Nodes with no parentsget_leaf_nodes() -> List[Node]- Nodes with no childrenget_descendants(node_id) -> List[Node]- All descendant nodesget_ancestors(node_id) -> List[Node]- All ancestor nodesget_topological_order() -> List[Node]- Topologically sorted nodes
visualize_graph() -> str- Text-based graph visualizationgraph() -> Dict- Generic graph structureto_react_flow_json() -> Dict- React Flow compatible format
serialize(file_path)- Save workspace to JSONdeserialize(file_path) -> Workspace- Load workspace from JSONfrom_dict(workspace_dict) -> Workspace- Create from dictionary
get_metadata(key) -> Any- Get workspace metadataset_metadata(key, value)- Set workspace metadatasummary() -> Dict- Get workspace summaryinfo() -> Dict- Alias for summary()
NodeSummary- API-friendly node representationWorkspaceGraph- React Flow compatible graphPaginatedData- Paginated data response
FastAPIUtils.node_to_api_summary(node) -> NodeSummary
FastAPIUtils.workspace_to_ui_graph_payload(workspace) -> WorkspaceGraphimport polars as pl
from docworkspace import Node, Workspace
# Sample text data
df = pl.DataFrame({
"doc_id": [f"doc_{i}" for i in range(100)],
"text": [f"Sample text content {i}" for i in range(100)],
"category": ["news", "blog", "academic"] * 34,
"year": [2020, 2021, 2022, 2023] * 25
})
# Create workspace
workspace = Workspace("text_analysis")
# Track the document column for text analysis
corpus = workspace.add_node(Node(df.lazy(), "full_corpus"))
corpus.document = "text"
# Filter by category
news_docs = corpus.filter(pl.col("category") == "news")
blog_docs = corpus.filter(pl.col("category") == "blog")
# Filter by recent years
recent_news = news_docs.filter(pl.col("year") >= 2022)
# Group analysis
year_stats = corpus.group_by(["category", "year"]).agg(
pl.count().alias("doc_count")
)
# Materialize results
final_stats = year_stats.collect()
# Analyze the computation graph
print(workspace.visualize_graph())
print(f"Total transformations: {len(workspace.nodes)}")import polars as pl
from docworkspace import Workspace
# Create workspace with lazy CSV loading
workspace = Workspace(
"large_data_analysis",
data="large_dataset.csv", # Path to CSV
data_name="raw_data",
csv_lazy=True # Load as LazyFrame for performance
)
# Get the loaded node
raw_data = workspace.get_node_by_name("raw_data")
print(f"Is lazy: {isinstance(raw_data.data, pl.LazyFrame)}") # True
# Chain transformations (all remain lazy)
cleaned = raw_data.filter(pl.col("value").is_not_null())
normalized = cleaned.with_columns(
(pl.col("value") / pl.col("value").max()).alias("normalized")
)
aggregated = normalized.group_by("category").agg([
pl.col("normalized").mean().alias("avg_normalized"),
pl.count().alias("count")
])
# Still lazy until we collect (check underlying data type)
print(f"Aggregated is lazy: {isinstance(aggregated.data, pl.LazyFrame)}") # True
# Materialize only the final result
result = aggregated.collect()
print(f"Result is lazy: {isinstance(result.data, pl.LazyFrame)}") # False
# Save the entire workspace with lazy evaluation preserved
workspace.serialize("lazy_analysis.json")import polars as pl
from docworkspace import Node, Workspace
workspace = Workspace("data_integration")
# Load data from multiple sources
sales_df = pl.DataFrame({
"customer_id": [1, 2, 3, 4],
"sales": [100, 200, 150, 300],
"region": ["North", "South", "East", "West"]
})
customer_df = pl.DataFrame({
"customer_id": [1, 2, 3, 4, 5],
"name": ["Alice", "Bob", "Charlie", "Diana", "Eve"],
"segment": ["Premium", "Regular", "Premium", "Regular", "Premium"]
})
# Add to workspace
sales_node = workspace.add_node(Node(sales_df, "sales_data"))
customer_node = workspace.add_node(Node(customer_df, "customer_data"))
# Join the datasets
combined = sales_node.join(customer_node, on="customer_id", how="inner")
# Analyze by segment
segment_analysis = combined.group_by("segment").agg([
pl.col("sales").sum().alias("total_sales"),
pl.col("sales").mean().alias("avg_sales"),
pl.count().alias("customer_count")
])
# Filter high-value segments
high_value = segment_analysis.filter(pl.col("total_sales") > 200)
print(f"Nodes in workspace: {len(workspace.nodes)}")
print("Data lineage:")
for node in workspace.get_leaf_nodes():
print(f"Leaf node: {node.name}")# Install development dependencies
uv sync --group dev
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=docworkspace
# Run specific test file
uv run pytest tests/test_workspace.py -vuv buildThis produces a universal wheel and source distribution suitable for PyPI.
- Fork the repository
- Create a feature branch:
git checkout -b feature-name - Make your changes and add tests
- Run the test suite:
uv run pytest - Submit a pull request
docworkspace/
├── .github/
│ └── workflows/ # CI and release automation
├── src/
│ └── docworkspace/
│ ├── __init__.py # Public package exports
│ ├── node/
│ │ ├── __init__.py
│ │ ├── core.py # Node implementation
│ │ └── io.py # Node serialization helpers
│ └── workspace/
│ ├── __init__.py
│ ├── core.py # Workspace implementation
│ ├── io.py # Workspace serialization helpers
│ └── analysis.py
├── tests/ # Test suite
│ ├── conftest.py
│ ├── test_fastapi_integration.py
│ ├── test_node.py
│ ├── test_node_io.py
│ ├── test_simple_operations.py
│ ├── test_workspace.py
│ ├── test_workspace_serialization_types.py
│ └── test_workspace_shim.py
├── PUBLISH.md # Release runbook
├── README.md # This file
└── pyproject.toml # Project configuration
Part of the LDaCA (Language Data Commons of Australia) ecosystem.
- Published on PyPI as
docworkspace - PyPI consumers can install the package directly instead of relying on a local workspace checkout
- Added release automation and publishing runbook for future releases
- Continued support for Polars data types, lazy evaluation, FastAPI integration, and serialization
- LDaCA Web App: Full-stack web application using DocWorkspace
- Polars: Fast DataFrame library with lazy evaluation