Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;

import java.sql.SQLException;
Expand All @@ -24,18 +25,23 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;


@ExtendWith(MockitoExtension.class)
class K8sJobDeployerTest {

// K8sJobDeployer.specify() calls ConfigService.config(context.connection(), ...), which runs the
// real ServiceLoader and invokes K8sConfigProvider.loadConfig() -> K8sContext.create(connection).
@Mock
private MockedStatic<K8sContext> contextStatic;

@Mock
private HoptimatorConnection connection;

Expand All @@ -60,6 +66,7 @@ K8sYamlApi createYamlApi(K8sContext context) {
}
};
when(mockContext.connection()).thenReturn(connection);
contextStatic.when(() -> K8sContext.create(any())).thenReturn(mockContext);
}

private Job createTestJob(Sink sink) {
Expand Down Expand Up @@ -94,8 +101,6 @@ K8sSnapshot createSnapshot(K8sContext context) {

@Test
void specifyWithNoTemplatesReturnsEmpty() throws SQLException {
when(connection.connectionProperties()).thenReturn(new Properties());

Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"),
Collections.emptyMap());
Job job = createTestJob(sink);
Expand All @@ -110,8 +115,6 @@ void specifyWithNoTemplatesReturnsEmpty() throws SQLException {

@Test
void specifyRendersMatchingTemplate() throws SQLException {
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -132,8 +135,6 @@ void specifyRendersMatchingTemplate() throws SQLException {

@Test
void specifyFiltersOutNonMatchingDatabases() throws SQLException {
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -153,8 +154,6 @@ void specifyFiltersOutNonMatchingDatabases() throws SQLException {

@Test
void specifyWithNullDatabasesMatchesAll() throws SQLException {
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -174,8 +173,6 @@ void specifyWithNullDatabasesMatchesAll() throws SQLException {

@Test
void specifyRendersTemplateVariables() throws SQLException {
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -199,8 +196,6 @@ void specifyRendersTemplateVariables() throws SQLException {
@Test
void specifyLambdasReturnNonEmptyValues() throws SQLException {
// Verify each key field is non-empty.
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand Down Expand Up @@ -233,10 +228,6 @@ void specifyLambdasReturnNonEmptyValues() throws SQLException {
@Test
void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException {
// Verify that sink options ARE merged into the environment
Properties connProps = new Properties();
connProps.setProperty("flinkConfig1", "value1");
when(connection.connectionProperties()).thenReturn(connProps);

Map<String, String> sinkOptions = new HashMap<>();
sinkOptions.put("sinkOption", "sinkVal");
Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"), sinkOptions);
Expand All @@ -260,8 +251,6 @@ void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException {
@Test
void specifyConditionalRenderedTemplateNotNull() throws SQLException {
// Verify null templates are skipped
when(connection.connectionProperties()).thenReturn(new Properties());

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.sql.SQLTransientConnectionException;
import java.util.Properties;

import com.linkedin.hoptimator.k8s.K8sContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;


@ExtendWith(MockitoExtension.class)
public class LogicalTableDriverTest {

// K8sContext.create() reads the real ~/.kube/config (if it exists) when no k8s connection
// properties are supplied. This can cause failures in local testing. Mock it to prevent that.
@Mock
private MockedStatic<K8sContext> k8sContextStatic;

@Test
public void connectReturnsNullForNonLogicalUrl() throws Exception {
LogicalTableDriver driver = new LogicalTableDriver();
Expand Down Expand Up @@ -92,38 +102,37 @@ public void connectThrowsWhenDatabasePropertyInUrl() throws Exception {

@Test
public void connectThrowsNonTransientWhenK8sContextCreationFails() throws Exception {
// All validation passes (2 tiers + database property set) but K8sContext.create() fails
// because kubeconfig points to a non-existent file → catch(Exception) → SQLNonTransientException
// All validation passes (2 tiers + database property set) but K8sContext.create() fails.
k8sContextStatic.when(() -> K8sContext.create(any()))
.thenThrow(new RuntimeException("simulated K8sContext failure"));

String url = "jdbc:logical://nearline=kafka-database;online=venice";
Properties props = new Properties();
props.setProperty("database", "mylogicaldb");
props.setProperty("k8s.kubeconfig", "/nonexistent/path/kubeconfig");

try (Connection ignored = DriverManager.getConnection(url, props)) {
throw new AssertionError("Expected exception");
throw new AssertionError("Expected SQLNonTransientException");
} catch (SQLNonTransientException e) {
assertTrue(e.getMessage().contains("Problem loading"));
}
}

@Test
void connectWithValidUrlThrowsSQLNonTransientWhenK8sContextFails() {
// A URL that passes all validation (2 tiers + database property) but then fails
// when creating K8sContext (no K8s config in test env) → covered by catch(Exception e)
// at the end of the try block, covering lines 93-103.
void connectPreservesCauseWhenK8sContextCreationFails() throws Exception {
// The catch(Exception e) branch must wrap the underlying failure as the cause rather than
// swallowing it. Stub K8sContext.create() to throw a known exception and assert it is preserved.
RuntimeException boom = new RuntimeException("boom");
k8sContextStatic.when(() -> K8sContext.create(any())).thenThrow(boom);

Properties props = new Properties();
props.setProperty("database", "logical");
try (Connection conn = DriverManager.getConnection(

try (Connection ignored = DriverManager.getConnection(
"jdbc:logical://nearline=kafka-database;online=venice", props)) {
// If connect() unexpectedly succeeds (real K8s available), just verify non-null
assertNotNull(conn);
throw new AssertionError("Expected SQLNonTransientException");
} catch (SQLNonTransientException e) {
// Expected: K8sContext.create() failed → catch(Exception e) path covered
assertTrue(e.getMessage().contains("Problem loading"));
} catch (SQLTransientConnectionException e) {
// Also acceptable: IOException from K8s client
assertTrue(e.getMessage().contains("Problem loading"));
} catch (SQLException e) {
// Any other SQL exception is also fine — something in connect() path was exercised
assertNotNull(e.getMessage());
assertSame(boom, e.getCause(), "original failure should be preserved as the cause");
}
}

Expand Down
Loading