This directory contains comprehensive examples demonstrating various pipeline patterns and capabilities.
# Set up environment
cp .env.example .env
# Edit .env with your Azure credentials
# Install dependencies
pip install -r ../requirements.txtBasic pipeline configuration and execution
- Minimal configuration
- Simple document processing
- Event streaming
- Perfect starting point for beginners
Efficient batch processing of multiple documents
- BatchSplitterExecutor usage
- BatchAggregatorExecutor for result merging
- Handling large document collections
- Resource optimization
Parallel processing of document sections
- ParallelExecutor configuration
- Worker pool management
- Timeout handling
- Concurrent event tracking
Dynamic routing based on document properties
- Conditional edges
- Type-specific processing
- Metadata-based decisions
- Fallback handling
Parallel execution paths with result merging
- Multiple concurrent branches
- Independent task execution
- Result aggregation from parallel paths
- Performance optimization
Nested pipelines for hierarchical processing
- SubworkflowExecutor usage
- Page-level processing
- Chunk-level processing
- Multi-level pipeline composition
-
Choose a sample:
cd 01-simple -
Run the example:
python simple_example.py
Beginner → 01-simple → 02-batch-processing
Intermediate → 03-concurrent-processing → 04-conditional-routing
Advanced → 05-parallel-processing → 06-subpipeline-processing
Demonstrates basic sequential execution using existing pipeline configurations.
Key Concepts:
- Document classification
- Switch-case conditional routing
- Type-specific processing paths
- Default fallback handling
Run:
python conditional_routing.pyAll examples use event streaming to observe workflow execution:
from agent_framework import (
ExecutorInvokedEvent,
ExecutorCompletedEvent,
WorkflowOutputEvent
)
async for event in workflow.run_stream(input_data):
if isinstance(event, ExecutorInvokedEvent):
print(f"Executor {event.executor_id} started")
elif isinstance(event, ExecutorCompletedEvent):
print(f"Executor {event.executor_id} completed")
elif isinstance(event, WorkflowOutputEvent):
print(f"Final output: {event.data}")Examples show how to create custom document processors:
from doc_proc_workflow.executors import DocumentExecutor
class MyProcessor(DocumentExecutor):
def __init__(self):
super().__init__(id="my_processor")
async def process_document(self, document, ctx):
# Your processing logic
document.summary_data["processed"] = True
return documentSequential:
workflow = (
WorkflowBuilder()
.add_edge(step1, step2)
.add_edge(step2, step3)
.set_start_executor(step1)
.build()
)Concurrent:
workflow = (
ConcurrentBuilder()
.participants([proc1, proc2, proc3])
.build()
)Conditional:
workflow = (
WorkflowBuilder()
.add_switch_case_edge_group(
classifier,
[
Case(condition=lambda x: x.type == "pdf", target=pdf_proc),
Case(condition=lambda x: x.type == "image", target=img_proc),
Default(target=default_proc)
]
)
.build()
)- Explore the Agent Framework documentation
- Review workflow samples
- Check out advanced patterns like:
- Loops and iterations
- Human-in-the-loop
- Checkpointing and resume
- Sub-workflows
- State management
ImportError: agent_framework not found
pip install agent-framework-azure-ai --preConfiguration files not found Ensure you're running from the examples directory and that doc-proc-lib configuration files exist.
Module import errors Make sure doc-proc-lib is in your Python path or installed.