Skip to content

Commit 14d5f78

Browse files
committed
feat: first node handling functionality
1 parent 130230f commit 14d5f78

4 files changed

Lines changed: 216 additions & 12 deletions

File tree

src/context/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ pub struct ContextReference {
1212
pub tertiary_level: Option<i32>,
1313
}
1414

15+
pub struct ContextLayer {
16+
is_finished: bool,
17+
result: Option<Result<Value, RuntimeError>>,
18+
}
19+
1520
#[derive(Debug)]
1621
pub struct Context {
1722
/// A stack of environments: layer 0 is the outermost.

src/error/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
fmt::{Display, Formatter},
44
};
55

6-
#[derive(Debug)]
6+
#[derive(Debug, Default)]
77
pub struct RuntimeError {}
88

99
impl Error for RuntimeError {}

src/main.rs

Lines changed: 209 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,139 @@
1-
pub mod locale;
21
pub mod context;
32
pub mod error;
3+
pub mod implementation;
4+
pub mod locale;
45
pub mod registry;
56

7+
use std::sync::Arc;
8+
69
use code0_flow::flow_queue::service::{Message, RabbitmqClient};
10+
use context::Context;
11+
use error::RuntimeError;
12+
use futures_lite::StreamExt;
13+
use lapin::{options::BasicConsumeOptions, types::FieldTable};
714
use locale::locale::Locale;
8-
use std::sync::Arc;
15+
use registry::FunctionStore;
16+
use tucana::shared::{Flow, NodeFunction, Value};
17+
18+
fn handle_node_function(
19+
function: NodeFunction,
20+
store: &FunctionStore,
21+
context: &mut Context,
22+
) -> Result<Value, RuntimeError> {
23+
if let Some(definition) = function.definition {
24+
let runtime_function = match store.get(definition.runtime_function_id.as_str()) {
25+
Some(fc) => fc,
26+
None => todo!("Retrun if no funtion is present"),
27+
};
28+
29+
let mut parameter_collection: Vec<Value> = vec![];
30+
31+
for parameter in function.parameters {
32+
if let Some(value) = parameter.value {
33+
match value {
34+
// Its just a normal value, directly a paramter
35+
tucana::shared::node_parameter::Value::LiteralValue(v) => {
36+
parameter_collection.push(v)
37+
}
38+
39+
// Its a reference to an already executed function that returns value is the parameter of this function
40+
tucana::shared::node_parameter::Value::ReferenceValue(reference) => {
41+
let optional_value = context.get(&reference);
42+
43+
// Look if its even present
44+
let context_result = match optional_value {
45+
Some(context_result) => context_result,
46+
None => {
47+
todo!("Required function that holds the parameter wasnt executed")
48+
}
49+
};
50+
51+
match context_result {
52+
Ok(v) => {
53+
parameter_collection.push(v.clone());
54+
}
55+
Err(_) => {
56+
todo!(
57+
"Reqired function that holds the paramter failed in execution"
58+
)
59+
}
60+
}
61+
}
62+
63+
// Its another function, that result is a direct parameter to this function
64+
tucana::shared::node_parameter::Value::FunctionValue(another_node_function) => {
65+
let function_result =
66+
handle_node_function(another_node_function, &store, context);
67+
68+
match function_result {
69+
Ok(v) => {
70+
parameter_collection.push(v.clone());
71+
}
72+
Err(_) => {
73+
todo!(
74+
"Reqired function that holds the paramter failed in execution"
75+
)
76+
}
77+
}
78+
}
79+
}
80+
}
81+
}
82+
83+
let result = runtime_function(&parameter_collection, context);
84+
85+
if let Some(ref next_node) = function.next_node {
86+
let next: NodeFunction = (**next_node).clone();
87+
let _ = handle_node_function(next, store, context);
88+
todo!()
89+
};
90+
91+
return result;
92+
};
93+
94+
Err(RuntimeError::default())
95+
}
96+
97+
fn handle_message(
98+
message: Message,
99+
store: &FunctionStore,
100+
context: &mut Context,
101+
) -> Result<Message, lapin::Error> {
102+
let flow: Flow = match serde_json::from_str(&message.body) {
103+
Ok(flow) => flow,
104+
Err(_) => {
105+
todo!()
106+
}
107+
};
108+
109+
if let Some(node) = flow.starting_node {
110+
match handle_node_function(node, store, context) {
111+
Ok(result) => match serde_json::to_string(&result) {
112+
Ok(res) => {
113+
return Ok(Message {
114+
message_id: message.message_id,
115+
message_type: message.message_type,
116+
timestamp: message.timestamp,
117+
sender: message.sender,
118+
body: res,
119+
})
120+
}
121+
Err(_) => {
122+
todo!("")
123+
}
124+
},
125+
Err(runtime_error) => {
126+
return Ok(Message {
127+
message_id: message.message_id,
128+
message_type: message.message_type,
129+
timestamp: message.timestamp,
130+
sender: message.sender,
131+
body: runtime_error.to_string(),
132+
})
133+
}
134+
}
135+
};
9136

10-
fn handle_message(message: Message) -> Result<Message, lapin::Error> {
11137
Ok(Message {
12138
message_id: message.message_id,
13139
message_type: message.message_type,
@@ -19,14 +145,87 @@ fn handle_message(message: Message) -> Result<Message, lapin::Error> {
19145

20146
#[tokio::main]
21147
async fn main() {
22-
let locale = Locale::default();
148+
let _locale = Locale::default();
149+
let store = FunctionStore::new();
150+
let mut context = Context::new();
151+
23152
let rabbitmq_client = Arc::new(RabbitmqClient::new("amqp://localhost:5672").await);
24153

25-
// Receive messages from the send_queue
26-
if let Err(e) = rabbitmq_client
27-
.receive_messages("send_queue", handle_message)
28-
.await
29-
{
30-
eprintln!("Failed to receive messages: {}", e);
154+
let mut consumer = {
155+
let channel = rabbitmq_client.channel.lock().await;
156+
157+
let consumer_res = channel
158+
.basic_consume(
159+
"send_queue",
160+
"consumer",
161+
BasicConsumeOptions::default(),
162+
FieldTable::default(),
163+
)
164+
.await;
165+
166+
match consumer_res {
167+
Ok(consumer) => consumer,
168+
Err(err) => panic!("Cannot consume messages: {}", err),
169+
}
170+
};
171+
172+
log::debug!("Starting to consume from send_queue");
173+
174+
while let Some(delivery) = consumer.next().await {
175+
let delivery = match delivery {
176+
Ok(del) => del,
177+
Err(err) => {
178+
log::error!("Error receiving message: {}", err);
179+
return;
180+
}
181+
};
182+
183+
let data = &delivery.data;
184+
let message_str = match std::str::from_utf8(&data) {
185+
Ok(str) => {
186+
log::info!("Received message: {}", str);
187+
str
188+
}
189+
Err(err) => {
190+
log::error!("Error decoding message: {}", err);
191+
return;
192+
}
193+
};
194+
// Parse the messagey
195+
let inc_message = match serde_json::from_str::<Message>(message_str) {
196+
Ok(mess) => mess,
197+
Err(err) => {
198+
log::error!("Error parsing message: {}", err);
199+
return;
200+
}
201+
};
202+
203+
let message = match handle_message(inc_message, &store, &mut context) {
204+
Ok(mess) => mess,
205+
Err(err) => {
206+
log::error!("Error handling message: {}", err);
207+
return;
208+
}
209+
};
210+
211+
let message_json = match serde_json::to_string(&message) {
212+
Ok(json) => json,
213+
Err(err) => {
214+
log::error!("Error serializing message: {}", err);
215+
return;
216+
}
217+
};
218+
219+
{
220+
let _ = rabbitmq_client
221+
.send_message(message_json, "recieve_queue")
222+
.await;
223+
}
224+
225+
// Acknowledge the message
226+
delivery
227+
.ack(lapin::options::BasicAckOptions::default())
228+
.await
229+
.expect("Failed to acknowledge message");
31230
}
32231
}

src/registry/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl FunctionStore {
2323
}
2424

2525
/// Execute all the registration closures to populate the map.
26-
fn populate(&mut self, regs: Vec<(&'static str, HandlerFn)>) {
26+
pub fn populate(&mut self, regs: Vec<(&'static str, HandlerFn)>) {
2727
for (id, func) in regs {
2828
self.functions.insert(id.to_string(), func);
2929
}

0 commit comments

Comments
 (0)