Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions deploy/samples/kafka-trigger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Sample: a TableTrigger that fires automatically when new records arrive on a Kafka topic.
#
# The KafkaInputWatermark provider (hoptimator-kafka-controller) reports the topic's latest record
# timestamp as the input's data-time frontier. The generic TableTriggerReconciler advances this
# trigger's cursor to that frontier and launches the Job below over the newly-available window
# [watermark, timestamp]. No external scheduler/poker is needed — contrast
# deploy/samples/crontrigger.yaml, which pokes a trigger's status on a cron.
#
# Requires the operator to know the Kafka cluster: set the bootstrap servers via the
# `hoptimator.kafka.bootstrap.servers` system property or the `KAFKA_BOOTSTRAP_SERVERS` env var.
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTrigger
metadata:
name: kafka-arrival-trigger
spec:
schema: KAFKA
table: existing-topic-1
yaml: |
apiVersion: batch/v1
kind: Job
metadata:
name: kafka-arrival-trigger-job
spec:
template:
spec:
containers:
- name: hello
image: alpine/k8s:1.33.0
command:
- /bin/bash
- -c
- >-
echo "New records on {{table}}: processing window [{{watermark}}, {{timestamp}}]"
restartPolicy: Never
backoffLimit: 4
ttlSecondsAfterFinished: 90
32 changes: 32 additions & 0 deletions docs/kubernetes/crd-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,44 @@ spec:
| `schedule` | string | | Cron schedule. If set, the trigger fires on a schedule. If null, it fires on status patches. |
| `paused` | boolean | | When true, the trigger does not fire (status updates are ignored). |

### Input watermarks

A trigger fires when its input is **complete** through some data-time frontier. For sources with a
registered `InputWatermarkProvider` (see [extending](../extending/index.md)), the operator advances
the trigger's cursor to the provider's reported completeness watermark and launches the job over the
newly-available window. Sources without a provider fire on their cron `schedule` or on manual
`FIRE`. Either way, the job is handed its output window and reads it directly:

| Template variable | Meaning |
| ----------------- | -------------------------------------------------------------- |
| `{{watermark}}` | Start of the window (the previous cursor position). |
| `{{timestamp}}` | End of the window (the confirmed frontier / cron tick). |

A job that needs a wider *trailing* read (e.g. to absorb late data) expresses that in its own SQL —
that late-data tolerance is a per-job concern. There is no platform-level `lookback`/`lookahead`
adjustment; earlier `{{inputStart}}`/`{{inputEnd}}` variables and the `lookback`/`lookahead` fields
were removed.

Each window endpoint — `watermark` and `timestamp` — is also exported in a few convenience formats so
a job can consume a value without parsing the ISO string. For base name `<v>`:

| Variable | Example | Meaning |
| -------- | ------- | ------- |
| `{{<v>}}` | `2026-05-08T07:55Z` | ISO-8601 instant (e.g. `{{timestamp}}`). |
| `{{<v>EpochMs}}` | `1778226900000` | Unix epoch milliseconds. |
| `{{<v>Date}}` | `2026-05-08` | UTC calendar date (handy for `dt=`-style partitions). |
| `{{<v>Hour}}` | `07` | UTC hour-of-day, zero-padded. |

So `{{timestampDate}}`, `{{timestampEpochMs}}`, `{{watermarkHour}}`, etc. are all available.

### Status fields

| Field | Type | Description |
| ----------- | --------- | ---------------------------------------------------------------------------- |
| `timestamp` | date-time | When the trigger was last fired. **Patching this fires the trigger.** |
| `watermark` | date-time | Timestamp of the last *successfully* processed event. |
| `backfillFrom` | date-time | Start of a requested one-off backfill window. When set with `backfillTo`, the operator runs a separate `<job>-bf-<windowId>` Job over `[backfillFrom, backfillTo]` **without** advancing `watermark`/`timestamp`, then clears these fields. |
| `backfillTo` | date-time | End of the requested backfill window. See `backfillFrom`. |
| `jobs` | object | Per-job state — useful for tracking the status of jobs the trigger spawned. |

