Skip to content

Commit 1ca3d37

Browse files
Merge pull request #15 from OpenArchive/feat/refresh-endpoint
Refresh endpoint
2 parents 1cb316a + 4866989 commit 1ca3d37

2 files changed

Lines changed: 285 additions & 0 deletions

File tree

src/groups.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub fn scope() -> actix_web::Scope {
1919
web::scope("/{group_id}")
2020
.service(delete_group)
2121
.service(get_group)
22+
.service(refresh_group)
2223
.service(repos::scope())
2324
)
2425
}
@@ -109,3 +110,57 @@ async fn join_group_from_url(request_url: web::Json<RequestUrl>) -> AppResult<im
109110

110111
Ok(HttpResponse::Ok().json(snowbird_group))
111112
}
113+
114+
#[post("/refresh")]
115+
async fn refresh_group(group_id: web::Path<String>) -> AppResult<impl Responder> {
116+
let backend = get_backend().await?;
117+
log_debug!(TAG, "Starting group refresh");
118+
119+
let group_id = group_id.into_inner();
120+
let key = create_veilid_cryptokey_from_base64(group_id.as_str())?;
121+
log_debug!(TAG, "Got key {}", key);
122+
123+
let group = backend.get_group(&key).await?;
124+
log_debug!(TAG, "Got group");
125+
126+
// Get all repos in the group
127+
let locked_repos = group.repos.lock().await;
128+
let repos: Vec<_> = locked_repos.values().cloned().collect();
129+
drop(locked_repos); // Release the lock before async operations
130+
131+
let mut refreshed_files = Vec::new();
132+
133+
// For each repo, refresh its collection and files
134+
for repo in repos {
135+
log_debug!(TAG, "Refreshing repo {}", repo.id());
136+
137+
// Refresh collection hash
138+
let collection_hash = repo.get_hash_from_dht().await?;
139+
if !group.has_hash(&collection_hash).await? {
140+
group.download_hash_from_peers(&collection_hash).await?;
141+
log_debug!(TAG, "Downloaded collection hash {}", collection_hash);
142+
}
143+
144+
// Get and refresh all files
145+
let files = repo.list_files().await?;
146+
for file_name in files {
147+
match repo.get_file_hash(&file_name).await {
148+
Ok(file_hash) => {
149+
if !group.has_hash(&file_hash).await? {
150+
group.download_hash_from_peers(&file_hash).await?;
151+
log_debug!(TAG, "Downloaded file hash {} for {}", file_hash, file_name);
152+
refreshed_files.push(file_name);
153+
}
154+
}
155+
Err(e) => {
156+
log_debug!(TAG, "Error getting hash for file {}: {}", file_name, e);
157+
}
158+
}
159+
}
160+
}
161+
162+
Ok(HttpResponse::Ok().json(json!({
163+
"status": "success",
164+
"refreshed_files": refreshed_files
165+
})))
166+
}

src/lib.rs

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,236 @@ mod tests {
395395
"Downloaded back file content"
396396
);
397397

