From 60f2d8851b81a5d7616bd96e689a72bd7e225536 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Mon, 11 May 2026 15:35:54 +0200 Subject: [PATCH 1/7] feat(task-streaming): allow the Php providers to set intermediate results with a callback in process Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 42 +++++++++++++++++++ .../ISynchronousProgressiveProvider.php | 36 ++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 lib/public/TaskProcessing/ISynchronousProgressiveProvider.php diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6821aee65a002..390b6b622ec16 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,6 +59,7 @@ use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousProgressiveProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; @@ -1135,6 +1136,13 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); + } elseif ($provider instanceof ISynchronousProgressiveProvider) { + $output = $provider->process( + $task->getUserId(), + $input, + fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output) + ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); } @@ -1216,6 +1224,40 @@ public function setTaskProgress(int $id, float $progress): bool { return true; } + public function setTaskIntermediateOutput(int $id, array $output): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() === Task::STATUS_CANCELLED) { + return false; + } + $userId = $task->getUserId(); + if ($userId !== null && $userId !== '') { + try { + // TODO figure out how to get the queue with DI + // $queue = Server::get(\OCA\NotifyPush\IQueue::class); + $queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class); + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'taskprocessing_task_results', + 'body' => $output, + ]); + error_log('sending to queue!!!!!!'); + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + error_log('NOT sending to queue!!!!!! ' . $e->getMessage()); + } + } + // no output shape validation for now + $task->setOutput($output); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + #[\Override] public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php new file mode 100644 index 0000000000000..102062d8d1ab3 --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php @@ -0,0 +1,36 @@ +|numeric|string|File> $input The task input + * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. + * @param null|callable(array):bool $reportOutput Set the task intermediate output + * @psalm-return array|numeric|string> + * @throws ProcessingException + * @since 33.0.0 + */ + #[\Override] + public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array; +} From de2cc4b14afce0a8da3bcb7c32f1d94df1c024a9 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Mon, 11 May 2026 15:57:42 +0200 Subject: [PATCH 2/7] fix(task-streaming): and test sending data via notify_push Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 390b6b622ec16..6cdf31c4f7ab4 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1231,11 +1231,11 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { return false; } $userId = $task->getUserId(); - if ($userId !== null && $userId !== '') { + if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) { try { - // TODO figure out how to get the queue with DI - // $queue = Server::get(\OCA\NotifyPush\IQueue::class); - $queue = $this->serverContainer->get(\OCA\NotifyPush\IQueue::class); + // $this->appManager->loadApp('notify_push'); + $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); + // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); $queue->push('notify_custom', [ 'user' => $userId, 'message' => 'taskprocessing_task_results', From 0b6d21ffde0c4a2b42e1e798ad7a7829f8ece060 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Tue, 12 May 2026 13:10:19 +0200 Subject: [PATCH 3/7] feat(task-streaming): add an endpoint to set a task intermediate output Signed-off-by: Julien Veyssier --- .../TaskProcessingApiController.php | 31 +++++++++++++++++++ lib/private/TaskProcessing/Manager.php | 2 +- lib/public/TaskProcessing/IManager.php | 10 ++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/core/Controller/TaskProcessingApiController.php b/core/Controller/TaskProcessingApiController.php index fd22d485af380..7b52c543d91a0 100644 --- a/core/Controller/TaskProcessingApiController.php +++ b/core/Controller/TaskProcessingApiController.php @@ -645,6 +645,37 @@ public function setResult(int $taskId, ?array $output = null, ?string $errorMess } } + /** + * Sets the task intermediate result while it is running + * + * @param int $taskId The id of the task + * @param array|null $output The intermediate task output, files are represented by their IDs + * @return DataResponse|DataResponse + * + * 200: Result updated successfully + * 404: Task not found + */ + #[ExAppRequired] + #[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')] + public function setIntermediateResult(int $taskId, array $output): DataResponse { + try { + // set result + $this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output); + $task = $this->taskProcessingManager->getTask($taskId); + + /** @var CoreTaskProcessingTask $json */ + $json = $task->jsonSerialize(); + + return new DataResponse([ + 'task' => $json, + ]); + } catch (NotFoundException) { + return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND); + } catch (Exception) { + return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR); + } + } + /** * @return DataResponse|DataResponse */ diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6cdf31c4f7ab4..e9e6fbdd084fd 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1227,7 +1227,7 @@ public function setTaskProgress(int $id, float $progress): bool { public function setTaskIntermediateOutput(int $id, array $output): bool { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently $task = $this->getTask($id); - if ($task->getStatus() === Task::STATUS_CANCELLED) { + if ($task->getStatus() !== Task::STATUS_RUNNING) { return false; } $userId = $task->getUserId(); diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 2cd0244b52e8d..f919410d7c4fe 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -143,6 +143,16 @@ public function cancelTask(int $id): void; */ public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void; + /** + * @param int $id The id of the task + * @param array $output + * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 34.0.0 + */ + public function setTaskIntermediateOutput(int $id, array $output): bool; + /** * @param int $id * @param float $progress From d531fae7c9ce362ab6fb872e35100a444ed6064e Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Sat, 16 May 2026 20:23:16 +0200 Subject: [PATCH 4/7] fix(task-streaming): make the notify_push message shorter Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index e9e6fbdd084fd..59d0c5dfca880 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1238,7 +1238,7 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); $queue->push('notify_custom', [ 'user' => $userId, - 'message' => 'taskprocessing_task_results', + 'message' => 'task_' . $task->getId(), 'body' => $output, ]); error_log('sending to queue!!!!!!'); From c6c2c1a693ca6a6c02f7e828d21c9d359c60a721 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Thu, 21 May 2026 14:30:29 +0200 Subject: [PATCH 5/7] feat(task-streaming): do not update the task in DB when setting intermediate result if notify_push is available Signed-off-by: Julien Veyssier --- lib/private/TaskProcessing/Manager.php | 8 ++++---- lib/public/TaskProcessing/IManager.php | 7 +++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 59d0c5dfca880..d12a2b8a04ed9 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1233,18 +1233,18 @@ public function setTaskIntermediateOutput(int $id, array $output): bool { $userId = $task->getUserId(); if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) { try { - // $this->appManager->loadApp('notify_push'); $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); - // $queue = $this->serverContainer->get(\OCA\NotifyPush\Queue\IQueue::class); $queue->push('notify_custom', [ 'user' => $userId, 'message' => 'task_' . $task->getId(), 'body' => $output, ]); - error_log('sending to queue!!!!!!'); + // we don't update the DB if something was sent via notify_push + // so if the push messages are not received for some reason, the polling will still not see any intermediate output + // but will receive the final output + return true; } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); - error_log('NOT sending to queue!!!!!! ' . $e->getMessage()); } } // no output shape validation for now diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index f919410d7c4fe..b09ea35fdcadf 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -144,12 +144,15 @@ public function cancelTask(int $id): void; public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void; /** + * Set the task intermediate output. + * If notify_push is available, the output will be pushed to the user and the task won't be updated in the DB. + * * @param int $id The id of the task - * @param array $output + * @param array $output The intermediate output * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime * @throws Exception If the query failed * @throws NotFoundException If the task could not be found - * @since 34.0.0 + * @since 35.0.0 */ public function setTaskIntermediateOutput(int $id, array $output): bool; From 2e5dbdf7f1b8ccb4c89adb696f41483af2e8a091 Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Wed, 27 May 2026 16:48:45 +0200 Subject: [PATCH 6/7] feat(task-streaming): add preferStreaming boolean attribute to taskprocessing tasks. pass it to ISynchronousProgressiveProvider::process Signed-off-by: Julien Veyssier --- .../Version35000Date20260527162338.php | 50 ++++ core/ResponseDefinitions.php | 1 + core/openapi-ex_app.json | 238 +++++++++++++++++- core/openapi-full.json | 238 +++++++++++++++++- core/openapi.json | 6 +- lib/composer/composer/autoload_classmap.php | 1 + lib/composer/composer/autoload_static.php | 1 + lib/private/TaskProcessing/Db/Task.php | 10 +- lib/private/TaskProcessing/Manager.php | 3 +- .../ISynchronousProgressiveProvider.php | 5 +- lib/public/TaskProcessing/Task.php | 19 ++ openapi.json | 238 +++++++++++++++++- version.php | 2 +- 13 files changed, 802 insertions(+), 10 deletions(-) create mode 100644 core/Migrations/Version35000Date20260527162338.php diff --git a/core/Migrations/Version35000Date20260527162338.php b/core/Migrations/Version35000Date20260527162338.php new file mode 100644 index 0000000000000..4c9bdbbae4f1e --- /dev/null +++ b/core/Migrations/Version35000Date20260527162338.php @@ -0,0 +1,50 @@ +hasTable('taskprocessing_tasks')) { + $table = $schema->getTable('taskprocessing_tasks'); + if (!$table->hasColumn('prefer_streaming')) { + $table->addColumn('prefer_streaming', Types::SMALLINT, [ + 'notnull' => true, + 'default' => 1, + 'unsigned' => true, + ]); + return $schema; + } + } + + return null; + } +} diff --git a/core/ResponseDefinitions.php b/core/ResponseDefinitions.php index 6f3380bab259a..b2be2ecfba1cb 100644 --- a/core/ResponseDefinitions.php +++ b/core/ResponseDefinitions.php @@ -213,6 +213,7 @@ * allowCleanup: bool, * includeWatermark: bool, * userFacingErrorMessage: ?string, + * preferStreaming: bool, * } * * @psalm-type CoreProfileAction = array{ diff --git a/core/openapi-ex_app.json b/core/openapi-ex_app.json index 5fbc10f465bee..94c8bbf06dde9 100644 --- a/core/openapi-ex_app.json +++ b/core/openapi-ex_app.json @@ -204,7 +204,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -285,6 +286,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -2207,6 +2211,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/TaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "task_processing_api-cancel-task-ex-app-endpoint", diff --git a/core/openapi-full.json b/core/openapi-full.json index 0164fbaef7a56..3d4cf93f91ccc 100644 --- a/core/openapi-full.json +++ b/core/openapi-full.json @@ -664,7 +664,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -745,6 +746,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -12017,6 +12021,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/TaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "task_processing_api-cancel-task-ex-app-endpoint", diff --git a/core/openapi.json b/core/openapi.json index 145894f33a0b5..7981b3763bf46 100644 --- a/core/openapi.json +++ b/core/openapi.json @@ -664,7 +664,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -745,6 +746,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, diff --git a/lib/composer/composer/autoload_classmap.php b/lib/composer/composer/autoload_classmap.php index 54ec6a944dff3..d9e09add6f124 100644 --- a/lib/composer/composer/autoload_classmap.php +++ b/lib/composer/composer/autoload_classmap.php @@ -1611,6 +1611,7 @@ 'OC\\Core\\Migrations\\Version33000Date20260126120000' => $baseDir . '/core/Migrations/Version33000Date20260126120000.php', 'OC\\Core\\Migrations\\Version34000Date20260318095645' => $baseDir . '/core/Migrations/Version34000Date20260318095645.php', 'OC\\Core\\Migrations\\Version34000Date20260415161745' => $baseDir . '/core/Migrations/Version34000Date20260415161745.php', + 'OC\\Core\\Migrations\\Version35000Date20260527162338' => $baseDir . '/core/Migrations/Version35000Date20260527162338.php', 'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php', 'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php', 'OC\\Core\\Service\\CronService' => $baseDir . '/core/Service/CronService.php', diff --git a/lib/composer/composer/autoload_static.php b/lib/composer/composer/autoload_static.php index 2b90f11fa7d3b..25d91133f4aa9 100644 --- a/lib/composer/composer/autoload_static.php +++ b/lib/composer/composer/autoload_static.php @@ -1652,6 +1652,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2 'OC\\Core\\Migrations\\Version33000Date20260126120000' => __DIR__ . '/../../..' . '/core/Migrations/Version33000Date20260126120000.php', 'OC\\Core\\Migrations\\Version34000Date20260318095645' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260318095645.php', 'OC\\Core\\Migrations\\Version34000Date20260415161745' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260415161745.php', + 'OC\\Core\\Migrations\\Version35000Date20260527162338' => __DIR__ . '/../../..' . '/core/Migrations/Version35000Date20260527162338.php', 'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php', 'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php', 'OC\\Core\\Service\\CronService' => __DIR__ . '/../../..' . '/core/Service/CronService.php', diff --git a/lib/private/TaskProcessing/Db/Task.php b/lib/private/TaskProcessing/Db/Task.php index 3ac4facf97eab..6bfbcdd1dc87f 100644 --- a/lib/private/TaskProcessing/Db/Task.php +++ b/lib/private/TaskProcessing/Db/Task.php @@ -51,6 +51,8 @@ * @method null|string getUserFacingErrorMessage() * @method setIncludeWatermark(int $includeWatermark) * @method int getIncludeWatermark() + * @method setPreferStreaming(int $preferStreaming) + * @method int getPreferStreaming() */ class Task extends Entity { protected $lastUpdated; @@ -72,16 +74,17 @@ class Task extends Entity { protected $allowCleanup; protected $userFacingErrorMessage; protected $includeWatermark; + protected $preferStreaming; /** * @var string[] */ - public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark']; + public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark', 'prefer_streaming']; /** * @var string[] */ - public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark']; + public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark', 'preferStreaming']; public function __construct() { @@ -106,6 +109,7 @@ public function __construct() { $this->addType('allowCleanup', 'integer'); $this->addType('userFacingErrorMessage', 'string'); $this->addType('includeWatermark', 'integer'); + $this->addType('preferStreaming', 'integer'); } public function toRow(): array { @@ -137,6 +141,7 @@ public static function fromPublicTask(OCPTask $task): self { 'allowCleanup' => $task->getAllowCleanup() ? 1 : 0, 'userFacingErrorMessage' => $task->getUserFacingErrorMessage(), 'includeWatermark' => $task->getIncludeWatermark() ? 1 : 0, + 'preferStreaming' => $task->getPreferStreaming() ? 1 : 0, ]); return $taskEntity; } @@ -162,6 +167,7 @@ public function toPublicTask(): OCPTask { $task->setAllowCleanup($this->getAllowCleanup() !== 0); $task->setUserFacingErrorMessage($this->getUserFacingErrorMessage()); $task->setIncludeWatermark($this->getIncludeWatermark() !== 0); + $task->setPreferStreaming($this->getPreferStreaming() !== 0); return $task; } } diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index d12a2b8a04ed9..c829a7a11a80c 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1141,7 +1141,8 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), - fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output) + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output), + $task->getPreferStreaming() ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php index 102062d8d1ab3..d0c20bd704c0d 100644 --- a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php +++ b/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php @@ -27,10 +27,11 @@ interface ISynchronousProgressiveProvider extends ISynchronousProvider { * @param array|numeric|string|File> $input The task input * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. * @param null|callable(array):bool $reportOutput Set the task intermediate output + * @param bool $preferStreaming Whether to prefer streaming output or not * @psalm-return array|numeric|string> * @throws ProcessingException - * @since 33.0.0 + * @since 35.0.0 */ #[\Override] - public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null): array; + public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null, bool $preferStreaming = true): array; } diff --git a/lib/public/TaskProcessing/Task.php b/lib/public/TaskProcessing/Task.php index 72edc9a17ce6f..b3b96290db3d4 100644 --- a/lib/public/TaskProcessing/Task.php +++ b/lib/public/TaskProcessing/Task.php @@ -51,6 +51,8 @@ final class Task implements \JsonSerializable { protected bool $includeWatermark = true; + protected bool $preferStreaming = true; + /** * @since 30.0.0 */ @@ -294,6 +296,22 @@ final public function setIncludeWatermark(bool $includeWatermark): void { $this->includeWatermark = $includeWatermark; } + /** + * @return bool + * @since 35.0.0 + */ + final public function getPreferStreaming(): bool { + return $this->preferStreaming; + } + + /** + * @param bool $preferStreaming + * @since 35.0.0 + */ + final public function setPreferStreaming(bool $preferStreaming): void { + $this->preferStreaming = $preferStreaming; + } + /** * @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array|numeric|string>, output: ?array|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, allowCleanup: bool, includeWatermark: bool, userFacingErrorMessage: ?string} * @since 30.0.0 @@ -318,6 +336,7 @@ final public function jsonSerialize(): array { 'allowCleanup' => $this->getAllowCleanup(), 'includeWatermark' => $this->getIncludeWatermark(), 'userFacingErrorMessage' => $this->getUserFacingErrorMessage(), + 'preferStreaming' => $this->getPreferStreaming(), ]; } diff --git a/openapi.json b/openapi.json index ba4ee767a4b16..e231ad6c64274 100644 --- a/openapi.json +++ b/openapi.json @@ -710,7 +710,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -791,6 +792,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -15725,6 +15729,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "core-task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "core/task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/CoreTaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "core-task_processing_api-cancel-task-ex-app-endpoint", diff --git a/version.php b/version.php index 69cc38c9d7eec..5bff34b308b06 100644 --- a/version.php +++ b/version.php @@ -11,7 +11,7 @@ // between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level // when updating major/minor version number. -$OC_Version = [35, 0, 0, 0]; +$OC_Version = [35, 0, 0, 1]; // The human-readable string $OC_VersionString = '35.0.0 dev'; From 5cd865218350d459eadfd6cb598c4bf16546dd0e Mon Sep 17 00:00:00 2001 From: Julien Veyssier Date: Thu, 28 May 2026 14:50:08 +0200 Subject: [PATCH 7/7] feat(task-streaming): add new provider interface Signed-off-by: Julien Veyssier --- lib/composer/composer/autoload_classmap.php | 2 + lib/composer/composer/autoload_static.php | 2 + lib/private/TaskProcessing/Manager.php | 13 +++++-- ...er.php => ISynchronousOptionsProvider.php} | 17 +++++---- .../ISynchronousWatermarkingProvider.php | 1 + .../SynchronousProviderOptions.php | 38 +++++++++++++++++++ 6 files changed, 62 insertions(+), 11 deletions(-) rename lib/public/TaskProcessing/{ISynchronousProgressiveProvider.php => ISynchronousOptionsProvider.php} (62%) create mode 100644 lib/public/TaskProcessing/SynchronousProviderOptions.php diff --git a/lib/composer/composer/autoload_classmap.php b/lib/composer/composer/autoload_classmap.php index d9e09add6f124..6d60106f3cc87 100644 --- a/lib/composer/composer/autoload_classmap.php +++ b/lib/composer/composer/autoload_classmap.php @@ -930,12 +930,14 @@ 'OCP\\TaskProcessing\\IInternalTaskType' => $baseDir . '/lib/public/TaskProcessing/IInternalTaskType.php', 'OCP\\TaskProcessing\\IManager' => $baseDir . '/lib/public/TaskProcessing/IManager.php', 'OCP\\TaskProcessing\\IProvider' => $baseDir . '/lib/public/TaskProcessing/IProvider.php', + 'OCP\\TaskProcessing\\ISynchronousOptionsProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousOptionsProvider.php', 'OCP\\TaskProcessing\\ISynchronousProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousProvider.php', 'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php', 'OCP\\TaskProcessing\\ITaskType' => $baseDir . '/lib/public/TaskProcessing/ITaskType.php', 'OCP\\TaskProcessing\\ITriggerableProvider' => $baseDir . '/lib/public/TaskProcessing/ITriggerableProvider.php', 'OCP\\TaskProcessing\\ShapeDescriptor' => $baseDir . '/lib/public/TaskProcessing/ShapeDescriptor.php', 'OCP\\TaskProcessing\\ShapeEnumValue' => $baseDir . '/lib/public/TaskProcessing/ShapeEnumValue.php', + 'OCP\\TaskProcessing\\SynchronousProviderOptions' => $baseDir . '/lib/public/TaskProcessing/SynchronousProviderOptions.php', 'OCP\\TaskProcessing\\Task' => $baseDir . '/lib/public/TaskProcessing/Task.php', 'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php', 'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php', diff --git a/lib/composer/composer/autoload_static.php b/lib/composer/composer/autoload_static.php index 25d91133f4aa9..2f7ae384c89b5 100644 --- a/lib/composer/composer/autoload_static.php +++ b/lib/composer/composer/autoload_static.php @@ -971,12 +971,14 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2 'OCP\\TaskProcessing\\IInternalTaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IInternalTaskType.php', 'OCP\\TaskProcessing\\IManager' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IManager.php', 'OCP\\TaskProcessing\\IProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IProvider.php', + 'OCP\\TaskProcessing\\ISynchronousOptionsProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousOptionsProvider.php', 'OCP\\TaskProcessing\\ISynchronousProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousProvider.php', 'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php', 'OCP\\TaskProcessing\\ITaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITaskType.php', 'OCP\\TaskProcessing\\ITriggerableProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITriggerableProvider.php', 'OCP\\TaskProcessing\\ShapeDescriptor' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeDescriptor.php', 'OCP\\TaskProcessing\\ShapeEnumValue' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeEnumValue.php', + 'OCP\\TaskProcessing\\SynchronousProviderOptions' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/SynchronousProviderOptions.php', 'OCP\\TaskProcessing\\Task' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/Task.php', 'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php', 'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php', diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index c829a7a11a80c..434493cfcc9a0 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,13 +59,14 @@ use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; -use OCP\TaskProcessing\ISynchronousProgressiveProvider; +use OCP\TaskProcessing\ISynchronousOptionsProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; use OCP\TaskProcessing\ITriggerableProvider; use OCP\TaskProcessing\ShapeDescriptor; use OCP\TaskProcessing\ShapeEnumValue; +use OCP\TaskProcessing\SynchronousProviderOptions; use OCP\TaskProcessing\Task; use OCP\TaskProcessing\TaskTypes\AnalyzeImages; use OCP\TaskProcessing\TaskTypes\AudioToAudioChat; @@ -1136,13 +1137,17 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); - } elseif ($provider instanceof ISynchronousProgressiveProvider) { + } elseif ($provider instanceof ISynchronousOptionsProvider) { + $options = new SynchronousProviderOptions( + $task->getIncludeWatermark(), + $task->getPreferStreaming(), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output), + ); $output = $provider->process( $task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), - fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output), - $task->getPreferStreaming() + $options, ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); diff --git a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php b/lib/public/TaskProcessing/ISynchronousOptionsProvider.php similarity index 62% rename from lib/public/TaskProcessing/ISynchronousProgressiveProvider.php rename to lib/public/TaskProcessing/ISynchronousOptionsProvider.php index d0c20bd704c0d..4564222506f11 100644 --- a/lib/public/TaskProcessing/ISynchronousProgressiveProvider.php +++ b/lib/public/TaskProcessing/ISynchronousOptionsProvider.php @@ -15,10 +15,10 @@ /** * This is the interface that is implemented by apps that - * implement a task processing provider that supports updating the output during processing - * @since 34.0.0 + * implement a task processing provider + * @since 35.0.0 */ -interface ISynchronousProgressiveProvider extends ISynchronousProvider { +interface ISynchronousOptionsProvider extends ISynchronousProvider { /** * Returns the shape of optional output parameters @@ -26,12 +26,15 @@ interface ISynchronousProgressiveProvider extends ISynchronousProvider { * @param null|string $userId The user that created the current task * @param array|numeric|string|File> $input The task input * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. - * @param null|callable(array):bool $reportOutput Set the task intermediate output - * @param bool $preferStreaming Whether to prefer streaming output or not + * @param SynchronousProviderOptions $options The task options * @psalm-return array|numeric|string> * @throws ProcessingException * @since 35.0.0 */ - #[\Override] - public function process(?string $userId, array $input, callable $reportProgress, ?callable $reportOutput = null, bool $preferStreaming = true): array; + public function process( + ?string $userId, + array $input, + callable $reportProgress, + SynchronousProviderOptions $options = new SynchronousProviderOptions(), + ): array; } diff --git a/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php b/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php index 05b7c07ec6cfc..3418ffa803d24 100644 --- a/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php +++ b/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php @@ -17,6 +17,7 @@ * This is the interface that is implemented by apps that * implement a task processing provider that supports watermarking * @since 33.0.0 + * @deprecated 35.0.0 Use ISynchronousOptionsProvider instead */ interface ISynchronousWatermarkingProvider extends ISynchronousProvider { diff --git a/lib/public/TaskProcessing/SynchronousProviderOptions.php b/lib/public/TaskProcessing/SynchronousProviderOptions.php new file mode 100644 index 0000000000000..9bbd81011f46f --- /dev/null +++ b/lib/public/TaskProcessing/SynchronousProviderOptions.php @@ -0,0 +1,38 @@ +reportOutput = $reportOutput !== null + ? \Closure::fromCallable($reportOutput) + : static function (array $output): bool { + return true; + }; + } + + public function getIncludeWatermarks(): bool { + return $this->includeWatermarks; + } + + public function getPreferStreaming(): bool { + return $this->preferStreaming; + } + + public function getReportOutput(): callable { + return $this->reportOutput; + } +}