### Printer columns
Expand Down
93 changes: 82 additions & 11 deletions docs/user-guide/ddl-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,26 @@ job, the sink, and any intermediate hops the planner created.
```
CREATE [OR REPLACE] TRIGGER [IF NOT EXISTS] <name>
ON <schema.table>
AS '<yaml-template>'
AS '<job-template-name>'
[IN '<namespace>']
[SCHEDULED '<cron>']
[WITH ('<key>' '<value>', ...)]
```

Equivalent to a `TableTrigger` CRD: runs the embedded YAML (typically a Job
or CronJob) when the named table changes or on a cron schedule. The job spec
is arbitrary, so triggers are how you wire up backfills, rETL refreshes,
downstream notifications, and operational hooks without embedding that
logic in the pipeline itself. See
Creates a `TableTrigger` CRD. The `AS` clause names an existing
[`JobTemplate`](../kubernetes/crd-reference.md#jobtemplate) (optionally qualified
by `IN '<namespace>'`); when the named table changes or the cron schedule fires,
the operator instantiates that JobTemplate — rendering its YAML with the
trigger's variables — and runs the resulting Job. Triggers are how you wire up
backfills, rETL refreshes, downstream notifications, and operational hooks
without embedding that logic in the pipeline itself. See
[TableTriggers in concepts](../getting-started/concepts.md#tabletriggers)
for the bigger picture.

```sql
CREATE TRIGGER refresh_audience
ON KAFKA.existing-topic-1
AS 'apiVersion: batch/v1
kind: Job
...'
AS 'refresh-audience-job'
SCHEDULED '@hourly';
```

Expand All @@ -181,6 +181,77 @@ PAUSE TRIGGER refresh_audience;
RESUME TRIGGER refresh_audience;
```

### `LOOK BACK` / `LOOK AHEAD` (removed)

Earlier versions accepted `LOOK BACK`/`LOOK AHEAD` clauses to pre-compute an
input read window and expose it as `{{inputStart}}`/`{{inputEnd}}`. These were
removed: a trigger fires when its input is complete (see
[TableTrigger input watermarks](../kubernetes/crd-reference.md#input-watermarks)),
and a job that needs a wider trailing read applies that policy in its own SQL —
that late-data tolerance is a per-job concern, not a per-schedule one. The job
window is exposed as `{{watermark}}`/`{{timestamp}}`.

### Recognized `WITH` options

Most `WITH (...)` keys flow through to the rendered job template, but a few are
mapped onto structured `TableTrigger` spec fields:

| Option | TableTrigger field | Meaning |
| ------ | ------------------ | ------- |
| `'paused'` | `spec.paused` | `true`/`false`. |
| `'job.properties.<k>'` | `spec.jobProperties[<k>]` | Runtime job properties. |

## FIRE TRIGGER

```
FIRE TRIGGER <name>
[ FROM <bound> TO <bound> ]
```

Fires a trigger on demand — useful for backfills, reprocessing, and testing
without waiting for the schedule or an upstream change. `FIRE` is a pure
imperative action: it carries no options and never modifies the trigger's spec
(config changes go through `CREATE OR REPLACE TRIGGER`). A plain `FIRE TRIGGER x`
processes everything since the last watermark up to now. The optional
`FROM … TO …` clause instead fires a **specific output window**:

| `<bound>` form | Meaning |
| -------------- | ------- |
| `'2026-05-01T00:00:00Z'` | An absolute ISO-8601 instant. |
| `'2026-05-01'` | An absolute date (UTC midnight). |
| `<n> SECOND[S]/MINUTE[S]/HOUR[S]/DAY[S] AGO` | Relative to now, e.g. `7 DAYS AGO`. |
| `NOW` | The current time. |

```sql
-- backfill a fixed historical range
FIRE TRIGGER engaged_sessions_hourly
FROM '2026-05-01' TO '2026-05-08';

-- reprocess the last week of processed data (the end is capped at the watermark)
FIRE TRIGGER engaged_sessions_hourly
FROM 7 DAYS AGO TO NOW;
```

`FROM … TO …` requests a **one-off backfill** over that output window. The
backfill runs as a **separate Job** and does **not** move the trigger's
incremental cursor — `watermark`/`timestamp` are left untouched, so a historical
backfill never disturbs (or rewinds) live incremental processing. The job reads
and writes the requested window `[from, to]`; because a backfill covers
already-processed history, that input already exists.

A backfill can only cover **already-processed history**, so the end is
automatically **capped at the watermark**: `NOW` (or any `to` past the watermark)
means "up to the cursor." This makes relative windows like `FROM 7 DAYS AGO TO
NOW` do the sensible thing on a lagging trigger. A fire is *rejected* only if the
window is inverted, starts at/after the watermark (nothing to backfill), or the
trigger has no watermark yet — or if an incremental execution or another backfill
is already in flight. A failed backfill is abandoned (re-issue the `FIRE` to
retry).

Internally the request is recorded in `status.backfillFrom`/`status.backfillTo`;
the operator launches a `<job>-bf-<windowId>` Job, and on completion clears those
fields (leaving the cursor alone).

## CREATE TABLE

```
Expand Down Expand Up @@ -271,9 +342,9 @@ the parse alone.

- `REFRESH MATERIALIZED VIEW <name>` — intended to re-run a batch-style
materialization on demand.
- `FIRE TABLE | TRIGGER | VIEW | MATERIALIZED VIEW <name>` — intended to
- `FIRE TABLE | VIEW | MATERIALIZED VIEW <name>` — intended to
manually fire a side effect (e.g. for testing without waiting for a
schedule).
schedule). (`FIRE TRIGGER` is fully implemented — see above.)
- `PAUSE MATERIALIZED VIEW <name>` / `RESUME MATERIALIZED VIEW <name>` —
parser support exists; executor does not. (`PAUSE TRIGGER` /
`RESUME TRIGGER` above are fully implemented.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ public class Trigger implements Deployable {
* Recognised by deployers (see K8sTriggerDeployer) to short-circuit normal update. */
public static final String FIRE_OPTION = "fire";

public static final String FIRE_FROM_OPTION = "fire.from";
public static final String FIRE_TO_OPTION = "fire.to";

private final String name;
private final UserJob job;
private final String cronSchedule;
Expand Down
4 changes: 4 additions & 0 deletions hoptimator-jdbc/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"AGO"
"NOW"
]

# List of non-reserved keywords to add;
Expand All @@ -86,6 +88,8 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"AGO"
"NOW"
]

# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
Expand Down
36 changes: 33 additions & 3 deletions hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,46 @@ SqlFire SqlFireTable(Span s) :
SqlFire SqlFireTrigger(Span s) :
{
final SqlIdentifier id;
SqlNodeList options = null;
SqlNode from = null;
SqlNode to = null;
}
{
<TRIGGER> id = CompoundIdentifier()
[ options = Options() ]
[ <FROM> from = FireBound() <TO> to = FireBound() ]
{
return new SqlFireTrigger(s.end(this), id, options);
return new SqlFireTrigger(s.end(this), id, from, to);
}
}

/**
* A time bound for FIRE TRIGGER ... FROM/TO. Resolved to an absolute instant by the executor.
* One of: an absolute timestamp string literal; a relative "<n> <unit> AGO"; or NOW.
* Relative and NOW forms are encoded as char-string literals ("-5m", "now") for the executor.
*/
SqlNode FireBound() :
{
final SqlNode lit;
int amount;
String unit;
}
{
(
<NOW> { return SqlLiteral.createCharString("now", getPos()); }
|
amount = UnsignedIntLiteral()
( ( <SECOND> | <SECONDS> ) { unit = "s"; }
| ( <MINUTE> | <MINUTES> ) { unit = "m"; }
| ( <HOUR> | <HOURS> ) { unit = "h"; }
| ( <DAY> | <DAYS> ) { unit = "d"; }
)
<AGO>
{ return SqlLiteral.createCharString("-" + amount + unit, getPos()); }
|
lit = StringLiteral() { return lit; }
)
}


SqlFire SqlFireView(Span s) :
{
final SqlIdentifier id;
Expand Down
Loading
Loading