diff --git a/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java b/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java index 2c34190c..6e6f0cda 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java @@ -6,6 +6,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; @@ -33,7 +35,9 @@ import org.byteveda.taskito.resources.ResourceScope; import org.byteveda.taskito.resources.ResourceStat; import org.byteveda.taskito.scheduling.PeriodicTask; +import org.byteveda.taskito.serialization.CodecSerializer; import org.byteveda.taskito.serialization.JsonSerializer; +import org.byteveda.taskito.serialization.PayloadCodec; import org.byteveda.taskito.serialization.Serializer; import org.byteveda.taskito.spi.QueueBackend; import org.byteveda.taskito.task.EnqueueOptions; @@ -252,6 +256,7 @@ final class Builder { private final Map options = new LinkedHashMap<>(); private Serializer serializer = new JsonSerializer(); + private final List codecs = new ArrayList<>(); public Builder backend(String backend) { // Normalize at the boundary so callers may pass "SQLite"/"REDIS"; the @@ -311,9 +316,24 @@ public Builder serializer(Serializer serializer) { return this; } + /** + * Apply payload codecs (compress/encrypt/sign) around the serializer, in + * order on the way out and reversed on the way in. The same chain must be + * configured on producers and workers. Returns {@code this}. + */ + public Builder codec(PayloadCodec... codecs) { + this.codecs.addAll(Arrays.asList(codecs)); + return this; + } + + /** The serializer wrapped in the configured codec chain (if any). */ + private Serializer effectiveSerializer() { + return codecs.isEmpty() ? serializer : new CodecSerializer(serializer, codecs); + } + /** Open over an explicit backend, e.g. an in-memory fake in tests. */ public Taskito open(QueueBackend backend) { - return new DefaultTaskito(backend, serializer); + return new DefaultTaskito(backend, effectiveSerializer()); } /** Open the native backend described by the configured options. */ @@ -325,7 +345,7 @@ public Taskito open() { } else if (!options.containsKey("dsn")) { throw new ConfigurationException("url (dsn) is required"); } - return new DefaultTaskito(JniQueueBackend.open(encodeOptions()), serializer); + return new DefaultTaskito(JniQueueBackend.open(encodeOptions()), effectiveSerializer()); } /** Create the SQLite file's parent directory (skip in-memory databases). */ diff --git a/sdks/java/src/main/java/org/byteveda/taskito/serialization/AesGcmCodec.java b/sdks/java/src/main/java/org/byteveda/taskito/serialization/AesGcmCodec.java new file mode 100644 index 00000000..4a45e045 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/serialization/AesGcmCodec.java @@ -0,0 +1,59 @@ +package org.byteveda.taskito.serialization; + +import java.security.SecureRandom; +import java.util.Arrays; +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import org.byteveda.taskito.errors.CryptoException; + +/** + * A {@link PayloadCodec} that encrypts payloads with AES-GCM. Each payload is + * prefixed with a fresh 12-byte IV; the GCM tag authenticates it. The key must be + * 16, 24, or 32 bytes (AES-128/192/256). + */ +public final class AesGcmCodec implements PayloadCodec { + private static final String TRANSFORMATION = "AES/GCM/NoPadding"; + private static final int IV_LENGTH = 12; + private static final int TAG_BITS = 128; + + private final SecretKeySpec key; + private final SecureRandom random = new SecureRandom(); + + public AesGcmCodec(byte[] key) { + this.key = new SecretKeySpec(key, "AES"); + } + + @Override + public byte[] encode(byte[] data) { + try { + byte[] iv = new byte[IV_LENGTH]; + random.nextBytes(iv); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + cipher.init(Cipher.ENCRYPT_MODE, key, new GCMParameterSpec(TAG_BITS, iv)); + byte[] ciphertext = cipher.doFinal(data); + byte[] out = new byte[IV_LENGTH + ciphertext.length]; + System.arraycopy(iv, 0, out, 0, IV_LENGTH); + System.arraycopy(ciphertext, 0, out, IV_LENGTH, ciphertext.length); + return out; + } catch (Exception e) { + throw new CryptoException("encryption failed", e); + } + } + + @Override + public byte[] decode(byte[] data) { + if (data.length < IV_LENGTH) { + throw new CryptoException("encrypted payload is too short"); + } + try { + byte[] iv = Arrays.copyOfRange(data, 0, IV_LENGTH); + byte[] ciphertext = Arrays.copyOfRange(data, IV_LENGTH, data.length); + Cipher cipher = Cipher.getInstance(TRANSFORMATION); + cipher.init(Cipher.DECRYPT_MODE, key, new GCMParameterSpec(TAG_BITS, iv)); + return cipher.doFinal(ciphertext); + } catch (Exception e) { + throw new CryptoException("decryption failed", e); + } + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/serialization/CodecSerializer.java b/sdks/java/src/main/java/org/byteveda/taskito/serialization/CodecSerializer.java new file mode 100644 index 00000000..c43afa41 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/serialization/CodecSerializer.java @@ -0,0 +1,49 @@ +package org.byteveda.taskito.serialization; + +import java.lang.reflect.Type; +import java.util.List; + +/** + * Applies a chain of {@link PayloadCodec}s around a delegate {@link Serializer}: + * each codec's {@link PayloadCodec#encode} runs in order after serialization, and + * {@link PayloadCodec#decode} runs in reverse before deserialization. This keeps + * the codec layer independent of the serializer (any chain works over JSON or + * MessagePack) while reusing the single serializer channel the worker already + * receives — so producer and worker apply the same transforms. + */ +public final class CodecSerializer implements Serializer { + private final Serializer delegate; + private final List codecs; + + public CodecSerializer(Serializer delegate, List codecs) { + this.delegate = delegate; + this.codecs = List.copyOf(codecs); + } + + @Override + public byte[] serialize(Object value) { + byte[] bytes = delegate.serialize(value); + for (PayloadCodec codec : codecs) { + bytes = codec.encode(bytes); + } + return bytes; + } + + @Override + public T deserialize(byte[] bytes, Class type) { + return delegate.deserialize(decode(bytes), type); + } + + @Override + public Object deserialize(byte[] bytes, Type type) { + return delegate.deserialize(decode(bytes), type); + } + + /** Reverse the encode chain: last codec first. */ + private byte[] decode(byte[] bytes) { + for (int i = codecs.size() - 1; i >= 0; i--) { + bytes = codecs.get(i).decode(bytes); + } + return bytes; + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/serialization/GzipCodec.java b/sdks/java/src/main/java/org/byteveda/taskito/serialization/GzipCodec.java new file mode 100644 index 00000000..4ef718c2 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/serialization/GzipCodec.java @@ -0,0 +1,64 @@ +package org.byteveda.taskito.serialization; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; +import org.byteveda.taskito.errors.SerializationException; + +/** + * A {@link PayloadCodec} that gzip-compresses payloads. Decompression is bounded + * by {@code maxDecompressedBytes} so a small malicious payload can't expand to + * exhaust worker memory (zip bomb). Order {@code GzipCodec} after a + * signing/encryption codec in the chain to verify integrity before decompressing. + */ +public final class GzipCodec implements PayloadCodec { + /** Default cap on decompressed output: 64 MiB. */ + public static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 64 * 1024 * 1024; + + private final int maxDecompressedBytes; + + public GzipCodec() { + this(DEFAULT_MAX_DECOMPRESSED_BYTES); + } + + public GzipCodec(int maxDecompressedBytes) { + if (maxDecompressedBytes <= 0) { + throw new IllegalArgumentException("maxDecompressedBytes must be > 0"); + } + this.maxDecompressedBytes = maxDecompressedBytes; + } + + @Override + public byte[] encode(byte[] data) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (GZIPOutputStream gzip = new GZIPOutputStream(out)) { + gzip.write(data); + } catch (IOException e) { + throw new SerializationException("gzip compression failed", e); + } + return out.toByteArray(); + } + + @Override + public byte[] decode(byte[] data) { + try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(data))) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + long total = 0; + int read; + while ((read = gzip.read(buffer)) != -1) { + total += read; + if (total > maxDecompressedBytes) { + throw new SerializationException( + "gzip payload exceeds max decompressed size of " + maxDecompressedBytes + " bytes"); + } + out.write(buffer, 0, read); + } + return out.toByteArray(); + } catch (IOException e) { + throw new SerializationException("gzip decompression failed", e); + } + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/serialization/HmacCodec.java b/sdks/java/src/main/java/org/byteveda/taskito/serialization/HmacCodec.java new file mode 100644 index 00000000..9665b648 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/serialization/HmacCodec.java @@ -0,0 +1,54 @@ +package org.byteveda.taskito.serialization; + +import java.security.MessageDigest; +import java.util.Arrays; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.byteveda.taskito.errors.CryptoException; + +/** + * A {@link PayloadCodec} that prefixes each payload with an HMAC-SHA256 tag and, + * on decode, verifies it in constant time — rejecting tampered bytes. + */ +public final class HmacCodec implements PayloadCodec { + private static final String ALGORITHM = "HmacSHA256"; + private static final int MAC_LENGTH = 32; + + private final byte[] key; + + public HmacCodec(byte[] key) { + this.key = key.clone(); + } + + @Override + public byte[] encode(byte[] data) { + byte[] mac = mac(data); + byte[] out = new byte[MAC_LENGTH + data.length]; + System.arraycopy(mac, 0, out, 0, MAC_LENGTH); + System.arraycopy(data, 0, out, MAC_LENGTH, data.length); + return out; + } + + @Override + public byte[] decode(byte[] data) { + if (data.length < MAC_LENGTH) { + throw new CryptoException("signed payload is too short"); + } + byte[] mac = Arrays.copyOfRange(data, 0, MAC_LENGTH); + byte[] body = Arrays.copyOfRange(data, MAC_LENGTH, data.length); + if (!MessageDigest.isEqual(mac, mac(body))) { + throw new CryptoException("signature mismatch"); + } + return body; + } + + private byte[] mac(byte[] body) { + try { + Mac instance = Mac.getInstance(ALGORITHM); + instance.init(new SecretKeySpec(key, ALGORITHM)); + return instance.doFinal(body); + } catch (Exception e) { + throw new CryptoException("HMAC computation failed", e); + } + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/serialization/PayloadCodec.java b/sdks/java/src/main/java/org/byteveda/taskito/serialization/PayloadCodec.java new file mode 100644 index 00000000..61a4282d --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/serialization/PayloadCodec.java @@ -0,0 +1,19 @@ +package org.byteveda.taskito.serialization; + +/** + * A two-sided, byte-to-byte transform applied after serialization on the + * producer and reversed before deserialization on the worker — for + * compression, encryption, or signing. One implementation owns both directions + * so the inverse cannot drift (cf. Temporal Payload Codec, Sidekiq middleware). + * + *

Codecs compose independently of the {@link Serializer}: a chain applies in + * order on {@link #encode} and in reverse on {@link #decode}, over JSON or + * MessagePack alike. Register them with {@code Taskito.builder().codec(...)}. + */ +public interface PayloadCodec { + /** Transform serialized bytes on the way out (producer). */ + byte[] encode(byte[] data); + + /** Reverse {@link #encode} on the way in (worker). */ + byte[] decode(byte[] data); +} diff --git a/sdks/java/src/test/java/org/byteveda/taskito/PayloadCodecTest.java b/sdks/java/src/test/java/org/byteveda/taskito/PayloadCodecTest.java new file mode 100644 index 00000000..c32f861f --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/PayloadCodecTest.java @@ -0,0 +1,98 @@ +package org.byteveda.taskito; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.byteveda.taskito.errors.CryptoException; +import org.byteveda.taskito.errors.SerializationException; +import org.byteveda.taskito.serialization.AesGcmCodec; +import org.byteveda.taskito.serialization.CodecSerializer; +import org.byteveda.taskito.serialization.GzipCodec; +import org.byteveda.taskito.serialization.HmacCodec; +import org.byteveda.taskito.serialization.JsonSerializer; +import org.byteveda.taskito.task.Task; +import org.byteveda.taskito.worker.Worker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +class PayloadCodecTest { + + private static final byte[] KEY32 = "0123456789abcdef0123456789abcdef".getBytes(); + private static final byte[] HKEY = "codec-hmac-secret".getBytes(); + + @Test + void gzipRoundTrips() { + GzipCodec gzip = new GzipCodec(); + byte[] data = "hello world ".repeat(50).getBytes(); + assertArrayEquals(data, gzip.decode(gzip.encode(data))); + } + + @Test + void aesHidesPlaintextAndRoundTrips() { + AesGcmCodec aes = new AesGcmCodec(KEY32); + byte[] data = "secret payload".getBytes(); + byte[] encoded = aes.encode(data); + assertFalse(Arrays.equals(data, encoded)); + assertArrayEquals(data, aes.decode(encoded)); + } + + @Test + void hmacRejectsTamper() { + HmacCodec hmac = new HmacCodec(HKEY); + byte[] encoded = hmac.encode("hi".getBytes()); + encoded[encoded.length - 1] ^= 1; // flip a body bit + assertThrows(CryptoException.class, () -> hmac.decode(encoded)); + } + + @Test + void chainIsReversibleInReverseOrder() { + CodecSerializer serializer = new CodecSerializer( + new JsonSerializer(), List.of(new GzipCodec(), new AesGcmCodec(KEY32), new HmacCodec(HKEY))); + byte[] bytes = serializer.serialize(42); + assertEquals(42, (int) serializer.deserialize(bytes, Integer.class)); + } + + @Test + @Timeout(30) + void roundTripsThroughWorker(@TempDir Path dir) throws Exception { + try (Taskito queue = Taskito.builder() + .url(dir.resolve("cdc.db").toString()) + .codec(new GzipCodec(), new AesGcmCodec(KEY32), new HmacCodec(HKEY)) + .open()) { + Task dbl = Task.of("cdc.double", Integer.class); + AtomicInteger seen = new AtomicInteger(); + CountDownLatch ran = new CountDownLatch(1); + try (Worker worker = queue.worker() + .handle(dbl, p -> { + seen.set(p); + ran.countDown(); + return p * 2; + }) + .start()) { + String id = queue.enqueue(dbl, 21); + assertTrue(ran.await(20, TimeUnit.SECONDS)); + assertEquals(21, seen.get()); // payload decoded on the worker + queue.awaitJob(id, java.time.Duration.ofSeconds(20)); + assertEquals(42, queue.getResult(id, Integer.class).orElseThrow()); // result decoded back + } + } + } + + @Test + void gzipDecodeRejectsPayloadExceedingCap() { + GzipCodec codec = new GzipCodec(16); // cap at 16 bytes decompressed + byte[] large = new byte[1024]; // compresses tiny (all zeros), expands past the cap + byte[] compressed = codec.encode(large); + assertThrows(SerializationException.class, () -> codec.decode(compressed)); + } +}