Skip to content

Commit 72ad08f

Browse files
committed
FEAT: WebTransport datagrams
1 parent b4a1eaf commit 72ad08f

7 files changed

Lines changed: 117 additions & 41 deletions

File tree

src/datas/docker.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use bollard::secret::ContainerSummary;
2+
use serde_json::json;
3+
4+
use super::EventDTO;
5+
6+
impl EventDTO for ContainerSummary {
7+
fn to_json(&self) -> String {
8+
json!({
9+
"id": self.id,
10+
"names": self.names,
11+
"image": self.image,
12+
"command": self.command,
13+
"created": self.created,
14+
"state": self.state,
15+
"status": self.status,
16+
"ports": self.ports,
17+
"labels": self.labels,
18+
}).to_string()
19+
}
20+
}
21+
22+
impl EventDTO for Vec<ContainerSummary> {
23+
fn to_json(&self) -> String {
24+
json!(self.iter().map(|f| f.to_json()).collect::<Vec<String>>()).to_string()
25+
}
26+
}

src/datas/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod docker;
2+
3+
pub trait EventDTO {
4+
fn to_json(&self) -> String;
5+
}

src/events/docker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
#[derive(Serialize, Deserialize, Debug)]
4+
#[serde(tag = "type")]
5+
pub enum DockerEvent {
6+
DockerStatus,
7+
DockerContainersRestart { data: DockerContainersRestartData }
8+
}
9+
10+
#[derive(Serialize, Deserialize, Debug)]
11+
pub struct DockerContainersRestartData {
12+
#[serde(rename = "containerId")]
13+
pub container_id: String
14+
}

src/events/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use serde::{Deserialize, Serialize};
2+
use system::SystemEvent;
3+
use docker::DockerEvent;
4+
5+
pub mod system;
6+
pub mod docker;
7+
8+
#[derive(Serialize, Deserialize, Debug)]
9+
#[serde(untagged)]
10+
pub enum Event {
11+
System(SystemEvent),
12+
Docker(DockerEvent)
13+
}

src/events/system.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use serde::{Deserialize, Serialize};
2+
3+
#[derive(Serialize, Deserialize, Debug)]
4+
#[serde(tag = "type")]
5+
pub enum SystemEvent {
6+
SystemStatus
7+
}

src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
mod events;
2+
mod datas;
13
mod errors;
24
mod webtransport;
35
mod services;

src/webtransport/mod.rs

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
use std::{error::Error, sync::Arc};
1+
use std::error::Error;
22
use axum::Json;
3-
use tokio::sync::{broadcast, Mutex};
3+
use tokio::sync::broadcast;
44
use wtransport::{endpoint::IncomingSession, Connection, Endpoint, Identity, ServerConfig};
5+
use crate::datas::EventDTO;
6+
use crate::events::docker::DockerEvent;
7+
use crate::events::system::SystemEvent;
58
use crate::services::docker;
9+
use crate::events::Event;
610

711
pub async fn start_webtransport() -> Result<(), Box<dyn Error + Send + Sync>> {
812
let identity = match Identity::load_pemfiles("localhost.pem", "localhost-key.pem").await {
@@ -62,17 +66,18 @@ async fn handle_connection(incoming_session: IncomingSession, tx: broadcast::Sen
6266
}
6367
};
6468

65-
log::info!("Accepted connection from {:?}", connection.remote_address());
66-
67-
let datagram_handle = tokio::spawn(handle_datagram(connection.clone()));
69+
let datagram_handle = tokio::spawn(handle_datagram(connection.clone(), tx.clone()));
6870

6971
let bidirectional_handle = tokio::spawn(handle_bidirectionnal(connection, tx));
7072

7173
let _ = tokio::join!(datagram_handle, bidirectional_handle);
7274
Ok(())
7375
}
7476

