|
1 | 1 | package com.springqprobackend.springqpro.integration; |
2 | 2 |
|
3 | 3 | import com.springqprobackend.springqpro.domain.entity.TaskEntity; |
| 4 | +import com.springqprobackend.springqpro.enums.TaskStatus; |
4 | 5 | import com.springqprobackend.springqpro.enums.TaskType; |
5 | 6 | import com.springqprobackend.springqpro.redis.RedisDistributedLock; |
6 | 7 | import com.springqprobackend.springqpro.repository.TaskRepository; |
7 | 8 | import com.springqprobackend.springqpro.service.ProcessingService; |
| 9 | +import com.springqprobackend.springqpro.service.QueueService; |
8 | 10 | import com.springqprobackend.springqpro.service.TaskService; |
9 | 11 | import com.springqprobackend.springqpro.testcontainers.IntegrationTestBase; |
10 | 12 | import org.junit.jupiter.api.BeforeEach; |
|
14 | 16 | import org.slf4j.LoggerFactory; |
15 | 17 | import org.springframework.beans.factory.annotation.Autowired; |
16 | 18 | import org.junit.jupiter.api.Test; |
| 19 | +import org.springframework.boot.test.context.SpringBootTest; |
| 20 | +import org.springframework.test.context.ActiveProfiles; |
| 21 | +import org.testcontainers.shaded.org.awaitility.Awaitility; |
| 22 | + |
| 23 | +import java.time.Duration; |
17 | 24 | import java.util.concurrent.*; |
18 | 25 |
|
19 | 26 | import static org.assertj.core.api.AssertionsForClassTypes.assertThat; |
@@ -52,43 +59,58 @@ These three Integration test that I have (ProcessingConcurrencyIntegrationTest.j |
52 | 59 | */ |
53 | 60 | //@Testcontainers |
54 | 61 | //@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) |
55 | | -@Tag("disable_temp") |
| 62 | +@SpringBootTest |
| 63 | +@ActiveProfiles("test") |
56 | 64 | class ProcessingConcurrencyIntegrationTest extends IntegrationTestBase { |
57 | | - // Field(s): |
58 | | - private static final Logger logger = LoggerFactory.getLogger(ProcessingConcurrencyIntegrationTest.class); |
59 | 65 | @Autowired |
60 | 66 | private TaskService taskService; |
| 67 | + |
61 | 68 | @Autowired |
62 | 69 | private TaskRepository taskRepository; |
| 70 | + |
63 | 71 | @Autowired |
64 | | - private ProcessingService processingService; |
65 | | - @Autowired |
66 | | - private RedisDistributedLock redisLock; |
| 72 | + private QueueService queueService; |
67 | 73 |
|
68 | 74 | @BeforeEach |
69 | 75 | void cleanDb() { |
70 | 76 | taskRepository.deleteAll(); |
71 | 77 | } |
72 | 78 |
|
73 | | - @Disabled("Outdated architecture — will fix later") |
74 | 79 | @Test |
75 | | - void twoThreads_tryToClaim_sameTask_onlyOneSucceeds() throws InterruptedException, ExecutionException { |
76 | | - TaskEntity entity = taskService.createTaskForUser("concurrency-test", TaskType.EMAIL, "random_email@gmail.com"); |
77 | | - String id = entity.getId(); |
78 | | - ExecutorService esDummy = Executors.newFixedThreadPool(2); |
79 | | - Callable<Void> c = () -> { |
80 | | - processingService.claimAndProcess(id); |
81 | | - return null; |
82 | | - }; |
83 | | - Future<Void> f1 = esDummy.submit(c); |
84 | | - Future<Void> f2 = esDummy.submit(c); |
85 | | - f1.get(); |
86 | | - f2.get(); |
87 | | - // reload: |
88 | | - TaskEntity reloaded = taskRepository.findById(id).orElseThrow(); |
89 | | - // ATTEMPTS SHOULD BE 1. |
90 | | - assertThat(reloaded.getAttempts()).isGreaterThanOrEqualTo(1); |
91 | | - assertThat(reloaded.getStatus()).isNotNull(); // No double-claiming anomalies or corrupted states. |
92 | | - esDummy.shutdown(); |
| 80 | + void concurrentEnqueue_sameTask_doesNotProcessTwice() throws Exception { |
| 81 | + // create task using TaskService: |
| 82 | + TaskEntity task = taskService.createTaskForUser( |
| 83 | + "concurrency-test", |
| 84 | + TaskType.EMAIL, |
| 85 | + "concurrency@test.com" |
| 86 | + ); |
| 87 | + String taskId = task.getId(); |
| 88 | + |
| 89 | + // enqueue same task concurrently: |
| 90 | + ExecutorService executor = Executors.newFixedThreadPool(2); |
| 91 | + |
| 92 | + Runnable enqueue = () -> queueService.enqueueById(taskId); |
| 93 | + |
| 94 | + executor.submit(enqueue); |
| 95 | + executor.submit(enqueue); |
| 96 | + |
| 97 | + executor.shutdown(); |
| 98 | + executor.awaitTermination(5, TimeUnit.SECONDS); |
| 99 | + |
| 100 | + // Assert: task eventually reaches a terminal state |
| 101 | + Awaitility.await() |
| 102 | + .atMost(Duration.ofSeconds(5)) |
| 103 | + .untilAsserted(() -> { |
| 104 | + TaskEntity reloaded = taskRepository.findById(taskId).orElseThrow(); |
| 105 | + assertThat(reloaded.getStatus()) |
| 106 | + .isIn(TaskStatus.COMPLETED, TaskStatus.FAILED); |
| 107 | + }); |
| 108 | + |
| 109 | + // Assert: task is not duplicated or corrupted |
| 110 | + TaskEntity finalState = taskRepository.findById(taskId).orElseThrow(); |
| 111 | + |
| 112 | + assertThat(finalState.getId()).isEqualTo(taskId); |
| 113 | + assertThat(finalState.getStatus()).isNotEqualTo(TaskStatus.QUEUED); |
| 114 | + assertThat(finalState.getStatus()).isNotEqualTo(TaskStatus.INPROGRESS); |
93 | 115 | } |
94 | 116 | } |
0 commit comments