Intelligent, scalable content processing pipelines powered by Azure AI and orchestrated by Microsoft Agent Framework
ContentFlow is an enterprise-grade document and content processing platform that transforms unstructured content into intelligent, actionable data. It combines:
- 🔄 Orchestrated Workflows - YAML-based pipeline definitions with conditional routing and parallel execution
- 🤖 AI-Powered Processing - Integration with Azure AI services for document intelligence, embeddings, and analysis
- 📦 Modular Executors - 40+ pre-built processors for PDF, Word, Excel, PowerPoint, and more
- 🌐 Cloud-Native Architecture - Deployed on Azure Container Apps with distributed processing
- 💻 Intuitive Web UI - React-based interface for pipeline design and monitoring
- ⚡ Scalable & Distributed - Multi-worker architecture for processing at scale
- Multi-Format Support: PDF, Word, Excel, PowerPoint, plain text, web content, audio, video
- OCR & Layout Analysis: Extract text from scanned documents with layout preservation
- Intelligent Extraction: Tables, images, metadata, document structure
- Content Understanding: Chunking, embedding generation, semantic analysis
- Knowledge Graphs: Extract and build relationships between entities
- Conditional Routing: Dynamic paths based on document properties
- Parallel Processing: Fan-out/fan-in patterns with result aggregation
- Batch Operations: Efficient processing of large document collections
- Sub-Pipelines: Hierarchical workflow composition for complex scenarios
- Error Handling: Automatic retry logic and graceful degradation
- Document Intelligence: Extract text, tables, key-value pairs from documents
- Embeddings: Generate semantic vectors for similarity search and RAG
- Content Analysis: Sentiment, entity extraction, topic classification
- Web Scraping: Dynamic content extraction with Playwright
- Azure AI Landing Zone Integration: Secure deployment within enterprise environments
- RBAC & Identity: Managed identities and role-based access control
- Audit & Monitoring: Comprehensive logging and Application Insights
- Data Isolation: Blob storage and Cosmos DB for persistent data management
- Azure subscription with necessary services configured
- Python 3.12+
- Docker (for running locally)
- Node.js 18+ (for web UI development)
Supports two modes:
- Basic mode for quick setup for development and testing
- Azure AI Landing Zone integrated mode for an Enterprise level deployment
➡️ View deployment docs for more details
git clone https://github.com/Azure/contentflow
cd contentflow
# One-command deployment
azd up
# This will:
# 1. Provision Azure infrastructure (Container Apps, Storage, Cosmos DB, etc.)
# 2. Build and push container images
# 3. Deploy services
# 4. Configure post-deployment settings
# 5. Output service URLs# API service
cd contentflow-api
pip install -r requirements.txt
python main.py
# Worker service
cd contentflow-worker
pip install -r requirements.txt
python main.py
# Web UI
cd contentflow-web
npm install
npm run devScenario: Enterprise needs to digitize and catalog thousands of historical documents
Input Documents → PDF Extraction → OCR & Layout Analysis → Metadata Extraction → Full-Text Indexing → Archive Storage
Benefits: Searchable digital archives, compliance automation, instant retrieval
Scenario: Build a knowledge base from company documents for AI-powered Q&A
Documents → Chunking → Embedding Generation → Vector Search Indexing → LLM Query Augmentation
ContentFlow Powers: Batch processing thousands of documents, generating embeddings, storing in vector DB
Scenario: Extract financial data from quarterly reports, earnings calls, and regulatory filings
Financial Documents → Extract Tables → Parse Key Metrics → Classify Document Type → Store in Data Warehouse
Smart Features:
- Conditional routing based on document type
- Parallel processing of multiple sections
- Automatic retry on extraction failure
Scenario: Process product descriptions, images, and specifications across multiple formats
Product Files (PDF, DOC, XLSX) → Content Extraction → Image Processing → Standardization → Catalog Upload
Powered By: Batch operations, format-specific extractors, validation logic
Scenario: Convert paper records and scanned documents into structured patient data
Scanned Records → OCR → Medical Entity Extraction → HIPAA Compliance Validation → EHR Integration
Enterprise Features: Audit logging, encryption, RBAC, data isolation
Scenario: Crawl websites and aggregate news articles with AI analysis
Web URLs → Web Scraping → Content Extraction → Sentiment Analysis → Topic Classification → Distribution
Automation: Parallel scraping, conditional routing, scheduled execution
Web Dashboard (contentflow-web)
- Modern React application with Vite
- Visual pipeline designer with React Flow
- Real-time execution monitoring
- Results viewer with syntax highlighting
- Responsive Tailwind CSS design
- FastAPI REST endpoints for pipeline operations
- AsyncIO-based for high concurrency
- WebSocket support for real-time events
- Integration with Azure Key Vault for secrets
- CORS configured for web UI
Core Library (contentflow-lib)
- Pipeline Factory: Compiles YAML to execution graphs
- Executor Framework: Base classes and 40+ implementations
- Content Models: Strongly-typed data structures
- Event System: Real-time pipeline execution tracking
- Plugin Architecture: Easy extension with custom executors
Worker Service (contentflow-worker)
- Multi-threaded content processing engine
- Queue-based job distribution
- Automatic scaling based on load
- Health monitoring and graceful shutdown
- Error handling with exponential backoff
pipeline:
name: document_processing
description: "Process documents with intelligence"
executors:
- id: get_content
type: azure_blob_input_discovery
settings:
blob_storage_account: "${STORAGE_ACCOUNT}"
blob_container_name: "documents"
file_extensions: ".pdf,.docx"
- id: extract_text
type: azure_document_intelligence_extractor
settings:
doc_intelligence_endpoint: "${DOC_INT_ENDPOINT}"
- id: generate_embeddings
type: embeddings_executor
settings:
model: "text-embedding-3-large"
- id: store_results
type: cosmos_db_writer
settings:
database_name: "contentflow"
container_name: "documents"
# Execution sequence with conditional routing
edges:
- from: get_content
to: extract_text
- from: extract_text
to: generate_embeddings
condition: "output.pages > 0"
- from: generate_embeddings
to: store_resultsfrom contentflow.pipeline import PipelineExecutor
from contentflow.models import Content, ContentIdentifier
async with PipelineExecutor.from_config_file(
config_path="my_pipeline.yaml",
pipeline_name="document_processing"
) as executor:
# Create content to process
document = Content(
id=ContentIdentifier(
canonical_id="doc_001",
unique_id="doc_001",
source_name="azure_blob",
source_type="pdf",
path="documents/report.pdf"
)
)
# Execute pipeline
result = await executor.execute(document)
# Check results
print(f"Status: {result.status}")
print(f"Duration: {result.duration_seconds}s")
for event in result.events:
print(f" {event.executor_id}: {event.message}")⇢ Follow the guide in the Creating Custom Executors to create custom executors and extend ContentFlow's capabilities.
contentflow/
├── contentflow-api/ # FastAPI REST service
│ ├── app/
│ │ ├── routers/ # API endpoint definitions
│ │ ├── services/ # Business logic
│ │ ├── dependencies.py # Dependency injection
│ │ └── settings.py # Configuration
│ ├── main.py # Application entry
│ └── Dockerfile
│
├── contentflow-lib/ # Core processing library
│ ├── contentflow/
│ │ ├── pipeline/ # Pipeline execution engine
│ │ ├── executors/ # 40+ executor implementations
│ │ ├── connectors/ # Data source connectors
│ │ ├── models/ # Data models
│ │ └── utils/ # Utilities
│ ├── samples/ # 20+ example pipelines
│ ├── executor_catalog.yaml # Executor registry
│ └── pyproject.toml
│
├── contentflow-web/ # React web dashboard
│ ├── src/
│ │ ├── components/ # Reusable UI components
│ │ ├── pages/ # Page components
│ │ ├── hooks/ # Custom React hooks
│ │ └── lib/ # Utilities & helpers
│ ├── vite.config.ts # Build configuration
│ └── Dockerfile
│
├── contentflow-worker/ # Processing worker service
│ ├── app/
│ │ ├── engine.py # Worker engine
│ │ ├── api.py # Health/status endpoints
│ │ └── settings.py # Configuration
│ ├── main.py # Entry point
│ └── Dockerfile
│
└── infra/ # Infrastructure as Code
├── bicep/
│ ├── main.bicep # Main template
│ └── modules/ # Reusable Bicep modules
└── scripts/ # Deployment automation
✅ Zero-Trust Architecture - No exposed endpoints
✅ Managed Identity Authentication - No exposed credentials
✅ Azure Key Vault Integration - Secure secret storage
✅ RBAC & Access Control - Fine-grained permissions
✅ Encrypted Communication - TLS for all endpoints
✅ Audit & Logging - Full audit trail with Application Insights
✅ Data Isolation - Separate storage containers per tenant/environment
| Metric | Capability |
|---|---|
| Throughput | 100+ documents/hour per worker |
| Concurrency | Unlimited parallel pipelines |
| Scaling | Auto-scale Container Apps based on queue depth |
| Latency | <1s for simple operations, <30s for complex AI |
| Reliability | Automatic retry, fault tolerance, graceful degradation |
| Storage | Unlimited with Blob Storage + Cosmos DB |
- Infrastructure Guide - Deploy to Azure
- API Documentation - REST endpoints
- Sample Pipelines - Learn by example
- Web UI Guide - Dashboard features
We welcome contributions! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: Report bugs and request features on GitHub Issues
- Discussions: Ask questions and share ideas in Discussions
- Documentation: Check our comprehensive docs
- Examples: Explore sample pipelines
Deploy to Azure · View Samples · API Reference · Report Issue
Made with ❤️ using Microsoft Azure & Agent Framework