Skip to content

Commit fad9b4b

Browse files
committed
Some fixes and optimizations
1 parent 310f819 commit fad9b4b

8 files changed

Lines changed: 73 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ async-trait = "0.1.83"
4242
tmpdir = "1.0.0"
4343
num_cpus = "1.16.0"
4444
tokio-stream = "0.1.16"
45+
bytes = "1.7.2"
4546

4647
[target.'cfg(target_os = "android")'.dependencies]
4748
jni = "0.21.1"

src/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use actix_web::{HttpResponse, ResponseError};
1+
use actix_web::{HttpResponse, ResponseError, error::BlockingError};
22
use anyhow::anyhow;
33
use anyhow::Error as AnyhowError;
44
use base64_url::base64;
@@ -66,5 +66,11 @@ impl From<ErrReport> for AppError {
6666
}
6767
}
6868

69+
impl From<BlockingError> for AppError {
70+
fn from(error: BlockingError) -> Self {
71+
AppError(AnyhowError::new(error).context("An error occurred during a blocking operation"))
72+
}
73+
}
74+
6975
// Shortcut for Result<T, AppError>
7076
pub type AppResult<T> = std::result::Result<T, AppError>;

src/groups.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub fn scope() -> actix_web::Scope {
2323
)
2424
}
2525

26+
// This doesn't seem to be the way to delete a group.
27+
//
2628
#[delete("")]
2729
async fn delete_group(group_id: web::Path<String>) -> AppResult<impl Responder> {
2830
let backend = get_backend().await?;

src/logging.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
#[cfg(target_os = "android")]
12
use std::ffi::CString;
3+
24
#[cfg(target_os = "android")]
35
use std::os::raw::{c_char, c_int};
46

src/media.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use crate::constants::TAG;
2-
use crate::error::AppResult;
3-
use crate::logging::android_log;
2+
use crate::error::{AppError, AppResult};
43
use crate::models::{GroupRepoMediaPath, GroupRepoPath};
54
use crate::server::server::get_backend;
65
use crate::utils::create_veilid_cryptokey_from_base64;
7-
use crate::{log_debug, log_info};
8-
use actix_web::web::Bytes;
9-
use actix_web::{delete, get, post, web, HttpResponse, Responder, Scope};
6+
use crate::log_info;
7+
use actix_web::{delete, get, post, web, HttpResponse, Responder, Scope, http::header, error::BlockingError};
8+
use bytes::{BytesMut, Bytes};
9+
use futures::Stream;
1010
use futures::StreamExt;
11-
use save_dweb_backend::common::DHTEntity;
1211
use serde_json::json;
12+
use std::io;
1313

1414
pub fn scope() -> Scope {
1515
web::scope("/media")
@@ -19,6 +19,27 @@ pub fn scope() -> Scope {
1919
.service(download_file)
2020
}
2121

22+
pub fn from_blocking<T>(result: Result<T, BlockingError>) -> AppResult<T> {
23+
result.map_err(AppError::from)
24+
}
25+
26+
async fn handle_file_stream(mut file_data: impl Stream<Item = Result<Bytes, io::Error>> + Unpin) -> AppResult<(usize, Bytes)> {
27+
let mut buffer = BytesMut::new();
28+
let mut length = 0;
29+
30+
while let Some(chunk_result) = file_data.next().await {
31+
let chunk = chunk_result.map_err(|e| AppError(anyhow::Error::new(e)))?;
32+
buffer.extend_from_slice(&chunk);
33+
length += chunk.len();
34+
}
35+
36+
let final_buffer = web::block(move || {
37+
buffer.freeze()
38+
}).await?;
39+
40+
Ok((length, final_buffer))
41+
}
42+
2243
#[get("")]
2344
async fn list_files(path: web::Path<GroupRepoPath>) -> AppResult<impl Responder> {
2445
let path_params = path.into_inner();
@@ -55,8 +76,7 @@ async fn list_files(path: web::Path<GroupRepoPath>) -> AppResult<impl Responder>
5576
"is_downloaded": is_downloaded
5677
}));
5778
}
58-
59-
Ok(HttpResponse::Ok().json(files_with_status))
79+
Ok(HttpResponse::Ok().json(json!({ "files": files_with_status })))
6080
}
6181

6282
#[get("/{file_name}")]
@@ -93,13 +113,14 @@ async fn download_file(path: web::Path<GroupRepoMediaPath>) -> AppResult<impl Re
93113
// Trigger file download from peers using the hash
94114
let file_data = repo
95115
.get_file_stream(file_name)
96-
.await
97-
.map_err(|e| anyhow::anyhow!("Failed to download file from peers: {}", e))?;
116+
.await?;
117+
118+
let (content_length, buffered_data) = handle_file_stream(file_data).await?;
98119

99-
// Return the file data as a binary response
100120
Ok(HttpResponse::Ok()
101121
.content_type("application/octet-stream")
102-
.streaming(file_data))
122+
.insert_header((header::CONTENT_LENGTH, content_length))
123+
.body(buffered_data))
103124
}
104125

105126
#[delete("/{file_name}")]
@@ -175,6 +196,7 @@ async fn upload_file(
175196
.map_err(|e| anyhow::anyhow!("Failed to upload file: {}", e))?;
176197

177198
Ok(HttpResponse::Ok().json(json!({
199+
"name": file_name,
178200
"updated_collection_hash": updated_collection_hash,
179201
})))
180202
}

src/repos.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,28 @@ async fn create_repo(
103103
}
104104

105105
async fn get_snowbird_repos(group: &Group) -> Result<Vec<SnowbirdRepo>, AppError> {
106-
log_debug!(TAG, "start");
106+
log_debug!(TAG, "Getting Snowbird Repos");
107+
108+
let locked_repos = group.repos.lock().await;
109+
let repos: Vec<_> = locked_repos.values().cloned().collect();
107110

108-
let repo_ids = group.list_repos().await;
109111
let mut snowbird_repos = Vec::new();
110112

111-
for id in repo_ids {
112-
log_debug!(TAG, "Repo ID {}", id);
113-
let repo = group.get_repo(&id).await?;
113+
for repo in repos {
114+
log_debug!(TAG, "Repo ID {}", repo.id());
114115
let snowbird_repo = SnowbirdRepo::async_from(repo).await;
115116
snowbird_repos.push(snowbird_repo);
116117
}
117118

119+
// let repo_ids = group.list_repos().await;
120+
// let mut snowbird_repos = Vec::new();
121+
122+
// for id in repo_ids {
123+
// log_debug!(TAG, "Repo ID {}", id);
124+
// let repo = group.get_repo(&id).await?;
125+
// let snowbird_repo = SnowbirdRepo::async_from(repo).await;
126+
// snowbird_repos.push(snowbird_repo);
127+
// }
128+
118129
Ok(snowbird_repos)
119130
}

src/server.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,14 @@ pub mod server {
9292

9393
fn get_optimal_worker_count() -> usize {
9494
let cpu_count = num_cpus::get();
95-
let worker_count = cmp::max(1, cmp::min(cpu_count / 2, 4));
95+
//let worker_count = cmp::max(1, cmp::min(cpu_count / 2, 4));
9696

97-
log_debug!(TAG, "Detected {} CPUs, using {} workers", cpu_count, worker_count);
97+
log_debug!(TAG, "Detected {} CPUs", cpu_count);
9898

99-
worker_count
99+
// This whole thing was an attempt at optimization, but since
100+
// we're only ever handling one request at a time let's keep
101+
// things lightweight for now.
102+
1
100103
}
101104

102105
pub async fn start(backend_base_directory: &str, server_socket_path: &str) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)