diff --git a/.claude/worktrees/state-examples-docs b/.claude/worktrees/state-examples-docs new file mode 160000 index 0000000000..3ff4ebadf5 --- /dev/null +++ b/.claude/worktrees/state-examples-docs @@ -0,0 +1 @@ +Subproject commit 3ff4ebadf501b0ec0b72e57fe0416f02a67853ef diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 966d4f08dd..e29c5f1347 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -172,10 +172,13 @@ public void stop() throws InterruptedException, IOException { System.out.println("Stopping dapr application ..."); try { this.stopCommand.run(); - System.out.println("Dapr application stopped."); } catch (RuntimeException e) { - System.out.println("Could not stop app " + this.appName + ": " + e.getMessage()); + if (e.getMessage() != null && e.getMessage().contains("Could not find success criteria")) { + System.out.println("App " + this.appName + " already stopped or not found (ignored)."); + } else { + System.out.println("Could not stop app " + this.appName + ": " + e.getMessage()); + } } } @@ -219,8 +222,7 @@ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedExceptio while (System.currentTimeMillis() <= maxWait) { try { stub.healthCheck(Empty.getDefaultInstance()); - // artursouza: workaround due to race condition with runtime's probe on app's health. - Thread.sleep(5000); + Thread.sleep(2000); return; } catch (Exception e) { Thread.sleep(1000); @@ -232,10 +234,10 @@ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedExceptio channel.shutdown(); } } else { - Duration waitDuration = Duration.ofMillis(maxWaitMilliseconds); + long maxWait = System.currentTimeMillis() + maxWaitMilliseconds; HttpClient client = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_1_1) - .connectTimeout(waitDuration) + .connectTimeout(Duration.ofSeconds(5)) .build(); String url = "http://127.0.0.1:" + this.getAppPort() + "/health"; HttpRequest request = HttpRequest.newBuilder() @@ -243,18 +245,20 @@ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedExceptio .uri(URI.create(url)) .build(); - try { - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - - if (response.statusCode() != 200) { - throw new RuntimeException("error: HTTP service is not healthy."); + while (System.currentTimeMillis() <= maxWait) { + try { + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 200) { + Thread.sleep(2000); + return; + } + } catch (IOException e) { + // not ready yet } - } catch (IOException e) { - throw new RuntimeException("exception: HTTP service is not healthy."); + Thread.sleep(1000); } - // artursouza: workaround due to race condition with runtime's probe on app's health. - Thread.sleep(5000); + throw new RuntimeException("timeout: HTTP service is not healthy."); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java index c388a906a6..4ea6a759fd 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java @@ -129,24 +129,19 @@ public void reminderRecoveryTest( ) throws Exception { setup(actorType); - logger.debug("Pausing 3 seconds to let gRPC connection get ready"); - Thread.sleep(3000); - logger.debug("Invoking actor method 'startReminder' which will register a reminder"); proxy.invokeMethod("setReminderData", reminderDataParam).block(); proxy.invokeMethod("startReminder", reminderName).block(); - logger.debug("Pausing 7 seconds to allow reminder to fire"); - Thread.sleep(7000); - + logger.debug("Waiting for reminder to fire at least 3 times"); final List logs = new ArrayList<>(); callWithRetry(() -> { logs.clear(); logs.addAll(fetchMethodCallLogs(proxy)); validateMethodCalls(logs, METHOD_NAME, 3); validateMessageContent(logs, METHOD_NAME, expectedReminderStateText); - }, 5000); + }, 30000); // Restarts runtime only. logger.info("Stopping Dapr sidecar"); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java index 809cd21a90..21910376fa 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java @@ -54,7 +54,6 @@ public void timerRecoveryTest() throws Exception { true, 60000); - Thread.sleep(3000); String actorType="MyActorTest"; logger.debug("Creating proxy builder"); @@ -68,16 +67,14 @@ public void timerRecoveryTest() throws Exception { logger.debug("Invoking actor method 'startTimer' which will register a timer"); proxy.invokeMethod("startTimer", "myTimer").block(); - logger.debug("Pausing 7 seconds to allow timer to fire"); - Thread.sleep(7000); - + logger.debug("Waiting for timer to fire at least 3 times"); final List logs = new ArrayList<>(); callWithRetry(() -> { logs.clear(); logs.addAll(fetchMethodCallLogs(proxy)); validateMethodCalls(logs, METHOD_NAME, 3); validateMessageContent(logs, METHOD_NAME, "ping!"); - }, 5000); + }, 30000); // Restarts app only. runs.left.stop(); @@ -91,7 +88,7 @@ public void timerRecoveryTest() throws Exception { newLogs.clear(); newLogs.addAll(fetchMethodCallLogs(proxy)); validateMethodCalls(newLogs, METHOD_NAME, 3); - }, 10000); + }, 30000); // Check that the restart actually happened by confirming the old logs are not in the new logs. for (MethodEntryTracker oldLog: logs) { diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java deleted file mode 100644 index 377d51e765..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ /dev/null @@ -1,723 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.pubsub.http; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.dapr.client.DaprClient; -import io.dapr.client.DaprClientBuilder; -import io.dapr.client.domain.BulkPublishEntry; -import io.dapr.client.domain.BulkPublishRequest; -import io.dapr.client.domain.BulkPublishResponse; -import io.dapr.client.domain.BulkSubscribeAppResponse; -import io.dapr.client.domain.BulkSubscribeAppResponseEntry; -import io.dapr.client.domain.BulkSubscribeAppResponseStatus; -import io.dapr.client.domain.CloudEvent; -import io.dapr.client.domain.HttpExtension; -import io.dapr.client.domain.Metadata; -import io.dapr.client.domain.PublishEventRequest; -import io.dapr.it.BaseIT; -import io.dapr.it.DaprRun; -import io.dapr.serializer.DaprObjectSerializer; -import io.dapr.utils.TypeRef; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Random; -import java.util.Set; - -import static io.dapr.it.Retry.callWithRetry; -import static io.dapr.it.TestUtils.assertThrowsDaprException; -import static io.dapr.it.TestUtils.assertThrowsDaprExceptionWithReason; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - - -public class PubSubIT extends BaseIT { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static final TypeRef> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() {}; - private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = new TypeRef<>() {}; - private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = new TypeRef<>() {}; - - //Number of messages to be sent: 10 - private static final int NUM_MESSAGES = 10; - - private static final String PUBSUB_NAME = "messagebus"; - //The title of the topic to be used for publishing - private static final String TOPIC_NAME = "testingtopic"; - - private static final String TOPIC_BULK = "testingbulktopic"; - private static final String TYPED_TOPIC_NAME = "typedtestingtopic"; - private static final String ANOTHER_TOPIC_NAME = "anothertopic"; - // Topic used for TTL test - private static final String TTL_TOPIC_NAME = "ttltopic"; - // Topic to test binary data - private static final String BINARY_TOPIC_NAME = "binarytopic"; - - private static final String LONG_TOPIC_NAME = "testinglongvalues"; - // Topic to test bulk subscribe. - private static final String BULK_SUB_TOPIC_NAME = "topicBulkSub"; - - private final List runs = new ArrayList<>(); - - private DaprRun closeLater(DaprRun run) { - this.runs.add(run); - return run; - } - - @AfterEach - public void tearDown() throws Exception { - for (DaprRun run : runs) { - run.stop(); - } - } - - @Test - public void publishPubSubNotFound() throws Exception { - DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - 60000)); - - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - assertThrowsDaprExceptionWithReason( - "INVALID_ARGUMENT", - "INVALID_ARGUMENT: pubsub unknown pubsub is not found", - "DAPR_PUBSUB_NOT_FOUND", - () -> client.publishEvent("unknown pubsub", "mytopic", "payload").block()); - } - } - - @Test - public void testBulkPublishPubSubNotFound() throws Exception { - DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - 60000)); - - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - assertThrowsDaprException( - "INVALID_ARGUMENT", - "INVALID_ARGUMENT: pubsub unknown pubsub is not found", - () -> client.publishEvents("unknown pubsub", "mytopic","text/plain", "message").block()); - } - } - - @Test - public void testBulkPublish() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - DaprObjectSerializer serializer = new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(o); - } - - @Override - public T deserialize(byte[] data, TypeRef type) throws IOException { - return (T) OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); - } - - @Override - public String getContentType() { - return "application/json"; - } - }; - try (DaprClient client = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).build()) { - // Only for the gRPC test - // Send a multiple messages on one topic in messagebus pubsub via publishEvents API. - List messages = new ArrayList<>(); - for (int i = 0; i < NUM_MESSAGES; i++) { - messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK)); - } - //Publishing 10 messages - BulkPublishResponse response = client.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block(); - System.out.println(String.format("Published %d messages to topic '%s' pubsub_name '%s'", - NUM_MESSAGES, TOPIC_BULK, PUBSUB_NAME)); - assertNotNull(response, "expected not null bulk publish response"); - assertEquals( 0, response.getFailedEntries().size(), "expected no failures in the response"); - - //Publishing an object. - MyObject object = new MyObject(); - object.setId("123"); - response = client.publishEvents(PUBSUB_NAME, TOPIC_BULK, - "application/json", Collections.singletonList(object)).block(); - System.out.println("Published one object."); - assertNotNull(response, "expected not null bulk publish response"); - assertEquals(0, response.getFailedEntries().size(), "expected no failures in the response"); - - //Publishing a single byte: Example of non-string based content published - client.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", - Collections.singletonList(new byte[]{1})).block(); - System.out.println("Published one byte."); - - assertNotNull(response, "expected not null bulk publish response"); - assertEquals(0, response.getFailedEntries().size(), "expected no failures in the response"); - - CloudEvent cloudEvent = new CloudEvent(); - cloudEvent.setId("1234"); - cloudEvent.setData("message from cloudevent"); - cloudEvent.setSource("test"); - cloudEvent.setSpecversion("1"); - cloudEvent.setType("myevent"); - cloudEvent.setDatacontenttype("text/plain"); - BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_BULK, - Collections.singletonList( - new BulkPublishEntry<>("1", cloudEvent, "application/cloudevents+json", null) - )); - - //Publishing a cloud event. - client.publishEvents(req).block(); - assertNotNull(response, "expected not null bulk publish response"); - assertEquals(0, response.getFailedEntries().size(), "expected no failures in the response"); - - System.out.println("Published one cloud event."); - - // Introduce sleep - Thread.sleep(10000); - - // Check messagebus subscription for topic testingbulktopic since it is populated only by publishEvents API call - callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_BULK + " in pubsub " + PUBSUB_NAME); - // Validate text payload. - final List cloudEventMessages = client.invokeMethod( - daprRun.getAppName(), - "messages/redis/testingbulktopic", - null, - HttpExtension.GET, - CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(13, cloudEventMessages.size(), "expected 13 messages to be received on subscribe"); - for (int i = 0; i < NUM_MESSAGES; i++) { - final int messageId = i; - assertTrue(cloudEventMessages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, TOPIC_BULK))) - .count() == 1, "expected data content to match"); - } - - // Validate object payload. - assertTrue(cloudEventMessages - .stream() - .filter(m -> m.getData() != null) - .filter(m -> m.getData() instanceof LinkedHashMap) - .map(m -> (LinkedHashMap) m.getData()) - .filter(m -> "123".equals(m.get("id"))) - .count() == 1, "expected data content 123 to match"); - - // Validate byte payload. - assertTrue(cloudEventMessages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> "AQ==".equals(m)) - .count() == 1, "expected bin data to match"); - - // Validate cloudevent payload. - assertTrue( cloudEventMessages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> "message from cloudevent".equals(m)) - .count() == 1, "expected data to match"); - }, 2000); - } - - } - - @Test - public void testPubSub() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - - DaprObjectSerializer serializer = new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(o); - } - - @Override - public T deserialize(byte[] data, TypeRef type) throws IOException { - return (T) OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); - } - - @Override - public String getContentType() { - return "application/json"; - } - }; - - // Send a batch of messages on one topic - try (DaprClient client = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).build()) { - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME); - //Publishing messages - client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); - System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME)); - } - - // Send a batch of different messages on the other. - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME); - //Publishing messages - client.publishEvent(PUBSUB_NAME, ANOTHER_TOPIC_NAME, message).block(); - System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME)); - } - - //Publishing an object. - MyObject object = new MyObject(); - object.setId("123"); - client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block(); - System.out.println("Published one object."); - - client.publishEvent(PUBSUB_NAME, TYPED_TOPIC_NAME, object).block(); - System.out.println("Published another object."); - - //Publishing a single byte: Example of non-string based content published - client.publishEvent( - PUBSUB_NAME, - TOPIC_NAME, - new byte[]{1}).block(); - System.out.println("Published one byte."); - - CloudEvent cloudEvent = new CloudEvent(); - cloudEvent.setId("1234"); - cloudEvent.setData("message from cloudevent"); - cloudEvent.setSource("test"); - cloudEvent.setSpecversion("1"); - cloudEvent.setType("myevent"); - cloudEvent.setDatacontenttype("text/plain"); - - //Publishing a cloud event. - client.publishEvent(new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEvent) - .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event."); - - { - CloudEvent cloudEventV2 = new CloudEvent(); - cloudEventV2.setId("2222"); - cloudEventV2.setData("message from cloudevent v2"); - cloudEventV2.setSource("test"); - cloudEventV2.setSpecversion("1"); - cloudEventV2.setType("myevent.v2"); - cloudEventV2.setDatacontenttype("text/plain"); - client.publishEvent( - new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2) - .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event for v2."); - } - - { - CloudEvent cloudEventV3 = new CloudEvent(); - cloudEventV3.setId("3333"); - cloudEventV3.setData("message from cloudevent v3"); - cloudEventV3.setSource("test"); - cloudEventV3.setSpecversion("1"); - cloudEventV3.setType("myevent.v3"); - cloudEventV3.setDatacontenttype("text/plain"); - client.publishEvent( - new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3) - .setContentType("application/cloudevents+json")).block(); - System.out.println("Published one cloud event for v3."); - } - - Thread.sleep(2000); - - callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME); - // Validate text payload. - final List messages = client.invokeMethod( - daprRun.getAppName(), - "messages/testingtopic", - null, - HttpExtension.GET, - CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(13, messages.size()); - for (int i = 0; i < NUM_MESSAGES; i++) { - final int messageId = i; - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, TOPIC_NAME))) - .count() == 1); - } - - // Validate object payload. - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .filter(m -> m.getData() instanceof LinkedHashMap) - .map(m -> (LinkedHashMap)m.getData()) - .filter(m -> "123".equals(m.get("id"))) - .count() == 1); - - // Validate byte payload. - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> "AQ==".equals(m)) - .count() == 1); - - // Validate cloudevent payload. - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> "message from cloudevent".equals(m)) - .count() == 1); - }, 2000); - - callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME + " V2"); - // Validate text payload. - final List messages = client.invokeMethod( - daprRun.getAppName(), - "messages/testingtopicV2", - null, - HttpExtension.GET, - CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(1, messages.size()); - }, 2000); - - callWithRetry(() -> { - System.out.println("Checking results for topic " + TOPIC_NAME + " V3"); - // Validate text payload. - final List messages = client.invokeMethod( - daprRun.getAppName(), - "messages/testingtopicV3", - null, - HttpExtension.GET, - CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(1, messages.size()); - }, 2000); - - callWithRetry(() -> { - System.out.println("Checking results for topic " + TYPED_TOPIC_NAME); - // Validate object payload. - final List> messages = client.invokeMethod( - daprRun.getAppName(), - "messages/typedtestingtopic", - null, - HttpExtension.GET, - CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF).block(); - - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .filter(m -> m.getData() instanceof MyObject) - .map(m -> (MyObject)m.getData()) - .filter(m -> "123".equals(m.getId())) - .count() == 1); - }, 2000); - - callWithRetry(() -> { - System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME); - final List messages = client.invokeMethod( - daprRun.getAppName(), - "messages/anothertopic", - null, - HttpExtension.GET, - CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(10, messages.size()); - - for (int i = 0; i < NUM_MESSAGES; i++) { - final int messageId = i; - assertTrue(messages - .stream() - .filter(m -> m.getData() != null) - .map(m -> m.getData()) - .filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, ANOTHER_TOPIC_NAME))) - .count() == 1); - } - }, 2000); - } - } - - @Test - public void testPubSubBinary() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - - DaprObjectSerializer serializer = new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) { - return (byte[])o; - } - - @Override - public T deserialize(byte[] data, TypeRef type) { - return (T) data; - } - - @Override - public String getContentType() { - return "application/octet-stream"; - } - }; - try (DaprClient client = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).build()) { - client.publishEvent( - PUBSUB_NAME, - BINARY_TOPIC_NAME, - new byte[]{1}).block(); - System.out.println("Published one byte."); - } - - Thread.sleep(3000); - - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - callWithRetry(() -> { - System.out.println("Checking results for topic " + BINARY_TOPIC_NAME); - final List messages = client.invokeMethod( - daprRun.getAppName(), - "messages/binarytopic", - null, - HttpExtension.GET, CLOUD_EVENT_LIST_TYPE_REF).block(); - assertEquals(1, messages.size()); - assertNull(messages.get(0).getData()); - assertArrayEquals(new byte[]{1}, messages.get(0).getBinaryData()); - }, 2000); - } - } - - @Test - public void testPubSubTTLMetadata() throws Exception { - DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - 60000)); - - // Send a batch of messages on one topic, all to be expired in 1 second. - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME); - //Publishing messages - client.publishEvent( - PUBSUB_NAME, - TTL_TOPIC_NAME, - message, - Map.of(Metadata.TTL_IN_SECONDS, "1")).block(); - System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME)); - } - } - - daprRun.stop(); - - // Sleeps for two seconds to let them expire. - Thread.sleep(2000); - - daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - - // Sleeps for five seconds to give subscriber a chance to receive messages. - Thread.sleep(5000); - - final String appId = daprRun.getAppName(); - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - callWithRetry(() -> { - System.out.println("Checking results for topic " + TTL_TOPIC_NAME); - final List messages = client.invokeMethod(appId, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block(); - assertEquals(0, messages.size()); - }, 2000); - } - - daprRun.stop(); - } - - @Test - public void testPubSubBulkSubscribe() throws Exception { - DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - - // Send a batch of messages on one topic. - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d on topic %s", i, BULK_SUB_TOPIC_NAME); - // Publishing messages - client.publishEvent(PUBSUB_NAME, BULK_SUB_TOPIC_NAME, message).block(); - System.out.printf("Published message: '%s' to topic '%s' pubSub_name '%s'\n", - message, BULK_SUB_TOPIC_NAME, PUBSUB_NAME); - } - } - - // Sleeps for five seconds to give subscriber a chance to receive messages. - Thread.sleep(5000); - - final String appId = daprRun.getAppName(); - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - callWithRetry(() -> { - System.out.println("Checking results for topic " + BULK_SUB_TOPIC_NAME); - - @SuppressWarnings("unchecked") - Class> clazz = (Class) List.class; - - final List messages = client.invokeMethod( - appId, - "messages/" + BULK_SUB_TOPIC_NAME, - null, - HttpExtension.GET, - clazz).block(); - - assertNotNull(messages); - BulkSubscribeAppResponse response = OBJECT_MAPPER.convertValue(messages.get(0), BulkSubscribeAppResponse.class); - - // There should be a single bulk response. - assertEquals(1, messages.size()); - - // The bulk response should contain NUM_MESSAGES entries. - assertEquals(NUM_MESSAGES, response.getStatuses().size()); - - // All the entries should be SUCCESS. - for (BulkSubscribeAppResponseEntry entry : response.getStatuses()) { - assertEquals(entry.getStatus(), BulkSubscribeAppResponseStatus.SUCCESS); - } - }, 2000); - } - - daprRun.stop(); - } - - @Test - public void testLongValues() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - SubscriberService.SUCCESS_MESSAGE, - SubscriberService.class, - true, - 60000)); - - Random random = new Random(590518626939830271L); - Set values = new HashSet<>(); - values.add(new ConvertToLong().setVal(590518626939830271L)); - ConvertToLong val; - for (int i = 0; i < NUM_MESSAGES - 1; i++) { - do { - val = new ConvertToLong().setVal(random.nextLong()); - } while (values.contains(val)); - values.add(val); - } - Iterator valuesIt = values.iterator(); - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - for (int i = 0; i < NUM_MESSAGES; i++) { - ConvertToLong value = valuesIt.next(); - System.out.println("The long value sent " + value.getValue()); - //Publishing messages - client.publishEvent( - PUBSUB_NAME, - LONG_TOPIC_NAME, - value, - Map.of(Metadata.TTL_IN_SECONDS, "30")).block(); - - try { - Thread.sleep((long) (1000 * Math.random())); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - return; - } - } - } - - Set actual = new HashSet<>(); - try (DaprClient client = daprRun.newDaprClientBuilder().build()) { - callWithRetry(() -> { - System.out.println("Checking results for topic " + LONG_TOPIC_NAME); - final List> messages = client.invokeMethod( - daprRun.getAppName(), - "messages/testinglongvalues", - null, - HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block(); - assertNotNull(messages); - for (CloudEvent message : messages) { - actual.add(message.getData()); - } - Assertions.assertEquals(values, actual); - }, 2000); - } - } - - public static class MyObject { - private String id; - - public String getId() { - return this.id; - } - - public void setId(String id) { - this.id = id; - } - } - - public static class ConvertToLong { - private Long value; - - public ConvertToLong setVal(Long value) { - this.value = value; - return this; - } - - public Long getValue() { - return value; - } - - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ConvertToLong that = (ConvertToLong) o; - return Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(value); - } - } - -} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java deleted file mode 100644 index 9fc5df3ee2..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.pubsub.http; - -import io.dapr.Rule; -import io.dapr.Topic; -import io.dapr.client.domain.BulkSubscribeAppResponse; -import io.dapr.client.domain.BulkSubscribeAppResponseEntry; -import io.dapr.client.domain.BulkSubscribeAppResponseStatus; -import io.dapr.client.domain.BulkSubscribeMessage; -import io.dapr.client.domain.BulkSubscribeMessageEntry; -import io.dapr.client.domain.CloudEvent; -import io.dapr.springboot.annotations.BulkSubscribe; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; - -/** - * SpringBoot Controller to handle input binding. - */ -@RestController -public class SubscriberController { - - private final Map>> messagesByTopic = Collections.synchronizedMap(new HashMap<>()); - - @GetMapping(path = "/messages/{topic}") - public List> getMessagesByTopic(@PathVariable("topic") String topic) { - return messagesByTopic.getOrDefault(topic, Collections.emptyList()); - } - - private static final List messagesReceivedBulkPublishTopic = new ArrayList(); - private static final List messagesReceivedTestingTopic = new ArrayList(); - private static final List messagesReceivedTestingTopicV2 = new ArrayList(); - private static final List messagesReceivedTestingTopicV3 = new ArrayList(); - private static final List responsesReceivedTestingTopicBulkSub = new ArrayList<>(); - - @GetMapping(path = "/messages/redis/testingbulktopic") - public List getMessagesReceivedBulkTopic() { - return messagesReceivedBulkPublishTopic; - } - - - - @GetMapping(path = "/messages/testingtopic") - public List getMessagesReceivedTestingTopic() { - return messagesReceivedTestingTopic; - } - - @GetMapping(path = "/messages/testingtopicV2") - public List getMessagesReceivedTestingTopicV2() { - return messagesReceivedTestingTopicV2; - } - - @GetMapping(path = "/messages/testingtopicV3") - public List getMessagesReceivedTestingTopicV3() { - return messagesReceivedTestingTopicV3; - } - - @GetMapping(path = "/messages/topicBulkSub") - public List getMessagesReceivedTestingTopicBulkSub() { - return responsesReceivedTestingTopicBulkSub; - } - - @Topic(name = "testingtopic", pubsubName = "messagebus") - @PostMapping("/route1") - public Mono handleMessage(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesReceivedTestingTopic.add(envelope); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "testingbulktopic", pubsubName = "messagebus") - @PostMapping("/route1_redis") - public Mono handleBulkTopicMessage(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesReceivedBulkPublishTopic.add(envelope); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "testingtopic", pubsubName = "messagebus", - rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2)) - @PostMapping(path = "/route1_v2") - public Mono handleMessageV2(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesReceivedTestingTopicV2.add(envelope); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "testingtopic", pubsubName = "messagebus", - rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1)) - @PostMapping(path = "/route1_v3") - public Mono handleMessageV3(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesReceivedTestingTopicV3.add(envelope); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "typedtestingtopic", pubsubName = "messagebus") - @PostMapping(path = "/route1b") - public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String id = envelope.getData() == null ? "" : envelope.getData().getId(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Testing typed topic Subscriber got message with ID: " + id + "; Content-type: " + contentType); - messagesByTopic.compute("typedtestingtopic", merge(envelope)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "binarytopic", pubsubName = "messagebus") - @PostMapping(path = "/route2") - public Mono handleBinaryMessage(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); - System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesByTopic.compute("binarytopic", merge(envelope)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "#{'another'.concat('topic')}", pubsubName = "${pubsubName:messagebus}") - @PostMapping(path = "/route3") - public Mono handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - System.out.println("Another topic Subscriber got message: " + message); - messagesByTopic.compute("anothertopic", merge(envelope)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "ttltopic", pubsubName = "messagebus") - @PostMapping(path = "/route4") - public Mono handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) { - return Mono.fromRunnable(() -> { - try { - String message = envelope.getData() == null ? "" : envelope.getData().toString(); - System.out.println("TTL topic Subscriber got message: " + message); - messagesByTopic.compute("ttltopic", merge(envelope)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Topic(name = "testinglongvalues", pubsubName = "messagebus") - @PostMapping(path = "/testinglongvalues") - public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { - return Mono.fromRunnable(() -> { - try { - Long message = cloudEvent.getData().getValue(); - System.out.println("Subscriber got: " + message); - messagesByTopic.compute("testinglongvalues", merge(cloudEvent)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - /** - * Receive messages using the bulk subscribe API. - * The maxBulkSubCount and maxBulkSubAwaitDurationMs are adjusted to ensure - * that all the test messages arrive in a single batch. - * - * @param bulkMessage incoming bulk of messages from the message bus. - * @return status for each message received. - */ - @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 5000) - @Topic(name = "topicBulkSub", pubsubName = "messagebus") - @PostMapping(path = "/routeBulkSub") - public Mono handleMessageBulk( - @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { - return Mono.fromCallable(() -> { - if (bulkMessage.getEntries().size() == 0) { - BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>()); - responsesReceivedTestingTopicBulkSub.add(response); - return response; - } - - List entries = new ArrayList<>(); - for (BulkSubscribeMessageEntry entry: bulkMessage.getEntries()) { - try { - System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId()); - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); - } catch (Exception e) { - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); - } - } - BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries); - responsesReceivedTestingTopicBulkSub.add(response); - return response; - }); - } - - private BiFunction>, List>> merge(final CloudEvent item) { - return (key, value) -> { - final List> list = value == null ? new ArrayList<>() : value; - list.add(item); - return list; - }; - } - - @GetMapping(path = "/health") - public void health() { - } -} diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java deleted file mode 100644 index 8667b2956e..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberService.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.pubsub.http; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -/** - * Service for subscriber. - */ -@SpringBootApplication -public class SubscriberService { - - public static final String SUCCESS_MESSAGE = "Completed initialization in"; - - public static void main(String[] args) throws Exception { - int port = Integer.parseInt(args[0]); - - System.out.printf("Service starting on port %d ...\n", port); - - // Start Dapr's callback endpoint. - start(port); - } - - /** - * Starts Dapr's callback in a given port. - * - * @param port Port to listen to. - */ - private static void start(int port) { - SpringApplication app = new SpringApplication(SubscriberService.class); - app.run(String.format("--server.port=%d", port)); - } - -} \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java deleted file mode 100644 index 334bc5232a..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.pubsub.stream; - -import io.dapr.client.DaprClient; -import io.dapr.client.DaprPreviewClient; -import io.dapr.client.SubscriptionListener; -import io.dapr.client.domain.CloudEvent; -import io.dapr.it.BaseIT; -import io.dapr.it.DaprRun; -import io.dapr.utils.TypeRef; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; - -import static io.dapr.it.Retry.callWithRetry; -import static org.junit.jupiter.api.Assertions.assertEquals; - - -public class PubSubStreamIT extends BaseIT { - - // Must be a large enough number, so we validate that we get more than the initial batch - // sent by the runtime. When this was first added, the batch size in runtime was set to 10. - private static final int NUM_MESSAGES = 100; - private static final String TOPIC_NAME = "stream-topic"; - private static final String TOPIC_NAME_FLUX = "stream-topic-flux"; - private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent"; - private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload"; - private static final String PUBSUB_NAME = "messagebus"; - - private final List runs = new ArrayList<>(); - - private DaprRun closeLater(DaprRun run) { - this.runs.add(run); - return run; - } - - @AfterEach - public void tearDown() throws Exception { - for (DaprRun run : runs) { - run.stop(); - } - } - - @Test - public void testPubSub() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName(), - 60000)); - - var runId = UUID.randomUUID().toString(); - try (DaprClient client = daprRun.newDaprClient(); - DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("This is message #%d on topic %s for run %s", i, TOPIC_NAME, runId); - //Publishing messages - client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); - System.out.println( - String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME)); - } - - System.out.println("Starting subscription for " + TOPIC_NAME); - - Set messages = Collections.synchronizedSet(new HashSet<>()); - Set errors = Collections.synchronizedSet(new HashSet<>()); - - var random = new Random(37); // predictable random. - var listener = new SubscriptionListener() { - @Override - public Mono onEvent(CloudEvent event) { - return Mono.fromCallable(() -> { - // Useful to avoid false negatives running locally multiple times. - if (event.getData().contains(runId)) { - // 5% failure rate. - var decision = random.nextInt(100); - if (decision < 5) { - if (decision % 2 == 0) { - throw new RuntimeException("artificial exception on message " + event.getId()); - } - return Status.RETRY; - } - - messages.add(event.getId()); - return Status.SUCCESS; - } - - return Status.DROP; - }); - } - - @Override - public void onError(RuntimeException exception) { - errors.add(exception.getMessage()); - } - - }; - try(var subscription = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, listener, TypeRef.STRING)) { - callWithRetry(() -> { - var messageCount = messages.size(); - System.out.println( - String.format("Got %d messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); - assertEquals(NUM_MESSAGES, messages.size()); - assertEquals(4, errors.size()); - }, 120000); // Time for runtime to retry messages. - - subscription.close(); - subscription.awaitTermination(); - } - } - } - - @Test - public void testPubSubFlux() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName() + "-flux", - 60000)); - - var runId = UUID.randomUUID().toString(); - try (DaprClient client = daprRun.newDaprClient(); - DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { - - // Publish messages - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("Flux message #%d for run %s", i, runId); - client.publishEvent(PUBSUB_NAME, TOPIC_NAME_FLUX, message).block(); - System.out.println( - String.format("Published flux message: '%s' to topic '%s'", message, TOPIC_NAME_FLUX)); - } - - System.out.println("Starting Flux subscription for " + TOPIC_NAME_FLUX); - - Set messages = Collections.synchronizedSet(new HashSet<>()); - - // subscribeToTopic returns Flux directly (raw data) - var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING) - .doOnNext(rawMessage -> { - // rawMessage is String directly - if (rawMessage.contains(runId)) { - messages.add(rawMessage); - System.out.println("Received raw message: " + rawMessage); - } - }) - .subscribe(); - - callWithRetry(() -> { - var messageCount = messages.size(); - System.out.println( - String.format("Got %d flux messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME_FLUX)); - assertEquals(NUM_MESSAGES, messages.size()); - }, 60000); - - disposable.dispose(); - } - } - - @Test - public void testPubSubCloudEvent() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName() + "-cloudevent", - 60000)); - - var runId = UUID.randomUUID().toString(); - try (DaprClient client = daprRun.newDaprClient(); - DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { - - // Publish messages - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("CloudEvent message #%d for run %s", i, runId); - client.publishEvent(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, message).block(); - System.out.println( - String.format("Published CloudEvent message: '%s' to topic '%s'", message, TOPIC_NAME_CLOUDEVENT)); - } - - System.out.println("Starting CloudEvent subscription for " + TOPIC_NAME_CLOUDEVENT); - - Set messageIds = Collections.synchronizedSet(new HashSet<>()); - - // Use TypeRef> to receive full CloudEvent with metadata - var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef>(){}) - .doOnNext(cloudEvent -> { - if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) { - messageIds.add(cloudEvent.getId()); - System.out.println("Received CloudEvent with ID: " + cloudEvent.getId() - + ", topic: " + cloudEvent.getTopic() - + ", data: " + cloudEvent.getData()); - } - }) - .subscribe(); - - callWithRetry(() -> { - var messageCount = messageIds.size(); - System.out.println( - String.format("Got %d CloudEvent messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME_CLOUDEVENT)); - assertEquals(NUM_MESSAGES, messageIds.size()); - }, 60000); - - disposable.dispose(); - } - } - - @Test - public void testPubSubRawPayload() throws Exception { - final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName() + "-rawpayload", - 60000)); - - var runId = UUID.randomUUID().toString(); - try (DaprClient client = daprRun.newDaprClient(); - DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { - - // Publish messages with rawPayload metadata - for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("RawPayload message #%d for run %s", i, runId); - client.publishEvent(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, message, Map.of("rawPayload", "true")).block(); - System.out.println( - String.format("Published raw payload message: '%s' to topic '%s'", message, TOPIC_NAME_RAWPAYLOAD)); - } - - System.out.println("Starting raw payload subscription for " + TOPIC_NAME_RAWPAYLOAD); - - Set messages = Collections.synchronizedSet(new HashSet<>()); - Map metadata = Map.of("rawPayload", "true"); - - // Use subscribeToTopic with rawPayload metadata - var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata) - .doOnNext(rawMessage -> { - if (rawMessage.contains(runId)) { - messages.add(rawMessage); - System.out.println("Received raw payload message: " + rawMessage); - } - }) - .subscribe(); - - callWithRetry(() -> { - var messageCount = messages.size(); - System.out.println( - String.format("Got %d raw payload messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME_RAWPAYLOAD)); - assertEquals(NUM_MESSAGES, messages.size()); - }, 60000); - - disposable.dispose(); - } - } -} diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencyIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencyIT.java index 05182f8d6c..d7f6eea96b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencyIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencyIT.java @@ -14,7 +14,8 @@ package io.dapr.it.resiliency; import com.github.tomakehurst.wiremock.client.WireMock; -import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import eu.rekawek.toxiproxy.model.ToxicDirection; @@ -35,6 +36,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tags; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Network; import org.testcontainers.toxiproxy.ToxiproxyContainer; @@ -49,7 +51,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.any; -import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; @@ -57,32 +58,25 @@ import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; import static com.github.tomakehurst.wiremock.client.WireMock.verify; -import static io.dapr.it.resiliency.SdkResiliencyIT.WIREMOCK_PORT; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; import static io.dapr.it.testcontainers.ContainerConstants.TOXI_PROXY_IMAGE_TAG; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @Testcontainers -@WireMockTest(httpPort = WIREMOCK_PORT) @Tags({@Tag("testcontainers"), @Tag("resiliency")}) public class SdkResiliencyIT { - public static final int WIREMOCK_PORT = 8888; private static final Network NETWORK = Network.newNetwork(); private static final String STATE_STORE_NAME = "kvstore"; private static final int INFINITE_RETRY = -1; - @Container - private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) - .withAppName("dapr-app") - .withAppPort(WIREMOCK_PORT) - .withDaprLogLevel(DaprLogLevel.DEBUG) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("dapr-logs"))) - .withAppHealthCheckPath("/actuator/health") - .withAppChannelAddress("host.testcontainers.internal") - .withNetworkAliases("dapr") - .withNetwork(NETWORK); + @RegisterExtension + static WireMockExtension wireMock = WireMockExtension.newInstance() + .options(WireMockConfiguration.wireMockConfig().dynamicPort()) + .build(); + + private static DaprContainer daprContainer; @Container private static final ToxiproxyContainer TOXIPROXY = new ToxiproxyContainer(TOXI_PROXY_IMAGE_TAG) @@ -91,41 +85,54 @@ public class SdkResiliencyIT { private static Proxy proxy; private void configStub() { - stubFor(any(urlMatching("/actuator/health")) + wireMock.stubFor(any(urlMatching("/actuator/health")) .willReturn(aResponse().withBody("[]").withStatus(200))); - stubFor(any(urlMatching("/dapr/subscribe")) + wireMock.stubFor(any(urlMatching("/dapr/subscribe")) .willReturn(aResponse().withBody("[]").withStatus(200))); - stubFor(get(urlMatching("/dapr/config")) + wireMock.stubFor(get(urlMatching("/dapr/config")) .willReturn(aResponse().withBody("[]").withStatus(200))); - // create a stub for simulating dapr sidecar with timeout of 1000 ms - stubFor(post(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState")) + wireMock.stubFor(post(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState")) .willReturn(aResponse().withStatus(204).withFixedDelay(1000))); - stubFor(any(urlMatching("/([a-z1-9]*)")) + wireMock.stubFor(any(urlMatching("/([a-z1-9]*)")) .willReturn(aResponse().withBody("[]").withStatus(200))); - configureFor("localhost", WIREMOCK_PORT); + WireMock.configureFor("localhost", wireMock.getPort()); } @BeforeAll static void configure() throws IOException { + int wmPort = wireMock.getPort(); + org.testcontainers.Testcontainers.exposeHostPorts(wmPort); + + daprContainer = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("dapr-app") + .withAppPort(wmPort) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("dapr-logs"))) + .withAppHealthCheckPath("/actuator/health") + .withAppChannelAddress("host.testcontainers.internal") + .withNetworkAliases("dapr") + .withNetwork(NETWORK); + daprContainer.start(); + ToxiproxyClient toxiproxyClient = new ToxiproxyClient(TOXIPROXY.getHost(), TOXIPROXY.getControlPort()); - proxy = - toxiproxyClient.createProxy("dapr", "0.0.0.0:8666", "dapr:3500"); + proxy = toxiproxyClient.createProxy("dapr", "0.0.0.0:8666", "dapr:3500"); } @AfterAll static void afterAll() { - WireMock.shutdownServer(); + if (daprContainer != null) { + daprContainer.stop(); + } } @BeforeEach public void beforeEach() { configStub(); - org.testcontainers.Testcontainers.exposeHostPorts(WIREMOCK_PORT); } @Test @@ -189,10 +196,11 @@ public void shouldFailDueToLatencyExceedingConfigurationWithInfiniteRetry() thro @Test @DisplayName("should fail due to latency exceeding configuration with once retry") public void shouldFailDueToLatencyExceedingConfigurationWithOnceRetry() throws Exception { + int wmPort = wireMock.getPort(); DaprClient client = - new DaprClientBuilder().withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + WIREMOCK_PORT) - .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + WIREMOCK_PORT) + new DaprClientBuilder().withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + wmPort) + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + wmPort) .withResiliencyOptions(new ResiliencyOptions().setTimeout(Duration.ofMillis(900)) .setMaxRetries(1)) .build(); @@ -202,7 +210,7 @@ public void shouldFailDueToLatencyExceedingConfigurationWithOnceRetry() throws E } catch (Exception ignored) { } - verify(2, postRequestedFor(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState"))); + wireMock.verify(2, postRequestedFor(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState"))); client.close(); } diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java deleted file mode 100644 index bdd25ae780..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.state; - -import io.dapr.it.BaseIT; -import io.dapr.it.DaprRun; -import io.dapr.v1.DaprGrpc; -import io.dapr.v1.DaprStateProtos; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class HelloWorldClientIT extends BaseIT { - - @Test - public void testHelloWorldState() throws Exception { - DaprRun daprRun = startDaprApp( - HelloWorldClientIT.class.getSimpleName(), - HelloWorldGrpcStateService.SUCCESS_MESSAGE, - HelloWorldGrpcStateService.class, - false, - 2000 - ); - try (var client = daprRun.newDaprClientBuilder().build()) { - var stub = client.newGrpcStub("n/a", DaprGrpc::newBlockingStub); - - String key = "mykey"; - { - DaprStateProtos.GetStateRequest req = DaprStateProtos.GetStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - DaprStateProtos.GetStateResponse response = stub.getState(req); - String value = response.getData().toStringUtf8(); - System.out.println("Got: " + value); - Assertions.assertEquals("Hello World", value); - } - - // Then, delete it. - { - DaprStateProtos.DeleteStateRequest req = DaprStateProtos.DeleteStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - stub.deleteState(req); - System.out.println("Deleted!"); - } - - { - DaprStateProtos.GetStateRequest req = DaprStateProtos.GetStateRequest - .newBuilder() - .setStoreName(STATE_STORE_NAME) - .setKey(key) - .build(); - DaprStateProtos.GetStateResponse response = stub.getState(req); - String value = response.getData().toStringUtf8(); - System.out.println("Got: " + value); - Assertions.assertEquals("", value); - } - } - } -} diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java deleted file mode 100644 index abab918be7..0000000000 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.state; - -import com.google.protobuf.ByteString; -import io.dapr.client.DaprClientBuilder; -import io.dapr.config.Properties; -import io.dapr.internal.grpc.DaprClientGrpcInterceptors; -import io.dapr.v1.CommonProtos.StateItem; -import io.dapr.v1.DaprGrpc; -import io.dapr.v1.DaprGrpc.DaprBlockingStub; -import io.dapr.v1.DaprStateProtos.SaveStateRequest; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; - - -/** - * Simple example. - * To run manually, from repo root: - * 1. mvn clean install - * 2. dapr run --resources-path ./components --dapr-grpc-port 50001 -- mvn exec:java -Dexec.mainClass=io.dapr.it.state.HelloWorldGrpcStateService -Dexec.classpathScope="test" -pl=sdk - */ -public class HelloWorldGrpcStateService { - - public static final String SUCCESS_MESSAGE = "Hello from " + HelloWorldGrpcStateService.class.getSimpleName(); - - public static void main(String[] args) { - String grpcPort = System.getenv("DAPR_GRPC_PORT"); - - // If port string is not valid, it will throw an exception. - int grpcPortInt = Integer.parseInt(grpcPort); - ManagedChannel channel = ManagedChannelBuilder.forAddress( - Properties.SIDECAR_IP.get(), grpcPortInt).usePlaintext().build(); - DaprClientGrpcInterceptors interceptors = new DaprClientGrpcInterceptors( - Properties.API_TOKEN.get(), null); - DaprBlockingStub client = interceptors.intercept(DaprGrpc.newBlockingStub(channel)); - - String key = "mykey"; - // First, write key-value pair. - - String value = "Hello World"; - StateItem req = StateItem - .newBuilder() - .setKey(key) - .setValue(ByteString.copyFromUtf8(value)) - .build(); - SaveStateRequest state = SaveStateRequest.newBuilder() - .setStoreName("statestore") - .addStates(req) - .build(); - client.saveState(state); - System.out.println("Saved!"); - channel.shutdown(); - - System.out.println(SUCCESS_MESSAGE); - } -} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java index 2694e32e5d..79a1d3485a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/jobs/DaprJobsIT.java @@ -25,6 +25,7 @@ import io.dapr.it.testcontainers.DaprClientConfiguration; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; +import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -43,9 +44,11 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.Random; +import java.util.UUID; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; @SpringBootTest( webEnvironment = WebEnvironment.RANDOM_PORT, @@ -93,63 +96,77 @@ public void setUp(){ @Test public void testJobScheduleCreationWithDueTime() { + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); - Instant currentTime = Instant.now(); - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime).setOverwrite(true)).block(); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime)).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); } @Test public void testJobScheduleCreationWithSchedule() { + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); - Instant currentTime = Instant.now(); - daprClient.scheduleJob(new ScheduleJobRequest("Job", JobSchedule.hourly()) - .setDueTime(currentTime).setOverwrite(true)).block(); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); + daprClient.scheduleJob(new ScheduleJobRequest(jobName, JobSchedule.hourly()) + .setDueTime(currentTime)).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); } @Test public void testJobScheduleCreationWithAllParameters() { - Instant currentTime = Instant.now(); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setOverwrite(true) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); assertEquals(Integer.valueOf(3), getJobResponse.getRepeats()); assertEquals("Job data", new String(getJobResponse.getData())); assertEquals(iso8601Formatter.format(currentTime.plus(2, ChronoUnit.HOURS)), @@ -158,36 +175,38 @@ public void testJobScheduleCreationWithAllParameters() { @Test public void testJobScheduleCreationWithDropFailurePolicy() { - Instant currentTime = Instant.now(); - DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - .withZone(ZoneOffset.UTC); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setFailurePolicy(new DropFailurePolicy()) + .setFailurePolicy(new DropFailurePolicy()) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType()); } @Test public void testJobScheduleCreationWithConstantFailurePolicy() { - Instant currentTime = Instant.now(); - DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - .withZone(ZoneOffset.UTC); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) @@ -195,11 +214,15 @@ public void testJobScheduleCreationWithConstantFailurePolicy() { .setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS))) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy(); assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType()); assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries()); @@ -209,17 +232,23 @@ public void testJobScheduleCreationWithConstantFailurePolicy() { @Test public void testDeleteJobRequest() { - Instant currentTime = Instant.now(); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setOverwrite(true) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); + + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/ConvertToLong.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/ConvertToLong.java new file mode 100644 index 0000000000..56e10b7880 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/ConvertToLong.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +import java.util.Objects; + +public class ConvertToLong { + private Long value; + + public ConvertToLong setVal(Long value) { + this.value = value; + return this; + } + + public Long getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConvertToLong that = (ConvertToLong) o; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java index eaa0f0c99f..d48e3ae5e8 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -26,7 +26,6 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.Metadata; import io.dapr.client.domain.PublishEventRequest; -import io.dapr.it.pubsub.http.PubSubIT; import io.dapr.it.testcontainers.DaprClientFactory; import io.dapr.serializer.CustomizableObjectSerializer; import io.dapr.serializer.DaprObjectSerializer; @@ -102,10 +101,10 @@ public class DaprPubSubIT { // typeRefs private static final TypeRef> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() { }; - private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = + private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = new TypeRef<>() { }; - private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = + private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = new TypeRef<>() { }; @@ -199,7 +198,7 @@ public void testPubSub() throws Exception { sendBulkMessagesAsText(client, ANOTHER_TOPIC_NAME); //Publishing an object. - PubSubIT.MyObject object = new PubSubIT.MyObject(); + MyObject object = new MyObject(); object.setId("123"); client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block(); LOG.info("Published one object."); @@ -321,7 +320,7 @@ public void testPubSub() throws Exception { callWithRetry(() -> { LOG.info("Checking results for topic " + TYPED_TOPIC_NAME); - List> messages = client.invokeMethod( + List> messages = client.invokeMethod( PUBSUB_APP_ID, "messages/typedtestingtopic", null, @@ -332,8 +331,8 @@ public void testPubSub() throws Exception { assertThat(messages) .extracting(CloudEvent::getData) .filteredOn(Objects::nonNull) - .filteredOn(PubSubIT.MyObject.class::isInstance) - .map(PubSubIT.MyObject::getId) + .filteredOn(MyObject.class::isInstance) + .map(MyObject::getId) .contains("123"); }, 2000); @@ -408,9 +407,9 @@ private static void sendBulkMessagesAsText(DaprClient client, String topicName) } private void publishMyObjectAsserting(DaprClient client) { - PubSubIT.MyObject object = new PubSubIT.MyObject(); + MyObject object = new MyObject(); object.setId("123"); - BulkPublishResponse response = client.publishEvents( + BulkPublishResponse response = client.publishEvents( PUBSUB_NAME, TOPIC_BULK, "application/json", @@ -542,19 +541,19 @@ public void testPubSubTTLMetadata() throws Exception { public void testLongValues() throws Exception { Random random = new Random(590518626939830271L); - Set values = new HashSet<>(); - values.add(new PubSubIT.ConvertToLong().setVal(590518626939830271L)); - PubSubIT.ConvertToLong val; + Set values = new HashSet<>(); + values.add(new ConvertToLong().setVal(590518626939830271L)); + ConvertToLong val; for (int i = 0; i < NUM_MESSAGES - 1; i++) { do { - val = new PubSubIT.ConvertToLong().setVal(random.nextLong()); + val = new ConvertToLong().setVal(random.nextLong()); } while (values.contains(val)); values.add(val); } - Iterator valuesIt = values.iterator(); + Iterator valuesIt = values.iterator(); try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { for (int i = 0; i < NUM_MESSAGES; i++) { - PubSubIT.ConvertToLong value = valuesIt.next(); + ConvertToLong value = valuesIt.next(); LOG.info("The long value sent " + value.getValue()); //Publishing messages client.publishEvent( @@ -573,17 +572,17 @@ public void testLongValues() throws Exception { } } - Set actual = new HashSet<>(); + Set actual = new HashSet<>(); try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { callWithRetry(() -> { LOG.info("Checking results for topic " + LONG_TOPIC_NAME); - final List> messages = client.invokeMethod( + final List> messages = client.invokeMethod( PUBSUB_APP_ID, "messages/testinglongvalues", null, HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block(); assertNotNull(messages); - for (CloudEvent message : messages) { + for (CloudEvent message : messages) { actual.add(message.getData()); } assertThat(values).containsAll(actual); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/MyObject.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/MyObject.java new file mode 100644 index 0000000000..019c537727 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/MyObject.java @@ -0,0 +1,25 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +public class MyObject { + private String id; + + public String getId() { + return this.id; + } + + public void setId(String id) { + this.id = id; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java index 30e9204018..1428d85788 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java @@ -21,7 +21,6 @@ import io.dapr.client.domain.BulkSubscribeMessage; import io.dapr.client.domain.BulkSubscribeMessageEntry; import io.dapr.client.domain.CloudEvent; -import io.dapr.it.pubsub.http.PubSubIT; import io.dapr.springboot.annotations.BulkSubscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +150,7 @@ public Mono handleMessageV3(@RequestBody(required = false) CloudEvent enve @Topic(name = "typedtestingtopic", pubsubName = "pubsub") @PostMapping(path = "/route1b") - public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent envelope) { + public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent envelope) { return Mono.fromRunnable(() -> { try { String id = envelope.getData() == null ? "" : envelope.getData().getId(); @@ -208,7 +207,7 @@ public Mono handleMessageTTLTopic(@RequestBody(required = false) CloudEven @Topic(name = "testinglongvalues", pubsubName = "pubsub") @PostMapping(path = "/testinglongvalues") - public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { + public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { return Mono.fromRunnable(() -> { try { Long message = cloudEvent.getData().getValue(); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java index d4139bcf91..b053b5c070 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java @@ -24,6 +24,7 @@ import io.dapr.testcontainers.wait.strategy.DaprWait; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -36,7 +37,6 @@ import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.Network; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.time.Duration; @@ -47,7 +47,7 @@ import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; -@Disabled("Unclear why this test is failing intermittently in CI") +@Disabled("Outbox event delivery via in-memory pubsub is unreliable — suspected Dapr runtime issue. See #1603") @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, classes = { @@ -70,7 +70,6 @@ public class DaprPubSubOutboxIT { private static final String TOPIC_PRODUCT_CREATED = "product.created"; private static final String STATE_STORE_NAME = "kvstore"; - @Container private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) .withAppName(PUBSUB_APP_ID) .withNetwork(DAPR_NETWORK) @@ -87,21 +86,20 @@ public class DaprPubSubOutboxIT { @Autowired private ProductWebhookController productWebhookController; - /** - * Expose the Dapr ports to the host. - * - * @param registry the dynamic property registry - */ @DynamicPropertySource static void daprProperties(DynamicPropertyRegistry registry) { - registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); - registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); registry.add("server.port", () -> PORT); } @BeforeAll - public static void beforeAll(){ + public static void beforeAll() { org.testcontainers.Testcontainers.exposeHostPorts(PORT); + DAPR_CONTAINER.start(); + } + + @AfterAll + public static void afterAll() { + DAPR_CONTAINER.stop(); } @BeforeEach @@ -128,7 +126,8 @@ public void shouldPublishUsingOutbox() throws Exception { client.executeStateTransaction(transactionRequest).block(); - Awaitility.await().atMost(Duration.ofSeconds(10)) + Awaitility.await().atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofMillis(500)) .ignoreExceptions() .untilAsserted(() -> Assertions.assertThat(productWebhookController.getEventList()).isNotEmpty()); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java new file mode 100644 index 0000000000..1c1f310643 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java @@ -0,0 +1,225 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.stream; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.SubscriptionListener; +import io.dapr.client.domain.CloudEvent; +import io.dapr.it.testcontainers.DaprClientFactory; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.utils.TypeRef; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Mono; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.dapr.it.Retry.callWithRetry; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +@Tag("testcontainers") +public class DaprPubSubStreamIT { + + private static final int NUM_MESSAGES = 100; + private static final String TOPIC_NAME = "stream-topic"; + private static final String TOPIC_NAME_FLUX = "stream-topic-flux"; + private static final String TOPIC_NAME_CLOUDEVENT = "stream-topic-cloudevent"; + private static final String TOPIC_NAME_RAWPAYLOAD = "stream-topic-rawpayload"; + private static final String PUBSUB_NAME = "pubsub"; + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("pubsub-stream-app") + .withComponent(new Component(PUBSUB_NAME, "pubsub.in-memory", "v1", Collections.emptyMap())); + + private void waitForSubscription(DaprClient client, String topic, CountDownLatch latch) throws InterruptedException { + callWithRetry(() -> { + client.publishEvent(PUBSUB_NAME, topic, "probe").block(); + try { + assertTrue(latch.await(500, TimeUnit.MILLISECONDS), "Subscription not ready for " + topic); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, 60000); + } + + @Test + public void testPubSub() throws Exception { + var runId = UUID.randomUUID().toString(); + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); + DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER) + .buildPreviewClient()) { + + Set received = Collections.synchronizedSet(new HashSet<>()); + CountDownLatch ready = new CountDownLatch(1); + + var listener = new SubscriptionListener() { + @Override + public Mono onEvent(CloudEvent event) { + return Mono.fromCallable(() -> { + ready.countDown(); + if (event.getData().contains(runId)) { + received.add(event.getId()); + return Status.SUCCESS; + } + return Status.DROP; + }); + } + + @Override + public void onError(RuntimeException exception) { + } + }; + + try (var subscription = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, listener, TypeRef.STRING)) { + waitForSubscription(client, TOPIC_NAME, ready); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s for run %s", i, TOPIC_NAME, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); + } + + callWithRetry(() -> { + assertEquals(NUM_MESSAGES, received.size(), + String.format("Got %d/%d messages for topic %s", received.size(), NUM_MESSAGES, TOPIC_NAME)); + }, 120000); + } + } + } + + @Test + public void testPubSubFlux() throws Exception { + var runId = UUID.randomUUID().toString(); + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); + DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER) + .buildPreviewClient()) { + + Set received = Collections.synchronizedSet(new HashSet<>()); + CountDownLatch ready = new CountDownLatch(1); + + var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING) + .doOnNext(rawMessage -> { + ready.countDown(); + if (rawMessage.contains(runId)) { + received.add(rawMessage); + } + }) + .subscribe(); + + waitForSubscription(client, TOPIC_NAME_FLUX, ready); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("Flux message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME_FLUX, message).block(); + } + + callWithRetry(() -> { + assertEquals(NUM_MESSAGES, received.size(), + String.format("Got %d/%d flux messages for topic %s", received.size(), NUM_MESSAGES, TOPIC_NAME_FLUX)); + }, 60000); + + disposable.dispose(); + } + } + + @Test + public void testPubSubCloudEvent() throws Exception { + var runId = UUID.randomUUID().toString(); + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); + DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER) + .buildPreviewClient()) { + + Set received = Collections.synchronizedSet(new HashSet<>()); + CountDownLatch ready = new CountDownLatch(1); + + var disposable = previewClient.subscribeToTopic( + PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef>() {}) + .doOnNext(cloudEvent -> { + ready.countDown(); + if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) { + received.add(cloudEvent.getId()); + } + }) + .subscribe(); + + waitForSubscription(client, TOPIC_NAME_CLOUDEVENT, ready); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("CloudEvent message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, message).block(); + } + + callWithRetry(() -> { + assertEquals(NUM_MESSAGES, received.size(), + String.format("Got %d/%d CloudEvent messages for topic %s", + received.size(), NUM_MESSAGES, TOPIC_NAME_CLOUDEVENT)); + }, 60000); + + disposable.dispose(); + } + } + + @Disabled("Streaming subscription with rawPayload metadata not supported by pubsub.in-memory") + @Test + public void testPubSubRawPayload() throws Exception { + var runId = UUID.randomUUID().toString(); + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build(); + DaprPreviewClient previewClient = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER) + .buildPreviewClient()) { + + Set received = Collections.synchronizedSet(new HashSet<>()); + Map metadata = Map.of("rawPayload", "true"); + CountDownLatch ready = new CountDownLatch(1); + + var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata) + .doOnNext(rawMessage -> { + ready.countDown(); + if (rawMessage.contains(runId)) { + received.add(rawMessage); + } + }) + .subscribe(); + + waitForSubscription(client, TOPIC_NAME_RAWPAYLOAD, ready); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("RawPayload message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, message, Map.of("rawPayload", "true")).block(); + } + + callWithRetry(() -> { + assertEquals(NUM_MESSAGES, received.size(), + String.format("Got %d/%d raw payload messages for topic %s", + received.size(), NUM_MESSAGES, TOPIC_NAME_RAWPAYLOAD)); + }, 60000); + + disposable.dispose(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/secrets/DaprSecretsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/secrets/DaprSecretsIT.java new file mode 100644 index 0000000000..3f71938286 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/secrets/DaprSecretsIT.java @@ -0,0 +1,148 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.secrets; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.config.Properties; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.testcontainers.MetadataEntry; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the Dapr Secrets API using testcontainers. + */ +@Disabled("Needs investigation: DaprContainer file mounting with secretstores.local.file") +@Testcontainers +@Tag("testcontainers") +public class DaprSecretsIT { + + private static final String SECRETS_STORE_NAME = "localSecretStore"; + private static final String CONTAINER_SECRETS_PATH = "/tmp/secrets.json"; + private static final ObjectMapper JSON_SERIALIZER = new ObjectMapper(); + + private static final String KEY1 = "movie"; + private static final String KEY2 = "person"; + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + private static DaprClient daprClient; + + private static final String SECRETS_JSON = createSecretsJson(); + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("secrets-test-app") + .withNetwork(DAPR_NETWORK) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withComponent(new Component( + SECRETS_STORE_NAME, + "secretstores.local.file", + "v1", + List.of(new MetadataEntry("secretsFile", CONTAINER_SECRETS_PATH)) + )) + .withCopyToContainer(Transferable.of(SECRETS_JSON), CONTAINER_SECRETS_PATH) + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())); + + private static String createSecretsJson() { + try { + Map secrets = new HashMap<>(); + Map movieSecret = new HashMap<>(); + movieSecret.put("title", "The Metrics IV"); + movieSecret.put("year", "2020"); + secrets.put(KEY1, movieSecret); + + Map personSecret = new HashMap<>(); + personSecret.put("name", "Jon Doe"); + secrets.put(KEY2, personSecret); + + return JSON_SERIALIZER.writeValueAsString(secrets); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @BeforeAll + static void setUp() { + daprClient = new DaprClientBuilder() + .withPropertyOverride(Properties.HTTP_ENDPOINT, DAPR_CONTAINER.getHttpEndpoint()) + .withPropertyOverride(Properties.GRPC_ENDPOINT, DAPR_CONTAINER.getGrpcEndpoint()) + .build(); + } + + @AfterAll + static void tearDown() throws Exception { + if (daprClient != null) { + daprClient.close(); + } + } + + @Test + public void testGetSecret() { + Map data = daprClient.getSecret(SECRETS_STORE_NAME, KEY1).block(); + + assertNotNull(data); + assertEquals(2, data.size()); + assertEquals("The Metrics IV", data.get("title")); + assertEquals("2020", data.get("year")); + } + + @Test + public void testGetBulkSecret() { + Map> data = daprClient.getBulkSecret(SECRETS_STORE_NAME).block(); + + assertNotNull(data); + assertTrue(data.size() >= 2); + assertEquals(2, data.get(KEY1).size()); + assertEquals("The Metrics IV", data.get(KEY1).get("title")); + assertEquals("2020", data.get(KEY1).get("year")); + assertEquals(1, data.get(KEY2).size()); + assertEquals("Jon Doe", data.get(KEY2).get("name")); + } + + @Test + public void testGetSecretKeyNotFound() { + assertThrows(RuntimeException.class, () -> + daprClient.getSecret(SECRETS_STORE_NAME, "unknownKey").block() + ); + } + + @Test + public void testGetSecretStoreNotFound() { + assertThrows(RuntimeException.class, () -> + daprClient.getSecret("unknownStore", "unknownKey").block() + ); + } +} diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java index 2f85128474..c00c5c952d 100644 --- a/sdk/src/main/java/io/dapr/client/Subscription.java +++ b/sdk/src/main/java/io/dapr/client/Subscription.java @@ -88,6 +88,7 @@ public class Subscription implements Closeable { }); this.receiver = new Thread(() -> { + int reconnectAttempts = 0; while (running.get()) { var stream = asyncStub.subscribeTopicEventsAlpha1(new StreamObserver<>() { @Override @@ -124,6 +125,7 @@ public void onNext(DaprPubsubProtos.SubscribeTopicEventsResponseAlpha1 topicEven @Override public void onError(Throwable throwable) { listener.onError(DaprException.propagate(throwable)); + receiverStateChange.release(); } @Override @@ -142,6 +144,17 @@ public void onCompleted() { Thread.currentThread().interrupt(); running.set(false); } + + if (running.get()) { + long backoffMs = Math.min(1000L * (1L << reconnectAttempts), 30000L); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + running.set(false); + } + reconnectAttempts++; + } } }); } diff --git a/spring-boot-4-sdk-tests/src/test/java/io/dapr/it/springboot4/testcontainers/jobs/DaprJobsIT.java b/spring-boot-4-sdk-tests/src/test/java/io/dapr/it/springboot4/testcontainers/jobs/DaprJobsIT.java index 686c7eb01f..92f2d09aa5 100644 --- a/spring-boot-4-sdk-tests/src/test/java/io/dapr/it/springboot4/testcontainers/jobs/DaprJobsIT.java +++ b/spring-boot-4-sdk-tests/src/test/java/io/dapr/it/springboot4/testcontainers/jobs/DaprJobsIT.java @@ -25,6 +25,7 @@ import io.dapr.it.springboot4.testcontainers.DaprClientConfiguration; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; +import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -43,9 +44,11 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.Random; +import java.util.UUID; import static io.dapr.it.springboot4.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; @SpringBootTest( webEnvironment = WebEnvironment.RANDOM_PORT, @@ -93,63 +96,77 @@ public void setUp(){ @Test public void testJobScheduleCreationWithDueTime() { + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); - Instant currentTime = Instant.now(); - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime).setOverwrite(true)).block(); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime)).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); } @Test public void testJobScheduleCreationWithSchedule() { + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); - Instant currentTime = Instant.now(); - daprClient.scheduleJob(new ScheduleJobRequest("Job", JobSchedule.hourly()) - .setDueTime(currentTime).setOverwrite(true)).block(); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); + daprClient.scheduleJob(new ScheduleJobRequest(jobName, JobSchedule.hourly()) + .setDueTime(currentTime)).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); } @Test public void testJobScheduleCreationWithAllParameters() { - Instant currentTime = Instant.now(); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneOffset.UTC); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setOverwrite(true) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression()); - assertEquals("Job", getJobResponse.getName()); + assertEquals(jobName, getJobResponse.getName()); assertEquals(Integer.valueOf(3), getJobResponse.getRepeats()); assertEquals("Job data", new String(getJobResponse.getData())); assertEquals(iso8601Formatter.format(currentTime.plus(2, ChronoUnit.HOURS)), @@ -158,36 +175,38 @@ public void testJobScheduleCreationWithAllParameters() { @Test public void testJobScheduleCreationWithDropFailurePolicy() { - Instant currentTime = Instant.now(); - DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - .withZone(ZoneOffset.UTC); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setFailurePolicy(new DropFailurePolicy()) + .setFailurePolicy(new DropFailurePolicy()) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); assertEquals(FailurePolicyType.DROP, getJobResponse.getFailurePolicy().getFailurePolicyType()); } @Test public void testJobScheduleCreationWithConstantFailurePolicy() { - Instant currentTime = Instant.now(); - DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") - .withZone(ZoneOffset.UTC); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) @@ -195,11 +214,15 @@ public void testJobScheduleCreationWithConstantFailurePolicy() { .setDurationBetweenRetries(Duration.of(10, ChronoUnit.SECONDS))) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - GetJobResponse getJobResponse = - daprClient.getJob(new GetJobRequest("Job")).block(); + GetJobResponse getJobResponse = Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); + assertNotNull(getJobResponse); ConstantFailurePolicy jobFailurePolicyConstant = (ConstantFailurePolicy) getJobResponse.getFailurePolicy(); assertEquals(FailurePolicyType.CONSTANT, getJobResponse.getFailurePolicy().getFailurePolicyType()); assertEquals(3, (int)jobFailurePolicyConstant.getMaxRetries()); @@ -209,17 +232,23 @@ public void testJobScheduleCreationWithConstantFailurePolicy() { @Test public void testDeleteJobRequest() { - Instant currentTime = Instant.now(); + String jobName = "Job-" + UUID.randomUUID().toString().substring(0, 8); + Instant currentTime = Instant.now().plus(10, ChronoUnit.MINUTES); String cronExpression = "2 * 3 * * FRI"; - daprClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + daprClient.scheduleJob(new ScheduleJobRequest(jobName, currentTime) .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) .setData("Job data".getBytes()) .setRepeat(3) - .setOverwrite(true) .setSchedule(JobSchedule.fromString(cronExpression))).block(); - daprClient.deleteJob(new DeleteJobRequest("Job")).block(); + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(300)) + .ignoreExceptions() + .until(() -> daprClient.getJob(new GetJobRequest(jobName)).block(), r -> r != null); + + daprClient.deleteJob(new DeleteJobRequest(jobName)).block(); } }