From 944982b3439ee34de958b94e10b7a5f1e4afae02 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:13:47 +0530 Subject: [PATCH 1/3] feat(java): scaler HTTP endpoint for autoscaling Serve GET /api/scaler (queue depth = pending + running, plus the target) and GET /health on a JDK HttpServer, for an external autoscaler (KEDA metrics-api). Reject a non-positive target depth. --- .../org/byteveda/taskito/scaler/Scaler.java | 110 ++++++++++++++++++ .../taskito/scaler/ScalerOptions.java | 30 +++++ .../byteveda/taskito/scaler/package-info.java | 8 ++ 3 files changed, 148 insertions(+) create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/scaler/Scaler.java create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/scaler/ScalerOptions.java create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/scaler/package-info.java diff --git a/sdks/java/src/main/java/org/byteveda/taskito/scaler/Scaler.java b/sdks/java/src/main/java/org/byteveda/taskito/scaler/Scaler.java new file mode 100644 index 00000000..679ed098 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/scaler/Scaler.java @@ -0,0 +1,110 @@ +package org.byteveda.taskito.scaler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; +import org.byteveda.taskito.Taskito; +import org.byteveda.taskito.model.QueueStats; + +/** + * Serves queue depth over HTTP for an external autoscaler. Depth is + * {@code pending + running} (outstanding work); the autoscaler divides it by + * {@code targetQueueDepth} to pick a replica count. Start with + * {@link #start(Taskito, ScalerOptions)} and {@link #close()} to stop. + */ +public final class Scaler implements AutoCloseable { + private static final ObjectMapper JSON = new ObjectMapper(); + + private final HttpServer server; + + private Scaler(HttpServer server) { + this.server = server; + } + + /** Start the endpoint; binds immediately and serves on a background selector. */ + public static Scaler start(Taskito queue, ScalerOptions options) { + HttpServer server; + try { + server = HttpServer.create(new InetSocketAddress(options.host(), options.port()), 0); + } catch (IOException e) { + throw new UncheckedIOException("failed to start the scaler endpoint", e); + } + server.createContext("/api/scaler", exchange -> handleScaler(exchange, queue, options)); + server.createContext("/health", Scaler::handleHealth); + server.start(); + return new Scaler(server); + } + + /** The bound port (useful when {@code port} was 0). */ + public int port() { + return server.getAddress().getPort(); + } + + @Override + public void close() { + server.stop(0); + } + + private static void handleScaler(HttpExchange exchange, Taskito queue, ScalerOptions options) throws IOException { + if (!"GET".equals(exchange.getRequestMethod())) { + send(exchange, 405, Map.of("error", "method not allowed")); + return; + } + String queueName = queryParam(exchange, "queue"); + if (queueName == null) { + queueName = options.queue(); + } + try { + QueueStats stats = queueName == null ? queue.stats() : queue.statsByQueue(queueName); + long depth = stats.pending + stats.running; + Map body = new LinkedHashMap<>(); + body.put("metricValue", depth); + body.put("targetValue", options.targetQueueDepth()); + body.put("queueName", queueName == null ? "all" : queueName); + send(exchange, 200, body); + } catch (RuntimeException e) { + // Never leak backend internals to the scaler caller. + send(exchange, 500, Map.of("error", "failed to read queue stats")); + } + } + + private static void handleHealth(HttpExchange exchange) throws IOException { + send(exchange, 200, Map.of("status", "ok")); + } + + private static String queryParam(HttpExchange exchange, String key) { + String query = exchange.getRequestURI().getQuery(); + if (query == null) { + return null; + } + for (String pair : query.split("&")) { + int eq = pair.indexOf('='); + if (eq > 0 && pair.substring(0, eq).equals(key)) { + return pair.substring(eq + 1); + } + } + return null; + } + + private static void send(HttpExchange exchange, int status, Map body) throws IOException { + byte[] bytes; + try { + bytes = JSON.writeValueAsBytes(body); + } catch (Exception e) { + bytes = "{}".getBytes(StandardCharsets.UTF_8); + status = 500; + } + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.sendResponseHeaders(status, bytes.length); + try (OutputStream out = exchange.getResponseBody()) { + out.write(bytes); + } + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/scaler/ScalerOptions.java b/sdks/java/src/main/java/org/byteveda/taskito/scaler/ScalerOptions.java new file mode 100644 index 00000000..81e01833 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/scaler/ScalerOptions.java @@ -0,0 +1,30 @@ +package org.byteveda.taskito.scaler; + +/** + * Configures the {@link Scaler} HTTP endpoint. + * + * @param port the port to bind ({@code 0} picks an ephemeral port) + * @param host the bind address (defaults to {@code 0.0.0.0}) + * @param targetQueueDepth the depth the autoscaler targets per replica (must be > 0) + * @param queue the queue to report on, or {@code null} for all queues + */ +public record ScalerOptions(int port, String host, int targetQueueDepth, String queue) { + public ScalerOptions { + if (targetQueueDepth <= 0) { + throw new IllegalArgumentException("targetQueueDepth must be > 0"); + } + if (host == null || host.isBlank()) { + host = "0.0.0.0"; + } + } + + /** Defaults: port 9090, all queues, target depth 10. */ + public static ScalerOptions defaults() { + return new ScalerOptions(9090, "0.0.0.0", 10, null); + } + + /** Bind to {@code port} with the other defaults. */ + public static ScalerOptions onPort(int port) { + return new ScalerOptions(port, "0.0.0.0", 10, null); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/scaler/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/scaler/package-info.java new file mode 100644 index 00000000..f2a1efb9 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/scaler/package-info.java @@ -0,0 +1,8 @@ +/** + * A tiny HTTP endpoint that exposes queue depth for an external autoscaler + * (e.g. KEDA's metrics-api scaler). {@link org.byteveda.taskito.scaler.Scaler} + * serves {@code GET /api/scaler} ({@code metricValue}/{@code targetValue}) and + * {@code GET /health}. Observability (metrics export) is left to the contrib + * middleware; this only reports depth for scaling decisions. + */ +package org.byteveda.taskito.scaler; From 53673c98bd6ec439527ee4b6896e0b181ca3f8e0 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:13:48 +0530 Subject: [PATCH 2/3] test(java): cover the scaler endpoint Depth + health over HTTP; non-positive target rejected. --- .../java/org/byteveda/taskito/ScalerTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java diff --git a/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java b/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java new file mode 100644 index 00000000..d25f9a6d --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java @@ -0,0 +1,58 @@ +package org.byteveda.taskito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Path; +import org.byteveda.taskito.scaler.Scaler; +import org.byteveda.taskito.scaler.ScalerOptions; +import org.byteveda.taskito.task.Task; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +class ScalerTest { + + private static final ObjectMapper JSON = new ObjectMapper(); + private static final Task TASK = Task.of("s.task", Integer.class); + + @Test + @Timeout(30) + void reportsQueueDepthAndHealth(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("s.db").toString()).open()) { + queue.enqueue(TASK, 1); + queue.enqueue(TASK, 2); + try (Scaler scaler = Scaler.start(queue, new ScalerOptions(0, "127.0.0.1", 5, null))) { + HttpClient client = HttpClient.newHttpClient(); + String base = "http://127.0.0.1:" + scaler.port(); + + HttpResponse scale = get(client, base + "/api/scaler"); + assertEquals(200, scale.statusCode()); + JsonNode body = JSON.readTree(scale.body()); + assertEquals(2, body.get("metricValue").asLong()); + assertEquals(5, body.get("targetValue").asInt()); + assertEquals("all", body.get("queueName").asText()); + + HttpResponse health = get(client, base + "/health"); + assertEquals(200, health.statusCode()); + assertEquals("ok", JSON.readTree(health.body()).get("status").asText()); + } + } + } + + @Test + void rejectsNonPositiveTarget() { + assertThrows(IllegalArgumentException.class, () -> new ScalerOptions(0, "127.0.0.1", 0, null)); + } + + private static HttpResponse get(HttpClient client, String url) throws Exception { + return client.send(HttpRequest.newBuilder(URI.create(url)).GET().build(), HttpResponse.BodyHandlers.ofString()); + } +} From 06a9e0f0c5e54b69dd00e299adf94a38fa33884f Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Wed, 1 Jul 2026 08:21:20 +0530 Subject: [PATCH 3/3] test(java): cover scaler queue filter and non-GET reject --- .../java/org/byteveda/taskito/ScalerTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java b/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java index d25f9a6d..b0046112 100644 --- a/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java +++ b/sdks/java/src/test/java/org/byteveda/taskito/ScalerTest.java @@ -12,6 +12,7 @@ import java.nio.file.Path; import org.byteveda.taskito.scaler.Scaler; import org.byteveda.taskito.scaler.ScalerOptions; +import org.byteveda.taskito.task.EnqueueOptions; import org.byteveda.taskito.task.Task; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -47,6 +48,44 @@ void reportsQueueDepthAndHealth(@TempDir Path dir) throws Exception { } } + @Test + @Timeout(30) + void filtersDepthByQueue(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("sq.db").toString()).open()) { + queue.enqueue(TASK, 1); // default queue + queue.enqueue(TASK, 2, EnqueueOptions.builder().queue("high").build()); + queue.enqueue(TASK, 3, EnqueueOptions.builder().queue("high").build()); + try (Scaler scaler = Scaler.start(queue, new ScalerOptions(0, "127.0.0.1", 5, null))) { + HttpClient client = HttpClient.newHttpClient(); + String base = "http://127.0.0.1:" + scaler.port(); + + HttpResponse scoped = get(client, base + "/api/scaler?queue=high"); + assertEquals(200, scoped.statusCode()); + JsonNode body = JSON.readTree(scoped.body()); + assertEquals(2, body.get("metricValue").asLong(), "only the 'high' queue counts"); + assertEquals("high", body.get("queueName").asText()); + } + } + } + + @Test + @Timeout(30) + void rejectsNonGetRequests(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("sm.db").toString()).open()) { + try (Scaler scaler = Scaler.start(queue, new ScalerOptions(0, "127.0.0.1", 5, null))) { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest post = HttpRequest.newBuilder( + URI.create("http://127.0.0.1:" + scaler.port() + "/api/scaler")) + .POST(HttpRequest.BodyPublishers.noBody()) + .build(); + HttpResponse response = client.send(post, HttpResponse.BodyHandlers.ofString()); + assertEquals(405, response.statusCode()); + } + } + } + @Test void rejectsNonPositiveTarget() { assertThrows(IllegalArgumentException.class, () -> new ScalerOptions(0, "127.0.0.1", 0, null));