Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 140 additions & 139 deletions CHANGELOG.md

Large diffs are not rendered by default.

231 changes: 125 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

32 changes: 25 additions & 7 deletions apalis-core/src/backend/impls/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//! - [`WorkerContext`]
use crate::backend::BackendExt;
use crate::backend::codec::IdentityCodec;
use crate::error::BoxDynError;
use crate::features_table;
use crate::task::metadata::{MetadataExt, MetadataStore};
use crate::{
Expand Down Expand Up @@ -136,14 +137,25 @@ impl MetadataExt for MemoryContext {
}
}

/// Error type for MemoryStorage operations
#[derive(Debug, thiserror::Error)]
pub enum MemoryStorageError {
/// Error occurred while sending a task to the in-memory channel
#[error("Failed to send task: {0}")]
SendError(#[from] SendError),
/// Error occurred while flushing the in-memory channel
#[error("Failed to add task to storage: {0}")]
Other(BoxDynError),
}

impl<Args: Send + 'static> MemoryStorage<Args, MemoryContext> {
/// Create a new in-memory storage
#[must_use]
pub fn new() -> Self {
let (sender, receiver) = unbounded();
let sender = Box::new(sender)
let sender = Box::new(sender.sink_map_err(|e| e.into()))
as Box<
dyn Sink<Task<Args, MemoryContext, RandomId>, Error = SendError>
dyn Sink<Task<Args, MemoryContext, RandomId>, Error = MemoryStorageError>
+ Send
+ Sync
+ Unpin,
Expand All @@ -167,7 +179,7 @@ impl<Args: Send + 'static, Ctx> MemoryStorage<Args, Ctx> {
}

impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
type Error = SendError;
type Error = MemoryStorageError;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.as_mut().sender.poll_ready_unpin(cx)
Expand All @@ -191,7 +203,13 @@ impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {

type ArcMemorySink<Args, Ctx = MemoryContext> = Arc<
Mutex<
Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
Box<
dyn Sink<Task<Args, Ctx, RandomId>, Error = MemoryStorageError>
+ Send
+ Sync
+ Unpin
+ 'static,
>,
>,
>;

Expand Down Expand Up @@ -232,7 +250,7 @@ impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
}

impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
type Error = SendError;
type Error = MemoryStorageError;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut lock = ready!(self.inner.lock().poll_unpin(cx));
Expand Down Expand Up @@ -301,8 +319,8 @@ impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemorySto

type Context = Ctx;

type Error = SendError;
type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
type Error = MemoryStorageError;
type Stream = TaskStream<Task<Args, Ctx, RandomId>, MemoryStorageError>;
type Layer = Identity;
type Beat = BoxStream<'static, Result<(), Self::Error>>;

Expand Down
8 changes: 8 additions & 0 deletions apalis-core/src/task/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::task::Task;
use crate::task_fn::FromRequest;
use std::collections::HashMap;
use std::convert::Infallible;
#[cfg(feature = "tracing")]
use std::fmt;
use std::ops::Deref;
#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -452,9 +453,16 @@ impl MetadataStore {
}

/// Get a typed metadata entry.
#[must_use = "Extracted metadata should be used or handled"]
pub fn extract_as<M: Metadata>(&self) -> Result<M, M::Error> {
M::extract(self)
}

/// Create a `MetadataStore` from a `HashMap<String, String>`.
#[must_use]
pub fn from_map(map: HashMap<String, String>) -> Self {
Self(map)
}
}

/// Implemented by types that can be stored as metadata.
Expand Down
2 changes: 1 addition & 1 deletion apalis-core/src/task/task_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod random_id {
let count = COUNTER.fetch_add(1, Ordering::Relaxed);
let rand_part = encode_base64(xorshift64(timestamp ^ count), RANDOM_LEN);

format!("{time_str}{rand_part}")
format!("{time_str}{rand_part}{count}")
}

/// Returns current time in milliseconds since UNIX epoch.
Expand Down
6 changes: 2 additions & 4 deletions apalis-core/src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ where
B: Backend<Args = Args, Context = Ctx>,
{
/// Consumes the builder and a service to construct the final worker
pub fn build<W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>, Svc>(
self,
service: W,
) -> Worker<Args, Ctx, W::Backend, Svc, M>
pub fn build<W, Svc>(self, service: W) -> Worker<Args, Ctx, W::Backend, Svc, M>
where
Svc: Service<Task<Args, Ctx, B::IdType>>,
W: IntoWorkerServiceExt<Args, Ctx, Svc, B, M>,
{
service.build_with(self)
}
Expand Down
5 changes: 3 additions & 2 deletions examples/dag-workflow/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use apalis::prelude::*;
use apalis_file_storage::JsonStorage;
use apalis_file_storage::CsvStorage;
use apalis_workflow::{DagFlow, WorkflowSink};
use tracing::info;

Expand Down Expand Up @@ -30,7 +30,8 @@ async fn main() -> Result<(), BoxDynError> {
std::env::set_var("RUST_LOG", "debug");
};
tracing_subscriber::fmt::init();
let mut backend = JsonStorage::new_temp().unwrap();
let mut backend = CsvStorage::new_temp().unwrap();

backend.push_start(vec![42, 43, 44]).await.unwrap();

let dag_flow = DagFlow::new("user-info-workflow");
Expand Down
Loading
Loading