Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions sdks/java/src/main/java/org/byteveda/taskito/Taskito.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -33,7 +35,9 @@
import org.byteveda.taskito.resources.ResourceScope;
import org.byteveda.taskito.resources.ResourceStat;
import org.byteveda.taskito.scheduling.PeriodicTask;
import org.byteveda.taskito.serialization.CodecSerializer;
import org.byteveda.taskito.serialization.JsonSerializer;
import org.byteveda.taskito.serialization.PayloadCodec;
import org.byteveda.taskito.serialization.Serializer;
import org.byteveda.taskito.spi.QueueBackend;
import org.byteveda.taskito.task.EnqueueOptions;
Expand Down Expand Up @@ -252,6 +256,7 @@ final class Builder {

private final Map<String, Object> options = new LinkedHashMap<>();
private Serializer serializer = new JsonSerializer();
private final List<PayloadCodec> codecs = new ArrayList<>();

public Builder backend(String backend) {
// Normalize at the boundary so callers may pass "SQLite"/"REDIS"; the
Expand Down Expand Up @@ -311,9 +316,24 @@ public Builder serializer(Serializer serializer) {
return this;
}

/**
* Apply payload codecs (compress/encrypt/sign) around the serializer, in
* order on the way out and reversed on the way in. The same chain must be
* configured on producers and workers. Returns {@code this}.
*/
public Builder codec(PayloadCodec... codecs) {
this.codecs.addAll(Arrays.asList(codecs));
return this;
}

/** The serializer wrapped in the configured codec chain (if any). */
private Serializer effectiveSerializer() {
return codecs.isEmpty() ? serializer : new CodecSerializer(serializer, codecs);
}

/** Open over an explicit backend, e.g. an in-memory fake in tests. */
public Taskito open(QueueBackend backend) {
return new DefaultTaskito(backend, serializer);
return new DefaultTaskito(backend, effectiveSerializer());
}

/** Open the native backend described by the configured options. */
Expand All @@ -325,7 +345,7 @@ public Taskito open() {
} else if (!options.containsKey("dsn")) {
throw new ConfigurationException("url (dsn) is required");
}
return new DefaultTaskito(JniQueueBackend.open(encodeOptions()), serializer);
return new DefaultTaskito(JniQueueBackend.open(encodeOptions()), effectiveSerializer());
}

/** Create the SQLite file's parent directory (skip in-memory databases). */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.byteveda.taskito.serialization;

import java.security.SecureRandom;
import java.util.Arrays;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import org.byteveda.taskito.errors.CryptoException;

/**
* A {@link PayloadCodec} that encrypts payloads with AES-GCM. Each payload is
* prefixed with a fresh 12-byte IV; the GCM tag authenticates it. The key must be
* 16, 24, or 32 bytes (AES-128/192/256).
*/
public final class AesGcmCodec implements PayloadCodec {
private static final String TRANSFORMATION = "AES/GCM/NoPadding";
private static final int IV_LENGTH = 12;
private static final int TAG_BITS = 128;

private final SecretKeySpec key;
private final SecureRandom random = new SecureRandom();

public AesGcmCodec(byte[] key) {
this.key = new SecretKeySpec(key, "AES");
}

@Override
public byte[] encode(byte[] data) {
try {
byte[] iv = new byte[IV_LENGTH];
random.nextBytes(iv);
Cipher cipher = Cipher.getInstance(TRANSFORMATION);
cipher.init(Cipher.ENCRYPT_MODE, key, new GCMParameterSpec(TAG_BITS, iv));
byte[] ciphertext = cipher.doFinal(data);
byte[] out = new byte[IV_LENGTH + ciphertext.length];
System.arraycopy(iv, 0, out, 0, IV_LENGTH);
System.arraycopy(ciphertext, 0, out, IV_LENGTH, ciphertext.length);
return out;
} catch (Exception e) {
throw new CryptoException("encryption failed", e);
}
}

@Override
public byte[] decode(byte[] data) {
if (data.length < IV_LENGTH) {
throw new CryptoException("encrypted payload is too short");
}
try {
byte[] iv = Arrays.copyOfRange(data, 0, IV_LENGTH);
byte[] ciphertext = Arrays.copyOfRange(data, IV_LENGTH, data.length);
Cipher cipher = Cipher.getInstance(TRANSFORMATION);
cipher.init(Cipher.DECRYPT_MODE, key, new GCMParameterSpec(TAG_BITS, iv));
return cipher.doFinal(ciphertext);
} catch (Exception e) {
throw new CryptoException("decryption failed", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.byteveda.taskito.serialization;

import java.lang.reflect.Type;
import java.util.List;

/**
* Applies a chain of {@link PayloadCodec}s around a delegate {@link Serializer}:
* each codec's {@link PayloadCodec#encode} runs in order after serialization, and
* {@link PayloadCodec#decode} runs in reverse before deserialization. This keeps
* the codec layer independent of the serializer (any chain works over JSON or
* MessagePack) while reusing the single serializer channel the worker already
* receives — so producer and worker apply the same transforms.
*/
public final class CodecSerializer implements Serializer {
private final Serializer delegate;
private final List<PayloadCodec> codecs;

public CodecSerializer(Serializer delegate, List<PayloadCodec> codecs) {
this.delegate = delegate;
this.codecs = List.copyOf(codecs);
}

@Override
public byte[] serialize(Object value) {
byte[] bytes = delegate.serialize(value);
for (PayloadCodec codec : codecs) {
bytes = codec.encode(bytes);
}
return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> type) {
return delegate.deserialize(decode(bytes), type);
}

@Override
public Object deserialize(byte[] bytes, Type type) {
return delegate.deserialize(decode(bytes), type);
}

/** Reverse the encode chain: last codec first. */
private byte[] decode(byte[] bytes) {
for (int i = codecs.size() - 1; i >= 0; i--) {
bytes = codecs.get(i).decode(bytes);
}
return bytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.byteveda.taskito.serialization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.byteveda.taskito.errors.SerializationException;

/**
* A {@link PayloadCodec} that gzip-compresses payloads. Decompression is bounded
* by {@code maxDecompressedBytes} so a small malicious payload can't expand to
* exhaust worker memory (zip bomb). Order {@code GzipCodec} <em>after</em> a
* signing/encryption codec in the chain to verify integrity before decompressing.
*/
public final class GzipCodec implements PayloadCodec {
/** Default cap on decompressed output: 64 MiB. */
public static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 64 * 1024 * 1024;

private final int maxDecompressedBytes;

public GzipCodec() {
this(DEFAULT_MAX_DECOMPRESSED_BYTES);
}

public GzipCodec(int maxDecompressedBytes) {
if (maxDecompressedBytes <= 0) {
throw new IllegalArgumentException("maxDecompressedBytes must be > 0");
}
this.maxDecompressedBytes = maxDecompressedBytes;
}

@Override
public byte[] encode(byte[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(out)) {
gzip.write(data);
} catch (IOException e) {
throw new SerializationException("gzip compression failed", e);
}
return out.toByteArray();
}

@Override
public byte[] decode(byte[] data) {
try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(data))) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
long total = 0;
int read;
while ((read = gzip.read(buffer)) != -1) {
total += read;
if (total > maxDecompressedBytes) {
throw new SerializationException(
"gzip payload exceeds max decompressed size of " + maxDecompressedBytes + " bytes");
}
out.write(buffer, 0, read);
}
return out.toByteArray();
} catch (IOException e) {
throw new SerializationException("gzip decompression failed", e);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.byteveda.taskito.serialization;

import java.security.MessageDigest;
import java.util.Arrays;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.byteveda.taskito.errors.CryptoException;

/**
* A {@link PayloadCodec} that prefixes each payload with an HMAC-SHA256 tag and,
* on decode, verifies it in constant time — rejecting tampered bytes.
*/
public final class HmacCodec implements PayloadCodec {
private static final String ALGORITHM = "HmacSHA256";
private static final int MAC_LENGTH = 32;

private final byte[] key;

public HmacCodec(byte[] key) {
this.key = key.clone();
}

@Override
public byte[] encode(byte[] data) {
byte[] mac = mac(data);
byte[] out = new byte[MAC_LENGTH + data.length];
System.arraycopy(mac, 0, out, 0, MAC_LENGTH);
System.arraycopy(data, 0, out, MAC_LENGTH, data.length);
return out;
}

@Override
public byte[] decode(byte[] data) {
if (data.length < MAC_LENGTH) {
throw new CryptoException("signed payload is too short");
}
byte[] mac = Arrays.copyOfRange(data, 0, MAC_LENGTH);
byte[] body = Arrays.copyOfRange(data, MAC_LENGTH, data.length);
if (!MessageDigest.isEqual(mac, mac(body))) {
throw new CryptoException("signature mismatch");
}
return body;
}

private byte[] mac(byte[] body) {
try {
Mac instance = Mac.getInstance(ALGORITHM);
instance.init(new SecretKeySpec(key, ALGORITHM));
return instance.doFinal(body);
} catch (Exception e) {
throw new CryptoException("HMAC computation failed", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.byteveda.taskito.serialization;

/**
* A two-sided, byte-to-byte transform applied <em>after</em> serialization on the
* producer and reversed <em>before</em> deserialization on the worker — for
* compression, encryption, or signing. One implementation owns both directions
* so the inverse cannot drift (cf. Temporal Payload Codec, Sidekiq middleware).
*
* <p>Codecs compose independently of the {@link Serializer}: a chain applies in
* order on {@link #encode} and in reverse on {@link #decode}, over JSON or
* MessagePack alike. Register them with {@code Taskito.builder().codec(...)}.
*/
public interface PayloadCodec {
/** Transform serialized bytes on the way out (producer). */
byte[] encode(byte[] data);

/** Reverse {@link #encode} on the way in (worker). */
byte[] decode(byte[] data);
}
Loading