398+
// Clean up
399+
backend2.stop().await?;
400+
{
401+
let backend = get_backend().await?;
402+
backend.stop().await.expect("Backend failed to stop");
403+
}
404+
// Add delay to allow tasks to complete
405+
tokio::time::sleep(Duration::from_secs(2)).await;
406+
veilid_api2.shutdown().await;
407+
408+
Ok(())
409+
}
410+
411+
#[actix_web::test]
412+
async fn test_refresh_endpoint() -> Result<()> {
413+
// Initialize the app
414+
let path = TmpDir::new("test_api_repo_file_operations").await?;
415+
416+
// Initialize backend2 first
417+
let store2 = iroh_blobs::store::fs::Store::load(path.to_path_buf().join("iroh2")).await?;
418+
let (veilid_api2, update_rx2) = save_dweb_backend::common::init_veilid(
419+
path.to_path_buf().join("test2").as_path(),
420+
"test2".to_string(),
421+
)
422+
.await?;
423+
let backend2 = save_dweb_backend::backend::Backend::from_dependencies(
424+
&path.to_path_buf(),
425+
veilid_api2.clone(),
426+
update_rx2,
427+
store2,
428+
)
429+
.await
430+
.unwrap();
431+
432+
// Initialize the main backend after backend2
433+
BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path()));
434+
{
435+
let backend = get_backend().await?;
436+
backend.start().await.expect("Backend failed to start");
437+
}
438+
439+
// Create group and repo in backend2
440+
let mut group = backend2.create_group().await?;
441+
let join_url = group.get_url();
442+
group.set_name(TEST_GROUP_NAME).await?;
443+
444+
let repo = group.create_repo().await?;
445+
repo.set_name(TEST_GROUP_NAME).await?;
446+
447+
// Upload a file
448+
let file_name = "example.txt";
449+
let file_content = b"Test content for file upload";
450+
repo.upload(&file_name, file_content.to_vec()).await?;
451+
452+
// Give the upload some time to propagate through the network
453+
// Simple polling function with backoff
454+
async fn poll_until_success(
455+
attempts: u32,
456+
base_delay_ms: u64,
457+
max_delay_ms: u64,
458+
check_condition: impl Fn() -> Result<bool>
459+
) -> Result<()> {
460+
let mut delay_ms = base_delay_ms;
461+
for attempt in 0..attempts {
462+
if check_condition()? {
463+
return Ok(());
464+
}
465+
466+
if attempt < attempts - 1 {
467+
let jitter = (attempt as u64 * 37) % (delay_ms / 4);
468+
let sleep_duration = std::cmp::min(
469+
delay_ms + jitter,
470+
max_delay_ms
471+
);
472+
473+
tokio::time::sleep(Duration::from_millis(sleep_duration)).await;
474+
delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
475+
}
476+
}
477+
478+
anyhow::bail!("Operation timed out after {} attempts", attempts)
479+
}
480+
481+
// Initialize the app
482+
let app = test::init_service(
483+
App::new()
484+
.service(status)
485+
.service(web::scope("/api").service(groups::scope())),
486+
)
487+
.await;
488+
489+
// Join the group from the first backend and wait for it to be ready
490+
{
491+
let backend = get_backend().await?;
492+
backend.join_from_url(join_url.as_str()).await?;
493+
494+
// Using a simpler approach with manual retries
495+
let group_id = group.id().clone();
496+
let mut found = false;
497+
498+
for attempt in 0..10 {
499+
match backend.list_groups().await {
500+
Ok(groups) => {
501+
if groups.iter().any(|g| g.id() == group_id) {
502+
found = true;
503+
break;
504+
}
505+
eprintln!("Group not found yet (attempt {}), waiting...", attempt + 1);
506+
}
507+
Err(e) => {
508+
eprintln!("Error checking groups (attempt {}): {}", attempt + 1, e);
509+
}
510+
}
511+
512+
let delay = 500 * (1 << attempt.min(5)); // Exponential backoff, max 16s
513+
tokio::time::sleep(Duration::from_millis(delay)).await;
514+
}
515+
516+
if !found {
517+
anyhow::bail!("Failed to verify group join completion after multiple attempts");
518+
}
519+
}
520+
521+
// Call refresh endpoint and handle response
522+
let group_id_str = group.id().to_string();
523+
let mut refresh_data = serde_json::Value::Null;
524+
525+
// Make multiple attempts for the refresh operation
526+
for attempt in 0..5 {
527+
// Create a fresh request each time since they can't be reused
528+
let refresh_req = test::TestRequest::post()
529+
.uri(&format!("/api/groups/{}/refresh", group_id_str))
530+
.to_request();
531+
532+
let refresh_resp = test::call_service(&app, refresh_req).await;
533+
if !refresh_resp.status().is_success() {
534+
eprintln!("Refresh failed with status: {} (attempt {})", refresh_resp.status(), attempt + 1);
535+
tokio::time::sleep(Duration::from_millis(1000 * (attempt + 1))).await;
536+
continue;
537+
}
538+
539+
// Try to parse the response body
540+
let body = test::read_body(refresh_resp).await;
541+
match serde_json::from_slice::<serde_json::Value>(&body) {
542+
Ok(data) => {
543+
refresh_data = data;
544+
if refresh_data["status"] == "success" {
545+
if let Some(refreshed_files) = refresh_data["refreshed_files"].as_array() {
546+
if refreshed_files.iter().any(|f| f.as_str() == Some(file_name)) {
547+
break; // Success case
548+
}
549+
}
550+
}
551+
eprintln!("Refresh response not as expected (attempt {}): {:?}", attempt + 1, refresh_data);
552+
}
553+
Err(e) => {
554+
eprintln!("Error parsing refresh response (attempt {}): {}", attempt + 1, e);
555+
}
556+
}
557+
558+
// Wait before retrying
559+
tokio::time::sleep(Duration::from_millis(1000 * (attempt + 1))).await;
560+
}
561+
562+
// Check if we got a successful response from any attempt
563+
assert_eq!(refresh_data["status"], "success", "Refresh should return success after multiple attempts");
564+
565+
// Verify file is accessible with retry logic
566+
let repo_id_str = repo.id().to_string();
567+
let mut got_file_data = Vec::new();
568+
let file_name_clone = file_name.to_string();
569+
570+
// Manual retry approach without capturing app
571+
for attempt in 0..10 {
572+
// Create a fresh request for each attempt
573+
let get_file_req = test::TestRequest::get()
574+
.uri(&format!(
575+
"/api/groups/{}/repos/{}/media/{}",
576+
group_id_str, repo_id_str, file_name_clone
577+
))
578+
.to_request();
579+
580+
let get_file_resp = test::call_service(&app, get_file_req).await;
581+
if !get_file_resp.status().is_success() {
582+
eprintln!("File not yet available, status: {} (attempt {})",
583+
get_file_resp.status(), attempt + 1);
584+
tokio::time::sleep(Duration::from_millis(1000 * (attempt + 1))).await;
585+
continue;
586+
}
587+
588+
got_file_data = test::read_body(get_file_resp).await.to_vec();
589+
if got_file_data.as_slice() == file_content {
590+
break; // Success - exit the retry loop
591+
} else {
592+
eprintln!("File content mismatch (attempt {}), retrying...", attempt + 1);
593+
tokio::time::sleep(Duration::from_millis(1000 * (attempt + 1))).await;
594+
}
595+
}
596+
597+
// Final verification
598+
assert_eq!(
599+
got_file_data.as_slice(),
600+
file_content,
601+
"Downloaded file content should match"
602+
);
603+
604+
// Verify the refresh response had the expected format
605+
let refreshed_files = refresh_data["refreshed_files"]
606+
.as_array()
607+
.expect("refreshed_files should be an array");
608+
609+
assert!(
610+
refreshed_files.iter().any(|f| f.as_str() == Some(file_name)),
611+
"File should be in refreshed_files list"
612+
);
613+
614+
// Clean up in reverse order of initialization - with grace periods
615+
backend2.stop().await?;
616+
{
617+
let backend = get_backend().await?;
618+
backend.stop().await.expect("Backend failed to stop");
619+
}
620+
621+
// Allow time for clean shutdown
622+
tokio::time::sleep(Duration::from_secs(1)).await;
623+
veilid_api2.shutdown().await;
624+
625+
Ok(())
626+
}
627+
398628
Ok(())
399629
}
400630
}

0 commit comments

Comments
 (0)