Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 67 additions & 32 deletions crates/fluss-test-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use fluss::client::FlussConnection;
use fluss::config::Config;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::mem::ManuallyDrop;
use std::sync::Arc;
Expand Down Expand Up @@ -45,6 +45,28 @@ async fn docker_client() -> Option<Docker> {
}
}

fn zookeeper_container_name(cluster_name: &str) -> String {
format!("zookeeper-{cluster_name}")
}

fn coordinator_server_container_name(cluster_name: &str) -> String {
format!("coordinator-server-{cluster_name}")
}

fn tablet_server_container_name(cluster_name: &str, server_id: u16) -> String {
format!("tablet-server-{cluster_name}-{server_id}")
}

fn is_cluster_container_name(container_name: &str, cluster_name: &str) -> bool {
let container_name = container_name.trim_start_matches('/');
let tablet_server_prefix = format!("tablet-server-{cluster_name}-");
container_name == zookeeper_container_name(cluster_name)
|| container_name == coordinator_server_container_name(cluster_name)
|| container_name
.strip_prefix(&tablet_server_prefix)
.is_some_and(|suffix| !suffix.is_empty() && suffix.chars().all(|c| c.is_ascii_digit()))
}

/// Force-removes a container by name. Best-effort: a 404 (already gone) is success;
/// other errors warn rather than panic.
async fn force_remove_container(docker: &Docker, name: &str) {
Expand Down Expand Up @@ -160,15 +182,15 @@ impl FlussTestingClusterBuilder {
}

fn tablet_server_container_name(&self, server_id: u16) -> String {
format!("tablet-server-{}-{}", self.testing_name, server_id)
tablet_server_container_name(&self.testing_name, server_id)
}

fn coordinator_server_container_name(&self) -> String {
format!("coordinator-server-{}", self.testing_name)
coordinator_server_container_name(&self.testing_name)
}

fn zookeeper_container_name(&self) -> String {
format!("zookeeper-{}", self.testing_name)
zookeeper_container_name(&self.testing_name)
}

fn container_names(&self) -> Vec<String> {
Expand Down Expand Up @@ -222,21 +244,30 @@ impl FlussTestingClusterBuilder {
let Some(docker) = docker_client().await else {
return false;
};
for name in self.container_names() {
// Anchored exact-name match; `all(false)` = running only, so a stopped
// leftover counts as absent and gets recreated.
let mut filters = HashMap::new();
filters.insert("name".to_string(), vec![format!("^{name}$")]);
let options = ListContainersOptionsBuilder::default()
.all(false)
.filters(&filters)
.build();
match docker.list_containers(Some(options)).await {
Ok(list) if !list.is_empty() => continue,
_ => return false,
}
}
true
let expected: HashSet<_> = self.container_names().into_iter().collect();

// Multiple exact-name filters are OR'd by the daemon, so we can query once
// while still ignoring unrelated containers. `all(false)` = running only, so a
// stopped leftover counts as absent and gets recreated.
let filters = HashMap::from([(
"name".to_string(),
expected.iter().map(|name| format!("^{name}$")).collect(),
)]);
let options = ListContainersOptionsBuilder::default()
.all(false)
.filters(&filters)
.build();
let running = match docker.list_containers(Some(options)).await {
Ok(containers) => containers,
Err(_) => return false,
};
let running_names: HashSet<_> = running
.into_iter()
.flat_map(|container| container.names.unwrap_or_default())
.map(|name| name.trim_start_matches('/').to_string())
.collect();

expected.iter().all(|name| running_names.contains(name))
}

async fn start_all_containers(&mut self) -> Vec<ContainerAsync<GenericImage>> {
Expand Down Expand Up @@ -564,24 +595,24 @@ pub fn stop_cluster(name: &str) {
run_blocking(async move { stop_cluster_async(&name).await });
}

/// Force-removes every container of cluster `name` (matched by name prefix) on the
/// testcontainers daemon — the same daemon `build_detached` started them on.
/// Force-removes every container of cluster `name` on the testcontainers daemon —
/// the same daemon `build_detached` started them on.
async fn stop_cluster_async(name: &str) {
let Some(docker) = docker_client().await else {
return;
};

// Multiple values for the `name` filter are OR'd by the daemon; these prefixes
// cover zookeeper, coordinator, and any number of tablet servers.
let mut filters = HashMap::new();
filters.insert(
// Multiple values for the `name` filter are OR'd by the daemon, so we use
// cluster-specific prefixes to narrow the list, then exact local checks before
// removing to avoid touching similarly-prefixed clusters such as `test` and `test2`.
let filters = HashMap::from([(
"name".to_string(),
vec![
format!("zookeeper-{name}"),
format!("coordinator-server-{name}"),
zookeeper_container_name(name),
coordinator_server_container_name(name),
format!("tablet-server-{name}-"),
],
);
)]);
let options = ListContainersOptionsBuilder::default()
.all(true)
.filters(&filters)
Expand All @@ -596,11 +627,15 @@ async fn stop_cluster_async(name: &str) {
};

for container in containers {
// Prefer the container name (daemon prefixes it with '/'); fall back to id.
// The daemon prefixes names with '/'. These testcontainers-managed containers
// are created with a single explicit container name here, so checking the
// first returned name is sufficient. Only remove containers whose exact name
// belongs to this cluster; if names are missing, skip rather than guessing by id.
if let Some(cname) = container.names.and_then(|n| n.into_iter().next()) {
force_remove_container(&docker, cname.trim_start_matches('/')).await;
} else if let Some(id) = container.id {
force_remove_container(&docker, &id).await;
let cname = cname.trim_start_matches('/');
if is_cluster_container_name(cname, name) {
force_remove_container(&docker, cname).await;
}
}
}
}
Loading