Skip to content

Commit 76887be

Browse files
committed
feat: implemented send and recieve queue
1 parent df2b28a commit 76887be

1 file changed

Lines changed: 184 additions & 0 deletions

File tree

src/main.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
use futures_lite::stream::StreamExt;
2+
use lapin::{
3+
options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
4+
types::FieldTable,
5+
Channel, Connection,
6+
};
7+
use serde::{Deserialize, Serialize};
8+
use std::sync::Arc;
9+
use tokio::sync::Mutex;
10+
11+
#[derive(Serialize, Deserialize)]
12+
enum MessageType {
13+
ExecuteFlow,
14+
TestExecuteFlow,
15+
}
16+
17+
#[derive(Serialize, Deserialize)]
18+
struct Sender {
19+
name: String,
20+
protocol: String,
21+
version: String,
22+
}
23+
24+
#[derive(Serialize, Deserialize)]
25+
struct Message {
26+
message_type: MessageType,
27+
sender: Sender,
28+
timestamp: i64,
29+
telegram_id: String,
30+
body: String,
31+
}
32+
33+
async fn build_connection(rabbitmq_url: &str) -> Connection {
34+
match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
35+
Ok(env) => env,
36+
Err(error) => panic!(
37+
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
38+
error
39+
),
40+
}
41+
}
42+
43+
// Thread-safe wrapper for RabbitMQ channel
44+
struct RabbitmqClient {
45+
channel: Arc<Mutex<Channel>>,
46+
}
47+
48+
impl RabbitmqClient {
49+
// Create a new RabbitMQ client with channel
50+
async fn new(rabbitmq_url: &str) -> Self {
51+
let connection = build_connection(rabbitmq_url).await;
52+
let channel = connection.create_channel().await.unwrap();
53+
54+
// Declare the queue once during initialization
55+
channel
56+
.queue_declare(
57+
"send_queue",
58+
QueueDeclareOptions::default(),
59+
FieldTable::default(),
60+
)
61+
.await
62+
.unwrap();
63+
64+
channel
65+
.queue_declare(
66+
"recieve_queue",
67+
QueueDeclareOptions::default(),
68+
FieldTable::default(),
69+
)
70+
.await
71+
.unwrap();
72+
73+
RabbitmqClient {
74+
channel: Arc::new(Mutex::new(channel)),
75+
}
76+
}
77+
78+
// Send message to the queue
79+
async fn send_message(&self, message_json: String, queue_name: &str) {
80+
let channel = self.channel.lock().await;
81+
82+
channel
83+
.basic_publish(
84+
"", // exchange
85+
queue_name, // routing key (queue name)
86+
lapin::options::BasicPublishOptions::default(),
87+
message_json.as_bytes(),
88+
lapin::BasicProperties::default(),
89+
)
90+
.await
91+
.expect("TEST");
92+
}
93+
94+
// Receive messages from a queue
95+
async fn receive_messages(&self, queue_name: &str) -> Result<(), lapin::Error> {
96+
let mut consumer = {
97+
let channel = self.channel.lock().await;
98+
99+
let consumer_res = channel
100+
.basic_consume(
101+
queue_name,
102+
"consumer",
103+
BasicConsumeOptions::default(),
104+
FieldTable::default(),
105+
)
106+
.await;
107+
108+
match consumer_res {
109+
Ok(consumer) => consumer,
110+
Err(err) => panic!("{}", err),
111+
}
112+
};
113+
114+
println!("Starting to consume from {}", queue_name);
115+
116+
while let Some(delivery) = consumer.next().await {
117+
let delivery = match delivery {
118+
Ok(del) => del,
119+
Err(err) => {
120+
println!("Error receiving message: {}", err);
121+
return Err(err);
122+
}
123+
};
124+
125+
let data = &delivery.data;
126+
let message_str = match std::str::from_utf8(&data) {
127+
Ok(str) => {
128+
println!("Received message: {}", str);
129+
str
130+
}
131+
Err(err) => {
132+
println!("Error decoding message: {}", err);
133+
return Ok(());
134+
}
135+
};
136+
// Parse the message
137+
let message = match serde_json::from_str::<Message>(message_str) {
138+
Ok(mess) => {
139+
println!("Parsed message with telegram_id: {}", mess.telegram_id);
140+
mess
141+
}
142+
Err(err) => {
143+
println!("Error parsing message: {}", err);
144+
return Ok(());
145+
}
146+
};
147+
148+
// Process the message here
149+
let hello_world = Message {
150+
telegram_id: message.telegram_id,
151+
message_type: message.message_type,
152+
timestamp: message.timestamp,
153+
sender: message.sender,
154+
body: "{ \"text\": \"Hello, World!\" }".to_string(),
155+
};
156+
157+
let hello_world_json = serde_json::to_string(&hello_world).unwrap();
158+
159+
println!("{}", hello_world_json);
160+
161+
{
162+
self.send_message(hello_world_json, "recieve_queue").await;
163+
}
164+
165+
// Acknowledge the message
166+
delivery
167+
.ack(BasicAckOptions::default())
168+
.await
169+
.expect("Failed to acknowledge message");
170+
}
171+
172+
Ok(())
173+
}
174+
}
175+
176+
#[tokio::main]
177+
async fn main() {
178+
let rabbitmq_client = Arc::new(RabbitmqClient::new("amqp://localhost:5672").await);
179+
180+
// Receive messages from the send_queue
181+
if let Err(e) = rabbitmq_client.receive_messages("send_queue").await {
182+
eprintln!("Failed to receive messages: {}", e);
183+
}
184+
}

0 commit comments

Comments
 (0)