diff --git a/demo-trino.sh b/demo-trino.sh new file mode 100644 index 000000000..7fb1d25dc --- /dev/null +++ b/demo-trino.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +WAYANG_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +exec "$WAYANG_ROOT/trino-setup/demo.sh" "$@" diff --git a/trino-setup/README.md b/trino-setup/README.md new file mode 100644 index 000000000..e92c4014b --- /dev/null +++ b/trino-setup/README.md @@ -0,0 +1,267 @@ +# Trino Local Setup + +Local Trino environment backed by an **Iceberg** data lake, completely containerised. + +The current validation has three parts: + +1. Build the Wayang Trino platform and run the shared JDBC SQL-generation tests. +2. Run the Wayang Trino operator tests against the live local stack. +3. Run standalone JDBC integration tests against the local Trino, Iceberg, and MinIO stack. + +Run the commands below from the repository root. Java 17 and Docker with +Docker Compose are required; Maven is provided by the repository wrapper. + +The pure Trino platform branch is named `wayang-trino`: + +```bash +git checkout wayang-trino +``` + +## Command Conventions + +Use the `bash` blocks on macOS/Linux terminals. Use the `powershell` blocks on +Windows PowerShell from the repository root. Docker Compose commands are the +same on both platforms. + +## Stack + +| Component | Image | Port | Role | +|-----------|-------|------|------| +| **Trino** | `trinodb/trino:435` | 8080 | SQL query engine | +| **Hive Metastore** | `naushadh/hive-metastore:latest` | 9083 | Iceberg table catalog (Thrift) | +| **PostgreSQL** | `postgres:15-alpine` | 5432 | HMS metadata backing store | +| **MinIO** | `minio/minio:latest` | 9000 / 9001 | S3-compatible object storage | + +HMS is the battle-tested Iceberg catalog for Trino. Parquet data files are written by Trino directly to MinIO; HMS only stores schema/table metadata. + +## Directory Layout + +``` +trino-setup/ +|-- docker-compose.yml # Full stack definition +|-- trino/ +| |-- config.properties # Trino node config +| `-- catalog/ +| |-- iceberg.properties # Iceberg via HMS + MinIO +| `-- tpch.properties # Built-in TPC-H (no storage needed) +|-- scripts/ +| |-- init.sql # Creates iceberg.sales.orders + sample rows +| `-- run-init.sh # Helper: waits for Trino then runs init.sql +|-- pom.xml # Standalone Maven project (Java 17) +`-- src/test/java/.../ + `-- TrinoIntegrationTest.java # JUnit 5 integration tests +``` + +## 1. Test the Wayang Trino Platform + +Build the Trino platform and its required modules: + +```bash +./mvnw -Pskip-prerequisite-check -pl wayang-platforms/wayang-trino -am -DskipTests -Drat.skip=true test +``` + +On PowerShell: + +```powershell +.\mvnw.cmd --% -Pskip-prerequisite-check -pl wayang-platforms/wayang-trino -am -DskipTests -Drat.skip=true test +``` + +Then run the shared JDBC SQL-generation tests: + +```bash +./mvnw -Pskip-prerequisite-check -pl wayang-platforms/wayang-jdbc-template -am -Dtest=JdbcExecutorTest -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Drat.skip=true test +``` + +On PowerShell: + +```powershell +.\mvnw.cmd --% -Pskip-prerequisite-check -pl wayang-platforms/wayang-jdbc-template -am -Dtest=JdbcExecutorTest -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Drat.skip=true test +``` + +Expected result: + +```text +Wayang Platform Trino ... SUCCESS +Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 +``` + +## 2. Test Against the Local Trino Stack + +### 1. Start the stack + +```bash +docker compose -f trino-setup/docker-compose.yml up -d +``` + +Wait ~30 seconds for all services to become healthy. Check with: + +```bash +docker compose -f trino-setup/docker-compose.yml ps +# or watch the Trino UI at http://localhost:8080 +``` + +### 2. Run the Wayang Trino operator tests + +`TrinoOperatorsIT` exercises the Wayang Trino implementation against the live +Trino stack. It checks `TableSource`, `Filter`, `Projection`, `Join`, +`GlobalReduce`, `ReduceBy`, `Sort`, and `TableSink`, and confirms that the +expected SQL reached Trino. The standalone join test now runs a full Wayang +plan and normalizes both possible join result shapes before collecting records: +logical joins can produce `Tuple2`, while pushed-down JDBC joins +can return a flat `Record`. + +The suite is self-contained: it creates `iceberg.wayang_it`, scales its test +data to 120,000 rows so the optimizer selects SQL pushdown, and drops its test +tables afterward. It does not require `scripts/init.sql`. The suite also +contains five JavaPlanBuilder `readTable` combination tests that cover filter, +projection, global reduce, reduce-by plus sort, table sink, and join through +the public API. + +```bash +./mvnw -Pskip-prerequisite-check -pl wayang-platforms/wayang-trino -am \ + -Dtest=TrinoOperatorsIT -Dsurefire.failIfNoSpecifiedTests=false \ + -DfailIfNoTests=false -Drat.skip=true -Dlicense.skip=true test +``` + +On PowerShell: + +```powershell +.\mvnw.cmd --% -Pskip-prerequisite-check -pl wayang-platforms/wayang-trino -am -Dtest=TrinoOperatorsIT -Dsurefire.failIfNoSpecifiedTests=false -DfailIfNoTests=false -Drat.skip=true -Dlicense.skip=true test +``` + +Expected result: + +```text +Tests run: 13, Failures: 0, Errors: 0, Skipped: 0 +``` + +Verified on June 18, 2026 against the local Docker stack with the full-plan +join test and all five JavaPlanBuilder combination tests enabled. + +If Trino is unreachable, these tests are skipped instead of failed. A result +with skipped tests does not confirm that the operators work. + +### 3. Load sample Iceberg data + +```bash +bash trino-setup/scripts/run-init.sh +``` + +On PowerShell: + +```powershell +Get-Content -Raw trino-setup/scripts/init.sql | docker exec -i trino trino --server http://localhost:8080 --user admin +``` + +This creates the schema `iceberg.sales` and inserts 20 sample orders into +`iceberg.sales.orders` (Parquet files on MinIO). + +### 4. Run the standalone stack integration tests + +```bash +./mvnw -f trino-setup/pom.xml -Pintegration -Dtest=TrinoIntegrationTest test +``` + +On PowerShell: + +```powershell +.\mvnw.cmd --% -f trino-setup/pom.xml -Pintegration -Dtest=TrinoIntegrationTest test +``` + +Tests are skipped by default (no `-Pintegration`) to avoid requiring Docker in CI. +These tests validate the stack and direct JDBC queries independently of the +Wayang operator implementation. + +Expected result: + +```text +Tests run: 10, Failures: 0, Errors: 0, Skipped: 0 +BUILD SUCCESS +``` + +### 5. Manual exploration + +Open the **Trino UI**: http://localhost:8080 + +Or connect via the Trino CLI inside the container: + +```bash +docker exec -it trino trino --catalog iceberg --schema sales +``` + +```sql +-- TPC-H built-in data (no init.sql needed) +SELECT * FROM tpch.tiny.orders LIMIT 5; + +-- Iceberg table +SELECT region, SUM(amount) FROM iceberg.sales.orders GROUP BY region; + +-- Iceberg file metadata +SELECT * FROM iceberg.sales."orders$files"; + +-- Iceberg history +SELECT * FROM iceberg.sales."orders$history"; +``` + +**MinIO console**: http://localhost:9001 (login: `minioadmin` / `minioadmin`) +Look for Parquet files under `warehouse/sales/orders/`. + +### 6. Tear down + +```bash +docker compose -f trino-setup/docker-compose.yml down -v +``` + +The `-v` option removes volumes and clears the local MinIO and PostgreSQL data. + +## Test Coverage + +### Wayang operator integration tests + +| Test | What it checks | +|------|----------------| +| `tableSource` | Full table scan through `TrinoTableSource` | +| `filter` | Wayang `FilterOperator` and SQL `WHERE` pushdown | +| `projection` | Column projection pushed into the Trino query | +| `join` | Full Wayang join plan with normalization before the collecting sink | +| `globalReduce` | Global aggregation such as `SUM` | +| `reduceBy` | Grouped aggregation and SQL `GROUP BY` | +| `sort` | Wayang sort and SQL `ORDER BY` | +| `tableSink` | Filtered result written with `CREATE TABLE AS` | +| `javaPlanBuilderReadTableFilterProjection` | `readTable -> filter -> projection -> collect` | +| `javaPlanBuilderReadTableFilterGlobalReduce` | `readTable -> filter -> globalReduce -> collect` | +| `javaPlanBuilderReadTableReduceBySort` | `readTable -> reduceByKey -> sort -> collect` | +| `javaPlanBuilderReadTableFilterProjectionTableSink` | `readTable -> filter -> projection -> writeTable` | +| `javaPlanBuilderReadTableJoin` | `readTable + readTable -> join -> collect` | + +### Standalone stack integration tests + +| Test | What it checks | +|------|----------------| +| `testConnectivity` | `SELECT 1`, JDBC connection works | +| `testTpchConnector` | TPC-H built-in connector, no storage needed | +| `testTpchTopOrders` | ORDER BY + LIMIT on TPC-H | +| `testIcebergSchemaVisible` | Schema created by `init.sql` is visible | +| `testIcebergSelectAll` | Full table scan, 20 rows | +| `testIcebergFilterByRegion` | WHERE pushdown on string column | +| `testIcebergAggregate` | GROUP BY + SUM aggregation | +| `testIcebergFilterByAmount` | WHERE pushdown on double column | +| `testIcebergProjection` | SELECT subset of columns | +| `testIcebergFilesMetadata` | `$files` system table, confirms Parquet on MinIO | + +## Environment Variables + +Override defaults if running Trino on a different host/port: + +```bash +TRINO_HOST=my-trino-host TRINO_PORT=8080 ./mvnw -f trino-setup/pom.xml -Pintegration -Dtest=TrinoIntegrationTest test +``` + +On PowerShell: + +```powershell +$env:TRINO_HOST="my-trino-host" +$env:TRINO_PORT="8080" +.\mvnw.cmd --% -f trino-setup/pom.xml -Pintegration -Dtest=TrinoIntegrationTest test +Remove-Item Env:TRINO_HOST, Env:TRINO_PORT +``` diff --git a/trino-setup/demo.sh b/trino-setup/demo.sh new file mode 100644 index 000000000..0bacb4d04 --- /dev/null +++ b/trino-setup/demo.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +WAYANG_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +TRINO_SETUP="$SCRIPT_DIR" +TRINO_CONTAINER="trino" +MAVEN_FLAGS="-Pskip-prerequisite-check -Drat.skip=true -Dmaven.javadoc.skip=true" + +banner() { + echo + echo "============================================================" + printf " %s\n" "$*" + echo "============================================================" + echo +} + +step() { + echo + echo "-- $*" + echo +} + +pause() { + if [[ "${WAYANG_DEMO_AUTO:-false}" != "true" ]]; then + echo + read -rp "Press ENTER to continue..." _ || true + echo + fi +} + +run_wayang_demo() { + "$WAYANG_ROOT/mvnw" exec:java -pl wayang-platforms/wayang-trino \ + -Dexec.mainClass="org.apache.wayang.trino.TrinoDemo" \ + ${MAVEN_FLAGS} +} + +banner "ACT 1: Start Trino + Iceberg via Docker" + +step "1a. Starting the stack" +cd "$TRINO_SETUP" +docker compose up -d + +step "1b. Containers running" +docker ps --format "table {{.Names}}\t{{.Image}}\t{{.Status}}\t{{.Ports}}" \ + | grep -E "NAMES|trino|minio|metastore|postgres" + +step "1c. Waiting for Trino to be ready" +MAX_WAIT=90 +ELAPSED=0 +until docker exec "$TRINO_CONTAINER" \ + trino --execute "SELECT 1" --output-format ALIGNED >/dev/null 2>&1; do + if [[ "$ELAPSED" -ge "$MAX_WAIT" ]]; then + echo "Timed out waiting for Trino after ${MAX_WAIT}s" + exit 1 + fi + printf ". waiting (%ds elapsed)\r" "$ELAPSED" + sleep 3 + ELAPSED=$((ELAPSED + 3)) +done +echo "Trino is ready at http://localhost:8080" + +step "1d. Initialising Iceberg tables" +docker exec -i "$TRINO_CONTAINER" trino < "$TRINO_SETUP/scripts/init.sql" 2>&1 \ + | grep -v "^WARNING\|jline\|org.jline" || true +echo "iceberg.sales.orders seeded" + +step "1e. Table schema" +docker exec "$TRINO_CONTAINER" \ + trino --execute "DESCRIBE iceberg.sales.orders" \ + --output-format ALIGNED 2>/dev/null + +pause + +banner "ACT 2: Query Iceberg directly via Trino CLI" + +step "2a. Full table scan" +echo "SQL: SELECT * FROM iceberg.sales.orders" +docker exec "$TRINO_CONTAINER" \ + trino --execute "SELECT * FROM iceberg.sales.orders ORDER BY order_id" \ + --output-format ALIGNED 2>/dev/null + +step "2b. Filter: region = 'AMER'" +echo "SQL: SELECT * FROM iceberg.sales.orders WHERE region = 'AMER'" +docker exec "$TRINO_CONTAINER" \ + trino --execute "SELECT * FROM iceberg.sales.orders WHERE region = 'AMER' ORDER BY order_id" \ + --output-format ALIGNED 2>/dev/null + +step "2c. Projection with filter" +echo "SQL: SELECT region, product, amount FROM iceberg.sales.orders WHERE region = 'AMER'" +docker exec "$TRINO_CONTAINER" \ + trino --execute \ + "SELECT region, product, amount + FROM iceberg.sales.orders + WHERE region = 'AMER' + ORDER BY order_id" \ + --output-format ALIGNED 2>/dev/null + +pause + +banner "ACT 3: Wayang API filter + projection pushdown" +cd "$WAYANG_ROOT" +run_wayang_demo + +banner "Demo complete" +echo "Trino UI: http://localhost:8080" +echo "MinIO UI: http://localhost:9001 (minioadmin / minioadmin)" +echo +echo "To stop the stack:" +echo " cd trino-setup && docker compose down" diff --git a/trino-setup/docker-compose.yml b/trino-setup/docker-compose.yml new file mode 100644 index 000000000..cf582184e --- /dev/null +++ b/trino-setup/docker-compose.yml @@ -0,0 +1,142 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Stack: Trino + Hive Metastore + MinIO (S3 storage) +# +# This is the battle-tested Trino + Iceberg local setup. +# Hive Metastore (HMS) stores Iceberg table metadata over Thrift on port 9083. +# MinIO provides S3-compatible object storage for Parquet data files. +# Trino's Iceberg connector uses HMS as catalog and writes Parquet to MinIO. +# +# Ports: +# Trino: http://localhost:8080 (UI + JDBC) +# MinIO S3: http://localhost:9000 +# MinIO UI: http://localhost:9001 (minioadmin / minioadmin) +# HMS: localhost:9083 (Thrift, internal) +# Postgres: localhost:5432 (HMS backing store) + +services: + + # PostgreSQL (Hive Metastore backing database) + postgres: + image: postgres:15-alpine + container_name: trino-postgres + environment: + POSTGRES_DB: metastore + POSTGRES_USER: hive + POSTGRES_PASSWORD: hive + ports: + - "5432:5432" + volumes: + - postgres-data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U hive -d metastore"] + interval: 10s + timeout: 5s + retries: 5 + + # MinIO (S3-compatible object storage) + minio: + image: minio/minio:latest + container_name: trino-minio + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9000:9000" + - "9001:9001" + command: server /data --console-address ":9001" + volumes: + - minio-data:/data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 10s + timeout: 5s + retries: 5 + + # Create the warehouse bucket before HMS starts + minio-init: + image: minio/mc:latest + container_name: trino-minio-init + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set local http://minio:9000 minioadmin minioadmin; + mc mb local/warehouse --ignore-existing; + echo 'bucket warehouse ready'; + exit 0; + " + + # Hive Metastore + # naushadh/hive-metastore is a minimal, pre-configured HMS image + # that supports S3-compatible storage via env vars. + metastore: + image: naushadh/hive-metastore:latest + container_name: trino-metastore + depends_on: + postgres: + condition: service_healthy + minio: + condition: service_healthy + minio-init: + condition: service_completed_successfully + ports: + - "9083:9083" + environment: + DATABASE_HOST: postgres + DATABASE_DB: metastore + DATABASE_USER: hive + DATABASE_PASSWORD: hive + # S3 / MinIO + S3_ENDPOINT_URL: http://minio:9000 + S3_BUCKET: warehouse + S3_PREFIX: / + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + REGION: us-east-1 + # No nc/curl in this image; use bash's /dev/tcp built-in + healthcheck: + test: ["CMD", "/bin/bash", "-c", "exec 3<>/dev/tcp/localhost/9083 2>/dev/null && exit 0 || exit 1"] + interval: 15s + timeout: 10s + retries: 15 + + # Trino + trino: + image: trinodb/trino:435 + container_name: trino + depends_on: + metastore: + condition: service_healthy + minio: + condition: service_healthy + ports: + - "8080:8080" + volumes: + - ./trino/catalog:/etc/trino/catalog + - ./trino/config.properties:/etc/trino/config.properties + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"] + interval: 15s + timeout: 10s + retries: 10 + +volumes: + postgres-data: + minio-data: diff --git a/trino-setup/pom.xml b/trino-setup/pom.xml new file mode 100644 index 000000000..35c5256ae --- /dev/null +++ b/trino-setup/pom.xml @@ -0,0 +1,94 @@ + + + + 4.0.0 + + org.apache.wayang + trino-setup + 1.0-SNAPSHOT + jar + + Trino Local Setup - Integration Tests + + Standalone integration tests for a local Trino stack + (Trino + Nessie Iceberg catalog + MinIO S3 storage). + Independent of the Wayang codebase. + + + + 17 + 17 + UTF-8 + 435 + 5.10.2 + + + + + + io.trino + trino-jdbc + ${trino.version} + test + + + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + + + org.slf4j + slf4j-simple + 2.0.12 + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.2.5 + + + ${skipIntegrationTests} + + + + + + + + + integration + + false + + + + diff --git a/trino-setup/scripts/init.sql b/trino-setup/scripts/init.sql new file mode 100644 index 000000000..3ce74bcf8 --- /dev/null +++ b/trino-setup/scripts/init.sql @@ -0,0 +1,66 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- Run this after the stack is up to create sample Iceberg tables. +-- Usage: ./scripts/run-init.sh +-- Or manually: docker exec -it trino trino < /scripts/init.sql + +-- ── Schema ──────────────────────────────────────────────────────────────── +CREATE SCHEMA IF NOT EXISTS iceberg.sales; + +-- ── Orders table (Iceberg / Parquet on MinIO) ───────────────────────────── +CREATE TABLE IF NOT EXISTS iceberg.sales.orders ( + order_id BIGINT, + region VARCHAR, + product VARCHAR, + amount DOUBLE, + order_date DATE +) +WITH (format = 'PARQUET'); + +-- ── Idempotent seed: clear before inserting so re-runs don't duplicate rows ─ +DELETE FROM iceberg.sales.orders; + +-- ── Sample data: 20 rows, 4 regions (AMER/APAC/EMEA/LATAM), 5 products ──── +-- AMER rows: 3, 6, 9, 12, 16 → 5 rows for filter demo +-- Projection demo selects only: region, product, amount +INSERT INTO iceberg.sales.orders VALUES + (1, 'APAC', 'Widget A', 1500.00, DATE '2024-01-15'), + (2, 'EMEA', 'Widget B', 800.50, DATE '2024-01-16'), + (3, 'AMER', 'Widget A', 2200.00, DATE '2024-01-17'), + (4, 'APAC', 'Widget C', 350.75, DATE '2024-01-18'), + (5, 'EMEA', 'Widget A', 1100.00, DATE '2024-01-19'), + (6, 'AMER', 'Widget B', 950.25, DATE '2024-01-20'), + (7, 'APAC', 'Widget B', 1750.00, DATE '2024-01-21'), + (8, 'EMEA', 'Widget C', 420.00, DATE '2024-01-22'), + (9, 'AMER', 'Widget C', 680.50, DATE '2024-01-23'), + (10, 'APAC', 'Widget A', 3000.00, DATE '2024-01-24'), + (11, 'LATAM', 'Widget D', 560.00, DATE '2024-01-25'), + (12, 'AMER', 'Widget D', 1320.75, DATE '2024-01-26'), + (13, 'EMEA', 'Widget D', 990.00, DATE '2024-01-27'), + (14, 'LATAM', 'Widget E', 2100.50, DATE '2024-01-28'), + (15, 'APAC', 'Widget E', 4500.00, DATE '2024-01-29'), + (16, 'AMER', 'Widget E', 3750.00, DATE '2024-01-30'), + (17, 'EMEA', 'Widget E', 1250.00, DATE '2024-01-31'), + (18, 'LATAM', 'Widget A', 870.25, DATE '2024-02-01'), + (19, 'APAC', 'Widget D', 1680.00, DATE '2024-02-02'), + (20, 'LATAM', 'Widget B', 440.50, DATE '2024-02-03'); + +-- ── Verify ──────────────────────────────────────────────────────────────── +SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_amount +FROM iceberg.sales.orders +GROUP BY region +ORDER BY total_amount DESC; diff --git a/trino-setup/scripts/run-init.sh b/trino-setup/scripts/run-init.sh new file mode 100644 index 000000000..ebaeb337c --- /dev/null +++ b/trino-setup/scripts/run-init.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Runs init.sql against the local Trino instance. +# The stack must be fully up before running this. + +set -e + +TRINO_HOST=${TRINO_HOST:-localhost} +TRINO_PORT=${TRINO_PORT:-8080} + +echo "Waiting for Trino to be ready..." +until curl -sf "http://${TRINO_HOST}:${TRINO_PORT}/v1/info" | grep -q '"starting":false'; do + echo " Trino not ready yet, retrying in 5s..." + sleep 5 +done +echo "Trino is ready." + +echo "Running init.sql..." +docker exec -i trino trino \ + --server "http://${TRINO_HOST}:${TRINO_PORT}" \ + --user admin \ + < "$(dirname "$0")/init.sql" + +echo "Done. Sample Iceberg data loaded into iceberg.sales.orders" diff --git a/trino-setup/src/test/java/org/apache/wayang/trino/TrinoIntegrationTest.java b/trino-setup/src/test/java/org/apache/wayang/trino/TrinoIntegrationTest.java new file mode 100644 index 000000000..3c2735b82 --- /dev/null +++ b/trino-setup/src/test/java/org/apache/wayang/trino/TrinoIntegrationTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino; + +import org.junit.jupiter.api.*; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for the local Trino stack. + * + * Prerequisites: run `docker-compose up -d` and `./scripts/run-init.sh` first. + * + * Run tests: + * mvn test -Pintegration + * + * Or skip infrastructure setup and run with a custom host: + * TRINO_HOST=localhost TRINO_PORT=8080 mvn test -Pintegration + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class TrinoIntegrationTest { + + private static final String TRINO_HOST = System.getenv().getOrDefault("TRINO_HOST", "localhost"); + private static final int TRINO_PORT = Integer.parseInt(System.getenv().getOrDefault("TRINO_PORT", "8080")); + private static final String JDBC_URL = String.format("jdbc:trino://%s:%d", TRINO_HOST, TRINO_PORT); + + private static Connection connection; + + // ── Lifecycle ───────────────────────────────────────────────────────── + + @BeforeAll + static void openConnection() throws Exception { + Properties props = new Properties(); + props.setProperty("user", "admin"); // Trino requires a non-empty user + connection = DriverManager.getConnection(JDBC_URL, props); + System.out.printf("Connected to Trino at %s%n", JDBC_URL); + } + + @AfterAll + static void closeConnection() throws Exception { + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } + + // ── Helper ──────────────────────────────────────────────────────────── + + private List> query(String sql) throws SQLException { + List> rows = new ArrayList<>(); + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + int cols = rs.getMetaData().getColumnCount(); + while (rs.next()) { + List row = new ArrayList<>(); + for (int i = 1; i <= cols; i++) row.add(rs.getObject(i)); + rows.add(row); + } + } + return rows; + } + + // ── Test 1: Basic connectivity ──────────────────────────────────────── + + @Test + @Order(1) + @DisplayName("Trino responds to a simple SELECT 1") + void testConnectivity() throws SQLException { + List> rows = query("SELECT 1"); + assertEquals(1, rows.size()); + assertEquals(1L, ((Number) rows.get(0).get(0)).longValue()); + System.out.println("[PASS] Basic connectivity OK"); + } + + // ── Test 2: TPC-H built-in connector ───────────────────────────────── + + @Test + @Order(2) + @DisplayName("TPC-H tiny catalog: count orders") + void testTpchConnector() throws SQLException { + List> rows = query("SELECT COUNT(*) FROM tpch.tiny.orders"); + long count = ((Number) rows.get(0).get(0)).longValue(); + assertTrue(count > 0, "tpch.tiny.orders should have rows"); + System.out.printf("[PASS] TPC-H tiny.orders has %,d rows%n", count); + } + + @Test + @Order(3) + @DisplayName("TPC-H tiny catalog: top 5 orders by total price") + void testTpchTopOrders() throws SQLException { + List> rows = query(""" + SELECT orderkey, totalprice + FROM tpch.tiny.orders + ORDER BY totalprice DESC + LIMIT 5 + """); + assertEquals(5, rows.size(), "Expected exactly 5 rows"); + System.out.println("[PASS] TPC-H top 5 orders:"); + rows.forEach(r -> System.out.printf(" orderkey=%s totalprice=%s%n", r.get(0), r.get(1))); + } + + // ── Test 4: Iceberg — schema exists ────────────────────────────────── + + @Test + @Order(4) + @DisplayName("Iceberg catalog: schema 'sales' is visible") + void testIcebergSchemaVisible() throws SQLException { + List> rows = query("SHOW SCHEMAS IN iceberg LIKE 'sales'"); + assertFalse(rows.isEmpty(), "Schema 'sales' should exist in iceberg catalog. " + + "Did you run scripts/run-init.sh?"); + System.out.println("[PASS] Iceberg schema 'sales' is visible"); + } + + // ── Test 5: Iceberg — full table scan ──────────────────────────────── + + @Test + @Order(5) + @DisplayName("Iceberg table: select all orders") + void testIcebergSelectAll() throws SQLException { + List> rows = query("SELECT * FROM iceberg.sales.orders ORDER BY order_id"); + assertEquals(20, rows.size(), "Expected 20 rows inserted by init.sql"); + System.out.println("[PASS] Iceberg full scan: 20 rows"); + rows.forEach(r -> System.out.printf(" %s%n", r)); + } + + // ── Test 6: Iceberg — pushdown filter ──────────────────────────────── + + @Test + @Order(6) + @DisplayName("Iceberg table: filter by region = APAC") + void testIcebergFilterByRegion() throws SQLException { + List> rows = query(""" + SELECT order_id, region, amount + FROM iceberg.sales.orders + WHERE region = 'APAC' + ORDER BY order_id + """); + assertFalse(rows.isEmpty(), "Should have APAC orders"); + rows.forEach(r -> assertEquals("APAC", r.get(1), "All rows must be APAC")); + System.out.printf("[PASS] Filter pushdown: %d APAC rows%n", rows.size()); + } + + // ── Test 7: Iceberg — aggregation ──────────────────────────────────── + + @Test + @Order(7) + @DisplayName("Iceberg table: aggregate total_amount by region") + void testIcebergAggregate() throws SQLException { + List> rows = query(""" + SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_amount + FROM iceberg.sales.orders + GROUP BY region + ORDER BY total_amount DESC + """); + assertFalse(rows.isEmpty(), "Aggregation should return rows"); + System.out.println("[PASS] Aggregation by region:"); + rows.forEach(r -> System.out.printf(" region=%-5s count=%s total=%.2f%n", + r.get(0), r.get(1), ((Number) r.get(2)).doubleValue())); + } + + // ── Test 8: Iceberg — amount threshold filter ───────────────────────── + + @Test + @Order(8) + @DisplayName("Iceberg table: filter orders with amount > 1000") + void testIcebergFilterByAmount() throws SQLException { + List> rows = query(""" + SELECT order_id, amount + FROM iceberg.sales.orders + WHERE amount > 1000.0 + ORDER BY amount DESC + """); + rows.forEach(r -> assertTrue( + ((Number) r.get(1)).doubleValue() > 1000.0, + "All rows must have amount > 1000" + )); + System.out.printf("[PASS] Amount filter: %d rows with amount > 1000%n", rows.size()); + } + + // ── Test 9: Iceberg — projection (select subset of columns) ─────────── + + @Test + @Order(9) + @DisplayName("Iceberg table: project only region and product columns") + void testIcebergProjection() throws SQLException { + List> rows = query(""" + SELECT region, product + FROM iceberg.sales.orders + LIMIT 5 + """); + assertEquals(5, rows.size()); + rows.forEach(r -> { + assertNotNull(r.get(0), "region should not be null"); + assertNotNull(r.get(1), "product should not be null"); + }); + System.out.println("[PASS] Projection (region, product): 5 rows returned"); + } + + // ── Test 10: Iceberg metadata — table files ────────────────────────── + + @Test + @Order(10) + @DisplayName("Iceberg metadata: $files table lists at least one Parquet file") + void testIcebergFilesMetadata() throws SQLException { + List> rows = query(""" + SELECT file_path, record_count + FROM iceberg.sales."orders$files" + """); + assertFalse(rows.isEmpty(), "There should be at least one data file after inserts"); + System.out.println("[PASS] Iceberg $files metadata:"); + rows.forEach(r -> System.out.printf(" %s (records=%s)%n", r.get(0), r.get(1))); + } +} diff --git a/trino-setup/trino/catalog/iceberg.properties b/trino-setup/trino/catalog/iceberg.properties new file mode 100644 index 000000000..ed05c180a --- /dev/null +++ b/trino-setup/trino/catalog/iceberg.properties @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Iceberg catalog backed by Hive Metastore (HMS) + MinIO (S3 storage). +# HMS stores Iceberg table metadata; Trino writes Parquet files to MinIO via S3. +connector.name=iceberg +iceberg.catalog.type=hive_metastore +hive.metastore.uri=thrift://metastore:9083 + +# Native S3 filesystem — handles both s3:// and s3a:// (which HMS uses internally). +# This avoids the "No FileSystem for scheme s3" error when HMS assigns locations. +fs.native-s3.enabled=true +s3.endpoint=http://minio:9000 +s3.path-style-access=true +s3.aws-access-key=minioadmin +s3.aws-secret-key=minioadmin +s3.region=us-east-1 + +iceberg.file-format=PARQUET diff --git a/trino-setup/trino/catalog/tpch.properties b/trino-setup/trino/catalog/tpch.properties new file mode 100644 index 000000000..064863fc7 --- /dev/null +++ b/trino-setup/trino/catalog/tpch.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Built-in TPC-H connector – no external dependencies. +# Useful for testing basic Trino connectivity without any storage setup. +# Usage: SELECT * FROM tpch.tiny.orders LIMIT 10; +connector.name=tpch diff --git a/trino-setup/trino/config.properties b/trino-setup/trino/config.properties new file mode 100644 index 000000000..8584e4aea --- /dev/null +++ b/trino-setup/trino/config.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala index d18ed3f85..9d37aa930 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala @@ -28,7 +28,7 @@ import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaB import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap} import org.apache.wayang.basic.data.{Record, Tuple2 => RT2} import org.apache.wayang.basic.model.{DLModel, Model, LogisticRegressionModel,DecisionTreeRegressionModel} -import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator} +import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, JoinOperator, LocalCallbackSink, MapOperator, ReduceByOperator, SampleOperator, SortOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator} import org.apache.wayang.commons.util.profiledb.model.Experiment import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate} import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate} @@ -1020,6 +1020,10 @@ class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T], /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */ private var keyUdfRamEstimator: LoadEstimator = _ + /** SQL column and direction implementing the sort key. */ + private var sqlColumnName: String = _ + private var sqlDirection: String = _ + // Try to infer the type classes from the UDFs. locally { @@ -1060,8 +1064,27 @@ class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T], this } - override protected def build = - applyTargetPlatforms(inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag), this.getTargetPlatforms()) + /** + * Add a SQL implementation of the sort key. + * + * @param columnName SQL column to sort by + * @param direction SQL sort direction, e.g. `ASC` or `DESC` + * @return this instance + */ + def withSqlUdf(columnName: String, direction: String) = { + this.sqlColumnName = columnName + this.sqlDirection = direction + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag) + if (this.sqlColumnName != null) { + result.operator.asInstanceOf[SortOperator[T, Key]] + .getKeyDescriptor.withSqlImplementation(this.sqlColumnName, this.sqlDirection) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } @@ -1283,6 +1306,10 @@ class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T] /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ private var udfLoadProfileEstimator: LoadProfileEstimator = _ + /** SQL implementations of the grouping key and reduction. */ + private var keySqlUdf: String = _ + private var reduceSqlUdf: String = _ + // TODO: Add these estimators. // /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */ // private var keyUdfCpuEstimator: LoadEstimator = _ @@ -1322,7 +1349,29 @@ class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T] this } - override protected def build = applyTargetPlatforms(inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator), this.getTargetPlatforms()) + /** + * Add SQL implementations of the grouping key and reduction. + * + * @param keySqlUdf SQL grouping column + * @param reduceSqlUdf SQL aggregate expression + * @return this instance + */ + def withSqlUdfs(keySqlUdf: String, reduceSqlUdf: String) = { + this.keySqlUdf = keySqlUdf + this.reduceSqlUdf = reduceSqlUdf + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta() + .reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator) + if (this.keySqlUdf != null) { + val operator = result.operator.asInstanceOf[ReduceByOperator[T, Key]] + operator.getKeyDescriptor.withSqlImplementation(this.keySqlUdf, this.keySqlUdf) + operator.getReduceDescriptor.withSqlImplementation(this.reduceSqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } /** @@ -1402,6 +1451,9 @@ class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ private var udfLoadProfileEstimator: LoadProfileEstimator = _ + /** SQL implementation of the reduction. */ + private var sqlUdf: String = _ + // Try to infer the type classes from the udf. locally { val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]]) @@ -1422,7 +1474,25 @@ class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], this } - override protected def build = applyTargetPlatforms(inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator), this.getTargetPlatforms()) + /** + * Add a SQL implementation of the reduction. + * + * @param sqlUdf SQL aggregate expression + * @return this instance + */ + def withSqlUdf(sqlUdf: String) = { + this.sqlUdf = sqlUdf + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator) + if (this.sqlUdf != null) { + result.operator.asInstanceOf[GlobalReduceOperator[T]] + .getReduceDescriptor.withSqlImplementation(this.sqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } @@ -1490,6 +1560,12 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */ private var keyUdf1RamEstimator: LoadEstimator = _ + /** SQL implementations of both join keys. */ + private var keyUdf0TableName: String = _ + private var keyUdf0SqlUdf: String = _ + private var keyUdf1TableName: String = _ + private var keyUdf1SqlUdf: String = _ + // Try to infer the type classes from the UDFs. locally { val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]]) @@ -1568,6 +1644,22 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ this } + /** + * Add SQL implementations of both join keys. + * + * @return this instance + */ + def withSqlUdfs(thisTableName: String, + thisKeySqlUdf: String, + thatTableName: String, + thatKeySqlUdf: String) = { + this.keyUdf0TableName = thisTableName + this.keyUdf0SqlUdf = thisKeySqlUdf + this.keyUdf1TableName = thatTableName + this.keyUdf1SqlUdf = thatKeySqlUdf + this + } + /** * Assemble the joined elements to new elements. * @@ -1579,8 +1671,16 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1) }) - override protected def build = - applyTargetPlatforms(inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag), this.getTargetPlatforms()) + override protected def build = { + val result = inputDataQuanta0.dataQuanta() + .joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag) + if (this.keyUdf0SqlUdf != null) { + val operator = result.operator.asInstanceOf[JoinOperator[In0, In1, Key]] + operator.getKeyDescriptor0.withSqlImplementation(this.keyUdf0TableName, this.keyUdf0SqlUdf) + operator.getKeyDescriptor1.withSqlImplementation(this.keyUdf1TableName, this.keyUdf1SqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } diff --git a/wayang-platforms/pom.xml b/wayang-platforms/pom.xml index 6a852c165..826f42404 100644 --- a/wayang-platforms/pom.xml +++ b/wayang-platforms/pom.xml @@ -43,6 +43,7 @@ wayang-giraph wayang-flink wayang-generic-jdbc + wayang-trino wayang-tensorflow diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index 401e331cd..6dd59a3a6 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -151,7 +151,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin )); } - sb.append(';'); + // Intentionally no trailing ';'. A trailing semicolon is unnecessary for a + // single-statement JDBC executeQuery and is rejected by strict SQL parsers + // such as Trino and BigQuery. Postgres/SQLite/HSQLDB accept its absence. return sb; } @@ -167,7 +169,7 @@ protected static Tuple2 createSqlQuery(final E final Collection startTasks = stage.getStartTasks(); // Verify that we can handle this instance. - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + final ExecutionTask startTask = JdbcExecutor.selectStartTask(startTasks, stage); assert startTask.getOperator() instanceof TableSource : "Invalid JDBC stage: Start task has to be a TableSource"; @@ -192,13 +194,16 @@ protected static Tuple2 createSqlQuery(final E } else if (operator instanceof JdbcProjectionOperator) { assert projectionTask == null; // Allow one projection operator per stage for now. projectionTask = (JdbcProjectionOperator) operator; - } else if (operator instanceof final JdbcGlobalReduceOperator globalReduce) { + } else if (operator instanceof JdbcGlobalReduceOperator) { + final JdbcGlobalReduceOperator globalReduce = (JdbcGlobalReduceOperator) operator; assert globalReduceTask == null; // Allow one projection operator per stage for now. globalReduceTask = globalReduce; - } else if (operator instanceof final JdbcReduceByOperator reduceBy) { + } else if (operator instanceof JdbcReduceByOperator) { + final JdbcReduceByOperator reduceBy = (JdbcReduceByOperator) operator; assert reduceByTask == null; // Allow one projection operator per stage for now. reduceByTask = reduceBy; - } else if (operator instanceof final JdbcSortOperator sort) { + } else if (operator instanceof JdbcSortOperator) { + final JdbcSortOperator sort = (JdbcSortOperator) operator; assert sortTask == null; // Allow one projection operator per stage for now. sortTask = sort; } else if (operator instanceof JoinOperator || (operator instanceof SpatialJoinOperator)) { @@ -221,6 +226,33 @@ protected static Tuple2 createSqlQuery(final E return new Tuple2<>(query.toString(), tipChannelInstance); } + /** + * Selects the table source that belongs on the left-hand side of a JDBC join. + * Stage start tasks are not ordered, but {@link JdbcJoinOperator#createSqlClause} + * assumes its first key descriptor's table is used in the {@code FROM} clause. + */ + private static ExecutionTask selectStartTask(final Collection startTasks, final ExecutionStage stage) { + if (startTasks.size() == 1) { + return (ExecutionTask) startTasks.iterator().next(); + } + + for (ExecutionTask task : stage.getAllTasks()) { + if (task.getOperator() instanceof JdbcJoinOperator) { + final JdbcJoinOperator joinOperator = (JdbcJoinOperator) task.getOperator(); + final String leftTableName = joinOperator.getKeyDescriptor0().getSqlImplementation().field0; + for (Object startTaskObject : startTasks) { + final ExecutionTask startTask = (ExecutionTask) startTaskObject; + if (startTask.getOperator() instanceof JdbcTableSource + && ((JdbcTableSource) startTask.getOperator()).getTableName().equals(leftTableName)) { + return startTask; + } + } + } + } + + throw new WayangException("Could not determine the left table source for JDBC stage."); + } + /** * Handles execution stages that end with a {@link JdbcTableSinkOperator}. * Composes a SQL query from the stage's operators and executes it directly on @@ -235,8 +267,7 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); - assert startTasks.size() == 1 : "Invalid JDBC stage: multiple sources are not currently supported"; - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + final ExecutionTask startTask = JdbcExecutor.selectStartTask(startTasks, stage); assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; assert startTask.getOperator() instanceof TableSource @@ -249,17 +280,35 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator) termTask.getOperator(); final Collection filterTasks = new ArrayList<>(4); JdbcProjectionOperator projectionTask = null; + JdbcGlobalReduceOperator globalReduceTask = null; + JdbcReduceByOperator reduceByTask = null; + JdbcSortOperator sortTask = null; final Collection joinTasks = new ArrayList<>(); // Walk through intermediate operators, stopping at the sink ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); while (nextTask != null && !(nextTask.getOperator() instanceof JdbcTableSinkOperator)) { - if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { + if (nextTask.getOperator() instanceof JdbcFilterOperator) { + final JdbcFilterOperator filterOperator = (JdbcFilterOperator) nextTask.getOperator(); filterTasks.add(filterOperator); - } else if (nextTask.getOperator() instanceof final JdbcProjectionOperator projectionOperator) { + } else if (nextTask.getOperator() instanceof JdbcProjectionOperator) { + final JdbcProjectionOperator projectionOperator = (JdbcProjectionOperator) nextTask.getOperator(); assert projectionTask == null; projectionTask = projectionOperator; - } else if (nextTask.getOperator() instanceof final JdbcJoinOperator joinOperator) { + } else if (nextTask.getOperator() instanceof JdbcGlobalReduceOperator) { + final JdbcGlobalReduceOperator globalReduceOperator = (JdbcGlobalReduceOperator) nextTask.getOperator(); + assert globalReduceTask == null; + globalReduceTask = globalReduceOperator; + } else if (nextTask.getOperator() instanceof JdbcReduceByOperator) { + final JdbcReduceByOperator reduceByOperator = (JdbcReduceByOperator) nextTask.getOperator(); + assert reduceByTask == null; + reduceByTask = reduceByOperator; + } else if (nextTask.getOperator() instanceof JdbcSortOperator) { + final JdbcSortOperator sortOperator = (JdbcSortOperator) nextTask.getOperator(); + assert sortTask == null; + sortTask = sortOperator; + } else if (nextTask.getOperator() instanceof JdbcJoinOperator) { + final JdbcJoinOperator joinOperator = (JdbcJoinOperator) nextTask.getOperator(); joinTasks.add(joinOperator); } else { throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); @@ -268,8 +317,8 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat } // Compose the SELECT query - final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, null, null, null, - joinTasks); + final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, + globalReduceTask, reduceByTask, sortTask, joinTasks); // Remove trailing semicolon from SELECT String selectSql = selectQuery.toString(); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java index 2d546deec..4d7096649 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java @@ -81,7 +81,8 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car .createJdbcConnection()) { // Query the table cardinality. - final String sql = String.format("SELECT count(*) FROM %s;", JdbcTableSource.this.getTableName()); + // No trailing ';' — strict parsers (Trino, BigQuery) reject it in executeQuery. + final String sql = String.format("SELECT count(*) FROM %s", JdbcTableSource.this.getTableName()); final ResultSet resultSet = connection.createStatement().executeQuery(sql); if (!resultSet.next()) { throw new SQLException("No query result for \"" + sql + "\"."); diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java index 0dfd8b698..8f7b3d8a2 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java @@ -81,7 +81,7 @@ void testExecuteWithPlainTableSource() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT * FROM customer;", + "SELECT * FROM customer", sqlQueryChannelInstance.getSqlQuery() ); } @@ -130,7 +130,7 @@ void testExecuteWithFilter() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT * FROM customer WHERE age >= 18;", + "SELECT * FROM customer WHERE age >= 18", sqlQueryChannelInstance.getSqlQuery() ); } @@ -172,7 +172,7 @@ void testExecuteWithProjection() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT name, age FROM customer;", + "SELECT name, age FROM customer", sqlQueryChannelInstance.getSqlQuery() ); } @@ -240,7 +240,7 @@ void testExecuteWithProjectionAndFilters() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT name, age FROM customer WHERE age >= 18 AND name IS NOT NULL;", + "SELECT name, age FROM customer WHERE age >= 18 AND name IS NOT NULL", sqlQueryChannelInstance.getSqlQuery() ); } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java index b5ccb0848..5850fd2cf 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.wayang.jdbc.operators; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,7 +61,7 @@ void testWithHsqldb() throws SQLException { final ExecutionStage sqlStage = mock(ExecutionStage.class); final JdbcTableSource tableSourceA = new HsqldbTableSource("testA"); - + final ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA); tableSourceATask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0))); tableSourceATask.setStage(sqlStage); @@ -86,32 +85,27 @@ void testWithHsqldb() throws SQLException { globalReduceTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); sqlToStreamTask.setStage(nextStage); - - final HsqldbPlatform hsqldbPlatform = new HsqldbPlatform(); - - try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) { - final Statement statement = jdbcConnection.createStatement(); - statement.execute("CREATE TABLE IF NOT EXISTS testA (a INT, b VARCHAR(6));"); - statement.execute("INSERT INTO testA VALUES (0, 'zero');"); - statement.execute("CREATE TABLE IF NOT EXISTS testB (a INT, b INT);"); - statement.execute("INSERT INTO testB VALUES (0, 100);"); - } - final JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job); executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor()); final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() .getChannelInstance(sqlToStreamTask.getInputChannel(0)); + final HsqldbPlatform hsqldbPlatform = new HsqldbPlatform(); + try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) { final Statement statement = jdbcConnection.createStatement(); + statement.execute("DROP TABLE IF EXISTS testA"); + statement.execute("CREATE TABLE testA (col0 INT)"); + statement.execute("INSERT INTO testA VALUES (1)"); + statement.execute("INSERT INTO testA VALUES (2)"); final java.sql.ResultSet resultSet = statement.executeQuery(sqlQueryChannelInstance.getSqlQuery()); resultSet.next(); final int count = resultSet.getInt(1); - assertTrue(count > 0); + assertEquals(2, count); } - assertEquals("SELECT COUNT(*) FROM testA;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT COUNT(*) FROM testA", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java index 875a7a47b..d56405b19 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java @@ -39,7 +39,9 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; @@ -116,7 +118,12 @@ void testWithHsqldb() throws SQLException { joinTask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, joinOperator.getOutput(0))); joinTask.setStage(sqlStage); - when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask)); + // Deliberately list the right source first: JdbcExecutor must still choose + // the join's left source for the FROM clause. + when(sqlStage.getStartTasks()).thenReturn(new LinkedHashSet<>(Arrays.asList( + tableSourceBTask, tableSourceATask))); + when(sqlStage.getAllTasks()).thenReturn(new LinkedHashSet<>(Arrays.asList( + tableSourceBTask, tableSourceATask, joinTask))); when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(joinTask)); ExecutionStage nextStage = mock(ExecutionStage.class); @@ -135,7 +142,7 @@ void testWithHsqldb() throws SQLException { System.out.println(); assertEquals( - "SELECT * FROM testA JOIN testB ON testB.a=testA.a;", + "SELECT * FROM testA JOIN testB ON testB.a=testA.a", sqlQueryChannelInstance.getSqlQuery() ); } @@ -213,7 +220,7 @@ void testMultiConditionJoinWithHsqldb() throws SQLException { String generatedSql = sqlQueryChannelInstance.getSqlQuery(); assertEquals( - "SELECT * FROM orders JOIN shipments ON orders.order_id=shipments.order_id AND orders.customer_id=shipments.customer_id;", + "SELECT * FROM orders JOIN shipments ON orders.order_id=shipments.order_id AND orders.customer_id=shipments.customer_id", generatedSql ); diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java index f00f4020e..556224027 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java @@ -91,6 +91,6 @@ void testWithHsqldb() throws SQLException { final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() .getChannelInstance(sqlToStreamTask.getInputChannel(0)); - assertEquals("SELECT col0,COUNT(*) FROM testA GROUP BY col0;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT col0,COUNT(*) FROM testA GROUP BY col0", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java index 118fb7efa..1dc2fe12f 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java @@ -86,6 +86,6 @@ void testWithHsqldb() throws SQLException { final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() .getChannelInstance(sqlToStreamTask.getInputChannel(0)); - assertEquals("SELECT * FROM testA ORDER BY col0 DESC;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT * FROM testA ORDER BY col0 DESC", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-trino/pom.xml b/wayang-platforms/wayang-trino/pom.xml new file mode 100644 index 000000000..5e8592352 --- /dev/null +++ b/wayang-platforms/wayang-trino/pom.xml @@ -0,0 +1,86 @@ + + + + 4.0.0 + + + wayang-platforms + org.apache.wayang + 1.1.2-SNAPSHOT + + + wayang-trino + + Wayang Platform Trino + + Wayang implementation of the operators to be working with the platform "Trino" + + + + org.apache.wayang.platform.trino + 435 + + + + + io.trino + trino-jdbc + ${trino.version} + + + org.apache.wayang + wayang-basic + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-jdbc-template + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-spark + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-api-scala-java + 1.1.2-SNAPSHOT + test + + + org.junit.jupiter + junit-jupiter + 5.10.2 + test + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + + + + diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/Trino.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/Trino.java new file mode 100644 index 000000000..46ab325b9 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/Trino.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino; + + +import org.apache.wayang.trino.platform.TrinoPlatform; +import org.apache.wayang.trino.plugin.TrinoConversionsPlugin; +import org.apache.wayang.trino.plugin.TrinoPlugin; + +/** + * Register for relevant components of this module. + */ +public class Trino { + + private final static TrinoPlugin PLUGIN = new TrinoPlugin(); + + private final static TrinoConversionsPlugin CONVERSIONS_PLUGIN = new TrinoConversionsPlugin(); + + /** + * Retrieve the {@link TrinoPlugin}. + * + * @return the {@link TrinoPlugin} + */ + public static TrinoPlugin plugin() { + return PLUGIN; + } + + /** + * Retrieve the {@link TrinoConversionsPlugin}. + * + * @return the {@link TrinoConversionsPlugin} + */ + public static TrinoConversionsPlugin conversionPlugin() { + return CONVERSIONS_PLUGIN; + } + + + /** + * Retrieve the {@link TrinoPlatform}. + * + * @return the {@link TrinoPlatform} + */ + public static TrinoPlatform platform() { + return TrinoPlatform.getInstance(); + } + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/TrinoDemo.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/TrinoDemo.java new file mode 100644 index 000000000..9becd061d --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/TrinoDemo.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.basic.operators.LocalCallbackSink; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.java.Java; +import org.apache.wayang.trino.operators.TrinoTableSource; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Standalone demo for the Wayang Trino connector. + * + *

