Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions terraphim_server/src/workflows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub struct WorkflowStatus {
pub result: Option<serde_json::Value>,
/// Error description; absent unless the workflow failed.
pub error: Option<String>,
/// Ordered list of steps recorded during execution.
pub steps: Vec<WorkflowStep>,
}

/// Lifecycle phases of a workflow run.
Expand All @@ -154,6 +156,35 @@ pub enum ExecutionStatus {
Cancelled,
}

/// Terminal state of a single workflow step.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum StepStatus {
/// Step is currently executing.
Running,
/// Step finished without error.
Completed,
/// Step aborted due to an error.
Failed,
}

/// A recorded execution step within a workflow run.
#[derive(Debug, Clone, Serialize)]
pub struct WorkflowStep {
/// Step identifier, unique within a workflow run.
pub id: String,
/// Human-readable step name.
pub name: String,
/// Terminal lifecycle state of this step.
pub status: StepStatus,
/// UTC timestamp when this step started.
pub started_at: chrono::DateTime<chrono::Utc>,
/// UTC timestamp when this step finished; absent while the step is running.
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
/// Output produced by this step; absent when not available.
pub output: Option<String>,
}

/// Event broadcast over WebSocket to connected clients.
#[derive(Debug, Clone, Serialize)]
pub struct WebSocketMessage {
Expand Down Expand Up @@ -225,11 +256,10 @@ async fn get_execution_trace(
let sessions = state.workflow_sessions.read().await;

if let Some(status) = sessions.get(&id) {
// Return detailed execution trace
let trace = serde_json::json!({
"workflow_id": id,
"status": status.status,
"steps": [], // TODO: Implement detailed step tracking
"steps": status.steps,
"timeline": {
"started_at": status.started_at,
"completed_at": status.completed_at
Expand All @@ -248,6 +278,107 @@ async fn get_execution_trace(
}
}

/// Appends a completed step record to an existing workflow run.
pub async fn record_workflow_step(
sessions: &WorkflowSessions,
workflow_id: &str,
step: WorkflowStep,
) {
let mut sessions = sessions.write().await;
if let Some(workflow) = sessions.get_mut(workflow_id) {
workflow.steps.push(step);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn trace_includes_recorded_steps() {
let sessions = RwLock::new(HashMap::new());
let (broadcaster, _rx) = broadcast::channel(16);
let wf_id = "wf_test_123".to_string();

create_workflow_session(
&sessions,
&broadcaster,
wf_id.clone(),
"test_pattern".to_string(),
)
.await;

let step = WorkflowStep {
id: "step_1".to_string(),
name: "Parse Input".to_string(),
status: StepStatus::Completed,
started_at: chrono::Utc::now(),
completed_at: Some(chrono::Utc::now()),
output: Some("parsed successfully".to_string()),
};
record_workflow_step(&sessions, &wf_id, step).await;

let guard = sessions.read().await;
let wf = guard.get(&wf_id).expect("workflow should exist");
assert_eq!(wf.steps.len(), 1);
assert_eq!(wf.steps[0].name, "Parse Input");
assert_eq!(wf.steps[0].id, "step_1");
assert!(matches!(wf.steps[0].status, StepStatus::Completed));
}

#[tokio::test]
async fn trace_starts_with_empty_steps() {
let sessions = RwLock::new(HashMap::new());
let (broadcaster, _rx) = broadcast::channel(16);
let wf_id = "wf_empty_456".to_string();

create_workflow_session(
&sessions,
&broadcaster,
wf_id.clone(),
"test_pattern".to_string(),
)
.await;

let guard = sessions.read().await;
let wf = guard.get(&wf_id).expect("workflow should exist");
assert!(wf.steps.is_empty());
}

#[tokio::test]
async fn record_multiple_steps_preserves_order() {
let sessions = RwLock::new(HashMap::new());
let (broadcaster, _rx) = broadcast::channel(16);
let wf_id = "wf_multi_789".to_string();

create_workflow_session(
&sessions,
&broadcaster,
wf_id.clone(),
"test_pattern".to_string(),
)
.await;

for i in 1..=3u32 {
let step = WorkflowStep {
id: format!("step_{i}"),
name: format!("Step {i}"),
status: StepStatus::Completed,
started_at: chrono::Utc::now(),
completed_at: Some(chrono::Utc::now()),
output: None,
};
record_workflow_step(&sessions, &wf_id, step).await;
}

let guard = sessions.read().await;
let wf = guard.get(&wf_id).expect("workflow should exist");
assert_eq!(wf.steps.len(), 3);
assert_eq!(wf.steps[0].name, "Step 1");
assert_eq!(wf.steps[2].name, "Step 3");
}
}

async fn list_workflows(State(state): State<AppState>) -> Json<Vec<WorkflowStatus>> {
let sessions = state.workflow_sessions.read().await;
let workflows: Vec<WorkflowStatus> = sessions.values().cloned().collect();
Expand Down Expand Up @@ -312,6 +443,7 @@ pub async fn create_workflow_session(
completed_at: None,
result: None,
error: None,
steps: vec![],
};

sessions.write().await.insert(workflow_id.clone(), status);
Expand Down
1 change: 1 addition & 0 deletions terraphim_server/tests/simple_workflow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn test_workflow_system_basic() {
completed_at: None,
result: None,
error: None,
steps: vec![],
},
);
}
Expand Down