From 8d0bfcf1c4fcf254d5e44722c7ad97cfef03aa88 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 18 Jun 2026 17:14:16 +0800 Subject: [PATCH] chore: fix tighten test cluster cleanup matching --- crates/fluss-test-cluster/src/lib.rs | 99 +++++++++++++++++++--------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/crates/fluss-test-cluster/src/lib.rs b/crates/fluss-test-cluster/src/lib.rs index a6d68638..144d725e 100644 --- a/crates/fluss-test-cluster/src/lib.rs +++ b/crates/fluss-test-cluster/src/lib.rs @@ -17,7 +17,7 @@ use fluss::client::FlussConnection; use fluss::config::Config; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::ManuallyDrop; use std::sync::Arc; @@ -45,6 +45,28 @@ async fn docker_client() -> Option { } } +fn zookeeper_container_name(cluster_name: &str) -> String { + format!("zookeeper-{cluster_name}") +} + +fn coordinator_server_container_name(cluster_name: &str) -> String { + format!("coordinator-server-{cluster_name}") +} + +fn tablet_server_container_name(cluster_name: &str, server_id: u16) -> String { + format!("tablet-server-{cluster_name}-{server_id}") +} + +fn is_cluster_container_name(container_name: &str, cluster_name: &str) -> bool { + let container_name = container_name.trim_start_matches('/'); + let tablet_server_prefix = format!("tablet-server-{cluster_name}-"); + container_name == zookeeper_container_name(cluster_name) + || container_name == coordinator_server_container_name(cluster_name) + || container_name + .strip_prefix(&tablet_server_prefix) + .is_some_and(|suffix| !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit())) +} + /// Force-removes a container by name. Best-effort: a 404 (already gone) is success; /// other errors warn rather than panic. async fn force_remove_container(docker: &Docker, name: &str) { @@ -160,15 +182,15 @@ impl FlussTestingClusterBuilder { } fn tablet_server_container_name(&self, server_id: u16) -> String { - format!("tablet-server-{}-{}", self.testing_name, server_id) + tablet_server_container_name(&self.testing_name, server_id) } fn coordinator_server_container_name(&self) -> String { - format!("coordinator-server-{}", self.testing_name) + coordinator_server_container_name(&self.testing_name) } fn zookeeper_container_name(&self) -> String { - format!("zookeeper-{}", self.testing_name) + zookeeper_container_name(&self.testing_name) } fn container_names(&self) -> Vec { @@ -222,21 +244,30 @@ impl FlussTestingClusterBuilder { let Some(docker) = docker_client().await else { return false; }; - for name in self.container_names() { - // Anchored exact-name match; `all(false)` = running only, so a stopped - // leftover counts as absent and gets recreated. - let mut filters = HashMap::new(); - filters.insert("name".to_string(), vec![format!("^{name}$")]); - let options = ListContainersOptionsBuilder::default() - .all(false) - .filters(&filters) - .build(); - match docker.list_containers(Some(options)).await { - Ok(list) if !list.is_empty() => continue, - _ => return false, - } - } - true + let expected: HashSet<_> = self.container_names().into_iter().collect(); + + // Multiple exact-name filters are OR'd by the daemon, so we can query once + // while still ignoring unrelated containers. `all(false)` = running only, so a + // stopped leftover counts as absent and gets recreated. + let filters = HashMap::from([( + "name".to_string(), + expected.iter().map(|name| format!("^{name}$")).collect(), + )]); + let options = ListContainersOptionsBuilder::default() + .all(false) + .filters(&filters) + .build(); + let running = match docker.list_containers(Some(options)).await { + Ok(containers) => containers, + Err(_) => return false, + }; + let running_names: HashSet<_> = running + .into_iter() + .flat_map(|container| container.names.unwrap_or_default()) + .map(|name| name.trim_start_matches('/').to_string()) + .collect(); + + expected.iter().all(|name| running_names.contains(name)) } async fn start_all_containers(&mut self) -> Vec> { @@ -564,24 +595,24 @@ pub fn stop_cluster(name: &str) { run_blocking(async move { stop_cluster_async(&name).await }); } -/// Force-removes every container of cluster `name` (matched by name prefix) on the -/// testcontainers daemon — the same daemon `build_detached` started them on. +/// Force-removes every container of cluster `name` on the testcontainers daemon — +/// the same daemon `build_detached` started them on. async fn stop_cluster_async(name: &str) { let Some(docker) = docker_client().await else { return; }; - // Multiple values for the `name` filter are OR'd by the daemon; these prefixes - // cover zookeeper, coordinator, and any number of tablet servers. - let mut filters = HashMap::new(); - filters.insert( + // Multiple values for the `name` filter are OR'd by the daemon, so we use + // cluster-specific prefixes to narrow the list, then exact local checks before + // removing to avoid touching similarly-prefixed clusters such as `test` and `test2`. + let filters = HashMap::from([( "name".to_string(), vec![ - format!("zookeeper-{name}"), - format!("coordinator-server-{name}"), + zookeeper_container_name(name), + coordinator_server_container_name(name), format!("tablet-server-{name}-"), ], - ); + )]); let options = ListContainersOptionsBuilder::default() .all(true) .filters(&filters) @@ -596,11 +627,15 @@ async fn stop_cluster_async(name: &str) { }; for container in containers { - // Prefer the container name (daemon prefixes it with '/'); fall back to id. + // The daemon prefixes names with '/'. These testcontainers-managed containers + // are created with a single explicit container name here, so checking the + // first returned name is sufficient. Only remove containers whose exact name + // belongs to this cluster; if names are missing, skip rather than guessing by id. if let Some(cname) = container.names.and_then(|n| n.into_iter().next()) { - force_remove_container(&docker, cname.trim_start_matches('/')).await; - } else if let Some(id) = container.id { - force_remove_container(&docker, &id).await; + let cname = cname.trim_start_matches('/'); + if is_cluster_container_name(cname, name) { + force_remove_container(&docker, cname).await; + } } } }