diff --git a/core/Controller/TaskProcessingApiController.php b/core/Controller/TaskProcessingApiController.php index fd22d485af380..7b52c543d91a0 100644 --- a/core/Controller/TaskProcessingApiController.php +++ b/core/Controller/TaskProcessingApiController.php @@ -645,6 +645,37 @@ public function setResult(int $taskId, ?array $output = null, ?string $errorMess } } + /** + * Sets the task intermediate result while it is running + * + * @param int $taskId The id of the task + * @param array|null $output The intermediate task output, files are represented by their IDs + * @return DataResponse|DataResponse + * + * 200: Result updated successfully + * 404: Task not found + */ + #[ExAppRequired] + #[ApiRoute(verb: 'POST', url: '/tasks_provider/{taskId}/stream-result', root: '/taskprocessing')] + public function setIntermediateResult(int $taskId, array $output): DataResponse { + try { + // set result + $this->taskProcessingManager->setTaskIntermediateOutput($taskId, $output); + $task = $this->taskProcessingManager->getTask($taskId); + + /** @var CoreTaskProcessingTask $json */ + $json = $task->jsonSerialize(); + + return new DataResponse([ + 'task' => $json, + ]); + } catch (NotFoundException) { + return new DataResponse(['message' => $this->l->t('Not found')], Http::STATUS_NOT_FOUND); + } catch (Exception) { + return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR); + } + } + /** * @return DataResponse|DataResponse */ diff --git a/core/Migrations/Version35000Date20260527162338.php b/core/Migrations/Version35000Date20260527162338.php new file mode 100644 index 0000000000000..4c9bdbbae4f1e --- /dev/null +++ b/core/Migrations/Version35000Date20260527162338.php @@ -0,0 +1,50 @@ +hasTable('taskprocessing_tasks')) { + $table = $schema->getTable('taskprocessing_tasks'); + if (!$table->hasColumn('prefer_streaming')) { + $table->addColumn('prefer_streaming', Types::SMALLINT, [ + 'notnull' => true, + 'default' => 1, + 'unsigned' => true, + ]); + return $schema; + } + } + + return null; + } +} diff --git a/core/ResponseDefinitions.php b/core/ResponseDefinitions.php index 6f3380bab259a..b2be2ecfba1cb 100644 --- a/core/ResponseDefinitions.php +++ b/core/ResponseDefinitions.php @@ -213,6 +213,7 @@ * allowCleanup: bool, * includeWatermark: bool, * userFacingErrorMessage: ?string, + * preferStreaming: bool, * } * * @psalm-type CoreProfileAction = array{ diff --git a/core/openapi-ex_app.json b/core/openapi-ex_app.json index 5fbc10f465bee..94c8bbf06dde9 100644 --- a/core/openapi-ex_app.json +++ b/core/openapi-ex_app.json @@ -204,7 +204,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -285,6 +286,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -2207,6 +2211,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/TaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "task_processing_api-cancel-task-ex-app-endpoint", diff --git a/core/openapi-full.json b/core/openapi-full.json index 0164fbaef7a56..3d4cf93f91ccc 100644 --- a/core/openapi-full.json +++ b/core/openapi-full.json @@ -664,7 +664,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -745,6 +746,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -12017,6 +12021,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/TaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "task_processing_api-cancel-task-ex-app-endpoint", diff --git a/core/openapi.json b/core/openapi.json index 145894f33a0b5..7981b3763bf46 100644 --- a/core/openapi.json +++ b/core/openapi.json @@ -664,7 +664,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -745,6 +746,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, diff --git a/lib/composer/composer/autoload_classmap.php b/lib/composer/composer/autoload_classmap.php index 54ec6a944dff3..6d60106f3cc87 100644 --- a/lib/composer/composer/autoload_classmap.php +++ b/lib/composer/composer/autoload_classmap.php @@ -930,12 +930,14 @@ 'OCP\\TaskProcessing\\IInternalTaskType' => $baseDir . '/lib/public/TaskProcessing/IInternalTaskType.php', 'OCP\\TaskProcessing\\IManager' => $baseDir . '/lib/public/TaskProcessing/IManager.php', 'OCP\\TaskProcessing\\IProvider' => $baseDir . '/lib/public/TaskProcessing/IProvider.php', + 'OCP\\TaskProcessing\\ISynchronousOptionsProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousOptionsProvider.php', 'OCP\\TaskProcessing\\ISynchronousProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousProvider.php', 'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => $baseDir . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php', 'OCP\\TaskProcessing\\ITaskType' => $baseDir . '/lib/public/TaskProcessing/ITaskType.php', 'OCP\\TaskProcessing\\ITriggerableProvider' => $baseDir . '/lib/public/TaskProcessing/ITriggerableProvider.php', 'OCP\\TaskProcessing\\ShapeDescriptor' => $baseDir . '/lib/public/TaskProcessing/ShapeDescriptor.php', 'OCP\\TaskProcessing\\ShapeEnumValue' => $baseDir . '/lib/public/TaskProcessing/ShapeEnumValue.php', + 'OCP\\TaskProcessing\\SynchronousProviderOptions' => $baseDir . '/lib/public/TaskProcessing/SynchronousProviderOptions.php', 'OCP\\TaskProcessing\\Task' => $baseDir . '/lib/public/TaskProcessing/Task.php', 'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php', 'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => $baseDir . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php', @@ -1611,6 +1613,7 @@ 'OC\\Core\\Migrations\\Version33000Date20260126120000' => $baseDir . '/core/Migrations/Version33000Date20260126120000.php', 'OC\\Core\\Migrations\\Version34000Date20260318095645' => $baseDir . '/core/Migrations/Version34000Date20260318095645.php', 'OC\\Core\\Migrations\\Version34000Date20260415161745' => $baseDir . '/core/Migrations/Version34000Date20260415161745.php', + 'OC\\Core\\Migrations\\Version35000Date20260527162338' => $baseDir . '/core/Migrations/Version35000Date20260527162338.php', 'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php', 'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php', 'OC\\Core\\Service\\CronService' => $baseDir . '/core/Service/CronService.php', diff --git a/lib/composer/composer/autoload_static.php b/lib/composer/composer/autoload_static.php index 2b90f11fa7d3b..2f7ae384c89b5 100644 --- a/lib/composer/composer/autoload_static.php +++ b/lib/composer/composer/autoload_static.php @@ -971,12 +971,14 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2 'OCP\\TaskProcessing\\IInternalTaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IInternalTaskType.php', 'OCP\\TaskProcessing\\IManager' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IManager.php', 'OCP\\TaskProcessing\\IProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/IProvider.php', + 'OCP\\TaskProcessing\\ISynchronousOptionsProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousOptionsProvider.php', 'OCP\\TaskProcessing\\ISynchronousProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousProvider.php', 'OCP\\TaskProcessing\\ISynchronousWatermarkingProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php', 'OCP\\TaskProcessing\\ITaskType' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITaskType.php', 'OCP\\TaskProcessing\\ITriggerableProvider' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ITriggerableProvider.php', 'OCP\\TaskProcessing\\ShapeDescriptor' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeDescriptor.php', 'OCP\\TaskProcessing\\ShapeEnumValue' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/ShapeEnumValue.php', + 'OCP\\TaskProcessing\\SynchronousProviderOptions' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/SynchronousProviderOptions.php', 'OCP\\TaskProcessing\\Task' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/Task.php', 'OCP\\TaskProcessing\\TaskTypes\\AnalyzeImages' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AnalyzeImages.php', 'OCP\\TaskProcessing\\TaskTypes\\AudioToAudioChat' => __DIR__ . '/../../..' . '/lib/public/TaskProcessing/TaskTypes/AudioToAudioChat.php', @@ -1652,6 +1654,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2 'OC\\Core\\Migrations\\Version33000Date20260126120000' => __DIR__ . '/../../..' . '/core/Migrations/Version33000Date20260126120000.php', 'OC\\Core\\Migrations\\Version34000Date20260318095645' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260318095645.php', 'OC\\Core\\Migrations\\Version34000Date20260415161745' => __DIR__ . '/../../..' . '/core/Migrations/Version34000Date20260415161745.php', + 'OC\\Core\\Migrations\\Version35000Date20260527162338' => __DIR__ . '/../../..' . '/core/Migrations/Version35000Date20260527162338.php', 'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php', 'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php', 'OC\\Core\\Service\\CronService' => __DIR__ . '/../../..' . '/core/Service/CronService.php', diff --git a/lib/private/TaskProcessing/Db/Task.php b/lib/private/TaskProcessing/Db/Task.php index 3ac4facf97eab..6bfbcdd1dc87f 100644 --- a/lib/private/TaskProcessing/Db/Task.php +++ b/lib/private/TaskProcessing/Db/Task.php @@ -51,6 +51,8 @@ * @method null|string getUserFacingErrorMessage() * @method setIncludeWatermark(int $includeWatermark) * @method int getIncludeWatermark() + * @method setPreferStreaming(int $preferStreaming) + * @method int getPreferStreaming() */ class Task extends Entity { protected $lastUpdated; @@ -72,16 +74,17 @@ class Task extends Entity { protected $allowCleanup; protected $userFacingErrorMessage; protected $includeWatermark; + protected $preferStreaming; /** * @var string[] */ - public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark']; + public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method', 'scheduled_at', 'started_at', 'ended_at', 'allow_cleanup', 'user_facing_error_message', 'include_watermark', 'prefer_streaming']; /** * @var string[] */ - public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark']; + public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod', 'scheduledAt', 'startedAt', 'endedAt', 'allowCleanup', 'userFacingErrorMessage', 'includeWatermark', 'preferStreaming']; public function __construct() { @@ -106,6 +109,7 @@ public function __construct() { $this->addType('allowCleanup', 'integer'); $this->addType('userFacingErrorMessage', 'string'); $this->addType('includeWatermark', 'integer'); + $this->addType('preferStreaming', 'integer'); } public function toRow(): array { @@ -137,6 +141,7 @@ public static function fromPublicTask(OCPTask $task): self { 'allowCleanup' => $task->getAllowCleanup() ? 1 : 0, 'userFacingErrorMessage' => $task->getUserFacingErrorMessage(), 'includeWatermark' => $task->getIncludeWatermark() ? 1 : 0, + 'preferStreaming' => $task->getPreferStreaming() ? 1 : 0, ]); return $taskEntity; } @@ -162,6 +167,7 @@ public function toPublicTask(): OCPTask { $task->setAllowCleanup($this->getAllowCleanup() !== 0); $task->setUserFacingErrorMessage($this->getUserFacingErrorMessage()); $task->setIncludeWatermark($this->getIncludeWatermark() !== 0); + $task->setPreferStreaming($this->getPreferStreaming() !== 0); return $task; } } diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index 6821aee65a002..434493cfcc9a0 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -59,12 +59,14 @@ use OCP\TaskProcessing\IInternalTaskType; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\IProvider; +use OCP\TaskProcessing\ISynchronousOptionsProvider; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\ISynchronousWatermarkingProvider; use OCP\TaskProcessing\ITaskType; use OCP\TaskProcessing\ITriggerableProvider; use OCP\TaskProcessing\ShapeDescriptor; use OCP\TaskProcessing\ShapeEnumValue; +use OCP\TaskProcessing\SynchronousProviderOptions; use OCP\TaskProcessing\Task; use OCP\TaskProcessing\TaskTypes\AnalyzeImages; use OCP\TaskProcessing\TaskTypes\AudioToAudioChat; @@ -1135,6 +1137,18 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool { $this->setTaskStatus($task, Task::STATUS_RUNNING); if ($provider instanceof ISynchronousWatermarkingProvider) { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), $task->getIncludeWatermark()); + } elseif ($provider instanceof ISynchronousOptionsProvider) { + $options = new SynchronousProviderOptions( + $task->getIncludeWatermark(), + $task->getPreferStreaming(), + fn (array $output) => $this->setTaskIntermediateOutput($task->getId(), $output), + ); + $output = $provider->process( + $task->getUserId(), + $input, + fn (float $progress) => $this->setTaskProgress($task->getId(), $progress), + $options, + ); } else { $output = $provider->process($task->getUserId(), $input, fn (float $progress) => $this->setTaskProgress($task->getId(), $progress)); } @@ -1216,6 +1230,40 @@ public function setTaskProgress(int $id, float $progress): bool { return true; } + public function setTaskIntermediateOutput(int $id, array $output): bool { + // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently + $task = $this->getTask($id); + if ($task->getStatus() !== Task::STATUS_RUNNING) { + return false; + } + $userId = $task->getUserId(); + if ($userId !== null && $userId !== '' && $this->appManager->isEnabledForAnyone('notify_push')) { + try { + $queue = Server::get(\OCA\NotifyPush\Queue\IQueue::class); + $queue->push('notify_custom', [ + 'user' => $userId, + 'message' => 'task_' . $task->getId(), + 'body' => $output, + ]); + // we don't update the DB if something was sent via notify_push + // so if the push messages are not received for some reason, the polling will still not see any intermediate output + // but will receive the final output + return true; + } catch (ContainerExceptionInterface|NotFoundExceptionInterface $e) { + $this->logger->debug('OCA\NotifyPush\IQueue not found, not sending to queue'); + } + } + // no output shape validation for now + $task->setOutput($output); + $taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task); + try { + $this->taskMapper->update($taskEntity); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e); + } + return true; + } + #[\Override] public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void { // TODO: Not sure if we should rather catch the exceptions of getTask here and fail silently diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 2cd0244b52e8d..b09ea35fdcadf 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -143,6 +143,19 @@ public function cancelTask(int $id): void; */ public function setTaskResult(int $id, ?string $error, ?array $result, bool $isUsingFileIds = false, ?string $userFacingError = null): void; + /** + * Set the task intermediate output. + * If notify_push is available, the output will be pushed to the user and the task won't be updated in the DB. + * + * @param int $id The id of the task + * @param array $output The intermediate output + * @return bool `true` if the task should still be running; `false` if the task has been cancelled in the meantime + * @throws Exception If the query failed + * @throws NotFoundException If the task could not be found + * @since 35.0.0 + */ + public function setTaskIntermediateOutput(int $id, array $output): bool; + /** * @param int $id * @param float $progress diff --git a/lib/public/TaskProcessing/ISynchronousOptionsProvider.php b/lib/public/TaskProcessing/ISynchronousOptionsProvider.php new file mode 100644 index 0000000000000..4564222506f11 --- /dev/null +++ b/lib/public/TaskProcessing/ISynchronousOptionsProvider.php @@ -0,0 +1,40 @@ +|numeric|string|File> $input The task input + * @param callable(float):bool $reportProgress Report the task progress. If this returns false, that means the task was cancelled and processing should be stopped. + * @param SynchronousProviderOptions $options The task options + * @psalm-return array|numeric|string> + * @throws ProcessingException + * @since 35.0.0 + */ + public function process( + ?string $userId, + array $input, + callable $reportProgress, + SynchronousProviderOptions $options = new SynchronousProviderOptions(), + ): array; +} diff --git a/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php b/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php index 05b7c07ec6cfc..3418ffa803d24 100644 --- a/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php +++ b/lib/public/TaskProcessing/ISynchronousWatermarkingProvider.php @@ -17,6 +17,7 @@ * This is the interface that is implemented by apps that * implement a task processing provider that supports watermarking * @since 33.0.0 + * @deprecated 35.0.0 Use ISynchronousOptionsProvider instead */ interface ISynchronousWatermarkingProvider extends ISynchronousProvider { diff --git a/lib/public/TaskProcessing/SynchronousProviderOptions.php b/lib/public/TaskProcessing/SynchronousProviderOptions.php new file mode 100644 index 0000000000000..9bbd81011f46f --- /dev/null +++ b/lib/public/TaskProcessing/SynchronousProviderOptions.php @@ -0,0 +1,38 @@ +reportOutput = $reportOutput !== null + ? \Closure::fromCallable($reportOutput) + : static function (array $output): bool { + return true; + }; + } + + public function getIncludeWatermarks(): bool { + return $this->includeWatermarks; + } + + public function getPreferStreaming(): bool { + return $this->preferStreaming; + } + + public function getReportOutput(): callable { + return $this->reportOutput; + } +} diff --git a/lib/public/TaskProcessing/Task.php b/lib/public/TaskProcessing/Task.php index 72edc9a17ce6f..b3b96290db3d4 100644 --- a/lib/public/TaskProcessing/Task.php +++ b/lib/public/TaskProcessing/Task.php @@ -51,6 +51,8 @@ final class Task implements \JsonSerializable { protected bool $includeWatermark = true; + protected bool $preferStreaming = true; + /** * @since 30.0.0 */ @@ -294,6 +296,22 @@ final public function setIncludeWatermark(bool $includeWatermark): void { $this->includeWatermark = $includeWatermark; } + /** + * @return bool + * @since 35.0.0 + */ + final public function getPreferStreaming(): bool { + return $this->preferStreaming; + } + + /** + * @param bool $preferStreaming + * @since 35.0.0 + */ + final public function setPreferStreaming(bool $preferStreaming): void { + $this->preferStreaming = $preferStreaming; + } + /** * @psalm-return array{id: int, lastUpdated: int, type: string, status: 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN', userId: ?string, appId: string, input: array|numeric|string>, output: ?array|numeric|string>, customId: ?string, completionExpectedAt: ?int, progress: ?float, scheduledAt: ?int, startedAt: ?int, endedAt: ?int, allowCleanup: bool, includeWatermark: bool, userFacingErrorMessage: ?string} * @since 30.0.0 @@ -318,6 +336,7 @@ final public function jsonSerialize(): array { 'allowCleanup' => $this->getAllowCleanup(), 'includeWatermark' => $this->getIncludeWatermark(), 'userFacingErrorMessage' => $this->getUserFacingErrorMessage(), + 'preferStreaming' => $this->getPreferStreaming(), ]; } diff --git a/openapi.json b/openapi.json index ba4ee767a4b16..e231ad6c64274 100644 --- a/openapi.json +++ b/openapi.json @@ -710,7 +710,8 @@ "endedAt", "allowCleanup", "includeWatermark", - "userFacingErrorMessage" + "userFacingErrorMessage", + "preferStreaming" ], "properties": { "id": { @@ -791,6 +792,9 @@ "userFacingErrorMessage": { "type": "string", "nullable": true + }, + "preferStreaming": { + "type": "boolean" } } }, @@ -15725,6 +15729,238 @@ } } }, + "/ocs/v2.php/taskprocessing/tasks_provider/{taskId}/stream-result": { + "post": { + "operationId": "core-task_processing_api-set-intermediate-result", + "summary": "Sets the task intermediate result while it is running", + "description": "This endpoint requires admin access", + "tags": [ + "core/task_processing_api" + ], + "security": [ + { + "bearer_auth": [] + }, + { + "basic_auth": [] + } + ], + "requestBody": { + "required": false, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "output": { + "type": "object", + "nullable": true, + "description": "The intermediate task output, files are represented by their IDs", + "additionalProperties": { + "type": "object" + } + } + } + } + } + } + }, + "parameters": [ + { + "name": "taskId", + "in": "path", + "description": "The id of the task", + "required": true, + "schema": { + "type": "integer", + "format": "int64" + } + }, + { + "name": "OCS-APIRequest", + "in": "header", + "description": "Required to be true for the API request to pass", + "required": true, + "schema": { + "type": "boolean", + "default": true + } + } + ], + "responses": { + "200": { + "description": "Result updated successfully", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "task" + ], + "properties": { + "task": { + "$ref": "#/components/schemas/CoreTaskProcessingTask" + } + } + } + } + } + } + } + } + } + }, + "500": { + "description": "", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "404": { + "description": "Task not found", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": { + "type": "object", + "required": [ + "message" + ], + "properties": { + "message": { + "type": "string" + } + } + } + } + } + } + } + } + } + }, + "401": { + "description": "Current user is not logged in", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + }, + "403": { + "description": "Logged in account must be an admin", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "ocs" + ], + "properties": { + "ocs": { + "type": "object", + "required": [ + "meta", + "data" + ], + "properties": { + "meta": { + "$ref": "#/components/schemas/OCSMeta" + }, + "data": {} + } + } + } + } + } + } + } + } + } + }, "/ocs/v2.php/taskprocessing/tasks_consumer/tasks/{taskId}/cancel": { "post": { "operationId": "core-task_processing_api-cancel-task-ex-app-endpoint", diff --git a/version.php b/version.php index 69cc38c9d7eec..5bff34b308b06 100644 --- a/version.php +++ b/version.php @@ -11,7 +11,7 @@ // between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level // when updating major/minor version number. -$OC_Version = [35, 0, 0, 0]; +$OC_Version = [35, 0, 0, 1]; // The human-readable string $OC_VersionString = '35.0.0 dev';