From 32990cd015106fa3529365a0e224db2fbdedef6b Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Sun, 14 Jun 2026 16:07:51 +0300 Subject: [PATCH] refactor: improve file storage adding csv functionality --- CHANGELOG.md | 279 +++---- Cargo.lock | 231 +++--- apalis-core/src/backend/impls/memory.rs | 32 +- apalis-core/src/task/metadata.rs | 8 + apalis-core/src/task/task_id.rs | 2 +- apalis-core/src/worker/builder.rs | 6 +- examples/dag-workflow/src/main.rs | 5 +- supply-chain/config.toml | 94 +-- utils/apalis-file-storage/Cargo.toml | 2 + utils/apalis-file-storage/README.md | 2 +- utils/apalis-file-storage/src/adapter.rs | 36 + utils/apalis-file-storage/src/backend.rs | 95 --- utils/apalis-file-storage/src/error.rs | 23 + utils/apalis-file-storage/src/lib.rs | 918 +++++++++++++++++++---- utils/apalis-file-storage/src/meta.rs | 2 +- utils/apalis-file-storage/src/shared.rs | 165 ++-- utils/apalis-file-storage/src/sink.rs | 135 ++-- utils/apalis-file-storage/src/util.rs | 420 ++++++----- 18 files changed, 1506 insertions(+), 949 deletions(-) create mode 100644 utils/apalis-file-storage/src/adapter.rs delete mode 100644 utils/apalis-file-storage/src/backend.rs create mode 100644 utils/apalis-file-storage/src/error.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 1321b102..af3284b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,13 @@ All notable changes to this project are documented in this file. ## [Unreleased] -- **chore (api)!**: metadata as a key-value store ([#747](https://github.com/geofmureithi/apalis/pull/747)) -- **chore**: improve `FromRequest` api adding `Option` ([#746](https://github.com/geofmureithi/apalis/pull/746)) -- **chore**: bump to v1.0.0 rc.9 ([#744](https://github.com/geofmureithi/apalis/pull/744)) -- **feat (api)!**: remove queue input in expose endpoints ([#741](https://github.com/geofmureithi/apalis/pull/741)) -- **feat**: idempotency for sql tasks ([#736](https://github.com/geofmureithi/apalis/pull/736)) -- **chore**: bump to v1.0.0 rc.8 ([#734](https://github.com/geofmureithi/apalis/pull/734)) +- **refactor**: improve file storage adding csv functionality ([#748](https://github.com/apalis-dev/apalis/pull/748)) +- **chore (api)!**: metadata as a key-value store ([#747](https://github.com/apalis-dev/apalis/pull/747)) +- **chore**: improve `FromRequest` api adding `Option` ([#746](https://github.com/apalis-dev/apalis/pull/746)) +- **chore**: bump to v1.0.0 rc.9 ([#744](https://github.com/apalis-dev/apalis/pull/744)) +- **feat (api)!**: remove queue input in expose endpoints ([#741](https://github.com/apalis-dev/apalis/pull/741)) +- **feat**: idempotency for sql tasks ([#736](https://github.com/apalis-dev/apalis/pull/736)) +- **chore**: bump to v1.0.0 rc.8 ([#734](https://github.com/apalis-dev/apalis/pull/734)) - **feat**: idempotency for tasks ([#726](https://github.com/apalis-dev/apalis/pull/726)) - **fix(tracing)**: improve OpenTelemetry context propagation across worker tracing layers ([#716](https://github.com/apalis-dev/apalis/pull/716)) - **deps(deps)**: bump sentry-\* from 0.46.2 to 0.47.0 ([#715](https://github.com/apalis-dev/apalis/pull/715)) @@ -26,28 +27,28 @@ All notable changes to this project are documented in this file. - **feat**: introduce contextual tracing allowing passing previous span details ([#670](https://github.com/apalis-dev/apalis/pull/670)) - **feat**: handle long running tasks with a result collector ([#669](https://github.com/apalis-dev/apalis/pull/669)) - **chore**: add `DagCodec` trait which handles entry and output in dag nodes ([#668](https://github.com/apalis-dev/apalis/pull/668)) -- **chore**: bump to v1.0.0 rc.2 ([#660](https://github.com/geofmureithi/apalis/pull/660)) +- **chore**: bump to v1.0.0 rc.2 ([#660](https://github.com/apalis-dev/apalis/pull/660)) - **refactor(apalis-sql)**: refactor `SqlDateTime` and `SqlDateTimeExt` to remove the `Sql` prefix ([#659](https://github.com/apalis-dev/apalis/pull/659)) - **chore**: chore: add file storage backend docs ([#658](https://github.com/apalis-dev/apalis/pull/658)) - **feat(apalis-sql)**: add `SqlDateTimeExt` trait for unified datetime handling with `time` crate support ([#655](https://github.com/apalis-dev/apalis/pull/655)) -- **chore**: chore: prep codec and filestorage crates ([#653](https://github.com/geofmureithi/apalis/pull/653)) -- **chore**: bump: introducing rc.1 ([#646](https://github.com/geofmureithi/apalis/pull/646)) -- **chore**: feat: refactor and granulize traits ([#586](https://github.com/geofmureithi/apalis/pull/586)) -- **refactor**: refactor: crates, workflow and BackendExt ([#623](https://github.com/geofmureithi/apalis/pull/623)) -- **chore**: bump to v1.0.0 beta.1 ([#624](https://github.com/geofmureithi/apalis/pull/624)) -- **chore(deps)**: update actions/checkout digest to 93cb6ef ([#628](https://github.com/geofmureithi/apalis/pull/628)) -- **fix(deps)**: update rust crate sentry-core to 0.45.0 ([#633](https://github.com/geofmureithi/apalis/pull/633)) -- **chore(deps)**: update actions/checkout action to v5 ([#634](https://github.com/geofmureithi/apalis/pull/634)) -- **chore**: introduce `cargo audit` and `cargo vet` ([#640](https://github.com/geofmureithi/apalis/pull/640)) -- **chore**: add cargo udeps ([#641](https://github.com/geofmureithi/apalis/pull/641)) -- **fix**: TaskId must be explicit to prevent defaulting to RandomId ([#643](https://github.com/geofmureithi/apalis/pull/643)) -- **chore**: bump to v1.0.0 beta.2 ([#624](https://github.com/geofmureithi/apalis/pull/644)) +- **chore**: chore: prep codec and filestorage crates ([#653](https://github.com/apalis-dev/apalis/pull/653)) +- **chore**: bump: introducing rc.1 ([#646](https://github.com/apalis-dev/apalis/pull/646)) +- **chore**: feat: refactor and granulize traits ([#586](https://github.com/apalis-dev/apalis/pull/586)) +- **refactor**: refactor: crates, workflow and BackendExt ([#623](https://github.com/apalis-dev/apalis/pull/623)) +- **chore**: bump to v1.0.0 beta.1 ([#624](https://github.com/apalis-dev/apalis/pull/624)) +- **chore(deps)**: update actions/checkout digest to 93cb6ef ([#628](https://github.com/apalis-dev/apalis/pull/628)) +- **fix(deps)**: update rust crate sentry-core to 0.45.0 ([#633](https://github.com/apalis-dev/apalis/pull/633)) +- **chore(deps)**: update actions/checkout action to v5 ([#634](https://github.com/apalis-dev/apalis/pull/634)) +- **chore**: introduce `cargo audit` and `cargo vet` ([#640](https://github.com/apalis-dev/apalis/pull/640)) +- **chore**: add cargo udeps ([#641](https://github.com/apalis-dev/apalis/pull/641)) +- **fix**: TaskId must be explicit to prevent defaulting to RandomId ([#643](https://github.com/apalis-dev/apalis/pull/643)) +- **chore**: bump to v1.0.0 beta.2 ([#624](https://github.com/apalis-dev/apalis/pull/644)) - **fix(worker)**: fix pause/resume functionality ([#651](https://github.com/apalis-dev/apalis/pull/651)) ### Breaking Changes -- **crates**: Moved backend crates to respective repos ([#586](https://github.com/geofmureithi/apalis/pull/586)) -- **api**: `Backend` must be the second input in `WorkerBuilder` ([#586](https://github.com/geofmureithi/apalis/pull/586)) +- **crates**: Moved backend crates to respective repos ([#586](https://github.com/apalis-dev/apalis/pull/586)) +- **api**: `Backend` must be the second input in `WorkerBuilder` ([#586](https://github.com/apalis-dev/apalis/pull/586)) ```rust let worker = WorkerBuilder::new("tasty-banana") @@ -58,7 +59,7 @@ let worker = WorkerBuilder::new("tasty-banana") .build(task_fn); ``` -- **api**: `Monitor` supports restarts and factory() becomes factory(usize) ([#586](https://github.com/geofmureithi/apalis/pull/586)) +- **api**: `Monitor` supports restarts and factory() becomes factory(usize) ([#586](https://github.com/apalis-dev/apalis/pull/586)) ```rust Monitor::new() @@ -76,14 +77,14 @@ Monitor::new() ... ``` -- **api**: `WorkerContext::id()` becomes `WorkerContext::name()` ([#586](https://github.com/geofmureithi/apalis/pull/586)) -- **api**: `service_fn` becomes `taskfn` ([#586](https://github.com/geofmureithi/apalis/pull/586)) -- **api**: `Pipe::pipe_to_storage` becomes `PipeExt::pipe_to` ([#586](https://github.com/geofmureithi/apalis/pull/586)) +- **api**: `WorkerContext::id()` becomes `WorkerContext::name()` ([#586](https://github.com/apalis-dev/apalis/pull/586)) +- **api**: `service_fn` becomes `taskfn` ([#586](https://github.com/apalis-dev/apalis/pull/586)) +- **api**: `Pipe::pipe_to_storage` becomes `PipeExt::pipe_to` ([#586](https://github.com/apalis-dev/apalis/pull/586)) ### Added - **opentelemetry layer**: monitor tasks with [OpenTelemetry](https://docs.rs/opentelemetry/latest/opentelemetry/) metrics ([#663](https://github.com/apalis-dev/apalis/pull/663)) -- **api**: `Monitor::should_restart` for controlling worker restarts ([#586](https://github.com/geofmureithi/apalis/pull/586)) +- **api**: `Monitor::should_restart` for controlling worker restarts ([#586](https://github.com/apalis-dev/apalis/pull/586)) ```rs Monitor::new() @@ -94,215 +95,215 @@ Monitor::new() }) ``` -## [0.7.4](https://github.com/geofmureithi/apalis/releases/tag/v0.7.4) +## [0.7.4](https://github.com/apalis-dev/apalis/releases/tag/v0.7.4) ### Fixed -- **MySQLStorage**: update MySQL query to correctly handle `last_seen` field ([#618](https://github.com/geofmureithi/apalis/pull/618)) -- **fix**: buggy delay with stepped tasks ([#630](https://github.com/geofmureithi/apalis/pull/630)) +- **MySQLStorage**: update MySQL query to correctly handle `last_seen` field ([#618](https://github.com/apalis-dev/apalis/pull/618)) +- **fix**: buggy delay with stepped tasks ([#630](https://github.com/apalis-dev/apalis/pull/630)) -## [0.7.3](https://github.com/geofmureithi/apalis/releases/tag/v0.7.3) +## [0.7.3](https://github.com/apalis-dev/apalis/releases/tag/v0.7.3) ### Fixed -- **deps**: update rust crate redis to `0.32` ([#584](https://github.com/geofmureithi/apalis/pull/584)) -- **deps**: update rust crate sentry-core to 0.42.0 ([#585](https://github.com/geofmureithi/apalis/pull/585)) -- **deps**: update rust crate criterion to 0.7.0 ([#591](https://github.com/geofmureithi/apalis/pull/591)) -- **deps**: update actions/checkout digest to 08eba0b ([#592](https://github.com/geofmureithi/apalis/pull/592)) -- **deps**: update actions/checkout action to v5 ([#593](https://github.com/geofmureithi/apalis/pull/593)) +- **deps**: update rust crate redis to `0.32` ([#584](https://github.com/apalis-dev/apalis/pull/584)) +- **deps**: update rust crate sentry-core to 0.42.0 ([#585](https://github.com/apalis-dev/apalis/pull/585)) +- **deps**: update rust crate criterion to 0.7.0 ([#591](https://github.com/apalis-dev/apalis/pull/591)) +- **deps**: update actions/checkout digest to 08eba0b ([#592](https://github.com/apalis-dev/apalis/pull/592)) +- **deps**: update actions/checkout action to v5 ([#593](https://github.com/apalis-dev/apalis/pull/593)) -## [0.7.2](https://github.com/geofmureithi/apalis/releases/tag/v0.7.2) +## [0.7.2](https://github.com/apalis-dev/apalis/releases/tag/v0.7.2) ### Fixed - **RedisStorage** the `stats` script is now compatible with dragonfly -- **examples** Prometheus example ([#562](https://github.com/geofmureithi/apalis/pull/562)) -- **SqliteStorage** Event::Idle never trigger ([#571](https://github.com/geofmureithi/apalis/pull/571)) -- **deps** : update rust crate redis to 0.31 ([#555](https://github.com/geofmureithi/apalis/pull/555)) -- **deps** : update rust crate sentry-core to 0.38.0 ([#569](https://github.com/geofmureithi/apalis/pull/569)) -- **deps** : update rust crate criterion to 0.6.0 ([#574](https://github.com/geofmureithi/apalis/pull/574)) -- **error-handling** : ease the error type that is returned by a worker function ([#577](https://github.com/geofmureithi/apalis/pull/577)) -- **PostgresStorage**: fix type error when updating jobs ([#539](https://github.com/geofmureithi/apalis/issues/539)) -- **workflows** : improve permissions for github workflows ([#578](https://github.com/geofmureithi/apalis/issues/578)) -- **deps** : update rust crate pprof to 0.15 ([#579](https://github.com/geofmureithi/apalis/issues/579)) - -## [0.7.1](https://github.com/geofmureithi/apalis/releases/tag/v0.7.1) +- **examples** Prometheus example ([#562](https://github.com/apalis-dev/apalis/pull/562)) +- **SqliteStorage** Event::Idle never trigger ([#571](https://github.com/apalis-dev/apalis/pull/571)) +- **deps** : update rust crate redis to 0.31 ([#555](https://github.com/apalis-dev/apalis/pull/555)) +- **deps** : update rust crate sentry-core to 0.38.0 ([#569](https://github.com/apalis-dev/apalis/pull/569)) +- **deps** : update rust crate criterion to 0.6.0 ([#574](https://github.com/apalis-dev/apalis/pull/574)) +- **error-handling** : ease the error type that is returned by a worker function ([#577](https://github.com/apalis-dev/apalis/pull/577)) +- **PostgresStorage**: fix type error when updating jobs ([#539](https://github.com/apalis-dev/apalis/issues/539)) +- **workflows** : improve permissions for github workflows ([#578](https://github.com/apalis-dev/apalis/issues/578)) +- **deps** : update rust crate pprof to 0.15 ([#579](https://github.com/apalis-dev/apalis/issues/579)) + +## [0.7.1](https://github.com/apalis-dev/apalis/releases/tag/v0.7.1) ### Changed -- **SqliteStorage**: Consistent state fetch via returning update query and remove transactions ([#549](https://github.com/geofmureithi/apalis/pull/549)) +- **SqliteStorage**: Consistent state fetch via returning update query and remove transactions ([#549](https://github.com/apalis-dev/apalis/pull/549)) ### Fixed -- **PostgresStorage**: remove old, conflicting apalis.push_job database function ([#543](https://github.com/geofmureithi/apalis/pull/543)) -- **deps**: update rust crate sentry-core to `0.37.0` ([#542](https://github.com/geofmureithi/apalis/pull/542)) -- **apalis-sql**: Fix schedule job on sqlite and mysql + tests ([#556](https://github.com/geofmureithi/apalis/issues/556)) -- **deps**: update rust crate metrics-exporter-prometheus to 0.17 ([#550](https://github.com/geofmureithi/apalis/issues/550)) +- **PostgresStorage**: remove old, conflicting apalis.push_job database function ([#543](https://github.com/apalis-dev/apalis/pull/543)) +- **deps**: update rust crate sentry-core to `0.37.0` ([#542](https://github.com/apalis-dev/apalis/pull/542)) +- **apalis-sql**: Fix schedule job on sqlite and mysql + tests ([#556](https://github.com/apalis-dev/apalis/issues/556)) +- **deps**: update rust crate metrics-exporter-prometheus to 0.17 ([#550](https://github.com/apalis-dev/apalis/issues/550)) ### Tests -- **execute_next**: fix(tests): avoid execute_next timeout ([#548](https://github.com/geofmureithi/apalis/pull/548)) +- **execute_next**: fix(tests): avoid execute_next timeout ([#548](https://github.com/apalis-dev/apalis/pull/548)) -## [0.7.0](https://github.com/geofmureithi/apalis/releases/tag/v0.7.0) +## [0.7.0](https://github.com/apalis-dev/apalis/releases/tag/v0.7.0) ### Added -- **api** add associated types to the Backend trait ([#516](https://github.com/geofmureithi/apalis/pull/516)) -- **retry layer**: Integrate retry logic with task handling ([#512](https://github.com/geofmureithi/apalis/pull/512)) -- **generic retry**: Persist check for tasks ([#498](https://github.com/geofmureithi/apalis/pull/498)) -- **native TLS**: Add `async-std-comp-native-tls` and `tokio-comp-native-tls` features ([#525](https://github.com/geofmureithi/apalis/pull/525)) -- **cron** : Introduce CronContext ([#488](https://github.com/geofmureithi/apalis/pull/488)) -- **stepped tasks** : adds ability to execute stepped tasks ([#478](https://github.com/geofmureithi/apalis/pull/478)) -- **SQL** : add support for job priority to SQL storages ([#533](https://github.com/geofmureithi/apalis/pull/533/)) +- **api** add associated types to the Backend trait ([#516](https://github.com/apalis-dev/apalis/pull/516)) +- **retry layer**: Integrate retry logic with task handling ([#512](https://github.com/apalis-dev/apalis/pull/512)) +- **generic retry**: Persist check for tasks ([#498](https://github.com/apalis-dev/apalis/pull/498)) +- **native TLS**: Add `async-std-comp-native-tls` and `tokio-comp-native-tls` features ([#525](https://github.com/apalis-dev/apalis/pull/525)) +- **cron** : Introduce CronContext ([#488](https://github.com/apalis-dev/apalis/pull/488)) +- **stepped tasks** : adds ability to execute stepped tasks ([#478](https://github.com/apalis-dev/apalis/pull/478)) +- **SQL** : add support for job priority to SQL storages ([#533](https://github.com/apalis-dev/apalis/pull/533/)) ### Fixed -- **PostgresStorage**: PostgresStorage get_jobs status conditional ([#524](https://github.com/geofmureithi/apalis/pull/524)) -- **cron heartbeat**: Refactor and fix cron heartbeat ([#513](https://github.com/geofmureithi/apalis/pull/513)) -- **RedisStorage**: Correct `running_count` statistic ([#506](https://github.com/geofmureithi/apalis/pull/506)) -- **orphaned tasks**: Re-enqueue orphaned jobs before starting streaming ([#507](https://github.com/geofmureithi/apalis/pull/507)) -- **deps**: Update Rust crate `redis` to `0.28` ([#495](https://github.com/geofmureithi/apalis/pull/495)) -- **deps**: Update Rust crate `redis` to `0.29` and `deadpool-redis` to `0.20` ([#527](https://github.com/geofmureithi/apalis/pull/527)) -- **features**: fix: ease apalis-core default features ([538](https://github.com/geofmureithi/apalis/pull/538)) +- **PostgresStorage**: PostgresStorage get_jobs status conditional ([#524](https://github.com/apalis-dev/apalis/pull/524)) +- **cron heartbeat**: Refactor and fix cron heartbeat ([#513](https://github.com/apalis-dev/apalis/pull/513)) +- **RedisStorage**: Correct `running_count` statistic ([#506](https://github.com/apalis-dev/apalis/pull/506)) +- **orphaned tasks**: Re-enqueue orphaned jobs before starting streaming ([#507](https://github.com/apalis-dev/apalis/pull/507)) +- **deps**: Update Rust crate `redis` to `0.28` ([#495](https://github.com/apalis-dev/apalis/pull/495)) +- **deps**: Update Rust crate `redis` to `0.29` and `deadpool-redis` to `0.20` ([#527](https://github.com/apalis-dev/apalis/pull/527)) +- **features**: fix: ease apalis-core default features ([538](https://github.com/apalis-dev/apalis/pull/538)) ### Tests -- **Integration tests**: Aborting jobs and panicking workers ([#508](https://github.com/geofmureithi/apalis/pull/508)) -- **Worker tests**: Run a real worker in `testrunner` ([#509](https://github.com/geofmureithi/apalis/pull/509)) +- **Integration tests**: Aborting jobs and panicking workers ([#508](https://github.com/apalis-dev/apalis/pull/508)) +- **Worker tests**: Run a real worker in `testrunner` ([#509](https://github.com/apalis-dev/apalis/pull/509)) --- -## [0.6.4](https://github.com/geofmureithi/apalis/releases/tag/v0.6.4) +## [0.6.4](https://github.com/apalis-dev/apalis/releases/tag/v0.6.4) ### Changed -- **Version bump**: Increment to `v0.6.4` ([#500](https://github.com/geofmureithi/apalis/pull/500)) +- **Version bump**: Increment to `v0.6.4` ([#500](https://github.com/apalis-dev/apalis/pull/500)) ### Fixed -- **deps**: Update Rust crate `cron` to `0.15.0` ([#499](https://github.com/geofmureithi/apalis/pull/499)) -- **deps**: Update Rust crate `sentry-core` to `0.36.0` ([#493](https://github.com/geofmureithi/apalis/pull/493)) -- **vacuuming**: Handle vacuuming correctly for backends ([#491](https://github.com/geofmureithi/apalis/pull/491)) +- **deps**: Update Rust crate `cron` to `0.15.0` ([#499](https://github.com/apalis-dev/apalis/pull/499)) +- **deps**: Update Rust crate `sentry-core` to `0.36.0` ([#493](https://github.com/apalis-dev/apalis/pull/493)) +- **vacuuming**: Handle vacuuming correctly for backends ([#491](https://github.com/apalis-dev/apalis/pull/491)) ### Added -- **Redis keep_alive**: Prevent permanent failure after Redis restarts ([#492](https://github.com/geofmureithi/apalis/pull/492)) +- **Redis keep_alive**: Prevent permanent failure after Redis restarts ([#492](https://github.com/apalis-dev/apalis/pull/492)) --- -## [0.6.3](https://github.com/geofmureithi/apalis/releases/tag/v0.6.3) +## [0.6.3](https://github.com/apalis-dev/apalis/releases/tag/v0.6.3) ### Changed -- **Version bump**: Increment to `v0.6.3` ([#490](https://github.com/geofmureithi/apalis/pull/490)) +- **Version bump**: Increment to `v0.6.3` ([#490](https://github.com/apalis-dev/apalis/pull/490)) ### Fixed -- **deps**: Update Rust crate `cron` to `0.14.0` ([#486](https://github.com/geofmureithi/apalis/pull/486)) +- **deps**: Update Rust crate `cron` to `0.14.0` ([#486](https://github.com/apalis-dev/apalis/pull/486)) - **deps**: Update Rust crate `sentry-core` to `0.35.0` -- **examples**: Several improvements and fixes ([#484](https://github.com/geofmureithi/apalis/pull/484), [#489](https://github.com/geofmureithi/apalis/pull/489)) +- **examples**: Several improvements and fixes ([#484](https://github.com/apalis-dev/apalis/pull/484), [#489](https://github.com/apalis-dev/apalis/pull/489)) --- -## [0.6.2](https://github.com/geofmureithi/apalis/releases/tag/v0.6.2) +## [0.6.2](https://github.com/apalis-dev/apalis/releases/tag/v0.6.2) ### Changed -- **Version bump**: Increment to `v0.6.2` ([#482](https://github.com/geofmureithi/apalis/pull/482)) +- **Version bump**: Increment to `v0.6.2` ([#482](https://github.com/apalis-dev/apalis/pull/482)) ### Fixed -- **Waker**: Handle only the latest waker ([#481](https://github.com/geofmureithi/apalis/pull/481)) +- **Waker**: Handle only the latest waker ([#481](https://github.com/apalis-dev/apalis/pull/481)) --- -## [0.6.1](https://github.com/geofmureithi/apalis/releases/tag/v0.6.1) +## [0.6.1](https://github.com/apalis-dev/apalis/releases/tag/v0.6.1) ### Changed -- **Version bump**: Increment to `v0.6.1` ([#479](https://github.com/geofmureithi/apalis/pull/479)) +- **Version bump**: Increment to `v0.6.1` ([#479](https://github.com/apalis-dev/apalis/pull/479)) ### Fixed -- **Redis MQ**: Restore missing `message_id` ([#475](https://github.com/geofmureithi/apalis/pull/475)) -- **Worker readiness**: Allow polling only when the worker is ready ([#472](https://github.com/geofmureithi/apalis/pull/472)) +- **Redis MQ**: Restore missing `message_id` ([#475](https://github.com/apalis-dev/apalis/pull/475)) +- **Worker readiness**: Allow polling only when the worker is ready ([#472](https://github.com/apalis-dev/apalis/pull/472)) --- -## [0.6.0](https://github.com/geofmureithi/apalis/releases/tag/v0.6.0) +## [0.6.0](https://github.com/apalis-dev/apalis/releases/tag/v0.6.0) ### Changed -- **Version bump**: Introduce `v0.6.0` ([#459](https://github.com/geofmureithi/apalis/pull/459)) -- **Branch name**: Rename `master` to `main` ([#456](https://github.com/geofmureithi/apalis/pull/456)) +- **Version bump**: Introduce `v0.6.0` ([#459](https://github.com/apalis-dev/apalis/pull/459)) +- **Branch name**: Rename `master` to `main` ([#456](https://github.com/apalis-dev/apalis/pull/456)) - **deps**: Various dependency updates (e.g., `tower` to `0.5`, `thiserror` to `v2`, etc.) ### Fixed -- **Redis**: Minor ACK bug ([#463](https://github.com/geofmureithi/apalis/pull/463)) +- **Redis**: Minor ACK bug ([#463](https://github.com/apalis-dev/apalis/pull/463)) --- -## [0.5.5](https://github.com/geofmureithi/apalis/releases/tag/v0.5.5) +## [0.5.5](https://github.com/apalis-dev/apalis/releases/tag/v0.5.5) ### Changed -- **Version bump**: `v0.5.5` ([#408](https://github.com/geofmureithi/apalis/pull/408)) +- **Version bump**: `v0.5.5` ([#408](https://github.com/apalis-dev/apalis/pull/408)) ### Fixed -- **tower**: Adjust for breaking changes ([#407](https://github.com/geofmureithi/apalis/pull/407)) +- **tower**: Adjust for breaking changes ([#407](https://github.com/apalis-dev/apalis/pull/407)) --- -## [0.5.4](https://github.com/geofmureithi/apalis/releases/tag/v0.5.4) +## [0.5.4](https://github.com/apalis-dev/apalis/releases/tag/v0.5.4) ### Changed -- **Version bump**: `v0.5.4` ([#405](https://github.com/geofmureithi/apalis/pull/405)) +- **Version bump**: `v0.5.4` ([#405](https://github.com/apalis-dev/apalis/pull/405)) ### Fixed -- **deps**: Update Rust crate `tower` to `0.5` ([#396](https://github.com/geofmureithi/apalis/pull/396)) -- **sqlx**: Update to `0.8.1` ([#403](https://github.com/geofmureithi/apalis/pull/403)) +- **deps**: Update Rust crate `tower` to `0.5` ([#396](https://github.com/apalis-dev/apalis/pull/396)) +- **sqlx**: Update to `0.8.1` ([#403](https://github.com/apalis-dev/apalis/pull/403)) --- -## [0.5.3](https://github.com/geofmureithi/apalis/releases/tag/v0.5.3) +## [0.5.3](https://github.com/apalis-dev/apalis/releases/tag/v0.5.3) ### Changed -- **Version bump**: `v0.5.3` ([#328](https://github.com/geofmureithi/apalis/pull/328)) -- **Repository metadata**: Add repository information ([#345](https://github.com/geofmureithi/apalis/pull/345)) +- **Version bump**: `v0.5.3` ([#328](https://github.com/apalis-dev/apalis/pull/328)) +- **Repository metadata**: Add repository information ([#345](https://github.com/apalis-dev/apalis/pull/345)) ### Fixed -- **Job context**: Ensure job context is tracked properly ([#289](https://github.com/geofmureithi/apalis/pull/289)) +- **Job context**: Ensure job context is tracked properly ([#289](https://github.com/apalis-dev/apalis/pull/289)) --- -## [0.5.2](https://github.com/geofmureithi/apalis/releases/tag/v0.5.2) +## [0.5.2](https://github.com/apalis-dev/apalis/releases/tag/v0.5.2) ### Changed -- **Version bump**: `v0.5.2` ([#322](https://github.com/geofmureithi/apalis/pull/322)) +- **Version bump**: `v0.5.2` ([#322](https://github.com/apalis-dev/apalis/pull/322)) ### Fixed -- **Timestamp**: Correct timestamp type for Postgres ([#321](https://github.com/geofmureithi/apalis/pull/321)) -- **SQL config**: Allow SQL config values to be modified ([#320](https://github.com/geofmureithi/apalis/pull/320)) +- **Timestamp**: Correct timestamp type for Postgres ([#321](https://github.com/apalis-dev/apalis/pull/321)) +- **SQL config**: Allow SQL config values to be modified ([#320](https://github.com/apalis-dev/apalis/pull/320)) --- -## [0.5.1](https://github.com/geofmureithi/apalis/releases/tag/v0.5.1) +## [0.5.1](https://github.com/apalis-dev/apalis/releases/tag/v0.5.1) ### Changed -- **Version bump**: `v0.5.1` ([#265](https://github.com/geofmureithi/apalis/pull/265)) +- **Version bump**: `v0.5.1` ([#265](https://github.com/apalis-dev/apalis/pull/265)) ### Added -- **Shutdown terminator**: Graceful shutdown improvements ([#263](https://github.com/geofmureithi/apalis/pull/263)) +- **Shutdown terminator**: Graceful shutdown improvements ([#263](https://github.com/apalis-dev/apalis/pull/263)) ### Fixed @@ -311,76 +312,76 @@ Monitor::new() --- -## [0.5.0](https://github.com/geofmureithi/apalis/releases/tag/v0.5.0) +## [0.5.0](https://github.com/apalis-dev/apalis/releases/tag/v0.5.0) ### Changed -- **Version bump**: `v0.5.0` ([#247](https://github.com/geofmureithi/apalis/pull/247)) +- **Version bump**: `v0.5.0` ([#247](https://github.com/apalis-dev/apalis/pull/247)) - **Tokio**: Update to `1.36.0` ### Fixed -- **Examples**: Fix missing data in example ([#245](https://github.com/geofmureithi/apalis/pull/245)) +- **Examples**: Fix missing data in example ([#245](https://github.com/apalis-dev/apalis/pull/245)) --- -## [0.4.9](https://github.com/geofmureithi/apalis/releases/tag/v0.4.9) +## [0.4.9](https://github.com/apalis-dev/apalis/releases/tag/v0.4.9) ### Changed -- **Version bump**: `v0.4.9` ([#232](https://github.com/geofmureithi/apalis/pull/232)) +- **Version bump**: `v0.4.9` ([#232](https://github.com/apalis-dev/apalis/pull/232)) ### Fixed -- **deps**: Update Rust crate `futures-lite` to `2.2.0` ([#230](https://github.com/geofmureithi/apalis/pull/230)) -- **MQ builder**: Various bug fixes and improvements ([#231](https://github.com/geofmureithi/apalis/pull/231)) +- **deps**: Update Rust crate `futures-lite` to `2.2.0` ([#230](https://github.com/apalis-dev/apalis/pull/230)) +- **MQ builder**: Various bug fixes and improvements ([#231](https://github.com/apalis-dev/apalis/pull/231)) --- -## [0.4.8](https://github.com/geofmureithi/apalis/releases/tag/v0.4.8) +## [0.4.8](https://github.com/apalis-dev/apalis/releases/tag/v0.4.8) ### Changed -- **Version bump**: `v0.4.8` ([#229](https://github.com/geofmureithi/apalis/pull/229)) +- **Version bump**: `v0.4.8` ([#229](https://github.com/apalis-dev/apalis/pull/229)) ### Added -- **Configurable concurrency** per worker ([#222](https://github.com/geofmureithi/apalis/pull/222)) +- **Configurable concurrency** per worker ([#222](https://github.com/apalis-dev/apalis/pull/222)) ### Fixed -- **deps**: Update Rust crate `smol` to `v2` ([#228](https://github.com/geofmureithi/apalis/pull/228)) -- **deps**: Update Rust crate `async-trait` to `0.1.77` ([#225](https://github.com/geofmureithi/apalis/pull/225)) +- **deps**: Update Rust crate `smol` to `v2` ([#228](https://github.com/apalis-dev/apalis/pull/228)) +- **deps**: Update Rust crate `async-trait` to `0.1.77` ([#225](https://github.com/apalis-dev/apalis/pull/225)) --- -## [0.4.6](https://github.com/geofmureithi/apalis/releases/tag/v0.4.6) +## [0.4.6](https://github.com/apalis-dev/apalis/releases/tag/v0.4.6) ### Changed -- **Version bump**: `v0.4.6` ([#197](https://github.com/geofmureithi/apalis/pull/197)) +- **Version bump**: `v0.4.6` ([#197](https://github.com/apalis-dev/apalis/pull/197)) ### Fixed -- **features**: Prevent forcing unintended dependencies by default ([#196](https://github.com/geofmureithi/apalis/pull/196)) -- **worker spawn**: Use `executor.spawn` from within worker to start heartbeat tasks ([#195](https://github.com/geofmureithi/apalis/pull/195)) +- **features**: Prevent forcing unintended dependencies by default ([#196](https://github.com/apalis-dev/apalis/pull/196)) +- **worker spawn**: Use `executor.spawn` from within worker to start heartbeat tasks ([#195](https://github.com/apalis-dev/apalis/pull/195)) --- -## [0.4.5](https://github.com/geofmureithi/apalis/releases/tag/v0.4.5) +## [0.4.5](https://github.com/apalis-dev/apalis/releases/tag/v0.4.5) ### Changed -- **Version bump**: `v0.4.5` ([#181](https://github.com/geofmureithi/apalis/pull/181)) +- **Version bump**: `v0.4.5` ([#181](https://github.com/apalis-dev/apalis/pull/181)) ### Fixed -- **deps**: Update Rust crate `tokio` to `1.33.0` ([#180](https://github.com/geofmureithi/apalis/pull/180)) -- **expose migrations**: Expose migrations for Postgres and SQLite ([#179](https://github.com/geofmureithi/apalis/pull/179)) +- **deps**: Update Rust crate `tokio` to `1.33.0` ([#180](https://github.com/apalis-dev/apalis/pull/180)) +- **expose migrations**: Expose migrations for Postgres and SQLite ([#179](https://github.com/apalis-dev/apalis/pull/179)) --- -## [0.4.4](https://github.com/geofmureithi/apalis/releases/tag/v0.4.4) +## [0.4.4](https://github.com/apalis-dev/apalis/releases/tag/v0.4.4) ### Changed @@ -390,7 +391,7 @@ Monitor::new() --- -## [0.4.3](https://github.com/geofmureithi/apalis/releases/tag/v0.4.3) +## [0.4.3](https://github.com/apalis-dev/apalis/releases/tag/v0.4.3) ### Changed @@ -399,7 +400,7 @@ Monitor::new() --- -## [0.4.2](https://github.com/geofmureithi/apalis/releases/tag/v0.4.2) +## [0.4.2](https://github.com/apalis-dev/apalis/releases/tag/v0.4.2) ### Changed @@ -412,7 +413,7 @@ Monitor::new() --- -## [0.4.1](https://github.com/geofmureithi/apalis/releases/tag/v0.4.1) +## [0.4.1](https://github.com/apalis-dev/apalis/releases/tag/v0.4.1) ### Changed @@ -424,7 +425,7 @@ Monitor::new() --- -## [0.3.6](https://github.com/geofmureithi/apalis/releases/tag/v0.3.6) +## [0.3.6](https://github.com/apalis-dev/apalis/releases/tag/v0.3.6) ### Changed @@ -433,7 +434,7 @@ Monitor::new() --- -## [0.3.5](https://github.com/geofmureithi/apalis/releases/tag/v0.3.5) +## [0.3.5](https://github.com/apalis-dev/apalis/releases/tag/v0.3.5) ### Changed @@ -446,7 +447,7 @@ Monitor::new() --- -## [0.3.4](https://github.com/geofmureithi/apalis/releases/tag/v0.3.4) +## [0.3.4](https://github.com/apalis-dev/apalis/releases/tag/v0.3.4) ### Changed @@ -455,7 +456,7 @@ Monitor::new() --- -## [0.3.3](https://github.com/geofmureithi/apalis/releases/tag/v0.3.3) +## [0.3.3](https://github.com/apalis-dev/apalis/releases/tag/v0.3.3) ### Changed @@ -464,7 +465,7 @@ Monitor::new() --- -## [0.3.1](https://github.com/geofmureithi/apalis/releases/tag/v0.3.1) +## [0.3.1](https://github.com/apalis-dev/apalis/releases/tag/v0.3.1) ### Changed @@ -473,7 +474,7 @@ Monitor::new() --- -## [0.3.0](https://github.com/geofmureithi/apalis/releases/tag/v0.3.0) +## [0.3.0](https://github.com/apalis-dev/apalis/releases/tag/v0.3.0) ### Changed diff --git a/Cargo.lock b/Cargo.lock index b6b4b6ce..6d14af37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,7 +144,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2 0.6.3", + "socket2 0.6.4", "time", "tracing", "url", @@ -305,12 +305,14 @@ dependencies = [ "apalis-codec", "apalis-core", "apalis-workflow", + "fd-lock", "futures-channel", "futures-core", "futures-sink", "futures-util", "serde", "serde_json", + "thiserror 2.0.18", "tokio", ] @@ -364,9 +366,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "aws-lc-rs" @@ -401,7 +403,7 @@ dependencies = [ "bytes", "form_urlencoded", "futures-util", - "http 1.4.0", + "http 1.4.2", "http-body", "http-body-util", "hyper", @@ -432,7 +434,7 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http 1.4.0", + "http 1.4.2", "http-body", "http-body-util", "mime", @@ -520,9 +522,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.11.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" [[package]] name = "block2" @@ -535,9 +537,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "byteorder" @@ -575,9 +577,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.62" +version = "1.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "dad887fd958be91b5098c0248def011f4523ab786cd411be668777e55063501f" dependencies = [ "find-msvc-tools", "jobserver", @@ -599,9 +601,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "iana-time-zone", "js-sys", @@ -707,7 +709,6 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ - "powerfmt", "serde_core", ] @@ -746,9 +747,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" dependencies = [ "proc-macro2", "quote", @@ -892,6 +893,17 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fd-lock" +version = "4.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" +dependencies = [ + "cfg-if", + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1073,9 +1085,9 @@ dependencies = [ [[package]] name = "generator" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +checksum = "b3b854b0e584ead1a33f18b2fcad7cf7be18b3875c78816b753639aa501513ae" dependencies = [ "cc", "cfg-if", @@ -1154,7 +1166,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.4.0", + "http 1.4.2", "indexmap", "slab", "tokio", @@ -1238,9 +1250,9 @@ dependencies = [ [[package]] name = "http" -version = "1.4.0" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" dependencies = [ "bytes", "itoa", @@ -1253,7 +1265,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.4.0", + "http 1.4.2", ] [[package]] @@ -1264,7 +1276,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.4.0", + "http 1.4.2", "http-body", "pin-project-lite", ] @@ -1283,16 +1295,16 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.9.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +checksum = "55281c53a1894c864990125767da440a4e630446785086f52523b20033b74498" dependencies = [ "atomic-waker", "bytes", "futures-channel", "futures-core", "h2", - "http 1.4.0", + "http 1.4.2", "http-body", "httparse", "httpdate", @@ -1309,7 +1321,7 @@ version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ - "http 1.4.0", + "http 1.4.2", "hyper", "hyper-util", "rustls", @@ -1345,14 +1357,14 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http 1.4.0", + "http 1.4.2", "http-body", "hyper", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.6.4", "tokio", "tower-service", "tracing", @@ -1538,9 +1550,9 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "jiff" -version = "0.2.24" +version = "0.2.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00b5dbd620d61dfdcb6007c9c1f6054ebd75319f163d886a9055cec1155073d" +checksum = "4603d3033e49e2b0e31229fcab20a5d40089c607d975cd9c80551dc69eed9102" dependencies = [ "jiff-static", "log", @@ -1551,9 +1563,9 @@ dependencies = [ [[package]] name = "jiff-static" -version = "0.2.24" +version = "0.2.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e000de030ff8022ea1da3f466fbb0f3a809f5e51ed31f6dd931c35181ad8e6d7" +checksum = "782d32378dddf207193ac91cefb848ad41abb58195c95168e1291227a0832b47" dependencies = [ "proc-macro2", "quote", @@ -1572,13 +1584,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.98" +version = "0.3.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" +checksum = "03d04c30968dffe80775bd4d7fb676131cd04a1fb46d2686dbffbaec2d9dfd31" dependencies = [ "cfg-if", "futures-util", - "once_cell", "wasm-bindgen", ] @@ -1652,9 +1663,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "953f07c43838f8e6f9758cab68bf5bed85465e7587ebe0b823f1bcd81978ad3a" [[package]] name = "long-running" @@ -1699,9 +1710,9 @@ checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "88904434abc2901f197fe8cc55f0445e7ded921dba5911dad2e2b39b48e663c4" [[package]] name = "metrics" @@ -1770,9 +1781,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "log", @@ -2032,9 +2043,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.80" +version = "0.10.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a45fa2aa886c42762255da344f0a0d313e254066c46aad76f300c3d3da62d967" +checksum = "77823a27f0babb03091cb9ed9ef80af3b39dbc82f97e8fa530374b7dafd87a45" dependencies = [ "bitflags", "cfg-if", @@ -2063,9 +2074,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.116" +version = "0.9.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28a22dc7140cda5f096e5e7724a6962ca81a7f8bfd2979f9b18c11af56318c4" +checksum = "b47e7e6bb2c38cd930d25a23b40fa52e068c10e85f3e03a7f5ba5aaca5713695" dependencies = [ "cc", "libc", @@ -2116,7 +2127,7 @@ checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", - "http 1.4.0", + "http 1.4.2", "opentelemetry", ] @@ -2126,7 +2137,7 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" dependencies = [ - "http 1.4.0", + "http 1.4.2", "opentelemetry", "opentelemetry-http", "opentelemetry-proto", @@ -2367,9 +2378,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -2377,9 +2388,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools", @@ -2511,9 +2522,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.3" +version = "1.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +checksum = "f1292b7759ae1cb9ec195452d1390a074f0cd8541ab7a5a8c31cd6db45d4a6ba" dependencies = [ "aho-corasick", "memchr", @@ -2540,15 +2551,15 @@ checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" [[package]] name = "regex-syntax" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" [[package]] name = "reqwest" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e0021ea2c22aed41653bc7e1419abb2c97e038ff2c33d0e1309e49a97deec0" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", @@ -2556,7 +2567,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http 1.4.0", + "http 1.4.2", "http-body", "http-body-util", "hyper", @@ -2674,9 +2685,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +checksum = "dab5152771c58876a2146916e53e35057e1a4dfa2b9df0f0305b07f611fdea4d" dependencies = [ "openssl-probe", "rustls-pki-types", @@ -2950,9 +2961,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", @@ -2995,9 +3006,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -3023,9 +3034,9 @@ checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" -version = "1.15.1" +version = "1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +checksum = "8ed6a63f02c8539c91a8685a86f4099661ba3da017932f6ebbea6de3f0fa7c90" [[package]] name = "socket2" @@ -3039,9 +3050,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys 0.61.2", @@ -3168,12 +3179,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.47" +version = "0.3.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +checksum = "711a53c2d47bbd818258c498c8dbfe186a2526c631495cfe7e078567f86b8469" dependencies = [ "deranged", - "itoa", "num-conv", "powerfmt", "serde_core", @@ -3183,15 +3193,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" +checksum = "9e1c906769ad99c88eaa54e728060edef082f8e358ff32030cb7c7d315e81109" [[package]] name = "time-macros" -version = "0.2.27" +version = "0.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +checksum = "71c652a3727a9cbb9a02f707f530b618ce00d0ccd762009c8c23bd191df3c17d" dependencies = [ "num-conv", "time-core", @@ -3219,7 +3229,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.3", + "socket2 0.6.4", "tokio-macros", "windows-sys 0.61.2", ] @@ -3288,7 +3298,7 @@ dependencies = [ "async-trait", "base64", "bytes", - "http 1.4.0", + "http 1.4.2", "http-body", "http-body-util", "percent-encoding", @@ -3340,7 +3350,7 @@ dependencies = [ "bitflags", "bytes", "futures-util", - "http 1.4.0", + "http 1.4.2", "http-body", "pin-project-lite", "tower", @@ -3503,9 +3513,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.13.2" +version = "1.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +checksum = "c6f5d3c3b1bf09027a88a6bc961fc00497d651009560b5463668dc81b0fa87a8" [[package]] name = "unicode-xid" @@ -3576,7 +3586,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e994ba84b0bd1b1b0cf92878b7ef898a5c1760108fe7b6010327e274917a808c" dependencies = [ "base64", - "http 1.4.0", + "http 1.4.2", "httparse", "log", ] @@ -3614,9 +3624,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -3659,9 +3669,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.3+wasi-0.2.9" +version = "1.0.4+wasi-0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" +checksum = "b67efb37e106e55ce722a510d6b5f9c17f083e5fc79afc2badeb12cc313d9487" dependencies = [ "wit-bindgen 0.57.1", ] @@ -3677,9 +3687,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.121" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" +checksum = "8ddb3f79143bced6de84270411622a2699cee572fc0875aeaf1e7867cf9fca1a" dependencies = [ "cfg-if", "once_cell", @@ -3690,9 +3700,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.71" +version = "0.4.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96492d0d3ffba25305a7dc88720d250b1401d7edca02cc3bcd50633b424673b8" +checksum = "503b14d284f2c8dac03b819967e155ea753f573586193b2b2c95990cb5d69280" dependencies = [ "js-sys", "wasm-bindgen", @@ -3700,9 +3710,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.121" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" +checksum = "4e21a184b13fb19e157296e2c46056aec9092264fab83e4ba59e68c61b323c3d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3710,9 +3720,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.121" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" +checksum = "fecefd9c35bd935a20fc3fc344b5f29138961e4f47fb03297d88f2587afb5ebd" dependencies = [ "bumpalo", "proc-macro2", @@ -3723,9 +3733,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.121" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" +checksum = "23939e44bb9a5d7576fa2b563dc2e136628f1224e88a8deed09e04858b77871f" dependencies = [ "unicode-ident", ] @@ -3766,9 +3776,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.98" +version = "0.3.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" +checksum = "a6430a72df5eb332242960fe84b3002a241163998241eb596d4f739b9757061d" dependencies = [ "js-sys", "wasm-bindgen", @@ -3883,6 +3893,15 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -4058,9 +4077,9 @@ checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "yoke" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +checksum = "709fe23a0424b6a435d82152b1bd3fdfb0833487d5fa90d05d42762a9891fef5" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -4081,18 +4100,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "ce1022995ff5ff5d841ad7d994facc23098cd40152f2c1d11cd607c6f530653f" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "1ae7f38b72ec2a254e2b87ef277cf2cd4fb97cbebf944faa6f33354da0867930" dependencies = [ "proc-macro2", "quote", @@ -4122,9 +4141,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.8.2" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +checksum = "e13c156562582aa81c60cb29407084cdb54c4164760106ab78e6c5b0858cf64e" [[package]] name = "zerotrie" diff --git a/apalis-core/src/backend/impls/memory.rs b/apalis-core/src/backend/impls/memory.rs index b4147bb7..e0e4efda 100644 --- a/apalis-core/src/backend/impls/memory.rs +++ b/apalis-core/src/backend/impls/memory.rs @@ -39,6 +39,7 @@ //! - [`WorkerContext`] use crate::backend::BackendExt; use crate::backend::codec::IdentityCodec; +use crate::error::BoxDynError; use crate::features_table; use crate::task::metadata::{MetadataExt, MetadataStore}; use crate::{ @@ -136,14 +137,25 @@ impl MetadataExt for MemoryContext { } } +/// Error type for MemoryStorage operations +#[derive(Debug, thiserror::Error)] +pub enum MemoryStorageError { + /// Error occurred while sending a task to the in-memory channel + #[error("Failed to send task: {0}")] + SendError(#[from] SendError), + /// Error occurred while flushing the in-memory channel + #[error("Failed to add task to storage: {0}")] + Other(BoxDynError), +} + impl MemoryStorage { /// Create a new in-memory storage #[must_use] pub fn new() -> Self { let (sender, receiver) = unbounded(); - let sender = Box::new(sender) + let sender = Box::new(sender.sink_map_err(|e| e.into())) as Box< - dyn Sink, Error = SendError> + dyn Sink, Error = MemoryStorageError> + Send + Sync + Unpin, @@ -167,7 +179,7 @@ impl MemoryStorage { } impl Sink> for MemoryStorage { - type Error = SendError; + type Error = MemoryStorageError; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().sender.poll_ready_unpin(cx) @@ -191,7 +203,13 @@ impl Sink> for MemoryStorage { type ArcMemorySink = Arc< Mutex< - Box, Error = SendError> + Send + Sync + Unpin + 'static>, + Box< + dyn Sink, Error = MemoryStorageError> + + Send + + Sync + + Unpin + + 'static, + >, >, >; @@ -232,7 +250,7 @@ impl Clone for MemorySink { } impl Sink> for MemorySink { - type Error = SendError; + type Error = MemoryStorageError; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut lock = ready!(self.inner.lock().poll_unpin(cx)); @@ -301,8 +319,8 @@ impl Backend for MemorySto type Context = Ctx; - type Error = SendError; - type Stream = TaskStream, SendError>; + type Error = MemoryStorageError; + type Stream = TaskStream, MemoryStorageError>; type Layer = Identity; type Beat = BoxStream<'static, Result<(), Self::Error>>; diff --git a/apalis-core/src/task/metadata.rs b/apalis-core/src/task/metadata.rs index 52dd6fa4..16ec42f7 100644 --- a/apalis-core/src/task/metadata.rs +++ b/apalis-core/src/task/metadata.rs @@ -14,6 +14,7 @@ use crate::task::Task; use crate::task_fn::FromRequest; use std::collections::HashMap; use std::convert::Infallible; +#[cfg(feature = "tracing")] use std::fmt; use std::ops::Deref; #[cfg(feature = "tracing")] @@ -452,9 +453,16 @@ impl MetadataStore { } /// Get a typed metadata entry. + #[must_use = "Extracted metadata should be used or handled"] pub fn extract_as(&self) -> Result { M::extract(self) } + + /// Create a `MetadataStore` from a `HashMap`. + #[must_use] + pub fn from_map(map: HashMap) -> Self { + Self(map) + } } /// Implemented by types that can be stored as metadata. diff --git a/apalis-core/src/task/task_id.rs b/apalis-core/src/task/task_id.rs index a367f034..7eaa8e28 100644 --- a/apalis-core/src/task/task_id.rs +++ b/apalis-core/src/task/task_id.rs @@ -140,7 +140,7 @@ mod random_id { let count = COUNTER.fetch_add(1, Ordering::Relaxed); let rand_part = encode_base64(xorshift64(timestamp ^ count), RANDOM_LEN); - format!("{time_str}{rand_part}") + format!("{time_str}{rand_part}{count}") } /// Returns current time in milliseconds since UNIX epoch. diff --git a/apalis-core/src/worker/builder.rs b/apalis-core/src/worker/builder.rs index 153e4987..6b2fe565 100644 --- a/apalis-core/src/worker/builder.rs +++ b/apalis-core/src/worker/builder.rs @@ -196,12 +196,10 @@ where B: Backend, { /// Consumes the builder and a service to construct the final worker - pub fn build, Svc>( - self, - service: W, - ) -> Worker + pub fn build(self, service: W) -> Worker where Svc: Service>, + W: IntoWorkerServiceExt, { service.build_with(self) } diff --git a/examples/dag-workflow/src/main.rs b/examples/dag-workflow/src/main.rs index f23d2f72..fb08a911 100644 --- a/examples/dag-workflow/src/main.rs +++ b/examples/dag-workflow/src/main.rs @@ -1,5 +1,5 @@ use apalis::prelude::*; -use apalis_file_storage::JsonStorage; +use apalis_file_storage::CsvStorage; use apalis_workflow::{DagFlow, WorkflowSink}; use tracing::info; @@ -30,7 +30,8 @@ async fn main() -> Result<(), BoxDynError> { std::env::set_var("RUST_LOG", "debug"); }; tracing_subscriber::fmt::init(); - let mut backend = JsonStorage::new_temp().unwrap(); + let mut backend = CsvStorage::new_temp().unwrap(); + backend.push_start(vec![42, 43, 44]).await.unwrap(); let dag_flow = DagFlow::new("user-info-workflow"); diff --git a/supply-chain/config.toml b/supply-chain/config.toml index aa974ff3..2e329baf 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -113,7 +113,7 @@ version = "1.1.2" criteria = "safe-to-deploy" [[exemptions.autocfg]] -version = "1.5.0" +version = "1.5.1" criteria = "safe-to-deploy" [[exemptions.aws-lc-rs]] @@ -157,7 +157,7 @@ version = "2.0.1" criteria = "safe-to-deploy" [[exemptions.bitflags]] -version = "2.11.1" +version = "2.13.0" criteria = "safe-to-deploy" [[exemptions.block2]] @@ -165,7 +165,7 @@ version = "0.6.2" criteria = "safe-to-deploy" [[exemptions.bumpalo]] -version = "3.20.2" +version = "3.20.3" criteria = "safe-to-deploy" [[exemptions.byteorder]] @@ -181,7 +181,7 @@ version = "1.5.1" criteria = "safe-to-deploy" [[exemptions.cc]] -version = "1.2.62" +version = "1.2.64" criteria = "safe-to-deploy" [[exemptions.cfg-if]] @@ -193,7 +193,7 @@ version = "0.2.1" criteria = "safe-to-deploy" [[exemptions.chrono]] -version = "0.4.44" +version = "0.4.45" criteria = "safe-to-deploy" [[exemptions.cmake]] @@ -249,7 +249,7 @@ version = "0.3.1" criteria = "safe-to-deploy" [[exemptions.displaydoc]] -version = "0.2.5" +version = "0.2.6" criteria = "safe-to-deploy" [[exemptions.document-features]] @@ -300,6 +300,10 @@ criteria = "safe-to-deploy" version = "2.4.1" criteria = "safe-to-deploy" +[[exemptions.fd-lock]] +version = "4.0.4" +criteria = "safe-to-deploy" + [[exemptions.find-msvc-tools]] version = "0.1.9" criteria = "safe-to-deploy" @@ -381,7 +385,7 @@ version = "0.3.32" criteria = "safe-to-deploy" [[exemptions.generator]] -version = "0.8.8" +version = "0.8.9" criteria = "safe-to-deploy" [[exemptions.getrandom]] @@ -441,7 +445,7 @@ version = "0.2.12" criteria = "safe-to-deploy" [[exemptions.http]] -version = "1.4.0" +version = "1.4.2" criteria = "safe-to-deploy" [[exemptions.http-body]] @@ -461,7 +465,7 @@ version = "1.0.3" criteria = "safe-to-deploy" [[exemptions.hyper]] -version = "1.9.0" +version = "1.10.1" criteria = "safe-to-deploy" [[exemptions.hyper-rustls]] @@ -549,11 +553,11 @@ version = "1.0.18" criteria = "safe-to-deploy" [[exemptions.jiff]] -version = "0.2.24" +version = "0.2.28" criteria = "safe-to-deploy" [[exemptions.jiff-static]] -version = "0.2.24" +version = "0.2.28" criteria = "safe-to-deploy" [[exemptions.jobserver]] @@ -561,7 +565,7 @@ version = "0.1.34" criteria = "safe-to-deploy" [[exemptions.js-sys]] -version = "0.3.98" +version = "0.3.102" criteria = "safe-to-deploy" [[exemptions.language-tags]] @@ -605,7 +609,7 @@ version = "0.4.14" criteria = "safe-to-deploy" [[exemptions.log]] -version = "0.4.29" +version = "0.4.32" criteria = "safe-to-deploy" [[exemptions.loom]] @@ -621,7 +625,7 @@ version = "0.8.4" criteria = "safe-to-deploy" [[exemptions.memchr]] -version = "2.8.0" +version = "2.8.2" criteria = "safe-to-deploy" [[exemptions.metrics]] @@ -645,7 +649,7 @@ version = "0.8.9" criteria = "safe-to-deploy" [[exemptions.mio]] -version = "1.2.0" +version = "1.2.1" criteria = "safe-to-deploy" [[exemptions.native-tls]] @@ -737,7 +741,7 @@ version = "1.70.2" criteria = "safe-to-deploy" [[exemptions.openssl]] -version = "0.10.80" +version = "0.10.81" criteria = "safe-to-deploy" [[exemptions.openssl-macros]] @@ -749,7 +753,7 @@ version = "0.2.1" criteria = "safe-to-deploy" [[exemptions.openssl-sys]] -version = "0.9.116" +version = "0.9.117" criteria = "safe-to-deploy" [[exemptions.opentelemetry]] @@ -849,11 +853,11 @@ version = "0.14.0" criteria = "safe-to-deploy" [[exemptions.prost]] -version = "0.14.3" +version = "0.14.4" criteria = "safe-to-deploy" [[exemptions.prost-derive]] -version = "0.14.3" +version = "0.14.4" criteria = "safe-to-deploy" [[exemptions.protobuf]] @@ -909,7 +913,7 @@ version = "0.5.18" criteria = "safe-to-deploy" [[exemptions.regex]] -version = "1.12.3" +version = "1.12.4" criteria = "safe-to-deploy" [[exemptions.regex-automata]] @@ -921,11 +925,11 @@ version = "0.1.9" criteria = "safe-to-deploy" [[exemptions.regex-syntax]] -version = "0.8.10" +version = "0.8.11" criteria = "safe-to-deploy" [[exemptions.reqwest]] -version = "0.13.3" +version = "0.13.4" criteria = "safe-to-deploy" [[exemptions.ring]] @@ -957,7 +961,7 @@ version = "0.23.40" criteria = "safe-to-deploy" [[exemptions.rustls-native-certs]] -version = "0.8.3" +version = "0.8.4" criteria = "safe-to-deploy" [[exemptions.rustls-pki-types]] @@ -1053,7 +1057,7 @@ version = "1.0.228" criteria = "safe-to-deploy" [[exemptions.serde_json]] -version = "1.0.149" +version = "1.0.150" criteria = "safe-to-deploy" [[exemptions.serde_path_to_error]] @@ -1069,7 +1073,7 @@ version = "0.1.7" criteria = "safe-to-deploy" [[exemptions.shlex]] -version = "1.3.0" +version = "2.0.1" criteria = "safe-to-deploy" [[exemptions.signal-hook-registry]] @@ -1085,7 +1089,7 @@ version = "0.4.12" criteria = "safe-to-deploy" [[exemptions.smallvec]] -version = "1.15.1" +version = "1.15.2" criteria = "safe-to-deploy" [[exemptions.socket2]] @@ -1093,7 +1097,7 @@ version = "0.5.10" criteria = "safe-to-deploy" [[exemptions.socket2]] -version = "0.6.3" +version = "0.6.4" criteria = "safe-to-deploy" [[exemptions.stable_deref_trait]] @@ -1141,15 +1145,15 @@ version = "1.1.9" criteria = "safe-to-deploy" [[exemptions.time]] -version = "0.3.47" +version = "0.3.49" criteria = "safe-to-deploy" [[exemptions.time-core]] -version = "0.1.8" +version = "0.1.9" criteria = "safe-to-deploy" [[exemptions.time-macros]] -version = "0.2.27" +version = "0.2.29" criteria = "safe-to-deploy" [[exemptions.tinystr]] @@ -1249,7 +1253,7 @@ version = "1.0.24" criteria = "safe-to-deploy" [[exemptions.unicode-segmentation]] -version = "1.13.2" +version = "1.13.3" criteria = "safe-to-deploy" [[exemptions.unicode-xid]] @@ -1289,7 +1293,7 @@ version = "0.2.2" criteria = "safe-to-deploy" [[exemptions.uuid]] -version = "1.23.1" +version = "1.23.3" criteria = "safe-to-deploy" [[exemptions.valuable]] @@ -1313,7 +1317,7 @@ version = "0.11.1+wasi-snapshot-preview1" criteria = "safe-to-deploy" [[exemptions.wasip2]] -version = "1.0.3+wasi-0.2.9" +version = "1.0.4+wasi-0.2.12" criteria = "safe-to-deploy" [[exemptions.wasip3]] @@ -1321,23 +1325,23 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen]] -version = "0.2.121" +version = "0.2.125" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-futures]] -version = "0.4.71" +version = "0.4.75" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-macro]] -version = "0.2.121" +version = "0.2.125" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-macro-support]] -version = "0.2.121" +version = "0.2.125" criteria = "safe-to-deploy" [[exemptions.wasm-bindgen-shared]] -version = "0.2.121" +version = "0.2.125" criteria = "safe-to-deploy" [[exemptions.wasm-encoder]] @@ -1353,7 +1357,7 @@ version = "0.244.0" criteria = "safe-to-deploy" [[exemptions.web-sys]] -version = "0.3.98" +version = "0.3.102" criteria = "safe-to-deploy" [[exemptions.web-time]] @@ -1404,6 +1408,10 @@ criteria = "safe-to-deploy" version = "0.52.0" criteria = "safe-to-deploy" +[[exemptions.windows-sys]] +version = "0.59.0" +criteria = "safe-to-deploy" + [[exemptions.windows-sys]] version = "0.61.2" criteria = "safe-to-deploy" @@ -1477,7 +1485,7 @@ version = "0.6.3" criteria = "safe-to-deploy" [[exemptions.yoke]] -version = "0.8.2" +version = "0.8.3" criteria = "safe-to-deploy" [[exemptions.yoke-derive]] @@ -1485,11 +1493,11 @@ version = "0.8.2" criteria = "safe-to-deploy" [[exemptions.zerocopy]] -version = "0.8.48" +version = "0.8.52" criteria = "safe-to-deploy" [[exemptions.zerocopy-derive]] -version = "0.8.48" +version = "0.8.52" criteria = "safe-to-deploy" [[exemptions.zerofrom]] @@ -1501,7 +1509,7 @@ version = "0.1.7" criteria = "safe-to-deploy" [[exemptions.zeroize]] -version = "1.8.2" +version = "1.9.0" criteria = "safe-to-deploy" [[exemptions.zerotrie]] diff --git a/utils/apalis-file-storage/Cargo.toml b/utils/apalis-file-storage/Cargo.toml index 1e176a33..5180ea9a 100644 --- a/utils/apalis-file-storage/Cargo.toml +++ b/utils/apalis-file-storage/Cargo.toml @@ -17,6 +17,7 @@ categories = ["database", "filesystem", "asynchronous", "data-structures"] [lib] [dependencies] +fd-lock = "4" serde = { version = "1.0.228", features = ["derive"] } apalis-core = { path = "../../apalis-core", version = "1.0.0-rc.9", default-features = false, features = [ "sleep", @@ -38,6 +39,7 @@ futures-core = { version = "0.3.30", default-features = false } apalis-codec = { path = "../apalis-codec", version = "0.1.0-rc.9", default-features = false, features = [ "json", ] } +thiserror = "2.0.18" [dev-dependencies] tokio = { version = "1.37.0", features = ["full"] } diff --git a/utils/apalis-file-storage/README.md b/utils/apalis-file-storage/README.md index 3dfe4db3..ccf62e2c 100644 --- a/utils/apalis-file-storage/README.md +++ b/utils/apalis-file-storage/README.md @@ -13,7 +13,7 @@ Currently only supports JSON. ## Usage Example -```rust +```rust,no_run use apalis_file_storage::JsonStorage; use apalis_core::worker::builder::WorkerBuilder; use std::time::Duration; diff --git a/utils/apalis-file-storage/src/adapter.rs b/utils/apalis-file-storage/src/adapter.rs new file mode 100644 index 00000000..56f80992 --- /dev/null +++ b/utils/apalis-file-storage/src/adapter.rs @@ -0,0 +1,36 @@ +use crate::util::RawTask; + +/// Pluggable serialization strategy for [`FileStorage`]. +pub trait Adapter: Send + 'static { + /// The format-native representation of one record. + type Line: Send + Clone; + + /// The error that will be returned by the adapter. + type Error; + + /// Serialize `line` into bytes including the record terminator (`\n`). + fn serialize(&self, line: &Self::Line) -> Result, Self::Error>; + + /// Deserialize one raw record (terminator already stripped) into a `Line`. + fn deserialize(&self, raw: &[u8]) -> Result; + + /// Convert a format-native `Line` into the common [`TaskWithMeta`]. + fn to_entry(&self, line: Self::Line) -> Result; + + /// Convert a [`TaskWithMeta`] into a format-native `Line` ready for + /// serialization. + fn from_entry(entry: &RawTask) -> Result; + + /// Return `true` if `raw` is a structural line (e.g. a CSV header) + /// that should be skipped rather than deserialized as a task entry. + /// The default implementation always returns `false`, so `JsonAdapter` + /// and any other self-describing adapter needs no change. + fn is_header(&self, _raw: &[u8]) -> bool { + false + } + + /// Return a header line to be written at the start of the file. + fn header(&self, _entries: &Vec) -> Option> { + None + } +} diff --git a/utils/apalis-file-storage/src/backend.rs b/utils/apalis-file-storage/src/backend.rs deleted file mode 100644 index ba9d3394..00000000 --- a/utils/apalis-file-storage/src/backend.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use apalis_codec::json::JsonCodec; -use futures_channel::mpsc::SendError; -use futures_core::{Stream, stream::BoxStream}; -use futures_util::{StreamExt, TryStreamExt, stream}; -use serde::{Serialize, de::Deserialize}; -use serde_json::Value; - -use apalis_core::{ - backend::{Backend, BackendExt, TaskStream, queue::Queue}, - task::{Task, status::Status, task_id::RandomId}, - worker::{context::WorkerContext, ext::ack::AcknowledgeLayer}, -}; - -use crate::{ - JsonMapMetadata, JsonStorage, - util::{FindFirstWith, JsonAck}, -}; - -impl Backend for JsonStorage -where - Args: 'static + Send + Serialize + for<'de> Deserialize<'de> + Unpin, -{ - type Args = Args; - type IdType = RandomId; - type Error = SendError; - type Context = JsonMapMetadata; - type Stream = TaskStream, SendError>; - type Layer = AcknowledgeLayer>; - type Beat = BoxStream<'static, Result<(), Self::Error>>; - - fn heartbeat(&self, _: &WorkerContext) -> Self::Beat { - stream::once(async { Ok(()) }).boxed() - } - fn middleware(&self) -> Self::Layer { - AcknowledgeLayer::new(JsonAck { - inner: self.clone(), - }) - } - fn poll(self, _worker: &WorkerContext) -> Self::Stream { - (self.map(|r| Ok(Some(r))).boxed()) as _ - } -} - -impl Deserialize<'de> + Unpin> BackendExt - for JsonStorage -{ - type Codec = JsonCodec; - type Compact = Value; - - type CompactStream = TaskStream, SendError>; - - fn get_queue(&self) -> Queue { - std::any::type_name::().into() - } - - fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { - self.poll(worker) - .map_ok(|c| { - c.map(|t| t.map(|args| serde_json::to_value(args).expect("to be encodable"))) - }) - .boxed() - } -} - -impl Deserialize<'de> + Unpin> Stream for JsonStorage { - type Item = Task; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let map = self.tasks.try_write().unwrap(); - if let Some((key, task)) = map.find_first_with(|s, _| { - s.queue == std::any::type_name::() && s.status == Status::Pending - }) { - use apalis_core::task::builder::TaskBuilder; - let key = key.clone(); - let args = Args::deserialize(&task.args).unwrap(); - let task = TaskBuilder::new(args) - .with_task_id(key.task_id.clone()) - .with_ctx(task.ctx.clone()) - .build(); - drop(map); - let this = self.get_mut(); - this.update_status(&key, Status::Running) - .expect("Failed to update status"); - this.persist_to_disk().expect("Failed to persist to disk"); - Poll::Ready(Some(task)) - } else { - Poll::Pending - } - } -} diff --git a/utils/apalis-file-storage/src/error.rs b/utils/apalis-file-storage/src/error.rs new file mode 100644 index 00000000..c5a2c405 --- /dev/null +++ b/utils/apalis-file-storage/src/error.rs @@ -0,0 +1,23 @@ +use crate::Adapter; + +/// Error type for [`FileStorage`]. +#[derive(Debug, thiserror::Error)] +pub enum FileStorageError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("job at line_id: {line_id} not found")] + JobNotFound { line_id: usize }, + + #[error("parse error: {0}")] + Parse(String), + + #[error("Adapter error: {0}")] + AdapterError(A::Error), + + #[error("Lock would block")] + WouldBlockLock, +} diff --git a/utils/apalis-file-storage/src/lib.rs b/utils/apalis-file-storage/src/lib.rs index ecfd2405..a46f94ab 100644 --- a/utils/apalis-file-storage/src/lib.rs +++ b/utils/apalis-file-storage/src/lib.rs @@ -1,33 +1,86 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![doc = include_str!("../README.md")] +use apalis_codec::json::JsonCodec; +use futures_core::{Stream, stream::BoxStream}; +use futures_util::{ + StreamExt, TryStreamExt, + future::{Ready, ready}, + stream, +}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde_json::Value; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, + fmt::Debug, fs::{File, OpenOptions}, - io::{BufRead, Write}, - path::PathBuf, - sync::{Arc, RwLock}, + io::{self, BufRead, Seek, SeekFrom, Write}, + marker::PhantomData, + num::ParseIntError, + path::{Path, PathBuf}, + pin::Pin, + str::FromStr, + sync::{Arc, Mutex, RwLock}, + task::{Context, Poll}, + time::Duration, }; -use self::util::{TaskKey, TaskWithMeta}; +use crate::error::FileStorageError; + +use self::util::RawTask; use apalis_core::{ + backend::{Backend, BackendExt, TaskResult, TaskStream, WaitForCompletion, queue::Queue}, + error::BoxDynError, features_table, task::{ - Task, + Parts, Task, + builder::TaskBuilder, + metadata::MetadataStore, status::Status, task_id::{RandomId, TaskId}, }, + worker::{ + context::WorkerContext, + ext::ack::{Acknowledge, AcknowledgeLayer}, + }, }; -use std::io::{BufReader, BufWriter}; +use std::io::BufReader; -mod backend; +use fd_lock::RwLock as FileLock; + +mod adapter; +mod error; mod meta; mod shared; mod sink; mod util; pub use self::shared::SharedJsonStore; +pub use adapter::Adapter; pub use meta::JsonMapMetadata; + +/// Handles the sync policy for when to flush pending changes to disk. +#[derive(Debug, Clone)] +pub enum SyncPolicy { + /// Flush to disk after every change. + Instant, + /// Accumulate changes; caller drives `tick()` on a timer. + Periodic(Duration), + /// Never flush automatically; caller drives `flush()`. + Manual, +} + +#[derive(Debug)] +enum PendingChange { + /// Append a new entry (already serialized, includes `\n`). + Append(Vec), + /// Rewrite the line_id` with new content. + RewriteLine { + line_id: usize, + /// The fully serialized replacement line, including `\n`. + new_bytes: Vec, + }, +} + /// A backend that persists to a file using json encoding /// /// *Warning*: This backend is not optimized for high-throughput scenarios and is best suited for development, testing, or low-volume workloads. @@ -57,7 +110,7 @@ pub use meta::JsonMapMetadata; FetchById => not_implemented("Allow fetching a task by its ID"), RegisterWorker => not_supported("Allow registering a worker with the backend"), "[`PipeExt`]" => supported("Allow other backends to pipe to this backend", false), - MakeShared => supported("Share the same JSON storage across multiple workers via [`SharedJsonStore`]", false), + MakeShared => supported("Share the same storage across multiple workers via [`SharedJsonStore`]", false), Workflow => supported("Flexible enough to support workflows", true), WaitForCompletion => supported("Wait for tasks to complete without blocking", true), ResumeById => not_implemented("Resume a task by its ID"), @@ -65,205 +118,711 @@ pub use meta::JsonMapMetadata; ListWorkers => not_supported("List all workers registered with the backend"), ListTasks => not_implemented("List all tasks in the backend"), }] -/// -/// [`PipeExt`]: crate::backend::pipe::PipeExt #[derive(Debug)] -pub struct JsonStorage { - tasks: Arc>>, - buffer: Vec>, +pub struct FileStorage { + adapter: A, + lock: Arc>>, path: PathBuf, - _marker: std::marker::PhantomData, + /// In-memory view of all entries; index == job_id. + entries: Arc>>, + /// Staged mutations not yet flushed to disk. + pending: Arc>>, + /// Next job_id to yield from the `Stream` impl. + read_cursor: usize, + sync_policy: SyncPolicy, + last_flush: std::time::Instant, + _args: PhantomData, } -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -struct StorageEntry { - task_id: TaskId, - status: Status, - task: TaskWithMeta, +impl Clone for FileStorage +where + A: Adapter + Clone, +{ + fn clone(&self) -> Self { + Self { + adapter: self.adapter.clone(), + lock: Arc::clone(&self.lock), + path: self.path.clone(), + entries: Arc::clone(&self.entries), + pending: Arc::clone(&self.pending), + read_cursor: self.read_cursor, + sync_policy: self.sync_policy.clone(), + last_flush: self.last_flush, + _args: PhantomData, + } + } } -impl JsonStorage { - /// Creates a new `JsonStorage` instance using the specified file path. - pub fn new(path: impl Into) -> std::io::Result { - let path = path.into(); - let mut data = BTreeMap::new(); +/// A JSON adapter for serializing and deserializing tasks. +#[derive(Debug, Clone, Copy, Default)] +pub struct JsonAdapter; - if path.exists() { - let file = File::open(&path)?; - let reader = BufReader::new(file); +/// A `FileStorage` using `JsonAdapter` for line encoding. +pub type JsonStorage = FileStorage; - for line in reader.lines() { - let line = line?; - if line.trim().is_empty() { - continue; - } +impl Adapter for JsonAdapter { + type Line = serde_json::Value; - if let Ok(entry) = serde_json::from_str::(&line) { - let key = TaskKey { - status: entry.status, - task_id: entry.task_id, - queue: std::any::type_name::().to_owned(), - }; - data.insert(key, entry.task); - } - } - } + type Error = serde_json::Error; - Ok(Self { - path, - tasks: Arc::new(RwLock::new(data)), - buffer: Vec::new(), - _marker: std::marker::PhantomData, - }) + fn serialize(&self, line: &Self::Line) -> Result, Self::Error> { + let mut bytes = serde_json::to_vec(line)?; + bytes.push(b'\n'); + Ok(bytes) } - /// Creates a new temporary `JsonStorage` instance. - pub fn new_temp() -> Result { - let p = std::env::temp_dir().join(format!("apalis-json-store-{}", RandomId::default())); - Self::new(p) + fn deserialize(&self, raw: &[u8]) -> Result { + serde_json::from_slice(raw.trim_ascii()) } - fn insert(&self, k: &TaskKey, v: TaskWithMeta) -> Result<(), std::io::Error> { - self.tasks.try_write().unwrap().insert(k.clone(), v); - Ok(()) + fn to_entry(&self, line: Self::Line) -> Result { + serde_json::from_value(line) } - /// Removes a task from the storage. - pub fn remove(&mut self, key: &TaskKey) -> std::io::Result> { - let removed = self.tasks.try_write().unwrap().remove(key); + fn from_entry(entry: &RawTask) -> Result { + serde_json::to_value(entry) + } +} - if removed.is_some() { - self.persist_to_disk()?; - } +impl FileStorage { + /// Creates a new `FileStorage` instance using the specified file path. + pub fn new(path: impl AsRef) -> Result> + where + A: Default, + { + let adapter = A::default(); + let path = path.as_ref().to_path_buf(); - Ok(removed) + let file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(true) + .open(&path) + .map_err(FileStorageError::Io)?; + + let lock = Arc::new(RwLock::new(FileLock::new(file))); + + let entries = { + let guard = lock.read().map_err(|e| { + FileStorageError::Io(std::io::Error::new( + std::io::ErrorKind::WouldBlock, + e.to_string(), + )) + })?; + let guard = guard + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)?; + Self::load_entries(&adapter, &guard)? + }; + + Ok(Self { + adapter, + lock, + path, + entries: Arc::new(RwLock::new(entries)), + pending: Default::default(), + read_cursor: 0, + sync_policy: SyncPolicy::Instant, + last_flush: std::time::Instant::now(), + _args: PhantomData, + }) } - /// Persist all current data to disk by rewriting the file - fn persist_to_disk(&self) -> std::io::Result<()> { - let tmp_path = &self.path; - { - let tmp_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(tmp_path)?; - let mut writer = BufWriter::new(tmp_file); - - for (key, value) in self.tasks.try_read().unwrap().iter() { - let entry = StorageEntry { - status: key.status.clone(), - task_id: key.task_id.clone(), - task: value.clone(), - }; - let line = serde_json::to_string(&entry)?; - writeln!(writer, "{line}")?; - } + /// Creates a new temporary `FileStorage` instance. + pub fn new_temp() -> Result> + where + A: Default, + { + let p = std::env::temp_dir().join(format!("apalis-temp-store-{}", RandomId::default())); + Self::new(p) + } - writer.flush()?; - } // BufWriter is dropped here, ensuring all data is written + /// Attach a result to the entry identified by `line_id`. + pub fn set_result( + &mut self, + line_id: usize, + result: serde_json::Value, + ) -> Result<(), FileStorageError> { + { + let mut entries = self + .entries + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)?; + let entry = entries + .get_mut(line_id) + .ok_or(FileStorageError::JobNotFound { line_id })?; + + entry.result = Some(result); + + // Serialize the updated entry through the adapter's native Line type. + let line = A::from_entry(entry).map_err(FileStorageError::AdapterError)?; + let new_bytes = self + .adapter + .serialize(&line) + .map_err(FileStorageError::AdapterError)?; + + self.pending + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)? + .push(PendingChange::RewriteLine { line_id, new_bytes }); + } - // Atomically replace the old file with the new one - std::fs::rename(tmp_path, &self.path)?; + if matches!(self.sync_policy, SyncPolicy::Instant) { + self.flush()?; + } Ok(()) } - /// Reload data from disk, useful if the file was modified externally - pub fn reload(&mut self) -> std::io::Result<()> { - let mut new_data = BTreeMap::new(); - if self.path.exists() { - let file = File::open(&self.path)?; - let reader = BufReader::new(file); + /// Persist all pending changes under an exclusive file lock. + pub fn flush(&mut self) -> Result<(), FileStorageError> { + if self + .pending + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)? + .is_empty() + { + return Ok(()); + } - for line in reader.lines() { - let line = line?; - if line.trim().is_empty() { - continue; + let mut outer_guard = self.lock.write().map_err(|e| { + FileStorageError::Io(std::io::Error::new( + std::io::ErrorKind::WouldBlock, + e.to_string(), + )) + })?; + let mut guard = outer_guard + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)?; + + // Partition: rewrites require a full-file atomic swap; appends do not. + let has_rewrites = self + .pending + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)? + .iter() + .any(|c| matches!(c, PendingChange::RewriteLine { .. })); + + if has_rewrites { + // Build the complete new file from the in-memory entries, + // applying any pending appends along the way so that the + // serialized file is always consistent with `self.entries`. + // + // Collect rewrite targets so we can substitute updated bytes. + let mut rewrites: std::collections::HashMap> = + std::collections::HashMap::new(); + let mut extra_appends: Vec> = Vec::new(); + + for change in self + .pending + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)? + .drain(..) + { + match change { + PendingChange::RewriteLine { line_id, new_bytes } => { + rewrites.insert(line_id, new_bytes); + } + PendingChange::Append(bytes) => { + extra_appends.push(bytes); + } } + } - if let Ok(entry) = serde_json::from_str::(&line) { - let key = TaskKey { - status: entry.status, - task_id: entry.task_id, - queue: std::any::type_name::().to_owned(), + // Write tmp file. + let tmp_path = self.path.with_extension("tmp"); + { + let mut tmp = File::create(&tmp_path).map_err(FileStorageError::Io)?; + + let raw_tasks = self + .entries + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)?; + if let Some(headers) = self.adapter.header(&raw_tasks) { + tmp.write_all(&headers).map_err(FileStorageError::Io)?; + } + for (job_id, entry) in raw_tasks.iter().enumerate() { + let bytes = if let Some(b) = rewrites.remove(&job_id) { + b + } else { + let line = A::from_entry(entry).map_err(FileStorageError::AdapterError)?; + self.adapter + .serialize(&line) + .map_err(FileStorageError::AdapterError)? }; - new_data.insert(key, entry.task); + tmp.write_all(&bytes).map_err(FileStorageError::Io)?; } + + tmp.flush().map_err(FileStorageError::Io)?; } + + // Atomic rename over the live file. + std::fs::rename(&tmp_path, &self.path).map_err(FileStorageError::Io)?; + + // Re-open the renamed file so `guard` / `lock` stays valid. + // We re-open for read+write and replace the lock's inner file. + // (fd_lock works on the file descriptor, so we update it.) + drop(guard); + drop(outer_guard); + let new_file = OpenOptions::new() + .read(true) + .write(true) + .open(&self.path) + .map_err(FileStorageError::Io)?; + self.lock = Arc::new(RwLock::new(FileLock::new(new_file))); + } else { + // Appends only — fast path: seek to end and write. + for change in self + .pending + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)? + .drain(..) + { + if let PendingChange::Append(bytes) = change { + guard.seek(SeekFrom::End(0)).map_err(FileStorageError::Io)?; + guard.write_all(&bytes).map_err(FileStorageError::Io)?; + } + } + guard.flush().map_err(FileStorageError::Io)?; } - *self.tasks.try_write().unwrap() = new_data; + self.last_flush = std::time::Instant::now(); Ok(()) } - /// Clear all data from memory and file - pub fn clear(&mut self) -> std::io::Result<()> { - self.tasks.try_write().unwrap().clear(); - - // Create an empty file - let file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&self.path)?; - drop(file); + /// Drive periodic syncing. Call this from your timer loop. + pub fn tick(&mut self) -> Result<(), FileStorageError> { + if let SyncPolicy::Periodic(interval) = self.sync_policy.clone() { + if self.last_flush.elapsed() >= interval { + self.flush()?; + } + } Ok(()) } - /// Update the status of an existing key - pub fn update_status( - &mut self, - old_key: &TaskKey, - new_status: Status, - ) -> std::io::Result { - let mut tasks = self.tasks.try_write().unwrap(); - if let Some(value) = tasks.remove(old_key) { - let new_key = TaskKey { - status: new_status, - task_id: old_key.task_id.clone(), - queue: old_key.queue.clone(), - }; - tasks.insert(new_key, value); - Ok(true) + /// Read every non-blank line from `file` and decode via the adapter. + fn load_entries(adapter: &A, file: &File) -> Result, FileStorageError> { + let mut reader = BufReader::new(file); + reader + .seek(SeekFrom::Start(0)) + .map_err(FileStorageError::Io)?; + + let mut entries = Vec::new(); + let mut raw: Vec = Vec::new(); + + loop { + raw.clear(); + let n = reader + .read_until(b'\n', &mut raw) + .map_err(FileStorageError::Io)?; + if n == 0 { + break; + } + let trimmed = raw.trim_ascii(); + if trimmed.is_empty() { + continue; + } + + if adapter.is_header(trimmed) { + continue; + } + let line = adapter + .deserialize(trimmed) + .map_err(FileStorageError::AdapterError)?; + entries.push( + adapter + .to_entry(line) + .map_err(FileStorageError::AdapterError)?, + ); + } + Ok(entries) + } +} + +impl Stream for FileStorage { + type Item = Result<(usize, RawTask), FileStorageError>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let raw_tasks = this + .entries + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)?; + if this.read_cursor < raw_tasks.len() { + let line_id = this.read_cursor; + let job = raw_tasks[line_id].clone(); + this.read_cursor += 1; + Poll::Ready(Some(Ok((line_id, job)))) } else { - Ok(false) + Poll::Ready(None) } } +} - /// Retrieves a task from the storage. - #[must_use] - pub fn get(&self, key: &TaskKey) -> Option { - let tasks = self.tasks.try_read().unwrap(); - let res = tasks.get(key); - res.cloned() +impl Backend for FileStorage +where + Args: 'static + Send + Serialize + for<'de> Deserialize<'de> + Unpin, + A: Adapter + Unpin + Clone, +{ + type Args = Args; + type IdType = RandomId; + type Error = FileStorageError; + type Context = JsonMapMetadata; + type Stream = TaskStream, Self::Error>; + type Layer = AcknowledgeLayer; + type Beat = BoxStream<'static, Result<(), Self::Error>>; + + fn heartbeat(&self, _: &WorkerContext) -> Self::Beat { + stream::once(async { Ok(()) }).boxed() + } + fn middleware(&self) -> Self::Layer { + AcknowledgeLayer::new(self.clone()) + } + fn poll(self, _worker: &WorkerContext) -> Self::Stream { + (self + .map_ok(|(line_id, mut job)| { + let args = Args::deserialize(&job.args).unwrap(); + job.ctx.0.insert("line_id", line_id.to_string()).unwrap(); + let mut task = TaskBuilder::new(args).with_ctx(job.ctx).build(); + task.parts.task_id = job.task_id; + Some(task) + }) + .boxed()) as _ } +} - fn update_result(&self, key: &TaskKey, status: Status, val: Value) -> std::io::Result { - let mut tasks = self.tasks.try_write().unwrap(); - if let Some(mut task) = tasks.remove(key) { - let new_key = TaskKey { - status, - task_id: key.task_id.clone(), - queue: key.queue.clone(), - }; - task.result = Some(val); - - tasks.insert(new_key, task); - Ok(true) - } else { - Ok(false) +impl BackendExt for FileStorage +where + Args: 'static + Send + Serialize + for<'de> Deserialize<'de> + Unpin, + A: Adapter + Unpin + Clone, +{ + type Codec = JsonCodec; + type Compact = Value; + + type CompactStream = + TaskStream, FileStorageError>; + + fn get_queue(&self) -> Queue { + std::any::type_name::().into() + } + + fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream { + self.poll(worker) + .map_ok(|c| { + c.map(|t| t.map(|args| serde_json::to_value(args).expect("to be encodable"))) + }) + .boxed() + } +} + +/// A CSV adapter for serializing and deserializing tasks. +#[derive(Debug, Clone, Default)] +pub struct CsvAdapter { + /// Column names, in BTreeMap (alphabetical) order. + /// Populated on first serialize (write path) or first deserialize (read path). + header: Arc>>>, +} + +/// A `FileStorage` using `CsvAdapter` for line encoding. +pub type CsvStorage = FileStorage; + +impl Adapter for CsvAdapter { + type Line = BTreeMap; + + type Error = io::Error; + + fn serialize(&self, line: &Self::Line) -> Result, Self::Error> { + let mut out = Vec::new(); + + let mut header = self + .header + .lock() + .map_err(|_| io::Error::other("header mutex poisoned"))?; + + if header.is_none() { + let cols: Vec = line.keys().cloned().collect(); + out.extend_from_slice(format!("{}\n", cols.join(",")).as_bytes()); + *header = Some(cols); + } + + let row = line + .values() + .map(|v| v.as_str()) + .collect::>() + .join(","); + + out.extend_from_slice(format!("{row}\n").as_bytes()); + + Ok(out) + } + + fn deserialize(&self, raw: &[u8]) -> Result { + let row = std::str::from_utf8(raw) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + .trim_end_matches('\n'); + + let header = self + .header + .lock() + .map_err(|_| io::Error::other("header mutex poisoned"))?; + + let cols = header.as_ref().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "CsvAdapter: deserialize called before header was parsed", + ) + })?; + + let map = row + .split(',') + .zip(cols.iter()) + .map(|(value, key)| (key.clone(), value.to_owned())) + .collect::>(); + + Ok(map) + } + + fn to_entry(&self, line: Self::Line) -> Result { + let args = util::to_value(Some("args"), &line); + + let mut result = None; + + if line.contains_key("result") { + result = Some(util::to_value(Some("result"), &line)); + } + + let idempotency_key = line.get("idempotency_key").cloned(); + + let task_id = line.get("task_id").and_then(|s| FromStr::from_str(s).ok()); + + let ctx = line + .iter() + .filter(|(k, _)| k.starts_with("ctx.")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect::>(); + + let ctx = JsonMapMetadata(MetadataStore::from_map(ctx)); + + Ok(RawTask { + task_id, + args, + ctx, + result, + idempotency_key, + }) + } + + fn from_entry(entry: &RawTask) -> Result { + let mut line = BTreeMap::new(); + + line.insert( + "task_id".to_owned(), + entry + .task_id + .as_ref() + .map(|t| t.to_string()) + .unwrap_or_default(), + ); + + let args = util::from_value(Some("args"), &entry.args); + + line.extend(args); + + let result = entry + .result + .as_ref() + .map(|r| util::from_value(Some("result"), r)) + .unwrap_or_else(|| util::from_value(Some("result"), &serde_json::Value::Null)); + + line.extend(result); + + if let Some(idempotency_key) = &entry.idempotency_key { + line.insert("idempotency_key".to_owned(), idempotency_key.clone()); + } + + for (k, value) in entry.ctx.0.iter() { + line.insert(format!("ctx.{k}"), value.clone()); } + + Ok(line) + } + + fn is_header(&self, raw: &[u8]) -> bool { + let Ok(mut header) = self.header.lock() else { + return false; + }; + + if header.is_none() { + if let Ok(row) = std::str::from_utf8(raw) { + let cols: Vec = row + .trim_end_matches('\n') + .split(',') + .map(|s| s.to_owned()) + .collect(); + + *header = Some(cols); + } + + return true; // this line is the header, skip it + } + + false + } + + fn header(&self, entries: &Vec) -> Option> { + let first = entries.first()?; + + let line = Self::from_entry(first).ok()?; + + let cols: Vec = line.keys().cloned().collect(); + Some(format!("{}\n", cols.join(",")).as_bytes().to_vec()) } } -impl Clone for JsonStorage { - fn clone(&self) -> Self { - Self { - tasks: self.tasks.clone(), - buffer: Vec::new(), - path: self.path.clone(), - _marker: std::marker::PhantomData, +impl Acknowledge for FileStorage +where + Args: Send + 'static + Debug, + Res: Serialize, + A: Adapter + Clone, +{ + type Error = FileStorageError; + + type Future = Ready>; + + fn ack( + &mut self, + res: &Result, + ctx: &Parts, + ) -> Self::Future { + let res = |this: &mut Self| { + let val = serde_json::to_value(res.as_ref().map_err(|e| e.to_string()))?; + let line_id = ctx + .ctx + .0 + .get("line_id") + .unwrap() + .parse() + .map_err(|e: ParseIntError| FileStorageError::Parse(e.to_string()))?; + this.set_result(line_id, val)?; + Ok(()) + }; + + ready(res(self)) + } +} + +impl WaitForCompletion + for FileStorage +where + Args: Send + DeserializeOwned + 'static + Unpin + Serialize, + A: Adapter + Unpin + Sync + Clone, +{ + type ResultStream = BoxStream<'static, Result, FileStorageError>>; + fn wait_for( + &self, + task_ids: impl IntoIterator>, + ) -> Self::ResultStream { + use futures_util::StreamExt; + use std::{collections::HashSet, time::Duration}; + + let task_ids: HashSet<_> = task_ids.into_iter().collect(); + struct PollState { + vault: FileStorage, + pending_tasks: HashSet>, + poll_interval: Duration, + _phantom: std::marker::PhantomData, } + let state = PollState { + vault: self.clone(), + pending_tasks: task_ids, + poll_interval: Duration::from_millis(100), + _phantom: std::marker::PhantomData, + }; + futures_util::stream::unfold(state, |mut state: PollState| { + async move { + if state.pending_tasks.is_empty() { + return None; + } + + loop { + // Check for completed tasks + let vault = &state.vault; + + let completed_task = { + let vault = vault.entries.try_read().ok()?; + vault.iter().find_map(|value| { + let task_id = value.task_id.clone()?; + if state.pending_tasks.contains(&task_id) { + Some((task_id, value.result.clone()?)) + } else { + None + } + }) + }; + + if let Some((task_id, result)) = completed_task { + state.pending_tasks.remove(&task_id); + let result: Result = serde_json::from_value(result).unwrap(); + return Some(( + Ok(TaskResult { + task_id, + status: Status::Done, + result, + }), + state, + )); + } + + // No completed tasks, wait and try again + apalis_core::timer::sleep(state.poll_interval).await; + } + } + }) + .boxed() + } + + async fn check_status( + &self, + task_ids: impl IntoIterator> + Send, + ) -> Result>, Self::Error> { + use apalis_core::task::status::Status; + use std::collections::HashSet; + let task_ids: HashSet<_> = task_ids.into_iter().collect(); + let mut results = Vec::new(); + for task_id in task_ids { + if let Some(value) = self + .entries + .try_read() + .unwrap() + .iter() + .find(|s| s.task_id.as_ref().unwrap() == &task_id) + { + if value.result.is_none() { + results.push(TaskResult { + task_id: task_id.clone(), + status: Status::Pending, + result: Err("Task not completed yet".to_owned()), + }); + continue; + } + let result = match serde_json::from_value::>( + value.result.clone().unwrap(), + ) { + Ok(result) => TaskResult { + task_id: task_id.clone(), + status: Status::Done, + result, + }, + Err(e) => TaskResult { + task_id: task_id.clone(), + status: Status::Failed, + result: Err(format!("Deserialization error: {e}")), + }, + }; + results.push(result); + } + } + Ok(results) } } @@ -283,7 +842,7 @@ mod tests { const ITEMS: u32 = 100; #[tokio::test] - async fn basic_worker() { + async fn json_worker() { let mut json_store = JsonStorage::new_temp().unwrap(); for i in 0..ITEMS { json_store.push(i).await.unwrap(); @@ -298,7 +857,7 @@ mod tests { Ok(()) } - let worker = WorkerBuilder::new("rango-tango") + let worker = WorkerBuilder::new("rango-tango-json") .backend(json_store) .on_event(|ctx, ev| { println!("On Event = {ev:?} from = {}", ctx.name()); @@ -306,4 +865,43 @@ mod tests { .build(task); worker.run().await.unwrap(); } + + #[tokio::test] + async fn csv_worker() { + #[derive(Debug, Serialize, Deserialize, Default)] + struct Email { + to: String, + subject: String, + index: u32, + } + + let mut csv_store = CsvStorage::new_temp().unwrap(); + for i in 0..ITEMS { + csv_store + .push(Email { + subject: "Test".to_string(), + to: "test".to_string(), + index: i, + }) + .await + .unwrap(); + } + + async fn task(task: Email, ctx: WorkerContext) -> Result<(), BoxDynError> { + tokio::time::sleep(Duration::from_secs(1)).await; + if task.index == ITEMS - 1 { + ctx.stop().unwrap(); + return Err("Worker stopped!")?; + } + Ok(()) + } + + let worker = WorkerBuilder::new("rango-tango-csv") + .backend(csv_store) + .on_event(|ctx, ev| { + println!("On Event = {ev:?} from = {}", ctx.name()); + }) + .build(task); + worker.run().await.unwrap(); + } } diff --git a/utils/apalis-file-storage/src/meta.rs b/utils/apalis-file-storage/src/meta.rs index 9488e584..66ac4878 100644 --- a/utils/apalis-file-storage/src/meta.rs +++ b/utils/apalis-file-storage/src/meta.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; /// A simple wrapper around a JSON map to represent task metadata #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -pub struct JsonMapMetadata(MetadataStore); +pub struct JsonMapMetadata(pub MetadataStore); impl MetadataExt for JsonMapMetadata { fn metadata(&self) -> &MetadataStore { diff --git a/utils/apalis-file-storage/src/shared.rs b/utils/apalis-file-storage/src/shared.rs index 5e9df58b..5d119904 100644 --- a/utils/apalis-file-storage/src/shared.rs +++ b/utils/apalis-file-storage/src/shared.rs @@ -1,3 +1,8 @@ +use futures_core::stream::BoxStream; +use futures_sink::Sink; +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; /// Sharable JSON based backend. /// /// The [`SharedJsonStore`] allows multiple task types to be stored @@ -42,67 +47,17 @@ /// ``` /// /// See the tests for more advanced usage with multiple types and event listeners. -use std::{ - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use futures_channel::mpsc::SendError; -use futures_core::{Stream, stream::BoxStream}; -use futures_sink::Sink; -use futures_util::{SinkExt, StreamExt}; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; -use serde_json::Value; +use std::{fmt::Debug, sync::Arc}; use apalis_core::{ - backend::memory::{MemorySink, MemoryStorage}, - task::{ - Task, - status::Status, - task_id::{RandomId, TaskId}, + backend::{ + memory::{MemorySink, MemoryStorage, MemoryStorageError}, + shared::MakeShared, }, + task::{Task, builder::TaskBuilder, task_id::RandomId}, }; -use crate::{ - JsonMapMetadata, JsonStorage, - util::{FindFirstWith, TaskKey, TaskWithMeta}, -}; - -#[derive(Debug)] -struct SharedJsonStream { - inner: JsonStorage, - req_type: std::marker::PhantomData<(T, Ctx)>, -} - -impl Stream for SharedJsonStream { - type Item = Task; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - use apalis_core::task::builder::TaskBuilder; - let map = self.inner.tasks.try_read().expect("Failed to read tasks"); - if let Some((key, _)) = map.find_first_with(|k, _| { - k.queue == std::any::type_name::() && k.status == Status::Pending - }) { - let task = map.get(key).unwrap(); - let Ok(args) = Args::deserialize(&task.args) else { - return Poll::Pending; - }; - let task = TaskBuilder::new(args) - .with_task_id(key.task_id.clone()) - .with_ctx(task.ctx.clone()) - .build(); - let key = key.clone(); - drop(map); - let this = &mut self.get_mut().inner; - this.update_status(&key, Status::Running) - .expect("Failed to update status"); - this.persist_to_disk().expect("Failed to persist to disk"); - Poll::Ready(Some(task)) - } else { - Poll::Pending - } - } -} +use crate::{JsonMapMetadata, JsonStorage}; /// Sharable JSON based backend. /// /// # Features @@ -112,7 +67,7 @@ impl Stream for SharedJsonStream, + inner: JsonStorage, } impl Default for SharedJsonStore { @@ -131,87 +86,97 @@ impl SharedJsonStore { } } -impl Deserialize<'de> + Unpin + 'static> - apalis_core::backend::shared::MakeShared for SharedJsonStore +impl Deserialize<'de> + Unpin + 'static> MakeShared + for SharedJsonStore { type Backend = MemoryStorage; - type Config = (); + type Config = String; - type MakeError = String; + type MakeError = MemoryStorageError; + + fn make_shared(&mut self) -> Result + where + Self::Config: Default, + { + self.make_shared_with_config(std::any::type_name::().to_owned()) + } fn make_shared_with_config( &mut self, - _: Self::Config, + queue: Self::Config, ) -> Result { - let (sender, receiver) = self.inner.create_channel::(); + let (sender, receiver) = self.create_channel::(&queue); let sender = MemorySink::new(Arc::new(futures_util::lock::Mutex::new(sender))); Ok(MemoryStorage::new_with(sender, receiver)) } } type BoxSink = Box< - dyn Sink, Error = SendError> + dyn Sink, Error = MemoryStorageError> + Send + Sync + Unpin + 'static, >; -impl JsonStorage { +impl SharedJsonStore { fn create_channel Deserialize<'de> + Serialize + Send + Unpin>( &self, + queue: &str, ) -> ( BoxSink, BoxStream<'static, Task>, ) { // Create a channel for communication - let sender = self.clone(); + let sender = self.inner.clone(); + + let queue_config = queue.to_owned(); // Create a wrapped sender that will insert into the in-memory store let wrapped_sender = { - let store = self.clone(); - - sender.with_flat_map(move |task: Task| { - use apalis_core::task::task_id::RandomId; - let task_id = task - .parts - .task_id - .clone() - .unwrap_or(TaskId::new(RandomId::default())); - let task = task.map(|args| serde_json::to_value(args).unwrap()); - store - .insert( - &TaskKey { - task_id, - queue: std::any::type_name::().to_owned(), - status: Status::Pending, - }, - TaskWithMeta { - args: task.args.clone(), - ctx: task.parts.ctx.clone(), - result: None, - idempotency_key: task.parts.idempotency_key.clone(), - }, - ) - .unwrap(); - futures_util::stream::iter(vec![Ok(task)]) - }) + let sender = sender.clone(); + + sender + .sink_map_err(|e| MemoryStorageError::Other(e.into())) + .with_flat_map(move |mut task: Task| { + task.parts + .ctx + .0 + .insert("queue", queue_config.clone()) + .unwrap(); + + let res = task.try_map(|s| { + serde_json::to_value(s).map_err(|e| MemoryStorageError::Other(e.into())) + }); + + futures_util::stream::iter(vec![res]) + }) }; // Create a stream that filters by type T let filtered_stream = { - let inner = self.clone(); - SharedJsonStream { - inner, - req_type: std::marker::PhantomData, - } + let queue_config = queue.to_owned(); + sender.map(|s| s.unwrap()).filter_map(move |(_, job)| { + let queue_config = queue_config.clone(); + async move { + let queue = job.ctx.0.get("queue").cloned().unwrap_or_default(); + if queue == queue_config.clone() { + let args = Args::deserialize(&job.args).ok()?; + let mut task = TaskBuilder::new(args).with_ctx(job.ctx).build(); + task.parts.task_id = job.task_id; + Some(task) + } else { + None + } + } + }) }; // Combine the sender and receiver let sender = Box::new(wrapped_sender) as Box< - dyn Sink, Error = SendError> + dyn Sink, Error = MemoryStorageError> + Send + Sync + Unpin, @@ -241,7 +206,7 @@ mod tests { async fn basic_shared() { let mut store = SharedJsonStore::new(); let mut string_store = store.make_shared().unwrap(); - let mut int_store = store.make_shared().unwrap(); + let mut int_store = store.make_shared_with_config("int".into()).unwrap(); for i in 0..ITEMS { string_store.push(format!("ITEM: {i}")).await.unwrap(); int_store.push(i).await.unwrap(); diff --git a/utils/apalis-file-storage/src/sink.rs b/utils/apalis-file-storage/src/sink.rs index 69164942..a1c2c8ab 100644 --- a/utils/apalis-file-storage/src/sink.rs +++ b/utils/apalis-file-storage/src/sink.rs @@ -3,26 +3,19 @@ use std::{ task::{Context, Poll}, }; -use futures_channel::mpsc::SendError; +use apalis_core::task::{Task, task_id::RandomId}; use futures_sink::Sink; -use serde::{Serialize, de::Deserialize}; use serde_json::Value; -use apalis_core::task::{ - Task, - task_id::{RandomId, TaskId}, -}; - use crate::{ - JsonMapMetadata, JsonStorage, - util::{TaskKey, TaskWithMeta}, + Adapter, FileStorage, JsonMapMetadata, PendingChange, SyncPolicy, error::FileStorageError, + util::RawTask, }; -impl Sink> for JsonStorage -where - Args: Unpin + Serialize + for<'de> Deserialize<'de>, +impl Sink> + for FileStorage { - type Error = SendError; + type Error = FileStorageError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) @@ -32,86 +25,58 @@ where self: Pin<&mut Self>, item: Task, ) -> Result<(), Self::Error> { - let this = Pin::get_mut(self); - - let task_id = item - .parts - .task_id - .clone() - .unwrap_or(TaskId::new(RandomId::default())); - - let queue = std::any::type_name::().to_owned(); - let idempotency_key = item.parts.idempotency_key.clone(); - - // Prevent duplicates already stored in tasks - if let Some(ref key) = idempotency_key { - let tasks = this.tasks.read().unwrap(); - - let exists = tasks.values().any(|task| { - task.idempotency_key - .as_ref() - .map(|existing| existing == key) - .unwrap_or(false) - }); - - if exists { - return Ok(()); - } - } - - let task_key = TaskKey { - task_id, - queue, - status: apalis_core::task::status::Status::Pending, + let idempotency_key = item.parts.idempotency_key; + let entry = RawTask { + task_id: item.parts.task_id, + args: item.args, + ctx: item.parts.ctx, + result: None, + idempotency_key, }; - - this.tasks.write().unwrap().insert( - task_key, - TaskWithMeta { - args: item.args, - ctx: item.parts.ctx, - result: None, - idempotency_key, - }, - ); - + let this = self.get_mut(); + let tasks = this + .entries + .try_read() + .map_err(|_| FileStorageError::WouldBlockLock)?; + // Enforce idempotency + if entry.idempotency_key.is_some() + && tasks + .iter() + .any(|t| t.idempotency_key == entry.idempotency_key) + { + return Ok(()); + } + drop(tasks); + let line = A::from_entry(&entry).map_err(FileStorageError::AdapterError)?; + let bytes = this + .adapter + .serialize(&line) + .map_err(FileStorageError::AdapterError)?; + let mut entries = this + .entries + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)?; + entries.push(entry); // optimistic in-memory update + drop(entries); + this.pending + .try_write() + .map_err(|_| FileStorageError::WouldBlockLock)? + .push(PendingChange::Append(bytes)); Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = Pin::get_mut(self); - - let tasks: Vec<_> = this.buffer.drain(..).collect(); - - for task in tasks { - let task_id = task - .parts - .task_id - .clone() - .unwrap_or(TaskId::new(RandomId::default())); - - let key = TaskKey { - task_id, - queue: std::any::type_name::().to_owned(), - status: apalis_core::task::status::Status::Pending, - }; - - this.insert( - &key, - TaskWithMeta { - args: task.args, - ctx: task.parts.ctx, - result: None, - idempotency_key: task.parts.idempotency_key.clone(), - }, - ) - .unwrap(); + fn poll_flush( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + match self.sync_policy.clone() { + SyncPolicy::Instant | SyncPolicy::Periodic(_) => self.flush()?, + SyncPolicy::Manual => {} } - Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Sink::>::poll_flush(self, cx) + self.poll_flush(cx) } } diff --git a/utils/apalis-file-storage/src/util.rs b/utils/apalis-file-storage/src/util.rs index b8588054..65d9b6a6 100644 --- a/utils/apalis-file-storage/src/util.rs +++ b/utils/apalis-file-storage/src/util.rs @@ -1,240 +1,250 @@ -use std::{cmp::Ordering, collections::BTreeMap, fmt::Debug}; - -use futures_util::FutureExt; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::{collections::BTreeMap, fmt::Debug}; -use apalis_core::{ - error::BoxDynError, - task::{ - status::Status, - task_id::{RandomId, TaskId}, - }, - worker::ext::ack::Acknowledge, -}; - -use crate::{JsonMapMetadata, JsonStorage}; - -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -pub struct TaskKey { - pub(super) task_id: TaskId, - pub(super) queue: String, - pub(super) status: Status, -} +use apalis_core::task::task_id::{RandomId, TaskId}; -impl PartialEq for TaskKey { - fn eq(&self, other: &Self) -> bool { - self.task_id == other.task_id && self.queue == other.queue - } +use crate::JsonMapMetadata; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RawTask { + pub(super) task_id: Option>, + pub(super) args: serde_json::Value, + pub(super) ctx: JsonMapMetadata, + pub(super) result: Option, + pub(super) idempotency_key: Option, } -impl Eq for TaskKey {} +/// Flattens a `serde_json::Value` into a `BTreeMap`. +/// +/// Arrays and objects are flattened using dot notation. +/// +/// `prefix` is prepended to every generated key. +/// +/// Example: +/// +/// prefix = Some("root") +/// +/// { +/// "user": { +/// "name": "John" +/// } +/// } +/// +/// becomes: +/// +/// root.user.name => "John" +/// +pub(crate) fn from_value(prefix: Option<&str>, value: &Value) -> BTreeMap { + fn recurse(prefix: Option, value: &Value, output: &mut BTreeMap) { + match value { + Value::Object(map) => { + for (key, value) in map { + let path = match &prefix { + Some(prefix) => format!("{prefix}.{key}"), + None => key.clone(), + }; -impl PartialOrd for TaskKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} + recurse(Some(path), value, output); + } + } + + Value::Array(values) => { + for (index, value) in values.iter().enumerate() { + let path = match &prefix { + Some(prefix) => format!("{prefix}.{index}"), + None => index.to_string(), + }; + + recurse(Some(path), value, output); + } + } -impl Ord for TaskKey { - fn cmp(&self, other: &Self) -> Ordering { - match self.task_id.cmp(&other.task_id) { - Ordering::Equal => self.queue.cmp(&other.queue), - ord => ord, + Value::Null => { + if let Some(prefix) = prefix { + output.insert(prefix, "null".to_owned()); + } + } + + Value::Bool(v) => { + if let Some(prefix) = prefix { + output.insert(prefix, v.to_string()); + } + } + + Value::Number(v) => { + if let Some(prefix) = prefix { + output.insert(prefix, v.to_string()); + } + } + + Value::String(v) => { + if let Some(prefix) = prefix { + output.insert(prefix, v.clone()); + } + } } } -} -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TaskWithMeta { - pub(super) args: Value, - pub(super) ctx: JsonMapMetadata, - pub(super) result: Option, - pub(super) idempotency_key: Option, -} + let mut output = BTreeMap::new(); + + recurse(prefix.map(|s| s.to_owned()), value, &mut output); -#[derive(Debug)] -pub struct JsonAck { - pub(crate) inner: JsonStorage, + output } -impl Clone for JsonAck { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), +/// Reconstructs a nested `serde_json::Value` from a flattened +/// `BTreeMap`. +/// +/// If `prefix` is provided, only matching keys are used and the prefix +/// is stripped before reconstruction. +/// +/// Example: +/// +/// root.user.name => "John" +/// +/// with prefix Some("root") +/// +/// becomes: +/// +/// { +/// "user": { +/// "name": "John" +/// } +/// } +/// +pub(crate) fn to_value(prefix: Option<&str>, map: &BTreeMap) -> Value { + fn parse_scalar(value: &str) -> Value { + if value == "null" { + Value::Null + } else if value == "true" { + Value::Bool(true) + } else if value == "false" { + Value::Bool(false) + } else if let Ok(n) = value.parse::() { + Value::Number(n.into()) + } else if let Ok(n) = value.parse::() { + serde_json::Number::from_f64(n) + .map(Value::Number) + .unwrap_or_else(|| Value::String(value.to_owned())) + } else { + Value::String(value.to_owned()) } } -} -impl Acknowledge - for JsonAck -{ - type Error = serde_json::Error; - - type Future = futures_core::future::BoxFuture<'static, Result<(), Self::Error>>; - - fn ack( - &mut self, - res: &Result, - ctx: &apalis_core::task::Parts, - ) -> Self::Future { - let store = self.inner.clone(); - let val = serde_json::to_value(res.as_ref().map_err(|e| e.to_string())).unwrap(); - let task_id = ctx.task_id.clone().unwrap(); - async move { - let key = TaskKey { - task_id: task_id.clone(), - queue: std::any::type_name::().to_owned(), - status: Status::Running, - }; - - let _ = store.update_result(&key, Status::Done, val).unwrap(); - - store.persist_to_disk().unwrap(); - - Ok(()) + fn insert_path(root: &mut Value, parts: &[&str], value: Value) { + if parts.is_empty() { + *root = value; + return; } - .boxed() - } -} -impl - apalis_core::backend::WaitForCompletion for JsonStorage -where - Args: Send + serde::de::DeserializeOwned + 'static + Unpin + Serialize, -{ - type ResultStream = futures_core::stream::BoxStream< - 'static, - Result, futures_channel::mpsc::SendError>, - >; - fn wait_for( - &self, - task_ids: impl IntoIterator>, - ) -> Self::ResultStream { - use futures_util::StreamExt; - use std::{collections::HashSet, time::Duration}; - - let task_ids: HashSet<_> = task_ids.into_iter().collect(); - struct PollState { - vault: JsonStorage, - pending_tasks: HashSet>, - queue: String, - poll_interval: Duration, - _phantom: std::marker::PhantomData, + let current = parts[0]; + let is_index = current.parse::().is_ok(); + + if parts.len() == 1 { + match (is_index, root) { + (true, Value::Array(arr)) => { + let idx = current.parse::().unwrap(); + + if arr.len() <= idx { + arr.resize(idx + 1, Value::Null); + } + + arr[idx] = value; + } + + (false, Value::Object(map)) => { + map.insert(current.to_owned(), value); + } + + (true, slot) => { + let idx = current.parse::().unwrap(); + + let mut arr = Vec::new(); + arr.resize(idx + 1, Value::Null); + arr[idx] = value; + + *slot = Value::Array(arr); + } + + (false, slot) => { + let mut map = serde_json::Map::new(); + map.insert(current.to_owned(), value); + *slot = Value::Object(map); + } + } + + return; } - let state = PollState { - vault: self.clone(), - pending_tasks: task_ids, - queue: std::any::type_name::().to_owned(), - poll_interval: Duration::from_millis(100), - _phantom: std::marker::PhantomData, - }; - futures_util::stream::unfold(state, |mut state: PollState| { - async move { - // panic!( "{}", state.pending_tasks.len()); - // If no pending tasks, we're done - if state.pending_tasks.is_empty() { - return None; + + match (is_index, root) { + (true, Value::Array(arr)) => { + let idx = current.parse::().unwrap(); + + if arr.len() <= idx { + arr.resize(idx + 1, Value::Null); } - loop { - // Check for completed tasks - let vault = &state.vault; - let completed_task = state.pending_tasks.iter().find_map(|task_id| { - let key = TaskKey { - task_id: task_id.clone(), - queue: state.queue.clone(), - status: Status::Pending, - }; - - vault - .get(&key) - .and_then(|value| Some((task_id.clone(), value.result?))) - }); - - if let Some((task_id, result)) = completed_task { - state.pending_tasks.remove(&task_id); - let result: Result = serde_json::from_value(result).unwrap(); - return Some(( - Ok(apalis_core::backend::TaskResult { - task_id, - status: Status::Done, - result, - }), - state, - )); - } + insert_path(&mut arr[idx], &parts[1..], value); + } + + (false, Value::Object(map)) => { + let next = map.entry(current.to_owned()).or_insert(Value::Null); + + insert_path(next, &parts[1..], value); + } - // No completed tasks, wait and try again - apalis_core::timer::sleep(state.poll_interval).await; + (true, slot) => { + let idx = current.parse::().unwrap(); + + let mut arr = Vec::new(); + arr.resize(idx + 1, Value::Null); + + *slot = Value::Array(arr); + + if let Value::Array(arr) = slot { + insert_path(&mut arr[idx], &parts[1..], value); } } - }) - .boxed() - } - async fn check_status( - &self, - task_ids: impl IntoIterator> + Send, - ) -> Result>, Self::Error> { - use apalis_core::task::status::Status; - use std::collections::HashSet; - let task_ids: HashSet<_> = task_ids.into_iter().collect(); - let mut results = Vec::new(); - for task_id in task_ids { - let key = TaskKey { - task_id: task_id.clone(), - queue: std::any::type_name::().to_owned(), - status: Status::Pending, - }; - if let Some(value) = self.get(&key) { - if value.result.is_none() { - results.push(apalis_core::backend::TaskResult { - task_id: task_id.clone(), - status: Status::Pending, - result: Err("Task not completed yet".to_owned()), - }); - continue; + (false, slot) => { + *slot = Value::Object(serde_json::Map::new()); + + if let Value::Object(map) = slot { + let next = map.entry(current.to_owned()).or_insert(Value::Null); + + insert_path(next, &parts[1..], value); } - let result = - match serde_json::from_value::>(value.result.unwrap()) { - Ok(result) => apalis_core::backend::TaskResult { - task_id: task_id.clone(), - status: Status::Done, - result, - }, - Err(e) => apalis_core::backend::TaskResult { - task_id: task_id.clone(), - status: Status::Failed, - result: Err(format!("Deserialization error: {e}")), - }, - }; - results.push(result); } } - Ok(results) } -} -/// Find the first item that meets the requirements -pub(super) trait FindFirstWith { - fn find_first_with(&self, predicate: F) -> Option<(&K, &V)> - where - F: FnMut(&K, &V) -> bool; -} + let mut root = Value::Object(serde_json::Map::new()); -impl FindFirstWith for BTreeMap -where - K: Ord + Clone, -{ - fn find_first_with(&self, mut predicate: F) -> Option<(&K, &V)> - where - F: FnMut(&K, &V) -> bool, - { - if let Some(key) = self.iter().find(|(k, v)| predicate(k, v)).map(|(k, _)| k) { - self.get_key_value(key) - } else { - None + for (key, value) in map { + let stripped = match prefix { + Some(prefix) => { + if key == prefix { + "" + } else if let Some(rest) = key.strip_prefix(&format!("{prefix}.")) { + rest + } else { + continue; + } + } + + None => key.as_str(), + }; + + if stripped.is_empty() { + root = parse_scalar(value); + continue; } + + let parts: Vec<&str> = stripped.split('.').collect(); + + insert_path(&mut root, &parts, parse_scalar(value)); } + + root }