Skip to content

Commit f03eb33

Browse files
committed
FIX: Message format
1 parent 2494445 commit f03eb33

8 files changed

Lines changed: 54 additions & 51 deletions

File tree

src/datas/mod.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
use serde_json::{json, Value};
2+
use tokio::sync::broadcast;
23

34
use crate::events::Event;
45

56
pub mod docker;
67

7-
pub fn create_event_dto(event: Event, data: Value) -> String {
8-
let mut event_json = serde_json::to_value(event).unwrap_or_else(|_| json!({}));
9-
if let Value::Object(ref mut map) = event_json {
10-
map.insert("data".to_string(), data);
11-
}
12-
event_json.to_string()
8+
pub fn create_event_dto(event: Event) -> String {
9+
serde_json::to_string(&event).unwrap_or_else(|_| "".to_string())
1310
}
1411

1512
pub trait EventDTO {
@@ -20,4 +17,24 @@ impl EventDTO for i8 {
2017
fn to_json(&self) -> Value {
2118
json!(*self)
2219
}
20+
}
21+
22+
pub trait SendEvent {
23+
async fn send_event(&mut self, event: Event);
24+
}
25+
26+
impl SendEvent for wtransport::SendStream {
27+
async fn send_event(&mut self, event: Event) {
28+
if let Err(error) = self.write_all(create_event_dto(event).as_bytes()).await {
29+
log::error!("Failed to send event: {:?}", error);
30+
}
31+
}
32+
}
33+
34+
impl SendEvent for broadcast::Sender<String> {
35+
async fn send_event(&mut self, event: Event) {
36+
if let Err(error) = self.send(create_event_dto(event)) {
37+
log::error!("Failed to send event: {:?}", error);
38+
}
39+
}
2340
}

src/errors/error_status.rs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/errors/mod.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/events/docker.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@ use serde::{Deserialize, Serialize};
33
#[derive(Serialize, Deserialize, Debug)]
44
#[serde(tag = "type")]
55
pub enum DockerEvent {
6-
DockerStatus,
6+
DockerStatus { data: DockerStatusData },
77
DockerContainersRestart { data: DockerContainersRestartData }
88
}
99

10+
#[derive(Serialize, Deserialize, Debug)]
11+
pub struct DockerStatusData {
12+
pub status: Option<i8>
13+
}
14+
1015
#[derive(Serialize, Deserialize, Debug)]
1116
pub struct DockerContainersRestartData {
1217
#[serde(rename = "containerId")]
13-
pub container_id: String
18+
pub container_id: Option<String>
1419
}

src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod events;
22
mod datas;
3-
mod errors;
43
mod webtransport;
54
mod services;
65

src/services/docker.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,25 @@ use futures::StreamExt;
55
use serde_json::json;
66
use tokio::sync::broadcast;
77

8+
use crate::{datas::SendEvent, events::{docker::{DockerEvent, DockerStatusData}, Event}};
9+
810
const INTERVAL: Duration = Duration::from_secs(10);
911

1012
pub fn get_docker_client() -> Result<Docker, Error> {
1113
Docker::connect_with_socket_defaults()
1214
}
1315

14-
pub async fn listen_docker_events(tx: broadcast::Sender<String>) {
16+
pub async fn listen_docker_events(mut tx: broadcast::Sender<String>) {
1517
let docker = loop {
1618
match get_docker_client() {
1719
Ok(client) => break client,
1820
Err(error) => {
1921
log::error!("Failed to connect to Docker, retrying in {:?}: {:?}", INTERVAL, error);
22+
tx.send_event(Event::Docker(DockerEvent::DockerStatus {
23+
data: DockerStatusData {
24+
status: Some(0)
25+
}
26+
})).await;
2027
sleep(INTERVAL);
2128
}
2229
}
@@ -29,7 +36,6 @@ pub async fn listen_docker_events(tx: broadcast::Sender<String>) {
2936
match event {
3037
Ok(event) => {
3138
let event_str = json!(event).to_string();
32-
log::info!("Docker event: [{:?}:{:?}]", event.action, event.typ);
3339

3440
match tx.send(event_str){
3541
Ok(_) => {},

src/webtransport/docker.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1-
use crate::{datas::{create_event_dto, EventDTO}, events::{docker::DockerEvent, Event}, services::docker};
1+
use crate::{datas::SendEvent, events::{docker::{DockerContainersRestartData, DockerEvent, DockerStatusData}, Event}, services::docker};
22

33
pub async fn handle_message(send_stream: &mut wtransport::SendStream, event: &DockerEvent) {
44
match event {
5-
DockerEvent::DockerStatus => {
6-
if let Err(error) = send_stream.write_all(create_event_dto(Event::Docker(DockerEvent::DockerStatus), docker::ping().await.to_json()).as_bytes()).await {
7-
log::error!("Failed to send event: {:?}", error);
8-
};
5+
DockerEvent::DockerStatus { .. } => {
6+
send_stream.send_event(Event::Docker(DockerEvent::DockerStatus {
7+
data: DockerStatusData {
8+
status: Some(docker::ping().await)
9+
}
10+
})).await;
11+
log::info!("{:?}", Event::Docker(DockerEvent::DockerContainersRestart {
12+
data: DockerContainersRestartData {
13+
container_id: Some("container_id".to_string()),
14+
}
15+
}));
916
},
1017
DockerEvent::DockerContainersRestart { data } => {
1118
log::info!("Restarting containers: {:?}", data);

src/webtransport/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use std::sync::Arc;
33
use tokio::sync::{broadcast, Mutex};
44
use wtransport::{endpoint::IncomingSession, Connection, Endpoint, Identity, ServerConfig};
55
use crate::services;
6-
use crate::events::{Event, system::SystemEvent};
6+
use crate::events::Event;
77

8-
mod system;
9-
mod docker;
8+
pub mod system;
9+
pub mod docker;
1010

1111
pub async fn start_webtransport() -> Result<(), Box<dyn Error + Send + Sync>> {
1212
let identity = match Identity::load_pemfiles("localhost.pem", "localhost-key.pem").await {
@@ -30,10 +30,9 @@ pub async fn start_webtransport() -> Result<(), Box<dyn Error + Send + Sync>> {
3030
}
3131
};
3232

33-
log::info!("Listening on port 4433");
34-
3533
let (tx, _rx) = broadcast::channel::<String>(100);
3634

35+
//TODO handle errors
3736
tokio::spawn(services::docker::listen_docker_events(tx.clone()));
3837

3938
loop {

0 commit comments

Comments
 (0)