Demonstrates two Trino operator types: + *

    + *
  1. Seg 3 — Filter pushdown: WHERE region = 'AMER'.
  2. + *
  3. Seg 4 — Projection + Filter pushdown: + * SELECT region, product, amount ... WHERE region = 'AMER'.
  4. + *
+ * + *

Run with: + *

+ *   cd /path/to/wayang
+ *   mvn exec:java -pl wayang-platforms/wayang-trino \
+ *     -Dexec.mainClass=org.apache.wayang.trino.TrinoDemo \
+ *     -Pskip-prerequisite-check -Drat.skip=true \
+ *     [-Dtrino.url=jdbc:trino://localhost:8080] [-Dtrino.user=admin]
+ * 
+ */ +public class TrinoDemo { + + private static final String JDBC_URL = System.getProperty("trino.url", "jdbc:trino://localhost:8080"); + private static final String JDBC_USER = System.getProperty("trino.user", "admin"); + + public static void main(String[] args) throws Exception { + seg3Filter(); + seg4Projection(); + } + + // ── Seg 3 — Filter pushdown ─────────────────────────────────────────────── + + static void seg3Filter() throws Exception { + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(" Seg 3 — Filter Operator Pushdown"); + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(); + System.out.println(" Operator: FilterOperator -> TrinoFilterOperator"); + System.out.println(" SQL sent: SELECT * FROM iceberg.sales.orders"); + System.out.println(" WHERE region = 'AMER'"); + System.out.println(); + + Configuration config = buildConfig(); + WayangContext wayang = buildWayang(config); + + List results = new ArrayList<>(); + TrinoTableSource source = new TrinoTableSource( + "iceberg.sales.orders", "order_id", "region", "product", "amount", "order_date" + ); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + r -> "AMER".equals(r.getField(1)), Record.class + ).withSqlImplementation("region = 'AMER'") + ); + LocalCallbackSink sink = LocalCallbackSink.createCollectingSink(results, Record.class); + source.connectTo(0, filter, 0); + filter.connectTo(0, sink, 0); + + wayang.execute("Trino-Filter-Demo", new WayangPlan(sink)); + + System.out.println(" Results returned by Wayang:"); + System.out.printf(" %-10s %-6s %-10s %10s %-12s%n", + "order_id", "region", "product", "amount", "order_date"); + System.out.println(" " + repeat('-', 54)); + for (Record r : results) { + System.out.printf(" %-10s %-6s %-10s %10s %-12s%n", + r.getField(0), r.getField(1), r.getField(2), r.getField(3), r.getField(4)); + } + System.out.println(); + System.out.printf(" ✓ %d AMER rows — filter pushed to Trino as SQL WHERE clause%n", results.size()); + + verifyInQueryHistory("iceberg.sales.orders"); + + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(); + } + + // ── Seg 4 — Projection + Filter pushdown ───────────────────────────────── + + static void seg4Projection() throws Exception { + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(" Seg 4 — Projection Operator Pushdown"); + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(); + System.out.println(" Operators: FilterOperator -> TrinoFilterOperator"); + System.out.println(" MapOperator -> TrinoProjectionOperator"); + System.out.println(" SQL sent: SELECT region, product, amount"); + System.out.println(" FROM iceberg.sales.orders"); + System.out.println(" WHERE region = 'AMER'"); + System.out.println(); + System.out.println(" Both operators get pushed into a single SQL query —"); + System.out.println(" no unnecessary columns are transferred over the network."); + System.out.println(); + + Configuration config = buildConfig(); + WayangContext wayang = buildWayang(config); + + List results = new ArrayList<>(); + TrinoTableSource source = new TrinoTableSource( + "iceberg.sales.orders", "order_id", "region", "product", "amount", "order_date" + ); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + r -> "AMER".equals(r.getField(1)), Record.class + ).withSqlImplementation("region = 'AMER'") + ); + // Use the Record-aware projection (multi-field). The plain + // ProjectionDescriptor(Class, Class, fields...) builds a POJO projection + // whose Java implementation only supports a single field; createForRecords + // yields a multi-field Record implementation that also works if Wayang + // executes the projection on the Java side instead of pushing it to Trino. + MapOperator projection = new MapOperator<>( + ProjectionDescriptor.createForRecords( + new RecordType("order_id", "region", "product", "amount", "order_date"), + "region", "product", "amount"), + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class) + ); + LocalCallbackSink sink = LocalCallbackSink.createCollectingSink(results, Record.class); + source.connectTo(0, filter, 0); + filter.connectTo(0, projection, 0); + projection.connectTo(0, sink, 0); + + wayang.execute("Trino-Projection-Demo", new WayangPlan(sink)); + + System.out.println(" Results returned by Wayang (projected columns only):"); + System.out.printf(" %-6s %-10s %10s%n", "region", "product", "amount"); + System.out.println(" " + repeat('-', 30)); + for (Record r : results) { + System.out.printf(" %-6s %-10s %10s%n", r.getField(0), r.getField(1), r.getField(2)); + } + System.out.println(); + System.out.printf(" ✓ %d AMER rows — only 3 of 5 columns fetched (projection pushed to SQL)%n", + results.size()); + + verifyInQueryHistory("iceberg.sales.orders"); + + System.out.println("══════════════════════════════════════════════════════"); + System.out.println(); + } + + // ── Shared helpers ──────────────────────────────────────────────────────── + + private static Configuration buildConfig() { + Configuration config = new Configuration(); + config.setProperty("wayang.trino.jdbc.url", JDBC_URL); + config.setProperty("wayang.trino.jdbc.user", JDBC_USER); + config.setProperty("wayang.trino.jdbc.password", ""); + return config; + } + + private static WayangContext buildWayang(Configuration config) { + return new WayangContext(config) + .withPlugin(Java.basicPlugin()) + .withPlugin(Trino.plugin()); + } + + private static void verifyInQueryHistory(String tableHint) throws Exception { + System.out.println(); + System.out.println(" Checking Trino's system.runtime.queries for proof..."); + Properties props = new Properties(); + props.setProperty("user", JDBC_USER); + try (Connection conn = DriverManager.getConnection(JDBC_URL, props)) { + ResultSet rs = conn.createStatement().executeQuery( + "SELECT query FROM system.runtime.queries " + + "WHERE state = 'FINISHED' AND query LIKE '%" + tableHint + "%' " + + "ORDER BY created DESC LIMIT 2" + ); + System.out.println(); + System.out.println(" Last SQL Trino executed:"); + while (rs.next()) { + System.out.println(" > " + rs.getString(1).replaceAll("\\s+", " ")); + } + } + System.out.println(); + System.out.println(" ✓ Wayang-assembled SQL confirmed in Trino query history."); + } + + private static String repeat(char c, int n) { + StringBuilder sb = new StringBuilder(n); + for (int i = 0; i < n; i++) sb.append(c); + return sb.toString(); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/channels/ChannelConversions.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/channels/ChannelConversions.java new file mode 100644 index 000000000..ed8299cad --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/channels/ChannelConversions.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.channels; + +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.optimizer.channels.DefaultChannelConversion; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.jdbc.operators.SqlToRddOperator; +import org.apache.wayang.jdbc.operators.SqlToStreamOperator; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Register for the {@link ChannelConversion}s supported for this platform. + */ +public class ChannelConversions { + + public static final ChannelConversion SQL_TO_STREAM_CONVERSION = new DefaultChannelConversion( + TrinoPlatform.getInstance().getSqlQueryChannelDescriptor(), + StreamChannel.DESCRIPTOR, + () -> new SqlToStreamOperator(TrinoPlatform.getInstance()) + ); + + public static final ChannelConversion SQL_TO_UNCACHED_RDD_CONVERSION = new DefaultChannelConversion( + TrinoPlatform.getInstance().getSqlQueryChannelDescriptor(), + RddChannel.UNCACHED_DESCRIPTOR, + () -> new SqlToRddOperator(TrinoPlatform.getInstance()) + ); + + public static final Collection ALL = Arrays.asList( + SQL_TO_STREAM_CONVERSION, + SQL_TO_UNCACHED_RDD_CONVERSION + ); + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/FilterMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/FilterMapping.java new file mode 100644 index 000000000..0b95a7335 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/FilterMapping.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoFilterOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + + +/** + * Mapping from {@link FilterOperator} to {@link TrinoFilterOperator}. + */ +@SuppressWarnings("unchecked") +public class FilterMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "filter", new FilterOperator<>(null, DataSetType.createDefault(Record.class)), false + ).withAdditionalTest(op -> op.getPredicateDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new TrinoFilterOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/GlobalReduceMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/GlobalReduceMapping.java new file mode 100644 index 000000000..c395cbfd8 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/GlobalReduceMapping.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoGlobalReduceOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link GlobalReduceOperator} to {@link TrinoGlobalReduceOperator}. + */ +@SuppressWarnings("unchecked") +public class GlobalReduceMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "reduce", new GlobalReduceOperator(null, DataSetType.createDefault(Record.class)), false) + .withAdditionalTest(op -> op.getReduceDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new TrinoGlobalReduceOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/JoinMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/JoinMapping.java new file mode 100644 index 000000000..16e406fca --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/JoinMapping.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoJoinOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link JoinOperator} to {@link TrinoJoinOperator}. + */ +@SuppressWarnings("unchecked") +public class JoinMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "join", + new JoinOperator( + null, + null, + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class) + ), + false + ) + .withAdditionalTest(op -> op.getKeyDescriptor0() instanceof TransformationDescriptor) + .withAdditionalTest(op -> op.getKeyDescriptor1() instanceof TransformationDescriptor) + .withAdditionalTest(op -> op.getKeyDescriptor0().getSqlImplementation() != null) + .withAdditionalTest(op -> op.getKeyDescriptor1().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> { + return new TrinoJoinOperator(matchedOperator).at(epoch); + } + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/Mappings.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/Mappings.java new file mode 100644 index 000000000..2912ed7ed --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/Mappings.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.core.mapping.Mapping; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Register for the {@link Mapping}s supported for this platform. + */ +public class Mappings { + + public static final Collection ALL = Arrays.asList( + new FilterMapping(), + new GlobalReduceMapping(), + new JoinMapping(), + new ProjectionMapping(), + new ReduceByMapping(), + new SortMapping(), + new TableSinkMapping() + ); + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ProjectionMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ProjectionMapping.java new file mode 100644 index 000000000..95e2338af --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ProjectionMapping.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoProjectionOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link MapOperator} to {@link TrinoProjectionOperator}. + */ +public class ProjectionMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance())); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "projection", + new MapOperator<>( + null, + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getFunctionDescriptor() instanceof ProjectionDescriptor) + .withAdditionalTest(op -> op.getNumInputs() == 1); // No broadcasts. + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new TrinoProjectionOperator(matchedOperator).at(epoch)); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ReduceByMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ReduceByMapping.java new file mode 100644 index 000000000..8870c9d8c --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/ReduceByMapping.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoReduceByOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link ReduceByOperator} to {@link TrinoReduceByOperator}. + */ +@SuppressWarnings("unchecked") +public class ReduceByMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "reduceBy", + new ReduceByOperator(null, null, DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getKeyDescriptor().getSqlImplementation() != null + && op.getReduceDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new TrinoReduceByOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/SortMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/SortMapping.java new file mode 100644 index 000000000..c952deb0c --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/SortMapping.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoSortOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link SortOperator} to {@link TrinoSortOperator}. + */ +@SuppressWarnings("unchecked") +public class SortMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "sort", + new SortOperator(null, DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getKeyDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new TrinoSortOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/TableSinkMapping.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/TableSinkMapping.java new file mode 100644 index 000000000..9539f50b0 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/mapping/TableSinkMapping.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.mapping; + +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.trino.operators.TrinoTableSinkOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link TableSink} to {@link TrinoTableSinkOperator}. + */ +@SuppressWarnings("unchecked") +public class TableSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + TrinoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", new TableSink<>(null, null, null), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new TrinoTableSinkOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoExecutionOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoExecutionOperator.java new file mode 100644 index 000000000..2e2f27b21 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoExecutionOperator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.jdbc.operators.JdbcExecutionOperator; +import org.apache.wayang.trino.platform.TrinoPlatform; + +public interface TrinoExecutionOperator extends JdbcExecutionOperator { + + @Override + default TrinoPlatform getPlatform() { + return TrinoPlatform.getInstance(); + } + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoFilterOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoFilterOperator.java new file mode 100644 index 000000000..48c0ea4a7 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoFilterOperator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.jdbc.operators.JdbcFilterOperator; + + +/** + * Trino implementation of the {@link FilterOperator}. + */ +public class TrinoFilterOperator extends JdbcFilterOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoFilterOperator(PredicateDescriptor predicateDescriptor) { + super(predicateDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoFilterOperator(FilterOperator that) { + super(that); + } + + @Override + protected TrinoFilterOperator createCopy() { + return new TrinoFilterOperator(this); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoGlobalReduceOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoGlobalReduceOperator.java new file mode 100644 index 000000000..2e608faaa --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoGlobalReduceOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; + +/** + * Trino implementation of the {@link GlobalReduceOperator}. + */ +public class TrinoGlobalReduceOperator extends JdbcGlobalReduceOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoGlobalReduceOperator(ReduceDescriptor reduceDescriptor) { + super(reduceDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoGlobalReduceOperator(GlobalReduceOperator that) { + super(that); + } + + @Override + protected TrinoGlobalReduceOperator createCopy() { + return new TrinoGlobalReduceOperator(this); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoJoinOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoJoinOperator.java new file mode 100644 index 000000000..6937cac79 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoJoinOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcJoinOperator; + + +/** + * Trino implementation of the {@link JoinOperator}. + */ +public class TrinoJoinOperator extends JdbcJoinOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoJoinOperator( + TransformationDescriptor keyDescriptor0, + TransformationDescriptor keyDescriptor1) { + super(keyDescriptor0,keyDescriptor1); + } + + public TrinoJoinOperator(JoinOperator that) { + super(that); + } + + @Override + protected TrinoJoinOperator createCopy() { + return new TrinoJoinOperator(this); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoProjectionOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoProjectionOperator.java new file mode 100644 index 000000000..f567fe86f --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoProjectionOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.jdbc.operators.JdbcProjectionOperator; + +/** + * Trino implementation of the {@link FilterOperator}. + */ +public class TrinoProjectionOperator extends JdbcProjectionOperator implements TrinoExecutionOperator { + + public TrinoProjectionOperator(String... fieldNames) { + super(fieldNames); + } + + public TrinoProjectionOperator(ProjectionDescriptor functionDescriptor) { + super(functionDescriptor); + } + + public TrinoProjectionOperator(MapOperator that) { + super(that); + } + + @Override + protected TrinoProjectionOperator createCopy() { + return new TrinoProjectionOperator(this); + } + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoReduceByOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoReduceByOperator.java new file mode 100644 index 000000000..d825ca82b --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoReduceByOperator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcReduceByOperator; + +/** + * Trino implementation of the {@link ReduceByOperator}. + */ +public class TrinoReduceByOperator extends JdbcReduceByOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoReduceByOperator(TransformationDescriptor keyDescriptor, + ReduceDescriptor reduceDescriptor) { + super(keyDescriptor, reduceDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoReduceByOperator(ReduceByOperator that) { + super(that); + } + + @Override + protected TrinoReduceByOperator createCopy() { + return new TrinoReduceByOperator(this); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoSortOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoSortOperator.java new file mode 100644 index 000000000..5fd4dcb84 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoSortOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcSortOperator; + +/** + * Trino implementation of the {@link SortOperator}. + */ +public class TrinoSortOperator extends JdbcSortOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoSortOperator(TransformationDescriptor keyDescriptor) { + super(keyDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoSortOperator(SortOperator that) { + super(that); + } + + @Override + protected TrinoSortOperator createCopy() { + return new TrinoSortOperator(this); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSinkOperator.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSinkOperator.java new file mode 100644 index 000000000..cbc276e30 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSinkOperator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator; + +/** + * Trino implementation of the {@link JdbcTableSinkOperator}. The sink stays + * entirely within Trino: the composed query is wrapped in a + * {@code CREATE TABLE ... AS} (mode {@code overwrite}) or + * {@code INSERT INTO ... } statement. + * + *

Table names use Trino's three-part naming convention: + * {@code catalog.schema.table} (e.g. {@code iceberg.sales.orders}). + */ +public class TrinoTableSinkOperator extends JdbcTableSinkOperator implements TrinoExecutionOperator { + + /** + * Creates a new instance. + */ + public TrinoTableSinkOperator(String tableName, String[] columnNames) { + super(tableName, columnNames); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoTableSinkOperator(TableSink that) { + super(that); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSource.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSource.java new file mode 100644 index 000000000..cae67530d --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/operators/TrinoTableSource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.operators; + +import org.apache.wayang.basic.operators.TableSource; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.jdbc.operators.JdbcTableSource; + +import java.util.List; + +/** + * Trino implementation for the {@link TableSource}. + * + *

Table names use Trino's three-part naming convention: + * {@code catalog.schema.table} (e.g. {@code iceberg.sales.orders}). + */ +public class TrinoTableSource extends JdbcTableSource implements TrinoExecutionOperator { + + /** + * Creates a new instance. + * + * @see TableSource#TableSource(String, String...) + */ + public TrinoTableSource(String tableName, String... columnNames) { + super(tableName, columnNames); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TrinoTableSource(JdbcTableSource that) { + super(that); + } + + @Override + public List getSupportedInputChannels(int index) { + throw new UnsupportedOperationException("This operator has no input channels."); + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/platform/TrinoPlatform.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/platform/TrinoPlatform.java new file mode 100644 index 000000000..ba6ec6111 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/platform/TrinoPlatform.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.platform; + +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate; + +/** + * {@link Platform} implementation for Trino. + */ +public class TrinoPlatform extends JdbcPlatformTemplate { + + private static final String PLATFORM_NAME = "Trino"; + + private static final String CONFIG_NAME = "trino"; + + private static TrinoPlatform instance = null; + + public static TrinoPlatform getInstance() { + if (instance == null) { + instance = new TrinoPlatform(); + } + return instance; + } + + protected TrinoPlatform() { + super(PLATFORM_NAME, CONFIG_NAME); + } + + @Override + public String getJdbcDriverClassName() { + return "io.trino.jdbc.TrinoDriver"; + } + +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoConversionsPlugin.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoConversionsPlugin.java new file mode 100644 index 000000000..b2dedd1a7 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoConversionsPlugin.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.plugin; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.core.plugin.Plugin; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.trino.channels.ChannelConversions; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * This {@link Plugin} enables to use some basic Wayang {@link Operator}s on the {@link TrinoPlatform}. + */ +public class TrinoConversionsPlugin implements Plugin { + + @Override + public Collection getRequiredPlatforms() { + return Arrays.asList(TrinoPlatform.getInstance(), JavaPlatform.getInstance()); + } + + @Override + public Collection getMappings() { + return Collections.emptyList(); + } + + @Override + public Collection getChannelConversions() { + return ChannelConversions.ALL; + } + + @Override + public void setProperties(Configuration configuration) { + } +} diff --git a/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoPlugin.java b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoPlugin.java new file mode 100644 index 000000000..a04e07c0d --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/java/org/apache/wayang/trino/plugin/TrinoPlugin.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino.plugin; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.core.plugin.Plugin; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.trino.channels.ChannelConversions; +import org.apache.wayang.trino.mapping.Mappings; +import org.apache.wayang.trino.platform.TrinoPlatform; + +import java.util.Arrays; +import java.util.Collection; + +/** + * This {@link Plugin} enables to use some basic Wayang {@link Operator}s on the {@link TrinoPlatform}. + */ +public class TrinoPlugin implements Plugin { + + @Override + public Collection getRequiredPlatforms() { + return Arrays.asList(TrinoPlatform.getInstance(), JavaPlatform.getInstance()); + } + + @Override + public Collection getMappings() { + return Mappings.ALL; + } + + @Override + public Collection getChannelConversions() { + return ChannelConversions.ALL; + } + + @Override + public void setProperties(Configuration configuration) { + } +} diff --git a/wayang-platforms/wayang-trino/src/main/resources/wayang-trino-defaults.properties b/wayang-platforms/wayang-trino/src/main/resources/wayang-trino-defaults.properties new file mode 100644 index 000000000..fe0429b85 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/main/resources/wayang-trino-defaults.properties @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Connection (override per deployment in wayang.properties) +# wayang.trino.jdbc.url = jdbc:trino://localhost:8080 +# wayang.trino.jdbc.user = admin +# wayang.trino.jdbc.password = +wayang.trino.jdbc.driverName = io.trino.jdbc.TrinoDriver + +# Hardware profile used by LoadProfileToTimeConverter. +# Trino runs as a distributed cluster; model a representative coordinator node. +wayang.trino.cpu.mhz = 2700 +wayang.trino.cores = 4 +wayang.trino.costs.fix = 0.0 +wayang.trino.costs.per-ms = 1.0 + +# ── Cost model ──────────────────────────────────────────────────────────────── +# +# Formula: cpu = α * rows + β +# +# Trino is a distributed MPP engine: very low per-row cost (small α) because +# scans are parallelised across workers, but noticeable fixed overhead (larger β) +# from query planning and cluster coordination. +# +# Compared to a single-node source (Postgres α=55, β=380k): +# α = 10 — parallel scan makes per-row cost ~5× cheaper +# β = 800k — cluster startup + query dispatch overhead +# +# These are initial estimates; tune after real benchmarks by fitting the +# template formula below on measured data and updating the 'load' key. +# ────────────────────────────────────────────────────────────────────────────── + +wayang.trino.tablesource.load.template = {\ + "type":"mathex", "in":0, "out":1,\ + "cpu":"?*out0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.tablesource.load = {\ + "in":0, "out":1,\ + "cpu":"${10*out0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.filter.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.filter.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.projection.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.projection.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.join.load.template = {\ + "type":"mathex", "in":2, "out":1,\ + "cpu":"?*(in0 + in1) + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.join.load = {\ + "in":2, "out":1,\ + "cpu":"${10*(in0 + in1) + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.globalreduce.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.globalreduce.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.reduceby.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.reduceby.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.sort.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.sort.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.tablesink.load.template = {\ + "type":"mathex", "in":1, "out":0,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.tablesink.load = {\ + "in":1, "out":0,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.trino.sqltostream.load.query.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*out0 + ?"\ +} +wayang.trino.sqltostream.load.query = {\ + "in":1, "out":1,\ + "cpu":"${10*out0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} +wayang.trino.sqltostream.load.output.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*out0"\ +} +wayang.trino.sqltostream.load.output = {\ + "in":1, "out":1,\ + "cpu":"${10*out0}",\ + "ram":"0",\ + "p":0.9\ +} diff --git a/wayang-platforms/wayang-trino/src/test/java/org/apache/wayang/trino/TrinoOperatorsIT.java b/wayang-platforms/wayang-trino/src/test/java/org/apache/wayang/trino/TrinoOperatorsIT.java new file mode 100644 index 000000000..44b027b49 --- /dev/null +++ b/wayang-platforms/wayang-trino/src/test/java/org/apache/wayang/trino/TrinoOperatorsIT.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.trino; + +import org.apache.wayang.api.DataQuantaBuilder; +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.function.FunctionDescriptor; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.core.types.DataUnitType; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.trino.operators.TrinoProjectionOperator; +import org.apache.wayang.trino.operators.TrinoTableSource; +import org.apache.wayang.trino.platform.TrinoPlatform; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end integration tests for every operator the Trino platform implements, + * driven through the Wayang API against a live Trino cluster. + * + *

Coverage: {@code TableSource}, {@code Filter}, {@code Projection}, + * {@code Join}, {@code GlobalReduce}, {@code ReduceBy}, {@code Sort}, + * and {@code TableSink}. Every Wayang plan ends in a Trino table sink so the + * execution itself does not require the Java plugin. Result assertions use + * plain JDBC only after the Wayang execution has completed. + * + *

Prerequisites: a Trino reachable at {@code TRINO_HOST:TRINO_PORT} + * (defaults {@code localhost:8080}); e.g. {@code cd trino-setup && docker compose up -d}. + * If Trino is not reachable the whole class is skipped (not failed). + * + *

Run: + *

+ *   JAVA_HOME=<jdk17> mvn -o test -pl wayang-platforms/wayang-trino \
+ *     -Dtest=TrinoOperatorsIT -Dsurefire.failIfNoSpecifiedTests=false \
+ *     -Drat.skip=true -Dlicense.skip=true -Pskip-prerequisite-check
+ * 
+ */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class TrinoOperatorsIT { + + private static final String HOST = System.getenv().getOrDefault("TRINO_HOST", "localhost"); + private static final int PORT = Integer.parseInt(System.getenv().getOrDefault("TRINO_PORT", "8080")); + private static final String USER = System.getenv().getOrDefault("TRINO_USER", "admin"); + private static final String JDBC_URL = String.format("jdbc:trino://%s:%d", HOST, PORT); + + // Dedicated schema so the test is self-contained and side-effect free. + private static final String SCHEMA = "iceberg.wayang_it"; + private static final String ORDERS = SCHEMA + ".orders"; + private static final String CUSTOMERS = SCHEMA + ".customers"; + private static final String SINK_TABLE_NAME = "operator_result"; + private static final String SINK_TABLE = SCHEMA + "." + SINK_TABLE_NAME; + private static final String[] JOIN_COLUMNS = { + "order_id", "customer_id", "region", "amount", "cust_id", "name", "tier" + }; + private static final String JOIN_FLATTEN_NAME = "Trino test-only join flatten"; + + private static boolean trinoAvailable = false; + + // Lifecycle + + @BeforeAll + static void setUp() { + try (Connection c = jdbc()) { + Statement st = c.createStatement(); + st.execute("CREATE SCHEMA IF NOT EXISTS " + SCHEMA); + + st.execute("DROP TABLE IF EXISTS " + ORDERS); + st.execute("CREATE TABLE " + ORDERS + " (" + + "order_id BIGINT, customer_id BIGINT, region VARCHAR, amount DOUBLE)" + + " WITH (format = 'PARQUET')"); + st.execute("INSERT INTO " + ORDERS + " VALUES " + + "(1, 100, 'AMER', 2200.0)," + + "(2, 101, 'EMEA', 800.5)," + + "(3, 100, 'AMER', 680.5)," + + "(4, 102, 'APAC', 1500.0)," + + "(5, 101, 'EMEA', 1100.0)," + + "(6, 100, 'AMER', 950.25)"); + // Scale up so Wayang's cost optimizer actually elects SQL pushdown + // (on a tiny table it prefers a full scan + Java-side ops, which + // makes the query-history assertions fail). Trino caps sequence() + // at 10000 entries, so scale in two steps: x10000 then x2 = 120000 + // rows total; ratios preserved (AMER = 60000). + st.execute("INSERT INTO " + ORDERS + + " SELECT order_id + (n * 10), customer_id, region, amount" + + " FROM " + ORDERS + ", UNNEST(sequence(1, 9999)) AS t(n)"); // 6 -> 60000 + st.execute("INSERT INTO " + ORDERS + + " SELECT order_id + 600000, customer_id, region, amount" + + " FROM " + ORDERS); // 60000 -> 120000 + + st.execute("DROP TABLE IF EXISTS " + CUSTOMERS); + st.execute("CREATE TABLE " + CUSTOMERS + " (" + + "cust_id BIGINT, name VARCHAR, tier VARCHAR)" + + " WITH (format = 'PARQUET')"); + st.execute("INSERT INTO " + CUSTOMERS + " VALUES " + + "(100, 'Acme', 'GOLD')," + + "(101, 'Globex', 'SILVER')," + + "(102, 'Initech','BRONZE')"); + + trinoAvailable = true; + System.out.println("[TrinoOperatorsIT] Connected to Trino at " + JDBC_URL + " — fixtures created."); + } catch (Exception e) { + System.err.println("[TrinoOperatorsIT] Trino not available (" + e.getMessage() + ") — skipping."); + } + } + + @AfterAll + static void tearDown() { + if (!trinoAvailable) return; + try (Connection c = jdbc()) { + Statement st = c.createStatement(); + st.execute("DROP TABLE IF EXISTS " + ORDERS); + st.execute("DROP TABLE IF EXISTS " + CUSTOMERS); + st.execute("DROP TABLE IF EXISTS " + SINK_TABLE); + } catch (Exception e) { + System.err.println("[TrinoOperatorsIT] cleanup failed: " + e.getMessage()); + } + } + + // Tests (one per operator) + + /** TableSource: full scan returns every row. */ + @Test + @Order(1) + void tableSource() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "TableSource should return all orders"); + assertSqlReachedTrino("SELECT * FROM " + ORDERS); + } + + /** Filter: WHERE region = 'AMER' pushed to Trino. */ + @Test + @Order(2) + void filter() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "60000 AMER orders expected"); + assertEquals(0, queryLong("SELECT count_if(region <> 'AMER') FROM " + SINK_TABLE), "all rows AMER"); + assertSqlReachedTrino("WHERE region = 'AMER'"); + } + + /** Projection (+filter): only the projected columns are fetched. */ + @Test + @Order(3) + void projection() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + MapOperator projection = new MapOperator<>( + ProjectionDescriptor.createForRecords( + new RecordType("order_id", "customer_id", "region", "amount"), + "region", "amount"), + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, projection, 0); + projection.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), "60000 AMER rows expected"); + assertEquals(2, queryLong( + "SELECT count(*) FROM iceberg.information_schema.columns " + + "WHERE table_schema = 'wayang_it' AND table_name = '" + SINK_TABLE_NAME + "'"), + "projection keeps only 2 columns"); + assertSqlReachedTrino("SELECT region, amount FROM " + ORDERS); + } + + /** + * Join: orders and customers on customer_id. + * + *

The logical join emits {@code Tuple2}. A test-only + * mapping turns the following flatten map into a Trino SQL projection, so + * this plan still executes entirely in Trino without deciding the general + * Tuple-to-Record semantics for JDBC platforms. + */ + @Test + @Order(4) + void join() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource orders = new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount"); + TrinoTableSource customers = new TrinoTableSource( + CUSTOMERS, "cust_id", "name", "tier"); + JoinOperator join = new JoinOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(1)), Record.class, Record.class + ).withSqlImplementation(ORDERS, "customer_id"), + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(0)), Record.class, Record.class + ).withSqlImplementation(CUSTOMERS, "cust_id")); + MapOperator, Record> flatten = joinFlattenOperator(); + TableSink sink = tableSink(JOIN_COLUMNS); + orders.connectTo(0, join, 0); + customers.connectTo(0, join, 1); + join.connectTo(0, flatten, 0); + flatten.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "join should yield one row per order"); + assertEquals(0, queryLong("SELECT count_if(customer_id <> cust_id) FROM " + SINK_TABLE), + "joined customer IDs should match"); + assertSqlReachedTrino("JOIN " + CUSTOMERS); + } + + /** GlobalReduce: SUM(amount) over the whole table collapses to a single row. */ + @Test + @Order(5) + void globalReduce() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + GlobalReduceOperator reduce = new GlobalReduceOperator<>( + new ReduceDescriptor<>((a, b) -> a, Record.class) + .withSqlImplementation("SUM(amount) AS total_amount"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("total_amount"); + src.connectTo(0, reduce, 0); + reduce.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + // 6 base rows sum to 7231.25; scaled x20000 gives 144,625,000 (exact in doubles). + assertSingleDoubleResult(144_625_000.0, "global reduce must collapse to a single row"); + assertSqlReachedTrino("SELECT SUM(amount) AS total_amount FROM " + ORDERS); + } + + /** ReduceBy: SUM(amount) GROUP BY region yields one row per region. */ + @Test + @Order(6) + void reduceBy() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + ReduceByOperator reduceBy = new ReduceByOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(2)), Record.class, Record.class + ).withSqlImplementation("region", "region"), + new ReduceDescriptor<>((a, b) -> a, Record.class) + .withSqlImplementation("SUM(amount) AS total_amount"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("region", "total_amount"); + src.connectTo(0, reduceBy, 0); + reduceBy.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + Map sums = readRegionSums(); + assertEquals(3, sums.size(), "one row per region expected"); + // Base sums (AMER 3830.75, EMEA 1900.5, APAC 1500.0) scaled x20000. + assertEquals(76_615_000.0, sums.get("AMER"), 0.01); + assertEquals(38_010_000.0, sums.get("EMEA"), 0.01); + assertEquals(30_000_000.0, sums.get("APAC"), 0.01); + assertSqlReachedTrino("GROUP BY region"); + } + + /** Sort: ORDER BY amount ASC pushed to Trino, order preserved to the sink. */ + @Test + @Order(7) + void sort() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + SortOperator sort = new SortOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(3)), Record.class, Record.class + ).withSqlImplementation("amount", "ASC"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, sort, 0); + sort.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "sort must not change the cardinality"); + assertEquals(680.5, queryDouble("SELECT min(amount) FROM " + SINK_TABLE), 0.001); + assertEquals(2200.0, queryDouble("SELECT max(amount) FROM " + SINK_TABLE), 0.001); + assertSqlReachedTrino("ORDER BY amount ASC"); + } + + /** + * TableSink: filter + sink composed into a single {@code CREATE TABLE ... AS + * SELECT} that runs entirely inside Trino; no data leaves the database. + */ + @Test + @Order(8) + void tableSink() throws Exception { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + TrinoTableSource src = new TrinoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + TableSink sink = new TableSink<>( + new Properties(), "overwrite", SINK_TABLE, + "order_id", "customer_id", "region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + try (Connection c = jdbc()) { + ResultSet rs = c.createStatement().executeQuery( + "SELECT count(*), count_if(region <> 'AMER') FROM " + SINK_TABLE); + rs.next(); + assertEquals(60000, rs.getLong(1), "sink table must hold all AMER orders"); + assertEquals(0, rs.getLong(2), "sink table must hold only AMER orders"); + } + assertSqlReachedTrino("CREATE TABLE " + SINK_TABLE + " AS"); + } + + // JavaPlanBuilder combination tests + + /** + * JavaPlanBuilder API: read a table, filter it, project two columns, and + * write the result through a complete high-level Wayang plan. + */ + @Test + @Order(9) + void javaPlanBuilderReadTableFilterProjection() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + new JavaPlanBuilder( + wayangContext(), "Trino JavaPlanBuilder readTable integration test") + .readTable(new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .asRecords() + .projectRecords(new String[]{"order_id", "amount"}) + .writeTable(SINK_TABLE, "overwrite", new String[]{"order_id", "amount"}, new Properties()); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "60000 projected AMER orders expected"); + assertEquals(0, queryLong( + "SELECT count_if(amount NOT IN (2200.0, 680.5, 950.25)) FROM " + SINK_TABLE), + "only AMER order amounts should remain"); + assertSqlReachedTrino( + "SELECT order_id, amount FROM " + ORDERS + " WHERE region = 'AMER'"); + } + + /** + * JavaPlanBuilder API: combine a filter with a global reduction. + */ + @Test + @Order(10) + void javaPlanBuilderReadTableFilterGlobalReduce() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + new JavaPlanBuilder( + wayangContext(), "Trino JavaPlanBuilder global reduce integration test") + .readTable(new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .reduce((left, right) -> left) + .withSqlUdf("SUM(amount) AS total_amount") + .writeTable(SINK_TABLE, "overwrite", new String[]{"total_amount"}, new Properties()); + + assertSingleDoubleResult(76_615_000.0, "global reduction should return one row"); + assertSqlReachedTrino( + "SELECT SUM(amount) AS total_amount FROM " + ORDERS + " WHERE region = 'AMER'"); + } + + /** + * JavaPlanBuilder API: group by region, aggregate each group, and sort the + * grouped result. + */ + @Test + @Order(11) + void javaPlanBuilderReadTableReduceBySort() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + new JavaPlanBuilder( + wayangContext(), "Trino JavaPlanBuilder reduce-by and sort integration test") + .readTable(new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")) + .reduceByKey( + record -> new Record(record.getField(2)), + (left, right) -> left) + .withSqlUdfs("region", "SUM(amount) AS total_amount") + .sort(record -> new Record(record.getField(0))) + .withSqlUdf("region", "ASC") + .writeTable(SINK_TABLE, "overwrite", new String[]{"region", "total_amount"}, new Properties()); + + assertEquals("AMER,APAC,EMEA", queryString( + "SELECT array_join(array_agg(region ORDER BY region), ',') FROM " + SINK_TABLE), + "one row per region expected"); + assertSqlReachedTrino( + "SELECT region,SUM(amount) AS total_amount FROM " + ORDERS + + " GROUP BY region ORDER BY region ASC"); + } + + /** + * JavaPlanBuilder API: compose a filtered projection directly into a table + * sink so all processing remains in Trino. + */ + @Test + @Order(12) + void javaPlanBuilderReadTableFilterProjectionTableSink() throws Exception { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + new JavaPlanBuilder(wayangContext(), "Trino JavaPlanBuilder table sink integration test") + .readTable(new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .asRecords() + .projectRecords(new String[]{"order_id", "amount"}) + .writeTable( + SINK_TABLE, + "overwrite", + new String[]{"order_id", "amount"}, + new Properties()); + + try (Connection c = jdbc()) { + ResultSet rs = c.createStatement().executeQuery("SELECT count(*) FROM " + SINK_TABLE); + rs.next(); + assertEquals(60000, rs.getLong(1), "sink table should contain projected AMER orders"); + } + assertSqlReachedTrino( + "CREATE TABLE " + SINK_TABLE + + " AS SELECT order_id, amount FROM " + ORDERS + " WHERE region = 'AMER'"); + } + + /** + * JavaPlanBuilder API: join two tables, flatten through the test-only Trino + * mapping, and write the result in Trino. + */ + @Test + @Order(13) + void javaPlanBuilderReadTableJoin() { + Assumptions.assumeTrue(trinoAvailable, "Trino not reachable"); + + JavaPlanBuilder plan = new JavaPlanBuilder( + wayangContext(), "Trino JavaPlanBuilder join integration test"); + DataQuantaBuilder orders = plan.readTable(new TrinoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")); + DataQuantaBuilder customers = plan.readTable(new TrinoTableSource( + CUSTOMERS, "cust_id", "name", "tier")); + + orders + .join( + record -> new Record(record.getField(1)), + customers, + record -> new Record(record.getField(0))) + .withSqlUdfs(ORDERS, "customer_id", CUSTOMERS, "cust_id") + .map(new JoinFlattenFunction()) + .withName(JOIN_FLATTEN_NAME) + .writeTable(SINK_TABLE, "overwrite", JOIN_COLUMNS, new Properties()); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "join should yield one row per order"); + assertEquals(0, queryLong("SELECT count_if(customer_id <> cust_id) FROM " + SINK_TABLE), + "joined customer IDs should match"); + assertSqlReachedTrino("JOIN " + CUSTOMERS); + } + + private WayangContext wayangContext() { + Configuration config = new Configuration(); + config.setProperty("wayang.trino.jdbc.url", JDBC_URL); + config.setProperty("wayang.trino.jdbc.user", USER); + config.setProperty("wayang.trino.jdbc.password", ""); + config.getMappingProvider().addAllToWhitelist( + Collections.singleton(new JoinFlattenMapping())); + return new WayangContext(config) + .withPlugin(Trino.plugin()); + } + + private TableSink tableSink(String... columnNames) { + return new TableSink<>(new Properties(), "overwrite", SINK_TABLE, columnNames); + } + + private static MapOperator, Record> joinFlattenOperator() { + MapOperator, Record> operator = new MapOperator<>( + new TransformationDescriptor<>( + new JoinFlattenFunction(), + DataUnitType.createBasicUnchecked(Tuple2.class), + DataUnitType.createBasic(Record.class)), + DataSetType.createDefaultUnchecked(Tuple2.class), + DataSetType.createDefault(Record.class)); + operator.setName(JOIN_FLATTEN_NAME); + return operator; + } + + private static Record flattenJoinResult(Object joinResult) { + if (joinResult instanceof Record) { + return (Record) joinResult; + } + Tuple2 pair = (Tuple2) joinResult; + Record left = (Record) pair.field0; + Record right = (Record) pair.field1; + return new Record( + left.getField(0), + left.getField(1), + left.getField(2), + left.getField(3), + right.getField(0), + right.getField(1), + right.getField(2)); + } + + private long queryLong(String sql) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); ResultSet rs = statement.executeQuery(sql)) { + rs.next(); + return rs.getLong(1); + } catch (Exception e) { + throw new RuntimeException("query failed: " + sql, e); + } + } + + private double queryDouble(String sql) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); ResultSet rs = statement.executeQuery(sql)) { + rs.next(); + return rs.getDouble(1); + } catch (Exception e) { + throw new RuntimeException("query failed: " + sql, e); + } + } + + private String queryString(String sql) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); ResultSet rs = statement.executeQuery(sql)) { + rs.next(); + return rs.getString(1); + } catch (Exception e) { + throw new RuntimeException("query failed: " + sql, e); + } + } + + private void assertSingleDoubleResult(double expected, String message) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + SINK_TABLE)) { + assertTrue(rs.next(), message); + assertEquals(expected, rs.getDouble(1), 0.01, message); + assertFalse(rs.next(), message); + } catch (Exception e) { + throw new RuntimeException("query failed: SELECT * FROM " + SINK_TABLE, e); + } + } + + private Map readRegionSums() { + Map sums = new HashMap<>(); + try (Connection c = jdbc(); Statement statement = c.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + SINK_TABLE)) { + while (rs.next()) { + sums.put(rs.getString(1), rs.getDouble(2)); + } + return sums; + } catch (Exception e) { + throw new RuntimeException("query failed: SELECT * FROM " + SINK_TABLE, e); + } + } + + private static final class JoinFlattenFunction implements + FunctionDescriptor.SerializableFunction, Record> { + + @Override + public Record apply(Tuple2 tuple) { + return flattenJoinResult(tuple); + } + } + + /** Test-only mapping for the unresolved logical join Tuple-to-Record mismatch. */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static final class JoinFlattenMapping implements Mapping { + + @Override + public java.util.Collection getTransformations() { + OperatorPattern pattern = new OperatorPattern( + "joinFlatten", + new MapOperator(null, DataSetType.none(), DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(operator -> JOIN_FLATTEN_NAME.equals(((MapOperator) operator).getName())); + + ReplacementSubplanFactory factory = new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> createTrinoProjection().at(epoch)); + + return Collections.singleton(new PlanTransformation( + SubplanPattern.createSingleton(pattern), + factory, + TrinoPlatform.getInstance())); + } + + private static TrinoProjectionOperator createTrinoProjection() { + ProjectionDescriptor, Record> descriptor = new ProjectionDescriptor<>( + new JoinFlattenFunction(), + Arrays.asList(JOIN_COLUMNS), + DataUnitType.createBasicUnchecked(Tuple2.class), + DataUnitType.createBasic(Record.class)); + MapOperator, Record> projection = new MapOperator<>( + descriptor, + DataSetType.createDefaultUnchecked(Tuple2.class), + DataSetType.createDefault(Record.class)); + projection.setName(JOIN_FLATTEN_NAME); + return new TrinoProjectionOperator((MapOperator) (MapOperator) projection); + } + } + + /** Assert that Trino actually ran a query containing the given fragment. */ + private void assertSqlReachedTrino(String fragment) { + try (Connection c = jdbc()) { + ResultSet rs = c.createStatement().executeQuery( + "SELECT count(*) FROM system.runtime.queries " + + "WHERE query LIKE '%" + fragment.replace("'", "''") + "%' " + + "AND query NOT LIKE '%system.runtime%'"); + rs.next(); + assertTrue(rs.getLong(1) > 0, + "Expected a Trino query containing: " + fragment); + } catch (Exception e) { + throw new RuntimeException("query-history check failed", e); + } + } + + private static Connection jdbc() throws Exception { + Properties p = new Properties(); + p.setProperty("user", USER); + return DriverManager.getConnection(JDBC_URL, p); + } +}