Skip to content

Commit 2e67bc9

Browse files
Merge pull request #17 from OpenArchive/feat/refresh-and-health-apis
Feat/refresh and health apis
2 parents 7b08125 + d40253d commit 2e67bc9

6 files changed

Lines changed: 625 additions & 245 deletions

File tree

Cargo.lock

Lines changed: 67 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,14 @@ num_cpus = "1.16.0"
4444
tokio-stream = "0.1.16"
4545
bytes = "1.7.2"
4646
iroh-blobs = "0.24.0"
47+
log = "0.4"
4748

4849
[target.'cfg(target_os = "android")'.dependencies]
4950
jni = "0.21.1"
5051
tokio = { version = "~1.39", default-features = false, features = ["rt", "rt-multi-thread", "sync", "time", "macros"] }
5152
veilid-core = { git = "https://gitlab.com/veilid/veilid.git", version = "0.4.3" }
5253
blake3 = "1.5.4"
54+
55+
[dev-dependencies]
56+
env_logger = "0.10"
57+
serial_test = "2.0"

src/android_bridge.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::error::Error;
1515
use std::sync::{Arc, Mutex, Once};
1616
use std::thread;
1717
use veilid_core::veilid_core_setup_android;
18+
use std::time::Duration;
1819

1920

2021
trait IntoJObject {
@@ -104,8 +105,47 @@ pub extern "system" fn Java_net_opendasharchive_openarchive_services_snowbird_Sn
104105
_clazz: JClass,
105106
ctx: JObject,
106107
) -> jstring {
108+
log_debug!(TAG, "Bridge: stopping server");
109+
110+
// Create a runtime to handle async operations
111+
let runtime = tokio::runtime::Runtime::new().unwrap();
112+
113+
// Stop the backend server and clean up Veilid API
114+
let stop_result = runtime.block_on(async {
115+
// First stop the backend
116+
match crate::server::server::stop().await {
117+
Ok(_) => {
118+
log_info!(TAG, "Backend stopped successfully");
119+
120+
// Get the backend to access Veilid API
121+
if let Ok(mut backend) = crate::server::server::get_backend().await {
122+
// Shutdown Veilid API
123+
if let Some(veilid_api) = backend.get_veilid_api() {
124+
veilid_api.shutdown().await;
125+
log_info!(TAG, "Veilid API shut down successfully");
126+
}
127+
}
128+
129+
// Add a small delay to ensure tasks complete
130+
tokio::time::sleep(Duration::from_millis(500)).await;
131+
132+
Ok(())
133+
}
134+
Err(e) => {
135+
log_error!(TAG, "Error stopping server: {:?}", e);
136+
Err(e)
137+
}
138+
}
139+
});
140+
141+
// Create response string based on result
142+
let response = match stop_result {
143+
Ok(_) => "Server stopped successfully",
144+
Err(_) => "Error stopping server",
145+
};
146+
107147
let output = env
108-
.new_string("Server stopped")
148+
.new_string(response)
109149
.expect("Couldn't create java string!");
110150

111151
output.into_raw()

src/groups.rs

Lines changed: 98 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -120,47 +120,124 @@ async fn refresh_group(group_id: web::Path<String>) -> AppResult<impl Responder>
120120
let key = create_veilid_cryptokey_from_base64(group_id.as_str())?;
121121
log_debug!(TAG, "Got key {}", key);
122122

123-
let group = backend.get_group(&key).await?;
123+
// Return error if group not found
124+
let group = match backend.get_group(&key).await {
125+
Ok(group) => group,
126+
Err(e) => {
127+
return Ok(HttpResponse::NotFound().json(json!({
128+
"status": "error",
129+
"error": format!("Group not found: {}", e)
130+
})));
131+
}
132+
};
124133
log_debug!(TAG, "Got group");
125134

126135
// Get all repos in the group
127136
let locked_repos = group.repos.lock().await;
128137
let repos: Vec<_> = locked_repos.values().cloned().collect();
129138
drop(locked_repos); // Release the lock before async operations
130139

131-
let mut refreshed_files = Vec::new();
140+
// Return empty arrays if no repos
141+
if repos.is_empty() {
142+
return Ok(HttpResponse::Ok().json(json!({
143+
"status": "success",
144+
"refreshed_files": [],
145+
"repos": []
146+
})));
147+
}
148+
149+
let mut refreshed_repos = Vec::new();
132150

133151
// For each repo, refresh its collection and files
134152
for repo in repos {
135153
log_debug!(TAG, "Refreshing repo {}", repo.id());
136-
137-
// Refresh collection hash
138-
let collection_hash = repo.get_hash_from_dht().await?;
139-
if !group.has_hash(&collection_hash).await? {
140-
group.download_hash_from_peers(&collection_hash).await?;
141-
log_debug!(TAG, "Downloaded collection hash {}", collection_hash);
142-
}
143154

144-
// Get and refresh all files
145-
let files = repo.list_files().await?;
146-
for file_name in files {
147-
match repo.get_file_hash(&file_name).await {
148-
Ok(file_hash) => {
149-
if !group.has_hash(&file_hash).await? {
150-
group.download_hash_from_peers(&file_hash).await?;
151-
log_debug!(TAG, "Downloaded file hash {} for {}", file_hash, file_name);
152-
refreshed_files.push(file_name);
155+
let mut repo_info = json!({
156+
"repo_id": repo.id().to_string(),
157+
"can_write": repo.can_write(),
158+
"name": repo.get_name().await.unwrap_or_default(),
159+
"refreshed_files": json!(Vec::<String>::new()), // Initialize empty
160+
"all_files": json!(Vec::<String>::new()) // Initialize empty
161+
});
162+
let mut refreshed_files_vec = Vec::new();
163+
let mut all_files_vec = Vec::new();
164+
165+
// Get current repo hash and collection info
166+
match repo.get_hash_from_dht().await {
167+
Ok(repo_hash) => {
168+
repo_info["repo_hash"] = json!(repo_hash.to_string());
169+
170+
// Refresh collection hash if needed
171+
log_debug!(TAG, "Repo {} has DHT hash {}. Checking if group has it locally.", repo.id(), repo_hash);
172+
if !group.has_hash(&repo_hash).await? {
173+
log_debug!(TAG, "Repo {} collection {} not found locally. Downloading...", repo.id(), repo_hash);
174+
match group.download_hash_from_peers(&repo_hash).await {
175+
Ok(_) => {
176+
log_debug!(TAG, "Successfully downloaded collection hash {} for repo {}", repo_hash, repo.id());
177+
}
178+
Err(e) => {
179+
log_debug!(TAG, "Error downloading collection hash {} for repo {}: {}", repo_hash, repo.id(), e);
180+
repo_info["error"] = json!(format!("Error downloading collection: {}", e));
181+
refreshed_repos.push(repo_info);
182+
continue; // Skip to next repo if download fails
183+
}
153184
}
185+
} else {
186+
log_debug!(TAG, "Repo {} collection {} already local.", repo.id(), repo_hash);
154187
}
155-
Err(e) => {
156-
log_debug!(TAG, "Error getting hash for file {}: {}", file_name, e);
188+
189+
// Now that the collection is ensured to be local, list all files in the repo
190+
match repo.list_files().await {
191+
Ok(files) => {
192+
log_debug!(TAG, "Repo {} lists files: {:?}", repo.id(), files);
193+
all_files_vec = files;
194+
}
195+
Err(e) => {
196+
log_debug!(TAG, "Error listing files for repo {} after ensuring collection download: {}", repo.id(), e);
197+
// Even if listing fails here, we might have a repo_hash, so continue with empty files.
198+
// Or, handle as a more significant error. For now, log and continue.
199+
repo_info["error_listing_files"] = json!(format!("Error listing files post-download: {}", e));
200+
}
201+
};
202+
repo_info["all_files"] = json!(all_files_vec.clone());
203+
204+
205+
// For each file, check if it needs to be refreshed
206+
for file_name in &all_files_vec {
207+
match repo.get_file_hash(file_name).await {
208+
Ok(file_hash) => {
209+
if !group.has_hash(&file_hash).await? {
210+
log_debug!(TAG, "File {} hash {} not found locally. Downloading...", file_name, file_hash);
211+
match group.download_hash_from_peers(&file_hash).await {
212+
Ok(_) => {
213+
log_debug!(TAG, "Successfully downloaded file hash {} for {}", file_hash, file_name);
214+
refreshed_files_vec.push(file_name.clone());
215+
}
216+
Err(e) => {
217+
log_debug!(TAG, "Error downloading file {} hash {}: {}", file_name, file_hash, e);
218+
// Optionally add to a list of files that failed to download
219+
}
220+
}
221+
}
222+
}
223+
Err(e) => {
224+
log_debug!(TAG, "Error getting hash for file {}: {}", file_name, e);
225+
}
226+
}
157227
}
228+
repo_info["refreshed_files"] = json!(refreshed_files_vec);
229+
}
230+
Err(e) => {
231+
log_debug!(TAG, "Error getting repo hash for {}: {}", repo.id(), e);
232+
repo_info["error"] = json!(format!("Error getting repo hash from DHT: {}", e));
158233
}
159234
}
235+
236+
refreshed_repos.push(repo_info);
160237
}
161238

162239
Ok(HttpResponse::Ok().json(json!({
163240
"status": "success",
164-
"refreshed_files": refreshed_files
241+
"repos": refreshed_repos
165242
})))
166243
}

0 commit comments

Comments
 (0)