The SourceFlow.Cloud.AWS extension provides distributed command and event processing using AWS cloud services. This document describes the architecture, implementation patterns, and design decisions for AWS cloud integration.
Target Audience: Developers implementing AWS cloud integration for distributed SourceFlow applications.
- AWS Services Integration
- Bus Configuration System
- Command Routing Architecture
- Event Routing Architecture
- Idempotency Service Architecture
- Bootstrapper Resource Provisioning
- Message Serialization
- Security and Encryption
- Observability and Monitoring
- Performance Optimizations
SourceFlow.Cloud.AWS integrates with three primary AWS services:
Purpose: Command dispatching and queuing
Features Used:
- Standard queues for high-throughput, at-least-once delivery
- FIFO queues for ordered, exactly-once processing per entity
- Dead letter queues for failed message handling
- Long polling for efficient message retrieval
Use Cases:
- Distributing commands across multiple application instances
- Ensuring ordered command processing per entity (FIFO)
- Decoupling command producers from consumers
Purpose: Event publishing and fan-out messaging
Features Used:
- Topics for publish-subscribe patterns
- SQS subscriptions for reliable event delivery
- Message filtering (future enhancement)
- Fan-out to multiple subscribers
Use Cases:
- Broadcasting events to multiple consumers
- Cross-service event notifications
- Decoupling event producers from consumers
Purpose: Message encryption for sensitive data
Features Used:
- Symmetric encryption keys
- Automatic key rotation
- IAM-based access control
- Envelope encryption pattern
Use Cases:
- Encrypting sensitive command/event payloads
- Protecting PII and confidential business data
- Compliance with data protection regulations
The Bus Configuration System provides a fluent API for configuring AWS message routing without hardcoding queue URLs or topic ARNs.
User Configuration (Short Names)
↓
BusConfiguration (Type-Safe Routing)
↓
AwsBusBootstrapper (Name Resolution)
↓
AWS Resources (Full URLs/ARNs)
services.UseSourceFlowAws(
options => { options.Region = RegionEndpoint.USEast1; },
bus => bus
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Listen.To
.CommandQueue("orders.fifo")
.Subscribe.To
.Topic("order-events"));Purpose: Store type-safe routing configuration
Structure:
public class BusConfiguration
{
// Command Type → Queue Name mapping
Dictionary<Type, string> CommandRoutes { get; }
// Event Type → Topic Name mapping
Dictionary<Type, string> EventRoutes { get; }
// Queue names to listen for commands
List<string> CommandQueues { get; }
// Topic names to subscribe for events
List<string> EventTopics { get; }
}Purpose: Fluent API for building configuration
Sections:
Send: Configure command routingRaise: Configure event routingListen.To: Configure command queue listenersSubscribe.To: Configure event topic subscriptions
Command Published
↓
CommandBus (assigns sequence number)
↓
AwsSqsCommandDispatcher (checks routing)
↓
SQS Queue (message persisted)
↓
AwsSqsCommandListener (polls queue)
↓
CommandBus.Publish (local processing)
↓
Saga Handles Command
Purpose: Route commands to SQS queues based on configuration
Key Responsibilities:
- Check if command type is configured for AWS routing
- Serialize command to JSON
- Set message attributes (CommandType, EntityId, SequenceNo)
- Send to configured SQS queue
- Handle FIFO queue requirements (MessageGroupId, MessageDeduplicationId)
FIFO Queue Handling:
// For queues ending with .fifo
MessageGroupId = command.Entity.Id.ToString(); // Ensures ordering per entity
MessageDeduplicationId = GenerateDeduplicationId(command); // Content-basedPurpose: Poll SQS queues and process commands locally
Key Responsibilities:
- Long-poll configured SQS queues
- Deserialize messages to commands
- Check idempotency (prevent duplicate processing)
- Publish to local CommandBus
- Delete message from queue after successful processing
- Handle errors and dead letter queue routing
Concurrency:
- Configurable
MaxConcurrentCallsfor parallel processing - Each message processed in separate scope for isolation
Event Published
↓
EventQueue (enqueues event)
↓
AwsSnsEventDispatcher (checks routing)
↓
SNS Topic (message published)
↓
SQS Queue (subscribed to topic)
↓
AwsSqsCommandListener (polls queue)
↓
EventQueue.Enqueue (local processing)
↓
Aggregates/Views Handle Event
Purpose: Publish events to SNS topics based on configuration
Key Responsibilities:
- Check if event type is configured for AWS routing
- Serialize event to JSON
- Set message attributes (EventType, EntityId, SequenceNo)
- Publish to configured SNS topic
Architecture:
SNS Topic (order-events)
↓
SQS Subscription (fwd-to-orders)
↓
SQS Queue (orders.fifo)
↓
AwsSqsCommandListener
Benefits:
- Reliable delivery (SQS persistence)
- Ordered processing (FIFO queues)
- Dead letter queue support
- Decoupling of publishers and subscribers
Prevent duplicate message processing in distributed systems where at-least-once delivery guarantees can result in duplicate messages.
Implementation: InMemoryIdempotencyService
Structure:
ConcurrentDictionary<string, DateTime> processedMessagesUse Case: Single-instance deployments or local development
Limitations: Not shared across instances
Implementation: EfIdempotencyService
Database Table:
CREATE TABLE IdempotencyRecords (
IdempotencyKey NVARCHAR(500) PRIMARY KEY,
ProcessedAt DATETIME2 NOT NULL,
ExpiresAt DATETIME2 NOT NULL,
MessageType NVARCHAR(500) NULL,
CloudProvider NVARCHAR(50) NULL
);
CREATE INDEX IX_IdempotencyRecords_ExpiresAt
ON IdempotencyRecords(ExpiresAt);Use Case: Multi-instance deployments requiring shared state
Features:
- Distributed duplicate detection
- Automatic cleanup of expired records
- Configurable TTL per message
Format: {CloudProvider}:{MessageType}:{MessageId}
Example: AWS:CreateOrderCommand:abc123-def456
// In AwsSqsCommandListener
var idempotencyKey = GenerateIdempotencyKey(message);
if (await idempotencyService.HasProcessedAsync(idempotencyKey))
{
// Duplicate detected - skip processing
await DeleteMessage(message);
return;
}
// Process message
await commandBus.Publish(command);
// Mark as processed
await idempotencyService.MarkAsProcessedAsync(idempotencyKey, ttl);Purpose: Automatically provision AWS resources at application startup
Lifecycle: Runs as IHostedService before listeners start
var identity = await stsClient.GetCallerIdentityAsync();
var accountId = identity.Account;// Short name: "orders.fifo"
// Resolved URL: "https://sqs.us-east-1.amazonaws.com/123456789012/orders.fifo"
var queueUrl = $"https://sqs.{region}.amazonaws.com/{accountId}/{queueName}";// Short name: "order-events"
// Resolved ARN: "arn:aws:sns:us-east-1:123456789012:order-events"
var topicArn = $"arn:aws:sns:{region}:{accountId}:{topicName}";SQS Queues:
// Standard queue
await sqsClient.CreateQueueAsync(new CreateQueueRequest
{
QueueName = "notifications",
Attributes = new Dictionary<string, string>
{
{ "MessageRetentionPeriod", "1209600" }, // 14 days
{ "VisibilityTimeout", "30" }
}
});
// FIFO queue (detected by .fifo suffix)
await sqsClient.CreateQueueAsync(new CreateQueueRequest
{
QueueName = "orders.fifo",
Attributes = new Dictionary<string, string>
{
{ "FifoQueue", "true" },
{ "ContentBasedDeduplication", "true" },
{ "MessageRetentionPeriod", "1209600" },
{ "VisibilityTimeout", "30" }
}
});SNS Topics:
await snsClient.CreateTopicAsync(new CreateTopicRequest
{
Name = "order-events",
Attributes = new Dictionary<string, string>
{
{ "DisplayName", "Order Events Topic" }
}
});SNS Subscriptions:
// Subscribe queue to topic
await snsClient.SubscribeAsync(new SubscribeRequest
{
TopicArn = "arn:aws:sns:us-east-1:123456789012:order-events",
Protocol = "sqs",
Endpoint = "arn:aws:sqs:us-east-1:123456789012:orders.fifo",
Attributes = new Dictionary<string, string>
{
{ "RawMessageDelivery", "true" }
}
});All resource creation operations are idempotent:
- Creating existing queue returns existing queue URL
- Creating existing topic returns existing topic ARN
- Subscribing existing subscription is a no-op
Purpose: Serialize/deserialize commands and events for AWS messaging
Command Serialization:
{
"Entity": {
"Id": 123
},
"Payload": {
"CustomerId": 456,
"OrderDate": "2026-03-04T10:00:00Z"
},
"Metadata": {
"SequenceNo": 1,
"Timestamp": "2026-03-04T10:00:00Z",
"CorrelationId": "abc123"
}
}Message Attributes:
CommandType: Full assembly-qualified type nameEntityId: Entity reference for FIFO orderingSequenceNo: Event sourcing sequence number
Purpose: Handle polymorphic command payloads
Strategy: Serialize payload separately with type information
Purpose: Serialize EntityRef objects
Strategy: Simple ID-based serialization
Purpose: Serialize command/event metadata
Strategy: Dictionary-based serialization with type preservation
Purpose: Encrypt sensitive message content using AWS KMS
Plaintext Message
↓
Generate Data Key (KMS)
↓
Encrypt Message (Data Key)
↓
Encrypt Data Key (KMS Master Key)
↓
Store: Encrypted Message + Encrypted Data Key
Retrieve: Encrypted Message + Encrypted Data Key
↓
Decrypt Data Key (KMS Master Key)
↓
Decrypt Message (Data Key)
↓
Plaintext Message
services.UseSourceFlowAws(
options =>
{
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key";
},
bus => ...);Encryption applies to:
- Command payloads
- Event payloads
- Message metadata (optional)
Key Management:
- Use KMS key aliases for easier rotation
- Enable automatic key rotation in KMS
- Use separate keys per environment
Minimum Required for Bootstrapper and Runtime:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSQueueManagement",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue"
],
"Resource": "arn:aws:sqs:*:*:*"
},
{
"Sid": "SQSMessageOperations",
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:*:*:*"
},
{
"Sid": "SNSTopicManagement",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource"
],
"Resource": "arn:aws:sns:*:*:*"
},
{
"Sid": "SNSPublishAndSubscribe",
"Effect": "Allow",
"Action": [
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:*"
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
},
{
"Sid": "KMSEncryption",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:*:*:key/*"
}
]
}Production Best Practice - Restrict to Specific Resources:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "SQSSpecificQueues",
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:orders.fifo",
"arn:aws:sqs:us-east-1:123456789012:payments.fifo",
"arn:aws:sqs:us-east-1:123456789012:inventory.fifo"
]
},
{
"Sid": "SNSSpecificTopics",
"Effect": "Allow",
"Action": [
"sns:CreateTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:TagResource",
"sns:Subscribe",
"sns:Unsubscribe",
"sns:Publish"
],
"Resource": [
"arn:aws:sns:us-east-1:123456789012:order-events",
"arn:aws:sns:us-east-1:123456789012:payment-events"
]
},
{
"Sid": "STSGetCallerIdentity",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
},
{
"Sid": "KMSSpecificKey",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:Encrypt",
"kms:GenerateDataKey",
"kms:DescribeKey"
],
"Resource": "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
}
]
}Purpose: AWS-specific metrics and tracing
Command Dispatching:
sourceflow.aws.command.dispatched- Commands sent to SQSsourceflow.aws.command.dispatch_duration- Dispatch latencysourceflow.aws.command.dispatch_error- Dispatch failures
Event Publishing:
sourceflow.aws.event.published- Events published to SNSsourceflow.aws.event.publish_duration- Publish latencysourceflow.aws.event.publish_error- Publish failures
Message Processing:
sourceflow.aws.message.received- Messages received from SQSsourceflow.aws.message.processed- Messages successfully processedsourceflow.aws.message.processing_duration- Processing latencysourceflow.aws.message.processing_error- Processing failures
Activity Source: SourceFlow.Cloud.AWS
Spans Created:
AwsSqsCommandDispatcher.Dispatch- Command dispatch to SQSAwsSnsEventDispatcher.Dispatch- Event publish to SNSAwsSqsCommandListener.ProcessMessage- Message processing
Trace Context Propagation:
- Correlation IDs passed via message attributes
- Parent span context preserved across service boundaries
AwsHealthCheck:
- Validates SQS connectivity
- Validates SNS connectivity
- Validates KMS access (if encryption enabled)
- Checks queue/topic existence
SqsClientFactory:
- Singleton AWS SDK clients
- Connection pooling
- Regional optimization
SnsClientFactory:
- Singleton AWS SDK clients
- Connection pooling
- Regional optimization
SQS Batch Operations:
- Receive up to 10 messages per request
- Delete messages in batches
- Reduces API calls and improves throughput
Concurrent Message Handling:
// Configurable concurrency
options.MaxConcurrentCalls = 10;
// Each message processed in parallel
await Task.WhenAll(messages.Select(ProcessMessage));Long Polling:
// Wait up to 20 seconds for messages
WaitTimeSeconds = 20Benefits:
- Reduces empty responses
- Lowers API costs
- Improves latency
┌─────────────┐
│ Client │
└──────┬──────┘
│ Publish Command
▼
┌─────────────────┐
│ CommandBus │
└──────┬──────────┘
│ Dispatch
▼
┌──────────────────────┐
│ AwsSqsCommand │
│ Dispatcher │
└──────┬───────────────┘
│ SendMessage
▼
┌──────────────────────┐
│ SQS Queue │
│ (orders.fifo) │
└──────┬───────────────┘
│ ReceiveMessage
▼
┌──────────────────────┐
│ AwsSqsCommand │
│ Listener │
└──────┬───────────────┘
│ Publish (local)
▼
┌─────────────────┐
│ CommandBus │
└──────┬──────────┘
│ Dispatch
▼
┌─────────────────┐
│ Saga │
└─────────────────┘
┌─────────────┐
│ Saga │
└──────┬──────┘
│ PublishEvent
▼
┌─────────────────┐
│ EventQueue │
└──────┬──────────┘
│ Dispatch
▼
┌──────────────────────┐
│ AwsSnsEvent │
│ Dispatcher │
└──────┬───────────────┘
│ Publish
▼
┌──────────────────────┐
│ SNS Topic │
│ (order-events) │
└──────┬───────────────┘
│ Fan-out
▼
┌──────────────────────┐
│ SQS Queue │
│ (orders.fifo) │
└──────┬───────────────┘
│ ReceiveMessage
▼
┌──────────────────────┐
│ AwsSqsCommand │
│ Listener │
└──────┬───────────────┘
│ Enqueue (local)
▼
┌─────────────────┐
│ EventQueue │
└──────┬──────────┘
│ Dispatch
▼
┌─────────────────┐
│ Aggregate/View │
└─────────────────┘
The AWS Cloud Architecture provides:
✅ Distributed Command Processing - SQS-based command routing ✅ Event Fan-Out - SNS-based event publishing ✅ Message Encryption - KMS-based sensitive data protection ✅ Idempotency - Duplicate message detection ✅ Auto-Provisioning - Bootstrapper creates AWS resources ✅ Type-Safe Configuration - Fluent API for routing ✅ Observability - Metrics, tracing, and health checks ✅ Performance - Connection pooling, batching, parallel processing
Key Design Principles:
- Zero core modifications required
- Plugin architecture via ICommandDispatcher/IEventDispatcher
- Configuration over convention
- Fail-fast with clear error messages
- Production-ready with comprehensive testing
- SourceFlow Core Architecture
- Cloud Core Consolidation
- AWS Cloud Extension Package
- Cloud Integration Testing
- Cloud Message Idempotency Guide
Document Version: 1.0
Last Updated: 2026-03-04
Status: Complete