75-
async fn handle_datagram(connection: Connection) -> Result<(), Box<dyn Error + Send + Sync>> {
77+
async fn handle_datagram(connection: Connection, tx: broadcast::Sender<String>) -> Result<(), Box<dyn Error + Send + Sync>> {
78+
log::info!("Accepted datagram connection from {:?}", connection.remote_address());
79+
let mut rx = tx.subscribe();
80+
7681
loop {
7782
let datagram = match connection.receive_datagram().await {
7883
Ok(datagram) => datagram,
@@ -82,59 +87,63 @@ async fn handle_datagram(connection: Connection) -> Result<(), Box<dyn Error + S
8287
}
8388
};
8489
let received_message = String::from_utf8_lossy(&datagram);
85-
log::info!("Received message: {:?}", received_message);
90+
log::info!("Received datagram message: {:?}", received_message);
8691

87-
let response = b"Hello from server via datagram!";
88-
match connection.send_datagram(response) {
89-
Ok(_) => {},
92+
let event: Event = match serde_json::from_str(&received_message) {
93+
Ok(event) => event,
9094
Err(e) => {
91-
log::error!("Failed to send datagram: {:?}", e);
92-
return Err(Box::new(e));
95+
log::error!("Failed to parse event: {:?}", e);
96+
continue;
9397
}
9498
};
99+
100+
match &event {
101+
Event::Docker(docker_event) => {
102+
match docker_event {
103+
DockerEvent::DockerStatus => {
104+
let event = "DockerStatus".to_string();
105+
connection.send_datagram(event.as_bytes())?;
106+
},
107+
DockerEvent::DockerContainersRestart { data } => {
108+
let containers = docker::get_containers().await?;
109+
log::info!("Restarting container: {:?}", containers.to_json().as_bytes().len());
110+
if let Err(error) = connection.send_datagram(containers.to_json()) {
111+
log::error!("Failed to send event: {:?}", error);
112+
}
113+
}
114+
}
115+
},
116+
Event::System(system_event) => {
117+
match system_event {
118+
SystemEvent::SystemStatus => {
119+
let event = "SystemStatus".to_string();
120+
connection.send_datagram(event.as_bytes())?;
121+
}
122+
}
123+
},
124+
}
95125
}
96126
}
97127

98128
async fn handle_bidirectionnal(connection: Connection, tx: broadcast::Sender<String>) -> Result<(), Box<dyn Error + Send + Sync>> {
99-
log::info!("Accepted connection from {:?}", connection.remote_address());
129+
log::info!("Accepted bidirectional connection from {:?}", connection.remote_address());
100130

101-
while let Ok((send_stream, mut recv_stream)) = connection.accept_bi().await {
131+
while let Ok((mut send_stream, mut recv_stream)) = connection.accept_bi().await {
102132
log::trace!("Accepted bidirectional stream");
103133

104134
let mut rx = tx.subscribe();
105-
let send_stream = Arc::new(Mutex::new(send_stream));
106135

107-
{
108-
let send_stream = Arc::clone(&send_stream);
109-
tokio::spawn(async move {
110-
let mut buffer = vec![0; 1024];
111-
112-
while let Ok(Some(bytes_read)) = recv_stream.read(&mut buffer).await {
113-
let received_message = String::from_utf8_lossy(&buffer[..bytes_read]);
114-
log::info!("Received message: {:?}", received_message);
115-
116-
let mut send_stream = send_stream.lock().await;
117-
118-
let _ = send_stream.write_all(b"Hello from server!").await;
119-
}
120-
});
121-
}
122-
123-
{
124-
let send_stream = Arc::clone(&send_stream);
125-
tokio::spawn(async move {
126-
while let Ok(event) = rx.recv().await {
127-
let mut send_stream = send_stream.lock().await;
128-
let _ = send_stream.write_all(event.as_bytes()).await;
129-
}
130-
});
131-
}
136+
tokio::spawn(async move {
137+
while let Ok(event) = rx.recv().await {
138+
let _ = send_stream.write_all(event.as_bytes()).await;
139+
}
140+
});
132141
}
133142

134143
Ok(())
135144
}
136145

137-
pub fn handle_event(event: String, data: Json<String>) {
146+
fn handle_event(event: String, data: Json<String>) {
138147
log::info!("Received event: {:?}", event);
139148
log::info!("Received data: {:?}", data);
140149

0 commit comments

Comments
 (0)