diff --git a/src/Client/WorkflowOptions.php b/src/Client/WorkflowOptions.php index 027f2c522..882a5690f 100644 --- a/src/Client/WorkflowOptions.php +++ b/src/Client/WorkflowOptions.php @@ -26,6 +26,7 @@ use Temporal\Common\Versioning\VersioningOverride; use Temporal\Common\WorkflowIdConflictPolicy; use Temporal\DataConverter\DataConverterInterface; +use Temporal\Internal\Client\OnConflictOptions; use Temporal\Internal\Marshaller\Meta\Marshal; use Temporal\Internal\Marshaller\Type\ArrayType; use Temporal\Internal\Marshaller\Type\CronType; @@ -193,6 +194,9 @@ final class WorkflowOptions extends Options #[Marshal(name: 'VersioningOverride')] public ?VersioningOverride $versioningOverride = null; + /** @internal */ + public ?OnConflictOptions $onConflictOptions = null; + /** * @throws \Exception */ @@ -612,4 +616,16 @@ public function withPriority(Priority $priority): self $self->priority = $priority; return $self; } + + /** + * @internal + * @return $this + */ + #[Pure] + public function withOnConflictOptionsInternal(?OnConflictOptions $options): self + { + $self = clone $this; + $self->onConflictOptions = $options; + return $self; + } } diff --git a/src/Internal/Client/OnConflictOptions.php b/src/Internal/Client/OnConflictOptions.php new file mode 100644 index 000000000..25ec82d5a --- /dev/null +++ b/src/Internal/Client/OnConflictOptions.php @@ -0,0 +1,28 @@ +setAttachRequestId($options->attachRequestId); + $proto->setAttachCompletionCallbacks($options->attachCompletionCallbacks); + $proto->setAttachLinks($options->attachLinks); + return $proto; + } + /** * @param StartWorkflowExecutionRequest|SignalWithStartWorkflowExecutionRequest $request * use {@see configureExecutionRequest()} to prepare request @@ -296,6 +308,10 @@ private function executeRequest( \assert($f instanceof WorkflowExecutionAlreadyStartedFailure); $execution = new WorkflowExecution($request->getWorkflowId(), $f->getRunId()); + if ($request->getWorkflowIdConflictPolicy() === WorkflowIdConflictPolicy::UseExisting->value) { + return $execution; + } + throw new WorkflowExecutionAlreadyStartedException( $execution, $request->getWorkflowType()->getName(), @@ -395,6 +411,10 @@ private function configureExecutionRequest( if ($req instanceof StartWorkflowExecutionRequest) { $req->setRequestEagerExecution($options->eagerStart); + + if ($options->onConflictOptions !== null) { + $req->setOnConflictOptions(self::onConflictOptionsToProto($options->onConflictOptions)); + } } if (!$input->arguments->isEmpty()) { diff --git a/tests/Acceptance/Extra/Update/UpdateWithStartTest.php b/tests/Acceptance/Extra/Update/UpdateWithStartTest.php index 62afd89ab..ff90cc3d5 100644 --- a/tests/Acceptance/Extra/Update/UpdateWithStartTest.php +++ b/tests/Acceptance/Extra/Update/UpdateWithStartTest.php @@ -10,6 +10,7 @@ use Temporal\Client\Update\UpdateOptions; use Temporal\Client\WorkflowClientInterface; use Temporal\Client\WorkflowOptions; +use Temporal\Common\WorkflowIdConflictPolicy; use Temporal\Exception\Client\WorkflowExecutionAlreadyStartedException; use Temporal\Exception\Client\WorkflowFailedException; use Temporal\Exception\Client\WorkflowServiceException; @@ -138,6 +139,96 @@ public function failOnReuseExistingWorkflowId( $stub1->signal('exit'); } } + + #[Test] + public function useExistingReturnsRunningExecution( + WorkflowClientInterface $client, + Feature $feature, + ): void { + $id = Uuid::uuid7()->__toString(); + + $first = $client->newUntypedWorkflowStub( + 'Extra_Update_UseExisting', + WorkflowOptions::new()->withTaskQueue($feature->taskQueue)->withWorkflowId($id), + ); + $client->start($first); + $firstRunId = $first->getExecution()->getRunID(); + + $second = $client->newUntypedWorkflowStub( + 'Extra_Update_UseExisting', + WorkflowOptions::new() + ->withTaskQueue($feature->taskQueue) + ->withWorkflowId($id) + ->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting), + ); + + try { + $client->start($second); + + $this->assertSame($id, $second->getExecution()->getID()); + $this->assertSame( + $firstRunId, + $second->getExecution()->getRunID(), + 'UseExisting must resolve to the already-running execution instead of throwing', + ); + } finally { + $first->signal('exit'); + } + } + + #[Test] + public function failPolicyThrowsOnRunningWorkflowId( + WorkflowClientInterface $client, + Feature $feature, + ): void { + $id = Uuid::uuid7()->__toString(); + + $first = $client->newUntypedWorkflowStub( + 'Extra_Update_UseExisting', + WorkflowOptions::new()->withTaskQueue($feature->taskQueue)->withWorkflowId($id), + ); + $client->start($first); + + $second = $client->newUntypedWorkflowStub( + 'Extra_Update_UseExisting', + WorkflowOptions::new() + ->withTaskQueue($feature->taskQueue) + ->withWorkflowId($id) + ->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::Fail), + ); + + try { + $this->expectException(WorkflowExecutionAlreadyStartedException::class); + $client->start($second); + } finally { + $first->signal('exit'); + } + } + + #[Test] + public function useExistingStartsFreshWhenNoneRunning( + WorkflowClientInterface $client, + Feature $feature, + ): void { + $id = Uuid::uuid7()->__toString(); + + $stub = $client->newUntypedWorkflowStub( + 'Extra_Update_UseExisting', + WorkflowOptions::new() + ->withTaskQueue($feature->taskQueue) + ->withWorkflowId($id) + ->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting), + ); + + try { + $client->start($stub); + + $this->assertSame($id, $stub->getExecution()->getID()); + $this->assertNotSame('', $stub->getExecution()->getRunID()); + } finally { + $stub->signal('exit'); + } + } } #[WorkflowInterface] @@ -195,3 +286,22 @@ public function __construct( public int $length = 0, ) {} } + +#[WorkflowInterface] +class UseExistingWorkflow +{ + private bool $exit = false; + + #[WorkflowMethod(name: 'Extra_Update_UseExisting')] + public function handle(): \Generator + { + yield Workflow::await(fn(): bool => $this->exit); + return 'done'; + } + + #[Workflow\SignalMethod] + public function exit(): void + { + $this->exit = true; + } +} diff --git a/tests/Unit/DTO/WorkflowOptionsTestCase.php b/tests/Unit/DTO/WorkflowOptionsTestCase.php index 2d6bde9b2..67b765860 100644 --- a/tests/Unit/DTO/WorkflowOptionsTestCase.php +++ b/tests/Unit/DTO/WorkflowOptionsTestCase.php @@ -54,6 +54,7 @@ public function testMarshalling(): void 'fairness_weight' => 0.0, ], 'VersioningOverride' => null, + 'onConflictOptions' => null, ]; $result = $this->marshal($dto); diff --git a/tests/Unit/Internal/Client/OnConflictOptionsTestCase.php b/tests/Unit/Internal/Client/OnConflictOptionsTestCase.php new file mode 100644 index 000000000..efb781dac --- /dev/null +++ b/tests/Unit/Internal/Client/OnConflictOptionsTestCase.php @@ -0,0 +1,42 @@ +attachRequestId); + self::assertTrue($options->attachCompletionCallbacks); + self::assertTrue($options->attachLinks); + } + + public function testAcceptsExplicitFlags(): void + { + $options = new OnConflictOptions( + attachRequestId: false, + attachCompletionCallbacks: false, + attachLinks: false, + ); + + self::assertFalse($options->attachRequestId); + self::assertFalse($options->attachCompletionCallbacks); + self::assertFalse($options->attachLinks); + } +} diff --git a/tests/Unit/Internal/Client/WorkflowStarterTestCase.php b/tests/Unit/Internal/Client/WorkflowStarterTestCase.php index a5f689cf1..3e6804fde 100644 --- a/tests/Unit/Internal/Client/WorkflowStarterTestCase.php +++ b/tests/Unit/Internal/Client/WorkflowStarterTestCase.php @@ -4,13 +4,20 @@ namespace Temporal\Tests\Unit\Internal\Client; +use Google\Protobuf\Any; +use Google\Rpc\Status as RpcStatus; use PHPUnit\Framework\Attributes\CoversClass; use PHPUnit\Framework\TestCase; +use Temporal\Api\Errordetails\V1\WorkflowExecutionAlreadyStartedFailure; use Temporal\Api\Workflowservice\V1\StartWorkflowExecutionRequest; use Temporal\Api\Workflowservice\V1\StartWorkflowExecutionResponse; use Temporal\Client\GRPC\ServiceClientInterface; +use Temporal\Client\GRPC\StatusCode; use Temporal\Client\WorkflowOptions; +use Temporal\Common\WorkflowIdConflictPolicy; use Temporal\DataConverter\DataConverter; +use Temporal\Exception\Client\ServiceClientException; +use Temporal\Internal\Client\OnConflictOptions; use Temporal\Internal\Client\WorkflowStarter; use Temporal\Internal\Interceptor\Pipeline; use Temporal\Internal\Support\DateInterval; @@ -70,6 +77,102 @@ public function testDelayIfSpecifiedNanos(): void self::assertSame(42000, $request->getWorkflowStartDelay()->getNanos()); } + public function testOnConflictOptionsAbsentByDefault(): void + { + $request = $this->startRequest('test-workflow', new WorkflowOptions()); + + self::assertNull($request->getOnConflictOptions()); + } + + public function testOnConflictOptionsSerializedToProtoWithAllFlags(): void + { + $options = (new WorkflowOptions()) + ->withOnConflictOptionsInternal(new OnConflictOptions()); + + $request = $this->startRequest('test-workflow', $options); + + $proto = $request->getOnConflictOptions(); + self::assertNotNull($proto); + self::assertTrue($proto->getAttachRequestId()); + self::assertTrue($proto->getAttachCompletionCallbacks()); + self::assertTrue($proto->getAttachLinks()); + } + + public function testOnConflictOptionsSerializedToProtoWithMixedFlags(): void + { + $options = (new WorkflowOptions()) + ->withOnConflictOptionsInternal(new OnConflictOptions( + attachRequestId: false, + attachCompletionCallbacks: true, + attachLinks: false, + )); + + $request = $this->startRequest('test-workflow', $options); + + $proto = $request->getOnConflictOptions(); + self::assertNotNull($proto); + self::assertFalse($proto->getAttachRequestId()); + self::assertTrue($proto->getAttachCompletionCallbacks()); + self::assertFalse($proto->getAttachLinks()); + } + + public function testWorkflowIdConflictPolicyUseExistingFlowsToProto(): void + { + $options = (new WorkflowOptions()) + ->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting); + + $request = $this->startRequest('test-workflow', $options); + + self::assertSame(WorkflowIdConflictPolicy::UseExisting->value, $request->getWorkflowIdConflictPolicy()); + } + + public function testAlreadyStartedWithUseExistingReturnsExistingExecution(): void + { + $exception = $this->alreadyStartedException('existing-run-id'); + + $clientOptions = (new \Temporal\Client\ClientOptions()) + ->withNamespace(self::NAMESPACE) + ->withIdentity(self::IDENTITY); + + $clientMock = $this->createMock(ServiceClientInterface::class); + $clientMock + ->expects($this->once()) + ->method('StartWorkflowExecution') + ->willThrowException($exception); + + $starter = new WorkflowStarter( + serviceClient: $clientMock, + converter: DataConverter::createDefault(), + clientOptions: $clientOptions, + interceptors: Pipeline::prepare([]), + ); + + $options = (new WorkflowOptions()) + ->withWorkflowId('my-wf-id') + ->withWorkflowIdConflictPolicy(WorkflowIdConflictPolicy::UseExisting); + + $execution = $starter->start('test-workflow', $options, []); + + self::assertSame('my-wf-id', $execution->getID()); + self::assertSame('existing-run-id', $execution->getRunID()); + } + + private function alreadyStartedException(string $runId): ServiceClientException + { + $any = new Any(); + $any->pack((new WorkflowExecutionAlreadyStartedFailure())->setRunId($runId)); + + $rpcStatus = (new RpcStatus())->setCode(StatusCode::ALREADY_EXISTS); + $rpcStatus->setDetails([$any]); + + $status = new \stdClass(); + $status->code = StatusCode::ALREADY_EXISTS; + $status->details = 'workflow execution already started'; + $status->metadata = ['grpc-status-details-bin' => [$rpcStatus->serializeToString()]]; + + return new ServiceClientException($status); + } + private function startRequest( string $workflowType, WorkflowOptions $options,