diff --git a/INTEGRATION-TESTS.md b/INTEGRATION-TESTS.md new file mode 100644 index 000000000..1d7d39406 --- /dev/null +++ b/INTEGRATION-TESTS.md @@ -0,0 +1,96 @@ + + +# Integration Tests – Presto + +End-to-end tests that drive the Presto platform operators (TableSource, Filter, +Projection, Join) through the Wayang API against a live PrestoDB cluster. + +| Test | Module | Needs | +|------|--------|-------| +| `AllOperatorsIT` | `wayang-presto` | a local Presto (Docker) | + +The test **skips** (it does not fail) when Presto is unreachable. + +--- + +## Prerequisites + +- **JDK 17** — required. The Scala in `wayang-spark` does not compile on JDK 21+, + and the build targets release 17, so JDK 11 is too old. Point Maven at a JDK 17: + ```bash + export JAVA_HOME=/path/to/jdk-17 # e.g. .../corretto-17.jdk/Contents/Home + ``` +- **Maven 3.8+** +- **Docker** + +### Common Maven flags + +The repo's root build runs RAT + license + prerequisite checks that are noisy for +local runs; skip them: + +``` +-Drat.skip=true -Dlicense.skip=true -Dmaven.javadoc.skip=true -Pskip-prerequisite-check +``` + +> First build only: drop `-o` (offline) so Maven can download dependencies. + +--- + +## Presto + +The test is self-contained: it creates and seeds its own `memory.wayang_it` +tables in Presto's built-in **in-memory connector** (scaled to 120k rows so the +optimizer elects SQL pushdown) and drops them afterwards — no Hive metastore or +object storage required. + +```bash +# 1. start a single PrestoDB node with the in-memory connector +cd presto-setup && docker compose up -d --wait && cd .. + +# 2. run the operator tests (JDK 17) +JAVA_HOME=/path/to/jdk-17 \ +mvn -o test -pl wayang-platforms/wayang-presto \ + -Dtest=AllOperatorsIT -Dsurefire.failIfNoSpecifiedTests=false \ + -Drat.skip=true -Dlicense.skip=true -Dmaven.javadoc.skip=true -Pskip-prerequisite-check + +# 3. tear down when done +cd presto-setup && docker compose down -v && cd .. +``` + +Expected: `Tests run: 4, Failures: 0, Errors: 0, Skipped: 0`. + +`docker compose up -d --wait` blocks on the container healthcheck, so Presto is +query-ready when it returns. Presto listens on host port **8081** (container 8080). + +--- + +## Notes + +- **Pushdown is cost-gated.** On tiny tables Wayang's optimizer prefers a full + scan + Java-side filter/projection, so pushdown only appears once a table is + large enough (hence the test scales to 120k rows). Each test asserts both correct + results and that the expected SQL reached Presto (`system.runtime.queries`). +- **Join.** A JDBC join is verified through the operator's SQL-clause contract + executed on Presto, not the high-level `WayangContext` API — the logical + `JoinOperator` emits `Tuple2`, which cannot connect to a `Record` + sink before the SQL pushdown flattens it. +- **Trailing semicolons.** Presto's SQL parser rejects a trailing `;` in + `executeQuery`, so this branch also carries the jdbc-template change that stops + emitting one (shared with the other JDBC platforms; Postgres/SQLite tolerate its + absence). diff --git a/improvement.md b/improvement.md new file mode 100644 index 000000000..5485fd929 --- /dev/null +++ b/improvement.md @@ -0,0 +1,120 @@ + + +# Presto engine-only integration test + +## 1. What this branch demonstrates + +The question this branch answers is **not** "does Presto execute some single +operator?" but: + +> From `WayangContext.execute(...)` to the end of the whole Wayang plan, do all +> data processing **and** the final sink run inside Presto, **without** registering +> `Java.basicPlugin()`? + +On this branch the answer is **yes**. `PrestoOperatorsIT`: + +- registers **only** `Presto.plugin()` — no `Java.basicPlugin()`; +- ends **every** Wayang plan in a Presto `TableSink`, which compiles to a single + `CREATE TABLE ... AS SELECT` executed inside Presto; +- after `WayangContext.execute(...)` returns, JUnit reads the result table with a + plain JDBC query (assertion only — not part of the Wayang plan); +- handles the join `Tuple2` vs flat `Record` mismatch with a + test-only flatten mapping (see §4). This is a test-only scheme, not a final + decision on Tuple-to-Record semantics for JDBC platforms. + +This mirrors the Trino-only work on `wayang-trino-only-test`; the contrast is the +older mixed branch `wayang-presto`, which registered both `Java.basicPlugin()` and +`Presto.plugin()` and ended most operator tests in a Java `LocalCallbackSink`. + +## 2. Execution shape + +```text +Presto TableSource -> Presto operator(s) -> Presto TableSink + | + v + CREATE TABLE memory.wayang_it.operator_result AS SELECT ... + +WayangContext.execute(...) returns + | + v + JUnit queries the result table over JDBC (assertions only) +``` + +The final JDBC query is part of the test only: it is not in the Wayang logical +plan, it is not a Wayang Java execution operator, and it does not process plan +data on Presto's behalf — it just inspects what Presto already wrote. + +## 3. The shared executor change + +All JDBC platforms share `wayang-jdbc-template`'s `JdbcExecutor`. When a stage's +terminal task is a `JdbcTableSinkOperator`, `JdbcExecutor.executeSinkStage(...)` +composes and runs the `CREATE TABLE ... AS SELECT` directly on the connection. + +The previous Presto branch's `executeSinkStage` had two gaps that only surface +once **every** test ends in a `TableSink`: + +1. It asserted a stage has a single source, so a join (orders + customers, two + sources) could not be composed. It also lacked the `selectStartTask(...)` helper + that picks the correct left/`FROM` table. +2. It only collected filter, projection and join; it threw `WayangException` for + global reduce, reduce-by and sort, and passed `null` for them to + `createSqlString(...)`. + +This branch ports the engine-only `executeSinkStage` (identical to the file on +`wayang-trino-only-test`): it uses `selectStartTask(...)` for multi-source joins +and collects global reduce / reduce-by / sort, passing them into the existing +`createSqlString(...)`. The file is platform-agnostic. (Assertions are enabled +under Maven — `pom.xml` `enableAssertions=true` — so without this change a +join/reduce/sort sink would fail loudly, not silently.) + +## 4. The join flatten mapping + +A logical `JoinOperator` emits `Tuple2`, while a pushed-down JDBC +join already emits a flat `Record`. The test wires an explicit flatten `MapOperator` +(named `JOIN_FLATTEN_NAME`) and registers a test-only `JoinFlattenMapping` on the +configuration whitelist; the mapping rewrites that named map into a +`PrestoProjectionOperator`, so the flatten is also pushed into Presto SQL and the +plan stays entirely in Presto. (Same approach as the Trino-only test, using +`PrestoProjectionOperator` + `PrestoPlatform`.) + +## 5. Coverage and results + +`PrestoOperatorsIT` runs 13 tests (8 operator-level + 5 high-level +`JavaPlanBuilder`) covering `TableSource`, `Filter`, `Projection`, `Join`, +`GlobalReduce`, `ReduceBy`, `Sort`, `TableSink`. Each composes a +`CREATE TABLE ... AS SELECT` and additionally asserts, via +`system.runtime.queries`, that the expected SQL actually reached Presto. + +The high-level tests also rely on the `withSqlUdf` / `withSqlUdfs` additions to +`DataQuantaBuilder.scala` (ported from `wayang-trino-only-test`) so reduce / join / +sort builders can carry SQL implementations. + +```bash +docker compose -f presto-setup/docker-compose.yml up -d + +JAVA_HOME= mvn test -pl wayang-platforms/wayang-presto -am \ + -Dtest=PrestoOperatorsIT -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false \ + -Drat.skip=true -Dlicense.skip=true -Pskip-prerequisite-check + +docker compose -f presto-setup/docker-compose.yml down +``` + +Expected: `Tests run: 13, Failures: 0, Errors: 0, Skipped: 0`. The suite scales +its fixtures to 120k rows and creates/drops its own `memory.wayang_it` schema. +If Presto is unreachable the whole class is skipped (not failed). diff --git a/presto-setup/docker-compose.yml b/presto-setup/docker-compose.yml new file mode 100644 index 000000000..99d8e4fe1 --- /dev/null +++ b/presto-setup/docker-compose.yml @@ -0,0 +1,47 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Stack: a single PrestoDB coordinator/worker with the in-memory connector. +# +# The `memory` connector supports CREATE SCHEMA / CREATE TABLE / INSERT / SELECT +# entirely in-memory, so the integration test is fully self-contained — no Hive +# metastore, object storage, or external catalog required. +# +# Ports: +# Presto: http://localhost:8081 (UI + JDBC; container listens on 8080) +# +# The host port is 8081 to avoid clashing with the Trino stack (which uses 8080). + +services: + + presto: + image: prestodb/presto:0.289 + container_name: presto + ports: + - "8081:8080" + volumes: + # Enable the in-memory connector by adding a catalog properties file. + - ./etc/catalog/memory.properties:/opt/presto-server/etc/catalog/memory.properties + # Presto needs ~20-60s before it accepts queries. Gate on /v1/info so callers + # (and `docker compose up -d --wait`) can wait for readiness; the container + # reports "Up" long before the coordinator is ready. + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/v1/info"] + interval: 10s + timeout: 5s + retries: 15 + start_period: 30s diff --git a/presto-setup/etc/catalog/memory.properties b/presto-setup/etc/catalog/memory.properties new file mode 100644 index 000000000..2accfc274 --- /dev/null +++ b/presto-setup/etc/catalog/memory.properties @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# PrestoDB in-memory connector. +# Backs the `memory` catalog used by the Wayang Presto integration tests: +# tables created here (CREATE TABLE memory..) live in worker +# memory and are dropped at teardown. +connector.name=memory +# Cap on-heap connector data per node. The image's default heap is -Xmx1G with +# -XX:+ExitOnOutOfMemoryError, so keep this well under the heap; 256MB is ample +# for the small, transient test tables. Raise the heap (custom jvm.config) before +# increasing this if you reuse the stack for larger inserts. +memory.max-data-per-node=256MB diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala index d18ed3f85..9d37aa930 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala @@ -28,7 +28,7 @@ import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaB import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap} import org.apache.wayang.basic.data.{Record, Tuple2 => RT2} import org.apache.wayang.basic.model.{DLModel, Model, LogisticRegressionModel,DecisionTreeRegressionModel} -import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator} +import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, JoinOperator, LocalCallbackSink, MapOperator, ReduceByOperator, SampleOperator, SortOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator} import org.apache.wayang.commons.util.profiledb.model.Experiment import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate} import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate} @@ -1020,6 +1020,10 @@ class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T], /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */ private var keyUdfRamEstimator: LoadEstimator = _ + /** SQL column and direction implementing the sort key. */ + private var sqlColumnName: String = _ + private var sqlDirection: String = _ + // Try to infer the type classes from the UDFs. locally { @@ -1060,8 +1064,27 @@ class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T], this } - override protected def build = - applyTargetPlatforms(inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag), this.getTargetPlatforms()) + /** + * Add a SQL implementation of the sort key. + * + * @param columnName SQL column to sort by + * @param direction SQL sort direction, e.g. `ASC` or `DESC` + * @return this instance + */ + def withSqlUdf(columnName: String, direction: String) = { + this.sqlColumnName = columnName + this.sqlDirection = direction + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag) + if (this.sqlColumnName != null) { + result.operator.asInstanceOf[SortOperator[T, Key]] + .getKeyDescriptor.withSqlImplementation(this.sqlColumnName, this.sqlDirection) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } @@ -1283,6 +1306,10 @@ class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T] /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ private var udfLoadProfileEstimator: LoadProfileEstimator = _ + /** SQL implementations of the grouping key and reduction. */ + private var keySqlUdf: String = _ + private var reduceSqlUdf: String = _ + // TODO: Add these estimators. // /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */ // private var keyUdfCpuEstimator: LoadEstimator = _ @@ -1322,7 +1349,29 @@ class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T] this } - override protected def build = applyTargetPlatforms(inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator), this.getTargetPlatforms()) + /** + * Add SQL implementations of the grouping key and reduction. + * + * @param keySqlUdf SQL grouping column + * @param reduceSqlUdf SQL aggregate expression + * @return this instance + */ + def withSqlUdfs(keySqlUdf: String, reduceSqlUdf: String) = { + this.keySqlUdf = keySqlUdf + this.reduceSqlUdf = reduceSqlUdf + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta() + .reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator) + if (this.keySqlUdf != null) { + val operator = result.operator.asInstanceOf[ReduceByOperator[T, Key]] + operator.getKeyDescriptor.withSqlImplementation(this.keySqlUdf, this.keySqlUdf) + operator.getReduceDescriptor.withSqlImplementation(this.reduceSqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } /** @@ -1402,6 +1451,9 @@ class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */ private var udfLoadProfileEstimator: LoadProfileEstimator = _ + /** SQL implementation of the reduction. */ + private var sqlUdf: String = _ + // Try to infer the type classes from the udf. locally { val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]]) @@ -1422,7 +1474,25 @@ class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], this } - override protected def build = applyTargetPlatforms(inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator), this.getTargetPlatforms()) + /** + * Add a SQL implementation of the reduction. + * + * @param sqlUdf SQL aggregate expression + * @return this instance + */ + def withSqlUdf(sqlUdf: String) = { + this.sqlUdf = sqlUdf + this + } + + override protected def build = { + val result = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator) + if (this.sqlUdf != null) { + result.operator.asInstanceOf[GlobalReduceOperator[T]] + .getReduceDescriptor.withSqlImplementation(this.sqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } @@ -1490,6 +1560,12 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */ private var keyUdf1RamEstimator: LoadEstimator = _ + /** SQL implementations of both join keys. */ + private var keyUdf0TableName: String = _ + private var keyUdf0SqlUdf: String = _ + private var keyUdf1TableName: String = _ + private var keyUdf1SqlUdf: String = _ + // Try to infer the type classes from the UDFs. locally { val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]]) @@ -1568,6 +1644,22 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ this } + /** + * Add SQL implementations of both join keys. + * + * @return this instance + */ + def withSqlUdfs(thisTableName: String, + thisKeySqlUdf: String, + thatTableName: String, + thatKeySqlUdf: String) = { + this.keyUdf0TableName = thisTableName + this.keyUdf0SqlUdf = thisKeySqlUdf + this.keyUdf1TableName = thatTableName + this.keyUdf1SqlUdf = thatKeySqlUdf + this + } + /** * Assemble the joined elements to new elements. * @@ -1579,8 +1671,16 @@ class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_ override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1) }) - override protected def build = - applyTargetPlatforms(inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag), this.getTargetPlatforms()) + override protected def build = { + val result = inputDataQuanta0.dataQuanta() + .joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag) + if (this.keyUdf0SqlUdf != null) { + val operator = result.operator.asInstanceOf[JoinOperator[In0, In1, Key]] + operator.getKeyDescriptor0.withSqlImplementation(this.keyUdf0TableName, this.keyUdf0SqlUdf) + operator.getKeyDescriptor1.withSqlImplementation(this.keyUdf1TableName, this.keyUdf1SqlUdf) + } + applyTargetPlatforms(result, this.getTargetPlatforms()) + } } diff --git a/wayang-platforms/pom.xml b/wayang-platforms/pom.xml index 6a852c165..9c5e29545 100644 --- a/wayang-platforms/pom.xml +++ b/wayang-platforms/pom.xml @@ -43,6 +43,7 @@ wayang-giraphwayang-flinkwayang-generic-jdbc + wayang-prestowayang-tensorflow diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index 401e331cd..6dd59a3a6 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -151,7 +151,9 @@ public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, fin )); } - sb.append(';'); + // Intentionally no trailing ';'. A trailing semicolon is unnecessary for a + // single-statement JDBC executeQuery and is rejected by strict SQL parsers + // such as Trino and BigQuery. Postgres/SQLite/HSQLDB accept its absence. return sb; } @@ -167,7 +169,7 @@ protected static Tuple2 createSqlQuery(final E final Collection startTasks = stage.getStartTasks(); // Verify that we can handle this instance. - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + final ExecutionTask startTask = JdbcExecutor.selectStartTask(startTasks, stage); assert startTask.getOperator() instanceof TableSource : "Invalid JDBC stage: Start task has to be a TableSource"; @@ -192,13 +194,16 @@ protected static Tuple2 createSqlQuery(final E } else if (operator instanceof JdbcProjectionOperator) { assert projectionTask == null; // Allow one projection operator per stage for now. projectionTask = (JdbcProjectionOperator) operator; - } else if (operator instanceof final JdbcGlobalReduceOperator globalReduce) { + } else if (operator instanceof JdbcGlobalReduceOperator) { + final JdbcGlobalReduceOperator globalReduce = (JdbcGlobalReduceOperator) operator; assert globalReduceTask == null; // Allow one projection operator per stage for now. globalReduceTask = globalReduce; - } else if (operator instanceof final JdbcReduceByOperator reduceBy) { + } else if (operator instanceof JdbcReduceByOperator) { + final JdbcReduceByOperator reduceBy = (JdbcReduceByOperator) operator; assert reduceByTask == null; // Allow one projection operator per stage for now. reduceByTask = reduceBy; - } else if (operator instanceof final JdbcSortOperator sort) { + } else if (operator instanceof JdbcSortOperator) { + final JdbcSortOperator sort = (JdbcSortOperator) operator; assert sortTask == null; // Allow one projection operator per stage for now. sortTask = sort; } else if (operator instanceof JoinOperator || (operator instanceof SpatialJoinOperator)) { @@ -221,6 +226,33 @@ protected static Tuple2 createSqlQuery(final E return new Tuple2<>(query.toString(), tipChannelInstance); } + /** + * Selects the table source that belongs on the left-hand side of a JDBC join. + * Stage start tasks are not ordered, but {@link JdbcJoinOperator#createSqlClause} + * assumes its first key descriptor's table is used in the {@code FROM} clause. + */ + private static ExecutionTask selectStartTask(final Collection startTasks, final ExecutionStage stage) { + if (startTasks.size() == 1) { + return (ExecutionTask) startTasks.iterator().next(); + } + + for (ExecutionTask task : stage.getAllTasks()) { + if (task.getOperator() instanceof JdbcJoinOperator) { + final JdbcJoinOperator joinOperator = (JdbcJoinOperator) task.getOperator(); + final String leftTableName = joinOperator.getKeyDescriptor0().getSqlImplementation().field0; + for (Object startTaskObject : startTasks) { + final ExecutionTask startTask = (ExecutionTask) startTaskObject; + if (startTask.getOperator() instanceof JdbcTableSource + && ((JdbcTableSource) startTask.getOperator()).getTableName().equals(leftTableName)) { + return startTask; + } + } + } + } + + throw new WayangException("Could not determine the left table source for JDBC stage."); + } + /** * Handles execution stages that end with a {@link JdbcTableSinkOperator}. * Composes a SQL query from the stage's operators and executes it directly on @@ -235,8 +267,7 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); - assert startTasks.size() == 1 : "Invalid JDBC stage: multiple sources are not currently supported"; - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + final ExecutionTask startTask = JdbcExecutor.selectStartTask(startTasks, stage); assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; assert startTask.getOperator() instanceof TableSource @@ -249,17 +280,35 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator) termTask.getOperator(); final Collection filterTasks = new ArrayList<>(4); JdbcProjectionOperator projectionTask = null; + JdbcGlobalReduceOperator globalReduceTask = null; + JdbcReduceByOperator reduceByTask = null; + JdbcSortOperator sortTask = null; final Collection joinTasks = new ArrayList<>(); // Walk through intermediate operators, stopping at the sink ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); while (nextTask != null && !(nextTask.getOperator() instanceof JdbcTableSinkOperator)) { - if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { + if (nextTask.getOperator() instanceof JdbcFilterOperator) { + final JdbcFilterOperator filterOperator = (JdbcFilterOperator) nextTask.getOperator(); filterTasks.add(filterOperator); - } else if (nextTask.getOperator() instanceof final JdbcProjectionOperator projectionOperator) { + } else if (nextTask.getOperator() instanceof JdbcProjectionOperator) { + final JdbcProjectionOperator projectionOperator = (JdbcProjectionOperator) nextTask.getOperator(); assert projectionTask == null; projectionTask = projectionOperator; - } else if (nextTask.getOperator() instanceof final JdbcJoinOperator joinOperator) { + } else if (nextTask.getOperator() instanceof JdbcGlobalReduceOperator) { + final JdbcGlobalReduceOperator globalReduceOperator = (JdbcGlobalReduceOperator) nextTask.getOperator(); + assert globalReduceTask == null; + globalReduceTask = globalReduceOperator; + } else if (nextTask.getOperator() instanceof JdbcReduceByOperator) { + final JdbcReduceByOperator reduceByOperator = (JdbcReduceByOperator) nextTask.getOperator(); + assert reduceByTask == null; + reduceByTask = reduceByOperator; + } else if (nextTask.getOperator() instanceof JdbcSortOperator) { + final JdbcSortOperator sortOperator = (JdbcSortOperator) nextTask.getOperator(); + assert sortTask == null; + sortTask = sortOperator; + } else if (nextTask.getOperator() instanceof JdbcJoinOperator) { + final JdbcJoinOperator joinOperator = (JdbcJoinOperator) nextTask.getOperator(); joinTasks.add(joinOperator); } else { throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); @@ -268,8 +317,8 @@ private static void executeSinkStage(final ExecutionStage stage, final Optimizat } // Compose the SELECT query - final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, null, null, null, - joinTasks); + final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, + globalReduceTask, reduceByTask, sortTask, joinTasks); // Remove trailing semicolon from SELECT String selectSql = selectQuery.toString(); diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java index 2d546deec..4d7096649 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcTableSource.java @@ -81,7 +81,8 @@ public CardinalityEstimate estimate(OptimizationContext optimizationContext, Car .createJdbcConnection()) { // Query the table cardinality. - final String sql = String.format("SELECT count(*) FROM %s;", JdbcTableSource.this.getTableName()); + // No trailing ';' — strict parsers (Trino, BigQuery) reject it in executeQuery. + final String sql = String.format("SELECT count(*) FROM %s", JdbcTableSource.this.getTableName()); final ResultSet resultSet = connection.createStatement().executeQuery(sql); if (!resultSet.next()) { throw new SQLException("No query result for \"" + sql + "\"."); diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java index 0dfd8b698..8f7b3d8a2 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/execution/JdbcExecutorTest.java @@ -81,7 +81,7 @@ void testExecuteWithPlainTableSource() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT * FROM customer;", + "SELECT * FROM customer", sqlQueryChannelInstance.getSqlQuery() ); } @@ -130,7 +130,7 @@ void testExecuteWithFilter() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT * FROM customer WHERE age >= 18;", + "SELECT * FROM customer WHERE age >= 18", sqlQueryChannelInstance.getSqlQuery() ); } @@ -172,7 +172,7 @@ void testExecuteWithProjection() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT name, age FROM customer;", + "SELECT name, age FROM customer", sqlQueryChannelInstance.getSqlQuery() ); } @@ -240,7 +240,7 @@ void testExecuteWithProjectionAndFilters() throws SQLException { SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor().getChannelInstance(sqlToStreamTask.getInputChannel(0)); assertEquals( - "SELECT name, age FROM customer WHERE age >= 18 AND name IS NOT NULL;", + "SELECT name, age FROM customer WHERE age >= 18 AND name IS NOT NULL", sqlQueryChannelInstance.getSqlQuery() ); } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java index b5ccb0848..4465751be 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java @@ -112,6 +112,6 @@ void testWithHsqldb() throws SQLException { assertTrue(count > 0); } - assertEquals("SELECT COUNT(*) FROM testA;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT COUNT(*) FROM testA", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java index 875a7a47b..0d7a4d65b 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcJoinOperatorTest.java @@ -135,7 +135,7 @@ void testWithHsqldb() throws SQLException { System.out.println(); assertEquals( - "SELECT * FROM testA JOIN testB ON testB.a=testA.a;", + "SELECT * FROM testA JOIN testB ON testB.a=testA.a", sqlQueryChannelInstance.getSqlQuery() ); } @@ -213,7 +213,7 @@ void testMultiConditionJoinWithHsqldb() throws SQLException { String generatedSql = sqlQueryChannelInstance.getSqlQuery(); assertEquals( - "SELECT * FROM orders JOIN shipments ON orders.order_id=shipments.order_id AND orders.customer_id=shipments.customer_id;", + "SELECT * FROM orders JOIN shipments ON orders.order_id=shipments.order_id AND orders.customer_id=shipments.customer_id", generatedSql ); diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java index f00f4020e..556224027 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java @@ -91,6 +91,6 @@ void testWithHsqldb() throws SQLException { final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() .getChannelInstance(sqlToStreamTask.getInputChannel(0)); - assertEquals("SELECT col0,COUNT(*) FROM testA GROUP BY col0;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT col0,COUNT(*) FROM testA GROUP BY col0", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java index 118fb7efa..1dc2fe12f 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java @@ -86,6 +86,6 @@ void testWithHsqldb() throws SQLException { final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() .getChannelInstance(sqlToStreamTask.getInputChannel(0)); - assertEquals("SELECT * FROM testA ORDER BY col0 DESC;", sqlQueryChannelInstance.getSqlQuery()); + assertEquals("SELECT * FROM testA ORDER BY col0 DESC", sqlQueryChannelInstance.getSqlQuery()); } } diff --git a/wayang-platforms/wayang-presto/pom.xml b/wayang-platforms/wayang-presto/pom.xml new file mode 100644 index 000000000..8fbd93d6e --- /dev/null +++ b/wayang-platforms/wayang-presto/pom.xml @@ -0,0 +1,76 @@ + + + + 4.0.0 + + + wayang-platforms + org.apache.wayang + 1.1.2-SNAPSHOT + + + wayang-presto + + Wayang Platform Presto + + Wayang implementation of the operators to be working with the platform "Presto" + + + + org.apache.wayang.platform.presto + 0.289 + + + + + com.facebook.presto + presto-jdbc + ${presto.version} + + + org.apache.wayang + wayang-basic + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-jdbc-template + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-spark + 1.1.2-SNAPSHOT + + + org.apache.wayang + wayang-api-scala-java + 1.1.2-SNAPSHOT + test + + + org.junit.jupiter + junit-jupiter + 5.10.2 + test + + + + diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/Presto.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/Presto.java new file mode 100644 index 000000000..3378e89fa --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/Presto.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto; + +import org.apache.wayang.presto.platform.PrestoPlatform; +import org.apache.wayang.presto.plugin.PrestoConversionsPlugin; +import org.apache.wayang.presto.plugin.PrestoPlugin; + +/** + * Entry point that exposes the relevant components of the Presto platform. + * + *

Typical usage: + *

{@code
+ *   new WayangContext(config)
+ *       .withPlugin(Java.basicPlugin())
+ *       .withPlugin(Presto.plugin());
+ * }
+ */ +public class Presto { + + private static final PrestoPlugin PLUGIN = new PrestoPlugin(); + + private static final PrestoConversionsPlugin CONVERSIONS_PLUGIN = new PrestoConversionsPlugin(); + + /** + * @return the {@link PrestoPlugin} (operator mappings + channel conversions) + */ + public static PrestoPlugin plugin() { + return PLUGIN; + } + + /** + * @return the {@link PrestoConversionsPlugin} (channel conversions only) + */ + public static PrestoConversionsPlugin conversionPlugin() { + return CONVERSIONS_PLUGIN; + } + + /** + * @return the {@link PrestoPlatform} + */ + public static PrestoPlatform platform() { + return PrestoPlatform.getInstance(); + } + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/channels/ChannelConversions.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/channels/ChannelConversions.java new file mode 100644 index 000000000..52447162b --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/channels/ChannelConversions.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.channels; + +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.optimizer.channels.DefaultChannelConversion; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.jdbc.operators.SqlToRddOperator; +import org.apache.wayang.jdbc.operators.SqlToStreamOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; +import org.apache.wayang.spark.channels.RddChannel; + +import java.util.Arrays; +import java.util.Collection; + +/** + * {@link ChannelConversion}s that materialise a Presto SQL query result into a + * Java {@link StreamChannel} or a Spark {@link RddChannel}. + */ +public class ChannelConversions { + + public static final ChannelConversion SQL_TO_STREAM_CONVERSION = new DefaultChannelConversion( + PrestoPlatform.getInstance().getSqlQueryChannelDescriptor(), + StreamChannel.DESCRIPTOR, + () -> new SqlToStreamOperator(PrestoPlatform.getInstance()) + ); + + public static final ChannelConversion SQL_TO_UNCACHED_RDD_CONVERSION = new DefaultChannelConversion( + PrestoPlatform.getInstance().getSqlQueryChannelDescriptor(), + RddChannel.UNCACHED_DESCRIPTOR, + () -> new SqlToRddOperator(PrestoPlatform.getInstance()) + ); + + public static final Collection ALL = Arrays.asList( + SQL_TO_STREAM_CONVERSION, + SQL_TO_UNCACHED_RDD_CONVERSION + ); + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/FilterMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/FilterMapping.java new file mode 100644 index 000000000..c9926760a --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/FilterMapping.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoFilterOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link FilterOperator} (with a SQL-implementable predicate) to a + * {@link PrestoFilterOperator}. + */ +@SuppressWarnings("unchecked") +public class FilterMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "filter", new FilterOperator<>(null, DataSetType.createDefault(Record.class)), false + ).withAdditionalTest(op -> op.getPredicateDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new PrestoFilterOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/GlobalReduceMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/GlobalReduceMapping.java new file mode 100644 index 000000000..f49a766ef --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/GlobalReduceMapping.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoGlobalReduceOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link GlobalReduceOperator} (with a SQL-implementable reduction) to a + * {@link PrestoGlobalReduceOperator}. + */ +@SuppressWarnings("unchecked") +public class GlobalReduceMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "reduce", new GlobalReduceOperator(null, DataSetType.createDefault(Record.class)), false) + .withAdditionalTest(op -> op.getReduceDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PrestoGlobalReduceOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/JoinMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/JoinMapping.java new file mode 100644 index 000000000..cdb6bd665 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/JoinMapping.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoJoinOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link JoinOperator} whose key descriptors are SQL-implementable to a + * {@link PrestoJoinOperator}. + */ +@SuppressWarnings("unchecked") +public class JoinMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "join", + new JoinOperator( + null, + null, + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class) + ), + false + ) + .withAdditionalTest(op -> op.getKeyDescriptor0() instanceof TransformationDescriptor) + .withAdditionalTest(op -> op.getKeyDescriptor1() instanceof TransformationDescriptor) + .withAdditionalTest(op -> op.getKeyDescriptor0().getSqlImplementation() != null) + .withAdditionalTest(op -> op.getKeyDescriptor1().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PrestoJoinOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/Mappings.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/Mappings.java new file mode 100644 index 000000000..9c8e0f42b --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/Mappings.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.core.mapping.Mapping; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Register of the {@link Mapping}s supported on the Presto platform. + */ +public class Mappings { + + public static final Collection ALL = Arrays.asList( + new FilterMapping(), + new GlobalReduceMapping(), + new JoinMapping(), + new ProjectionMapping(), + new ReduceByMapping(), + new SortMapping(), + new TableSinkMapping() + ); + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ProjectionMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ProjectionMapping.java new file mode 100644 index 000000000..d05a663de --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ProjectionMapping.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoProjectionOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link MapOperator} that carries a {@link ProjectionDescriptor} to a + * {@link PrestoProjectionOperator}. + */ +public class ProjectionMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance())); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern> operatorPattern = new OperatorPattern<>( + "projection", + new MapOperator<>( + null, + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getFunctionDescriptor() instanceof ProjectionDescriptor) + .withAdditionalTest(op -> op.getNumInputs() == 1); // No broadcasts. + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PrestoProjectionOperator(matchedOperator).at(epoch)); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ReduceByMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ReduceByMapping.java new file mode 100644 index 000000000..fe2c3776c --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/ReduceByMapping.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoReduceByOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link ReduceByOperator} (with SQL-implementable key and reduction) to + * a {@link PrestoReduceByOperator}. + */ +@SuppressWarnings("unchecked") +public class ReduceByMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "reduceBy", + new ReduceByOperator(null, null, DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getKeyDescriptor().getSqlImplementation() != null + && op.getReduceDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PrestoReduceByOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/SortMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/SortMapping.java new file mode 100644 index 000000000..f27c35657 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/SortMapping.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoSortOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link SortOperator} (with a SQL-implementable sort key) to a + * {@link PrestoSortOperator}. + */ +@SuppressWarnings("unchecked") +public class SortMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "sort", + new SortOperator(null, DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(op -> op.getKeyDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PrestoSortOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/TableSinkMapping.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/TableSinkMapping.java new file mode 100644 index 000000000..cac616d9f --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/mapping/TableSinkMapping.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.mapping; + +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.presto.operators.PrestoTableSinkOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Maps a {@link TableSink} to a {@link PrestoTableSinkOperator}. + */ +@SuppressWarnings("unchecked") +public class TableSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PrestoPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", new TableSink<>(null, null, null), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new PrestoTableSinkOperator(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoExecutionOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoExecutionOperator.java new file mode 100644 index 000000000..fa58ab1cc --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoExecutionOperator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.jdbc.operators.JdbcExecutionOperator; +import org.apache.wayang.presto.platform.PrestoPlatform; + +/** + * Marker for {@link JdbcExecutionOperator}s that run on the {@link PrestoPlatform}. + */ +public interface PrestoExecutionOperator extends JdbcExecutionOperator { + + @Override + default PrestoPlatform getPlatform() { + return PrestoPlatform.getInstance(); + } + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoFilterOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoFilterOperator.java new file mode 100644 index 000000000..bb417f949 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoFilterOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.jdbc.operators.JdbcFilterOperator; + +/** + * Presto implementation of the {@link FilterOperator}. The predicate is pushed + * down as a SQL {@code WHERE} clause via its {@code sqlImplementation}. + */ +public class PrestoFilterOperator extends JdbcFilterOperator implements PrestoExecutionOperator { + + public PrestoFilterOperator(PredicateDescriptor predicateDescriptor) { + super(predicateDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoFilterOperator(FilterOperator that) { + super(that); + } + + @Override + protected PrestoFilterOperator createCopy() { + return new PrestoFilterOperator(this); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoGlobalReduceOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoGlobalReduceOperator.java new file mode 100644 index 000000000..f0db95633 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoGlobalReduceOperator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; + +/** + * Presto implementation of the {@link GlobalReduceOperator}. The reduction is + * pushed down as a SQL aggregate (e.g. {@code SUM(amount)}) via its + * {@code sqlImplementation}. + */ +public class PrestoGlobalReduceOperator extends JdbcGlobalReduceOperator implements PrestoExecutionOperator { + + public PrestoGlobalReduceOperator(ReduceDescriptor reduceDescriptor) { + super(reduceDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoGlobalReduceOperator(GlobalReduceOperator that) { + super(that); + } + + @Override + protected PrestoGlobalReduceOperator createCopy() { + return new PrestoGlobalReduceOperator(this); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoJoinOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoJoinOperator.java new file mode 100644 index 000000000..2f97af71a --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoJoinOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcJoinOperator; + +/** + * Presto implementation of the {@link JoinOperator}. The two key descriptors + * carry the {@code (table, keyColumns)} SQL implementation that the base class + * renders into a {@code JOIN ... ON ...} clause. + * + * @param type of the join key + */ +public class PrestoJoinOperator extends JdbcJoinOperator implements PrestoExecutionOperator { + + public PrestoJoinOperator( + TransformationDescriptor keyDescriptor0, + TransformationDescriptor keyDescriptor1) { + super(keyDescriptor0, keyDescriptor1); + } + + public PrestoJoinOperator(JoinOperator that) { + super(that); + } + + @Override + protected PrestoJoinOperator createCopy() { + return new PrestoJoinOperator(this); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoProjectionOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoProjectionOperator.java new file mode 100644 index 000000000..566a65b6f --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoProjectionOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.jdbc.operators.JdbcProjectionOperator; + +/** + * Presto implementation of a column projection. The selected fields are pushed + * down as the SQL {@code SELECT} list. + */ +public class PrestoProjectionOperator extends JdbcProjectionOperator implements PrestoExecutionOperator { + + public PrestoProjectionOperator(String... fieldNames) { + super(fieldNames); + } + + public PrestoProjectionOperator(ProjectionDescriptor functionDescriptor) { + super(functionDescriptor); + } + + public PrestoProjectionOperator(MapOperator that) { + super(that); + } + + @Override + protected PrestoProjectionOperator createCopy() { + return new PrestoProjectionOperator(this); + } + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoReduceByOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoReduceByOperator.java new file mode 100644 index 000000000..b61587a76 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoReduceByOperator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcReduceByOperator; + +/** + * Presto implementation of the {@link ReduceByOperator}. The grouping key and + * the reduction are pushed down as a SQL {@code GROUP BY} plus aggregate (e.g. + * {@code SELECT region, SUM(amount) ... GROUP BY region}). + */ +public class PrestoReduceByOperator extends JdbcReduceByOperator implements PrestoExecutionOperator { + + public PrestoReduceByOperator(TransformationDescriptor keyDescriptor, + ReduceDescriptor reduceDescriptor) { + super(keyDescriptor, reduceDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoReduceByOperator(ReduceByOperator that) { + super(that); + } + + @Override + protected PrestoReduceByOperator createCopy() { + return new PrestoReduceByOperator(this); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoSortOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoSortOperator.java new file mode 100644 index 000000000..1be614bf2 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoSortOperator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcSortOperator; + +/** + * Presto implementation of the {@link SortOperator}. The sort key and direction + * are pushed down as a SQL {@code ORDER BY} clause via its {@code sqlImplementation}. + */ +public class PrestoSortOperator extends JdbcSortOperator implements PrestoExecutionOperator { + + public PrestoSortOperator(TransformationDescriptor keyDescriptor) { + super(keyDescriptor); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoSortOperator(SortOperator that) { + super(that); + } + + @Override + protected PrestoSortOperator createCopy() { + return new PrestoSortOperator(this); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSinkOperator.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSinkOperator.java new file mode 100644 index 000000000..cff0d7e10 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSinkOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator; + +/** + * Presto implementation of the {@link JdbcTableSinkOperator}. The sink stays + * entirely within Presto: the composed query is wrapped in a + * {@code CREATE TABLE ... AS} (mode {@code overwrite}) or {@code INSERT INTO ...} + * statement. + * + *

Table names follow Presto's three-part convention + * {@code catalog.schema.table} (e.g. {@code memory.sales.orders}). + */ +public class PrestoTableSinkOperator extends JdbcTableSinkOperator implements PrestoExecutionOperator { + + public PrestoTableSinkOperator(String tableName, String[] columnNames) { + super(tableName, columnNames); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoTableSinkOperator(TableSink that) { + super(that); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSource.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSource.java new file mode 100644 index 000000000..e67d3cec9 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/operators/PrestoTableSource.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.operators; + +import org.apache.wayang.basic.operators.TableSource; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.jdbc.operators.JdbcTableSource; + +import java.util.List; + +/** + * Presto implementation of the {@link TableSource}. + * + *

Table names follow Presto's three-part convention + * {@code catalog.schema.table} (e.g. {@code memory.sales.orders}). + */ +public class PrestoTableSource extends JdbcTableSource implements PrestoExecutionOperator { + + /** + * @see TableSource#TableSource(String, String...) + */ + public PrestoTableSource(String tableName, String... columnNames) { + super(tableName, columnNames); + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public PrestoTableSource(JdbcTableSource that) { + super(that); + } + + @Override + public List getSupportedInputChannels(int index) { + throw new UnsupportedOperationException("This operator has no input channels."); + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/platform/PrestoPlatform.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/platform/PrestoPlatform.java new file mode 100644 index 000000000..cddfa19d6 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/platform/PrestoPlatform.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.platform; + +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate; + +/** + * {@link Platform} implementation for Presto (PrestoDB). + * + *

The {@code configName} {@code "presto"} makes Wayang resolve every property + * with the {@code wayang.presto.*} prefix — connection ({@code .jdbc.url}, + * {@code .jdbc.user}, {@code .jdbc.password}), the cost model, and the hardware + * profile — with defaults loaded from {@code wayang-presto-defaults.properties}. + */ +public class PrestoPlatform extends JdbcPlatformTemplate { + + private static final String PLATFORM_NAME = "Presto"; + + private static final String CONFIG_NAME = "presto"; + + private static PrestoPlatform instance = null; + + public static PrestoPlatform getInstance() { + if (instance == null) { + instance = new PrestoPlatform(); + } + return instance; + } + + protected PrestoPlatform() { + super(PLATFORM_NAME, CONFIG_NAME); + } + + @Override + public String getJdbcDriverClassName() { + return "com.facebook.presto.jdbc.PrestoDriver"; + } + +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoConversionsPlugin.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoConversionsPlugin.java new file mode 100644 index 000000000..f72eea0e5 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoConversionsPlugin.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.plugin; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.core.plugin.Plugin; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.presto.channels.ChannelConversions; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Provides only the {@link ChannelConversion}s for the {@link PrestoPlatform} + * (no operator {@link Mapping}s) — used to make Presto results consumable by + * other platforms without enabling operator pushdown. + */ +public class PrestoConversionsPlugin implements Plugin { + + @Override + public Collection getRequiredPlatforms() { + return Arrays.asList(PrestoPlatform.getInstance(), JavaPlatform.getInstance()); + } + + @Override + public Collection getMappings() { + return Collections.emptyList(); + } + + @Override + public Collection getChannelConversions() { + return ChannelConversions.ALL; + } + + @Override + public void setProperties(Configuration configuration) { + } +} diff --git a/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoPlugin.java b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoPlugin.java new file mode 100644 index 000000000..3755be2cf --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/java/org/apache/wayang/presto/plugin/PrestoPlugin.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto.plugin; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.optimizer.channels.ChannelConversion; +import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.platform.Platform; +import org.apache.wayang.core.plugin.Plugin; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.presto.channels.ChannelConversions; +import org.apache.wayang.presto.mapping.Mappings; +import org.apache.wayang.presto.platform.PrestoPlatform; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Enables Wayang {@link Operator}s to be pushed down onto the {@link PrestoPlatform}. + */ +public class PrestoPlugin implements Plugin { + + @Override + public Collection getRequiredPlatforms() { + return Arrays.asList(PrestoPlatform.getInstance(), JavaPlatform.getInstance()); + } + + @Override + public Collection getMappings() { + return Mappings.ALL; + } + + @Override + public Collection getChannelConversions() { + return ChannelConversions.ALL; + } + + @Override + public void setProperties(Configuration configuration) { + } +} diff --git a/wayang-platforms/wayang-presto/src/main/resources/wayang-presto-defaults.properties b/wayang-platforms/wayang-presto/src/main/resources/wayang-presto-defaults.properties new file mode 100644 index 000000000..f0e165964 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/main/resources/wayang-presto-defaults.properties @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Connection (override per deployment in wayang.properties) +# wayang.presto.jdbc.url = jdbc:presto://localhost:8080 +# wayang.presto.jdbc.user = test +# wayang.presto.jdbc.password = +wayang.presto.jdbc.driverName = com.facebook.presto.jdbc.PrestoDriver + +# Hardware profile used by LoadProfileToTimeConverter. +# Presto runs as a distributed MPP cluster; model a representative coordinator node. +wayang.presto.cpu.mhz = 2700 +wayang.presto.cores = 4 +wayang.presto.costs.fix = 0.0 +wayang.presto.costs.per-ms = 1.0 + +# ── Cost model ──────────────────────────────────────────────────────────────── +# +# Formula: cpu = α * rows + β +# +# Presto is a distributed MPP engine: low per-row cost (small α) because scans +# are parallelised across workers, but a noticeable fixed overhead (larger β) +# from query planning and cluster coordination. +# +# α = 10 — parallel scan; per-row cost is cheap +# β = 800k — cluster startup + query dispatch overhead +# +# These are initial estimates; tune after real benchmarks by fitting the +# template formula on measured data and updating the 'load' key. +# ────────────────────────────────────────────────────────────────────────────── + +wayang.presto.tablesource.load.template = {\ + "type":"mathex", "in":0, "out":1,\ + "cpu":"?*out0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.tablesource.load = {\ + "in":0, "out":1,\ + "cpu":"${10*out0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.filter.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.filter.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.projection.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.projection.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.join.load.template = {\ + "type":"mathex", "in":2, "out":1,\ + "cpu":"?*in0 + ?*in1 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.join.load = {\ + "in":2, "out":1,\ + "cpu":"${10*in0 + 10*in1 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.globalreduce.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.globalreduce.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.reduceby.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.reduceby.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.sort.load.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.sort.load = {\ + "in":1, "out":1,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.tablesink.load.template = {\ + "type":"mathex", "in":1, "out":0,\ + "cpu":"?*in0 + ?",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.tablesink.load = {\ + "in":1, "out":0,\ + "cpu":"${10*in0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} + +wayang.presto.sqltostream.load.query.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*out0 + ?"\ +} +wayang.presto.sqltostream.load.query = {\ + "in":1, "out":1,\ + "cpu":"${10*out0 + 800000}",\ + "ram":"0",\ + "p":0.9\ +} +wayang.presto.sqltostream.load.output.template = {\ + "type":"mathex", "in":1, "out":1,\ + "cpu":"?*out0"\ +} +wayang.presto.sqltostream.load.output = {\ + "in":1, "out":1,\ + "cpu":"${10*out0}",\ + "ram":"0",\ + "p":0.9\ +} diff --git a/wayang-platforms/wayang-presto/src/test/java/org/apache/wayang/presto/PrestoOperatorsIT.java b/wayang-platforms/wayang-presto/src/test/java/org/apache/wayang/presto/PrestoOperatorsIT.java new file mode 100644 index 000000000..d92f30cf6 --- /dev/null +++ b/wayang-platforms/wayang-presto/src/test/java/org/apache/wayang/presto/PrestoOperatorsIT.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.presto; + +import org.apache.wayang.api.DataQuantaBuilder; +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.basic.function.ProjectionDescriptor; +import org.apache.wayang.basic.operators.FilterOperator; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.basic.operators.MapOperator; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.function.FunctionDescriptor; +import org.apache.wayang.core.function.PredicateDescriptor; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; +import org.apache.wayang.core.types.DataUnitType; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.presto.operators.PrestoProjectionOperator; +import org.apache.wayang.presto.operators.PrestoTableSource; +import org.apache.wayang.presto.platform.PrestoPlatform; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end integration tests for every operator the Presto platform implements, + * driven through the Wayang API against a live PrestoDB cluster (in-memory + * connector). + * + *

Coverage: {@code TableSource}, {@code Filter}, {@code Projection}, + * {@code Join}, {@code GlobalReduce}, {@code ReduceBy}, {@code Sort}, + * and {@code TableSink}, plus the same operators driven through the high-level + * {@link JavaPlanBuilder} API. Every Wayang plan ends in a Presto table sink so + * the execution itself does not require the Java plugin — only {@code Presto.plugin()} + * is registered. Result assertions use plain JDBC only after the Wayang execution + * has completed; the sink table's existence and contents prove that the composed + * {@code CREATE TABLE ... AS SELECT} ran entirely inside Presto. + * + *

Prerequisites: a Presto reachable at {@code PRESTO_HOST:PRESTO_PORT} + * (defaults {@code localhost:8081}) with the {@code memory} connector enabled — + * e.g. {@code cd presto-setup && docker compose up -d}. If Presto is not reachable + * the whole class is skipped (not failed). + * + *

+ *   JAVA_HOME=<jdk17> mvn -o test -pl wayang-platforms/wayang-presto \
+ *     -Dtest=PrestoOperatorsIT -Dsurefire.failIfNoSpecifiedTests=false \
+ *     -Drat.skip=true -Dlicense.skip=true -Pskip-prerequisite-check
+ * 
+ */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class PrestoOperatorsIT { + + private static final String HOST = System.getenv().getOrDefault("PRESTO_HOST", "localhost"); + private static final int PORT = Integer.parseInt(System.getenv().getOrDefault("PRESTO_PORT", "8081")); + private static final String USER = System.getenv().getOrDefault("PRESTO_USER", "test"); + private static final String JDBC_URL = String.format("jdbc:presto://%s:%d/memory", HOST, PORT); + + private static final String SCHEMA = "memory.wayang_it"; + private static final String ORDERS = SCHEMA + ".orders"; + private static final String CUSTOMERS = SCHEMA + ".customers"; + private static final String SINK_TABLE_NAME = "operator_result"; + private static final String SINK_TABLE = SCHEMA + "." + SINK_TABLE_NAME; + private static final String[] JOIN_COLUMNS = { + "order_id", "customer_id", "region", "amount", "cust_id", "name", "tier" + }; + private static final String JOIN_FLATTEN_NAME = "Presto test-only join flatten"; + + private static boolean prestoAvailable = false; + + // Lifecycle + + @BeforeAll + static void setUp() throws Exception { + try (Connection probe = jdbc()) { + probe.createStatement().execute("SELECT 1"); + prestoAvailable = true; + } catch (Exception e) { + System.err.println("[PrestoOperatorsIT] Presto not reachable (" + e.getMessage() + ") — skipping."); + return; + } + try (Connection c = jdbc()) { + Statement st = c.createStatement(); + st.execute("CREATE SCHEMA IF NOT EXISTS " + SCHEMA); + + st.execute("DROP TABLE IF EXISTS " + ORDERS); + st.execute("CREATE TABLE " + ORDERS + + " (order_id BIGINT, customer_id BIGINT, region VARCHAR, amount DOUBLE)"); + // 120000 rows from a 6-row VALUES list crossed with sequence(1,10000) and a + // 2-row doubler. Sourcing from VALUES (not the table itself) avoids reading + + // writing the same memory table in one statement. AMER = 60000. + st.execute("INSERT INTO " + ORDERS + " (order_id, customer_id, region, amount) " + + "SELECT CAST(b.order_id AS BIGINT), CAST(b.customer_id AS BIGINT), " + + " b.region, CAST(b.amount AS DOUBLE) " + + "FROM UNNEST(sequence(1, 10000)) AS s(n) " + + "CROSS JOIN (VALUES (1), (2)) AS d(k) " + + "CROSS JOIN (VALUES " + + " (1, 100, 'AMER', 2200.0)," + + " (2, 101, 'EMEA', 800.5)," + + " (3, 100, 'AMER', 680.5)," + + " (4, 102, 'APAC', 1500.0)," + + " (5, 101, 'EMEA', 1100.0)," + + " (6, 100, 'AMER', 950.25)" + + ") AS b(order_id, customer_id, region, amount)"); + + // customers' key column is `cust_id` (not customer_id) so the flattened + // join projection has no duplicate column name in CREATE TABLE AS SELECT. + st.execute("DROP TABLE IF EXISTS " + CUSTOMERS); + st.execute("CREATE TABLE " + CUSTOMERS + + " (cust_id BIGINT, name VARCHAR, tier VARCHAR)"); + st.execute("INSERT INTO " + CUSTOMERS + " VALUES " + + "(CAST(100 AS BIGINT), 'Acme', 'GOLD')," + + "(101, 'Globex', 'SILVER')," + + "(102, 'Initech', 'BRONZE')"); + } + System.out.println("[PrestoOperatorsIT] Connected to Presto at " + JDBC_URL + " — fixtures created."); + } + + @AfterAll + static void tearDown() { + if (!prestoAvailable) return; + try (Connection c = jdbc()) { + Statement st = c.createStatement(); + st.execute("DROP TABLE IF EXISTS " + ORDERS); + st.execute("DROP TABLE IF EXISTS " + CUSTOMERS); + st.execute("DROP TABLE IF EXISTS " + SINK_TABLE); + } catch (Exception e) { + System.err.println("[PrestoOperatorsIT] cleanup failed: " + e.getMessage()); + } + } + + // Tests (one per operator) + + /** TableSource: full scan returns every row. */ + @Test + @Order(1) + void tableSource() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "TableSource should return all orders"); + assertSqlReachedPresto("SELECT * FROM " + ORDERS); + } + + /** Filter: WHERE region = 'AMER' pushed to Presto. */ + @Test + @Order(2) + void filter() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), "60000 AMER orders expected"); + assertEquals(0, queryLong("SELECT count_if(region <> 'AMER') FROM " + SINK_TABLE), "all rows AMER"); + assertSqlReachedPresto("WHERE region = 'AMER'"); + } + + /** Projection (+filter): only the projected columns are fetched. */ + @Test + @Order(3) + void projection() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + MapOperator projection = new MapOperator<>( + ProjectionDescriptor.createForRecords( + new RecordType("order_id", "customer_id", "region", "amount"), + "region", "amount"), + DataSetType.createDefault(Record.class), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, projection, 0); + projection.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), "60000 AMER rows expected"); + assertEquals(2, columnCount(SINK_TABLE), "projection keeps only 2 columns"); + assertSqlReachedPresto("SELECT region, amount FROM " + ORDERS); + } + + /** + * Join: orders and customers on customer_id, flattened into Presto SQL via a + * test-only mapping so the whole plan stays in Presto. + */ + @Test + @Order(4) + void join() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource orders = new PrestoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount"); + PrestoTableSource customers = new PrestoTableSource( + CUSTOMERS, "cust_id", "name", "tier"); + JoinOperator join = new JoinOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(1)), Record.class, Record.class + ).withSqlImplementation(ORDERS, "customer_id"), + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(0)), Record.class, Record.class + ).withSqlImplementation(CUSTOMERS, "cust_id")); + MapOperator, Record> flatten = joinFlattenOperator(); + TableSink sink = tableSink(JOIN_COLUMNS); + orders.connectTo(0, join, 0); + customers.connectTo(0, join, 1); + join.connectTo(0, flatten, 0); + flatten.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "join should yield one row per order"); + assertEquals(0, queryLong("SELECT count_if(customer_id <> cust_id) FROM " + SINK_TABLE), + "joined customer IDs should match"); + assertSqlReachedPresto("JOIN " + CUSTOMERS); + } + + /** GlobalReduce: SUM(amount) over the whole table collapses to a single row. */ + @Test + @Order(5) + void globalReduce() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + GlobalReduceOperator reduce = new GlobalReduceOperator<>( + new ReduceDescriptor<>((a, b) -> a, Record.class) + .withSqlImplementation("SUM(amount) AS total_amount"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("total_amount"); + src.connectTo(0, reduce, 0); + reduce.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertSingleDoubleResult(144_625_000.0, "global reduce must collapse to a single row"); + assertSqlReachedPresto("SELECT SUM(amount) AS total_amount FROM " + ORDERS); + } + + /** ReduceBy: SUM(amount) GROUP BY region yields one row per region. */ + @Test + @Order(6) + void reduceBy() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + ReduceByOperator reduceBy = new ReduceByOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(2)), Record.class, Record.class + ).withSqlImplementation("region", "region"), + new ReduceDescriptor<>((a, b) -> a, Record.class) + .withSqlImplementation("SUM(amount) AS total_amount"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("region", "total_amount"); + src.connectTo(0, reduceBy, 0); + reduceBy.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + Map sums = readRegionSums(); + assertEquals(3, sums.size(), "one row per region expected"); + assertEquals(76_615_000.0, sums.get("AMER"), 0.01); + assertEquals(38_010_000.0, sums.get("EMEA"), 0.01); + assertEquals(30_000_000.0, sums.get("APAC"), 0.01); + assertSqlReachedPresto("GROUP BY region"); + } + + /** Sort: ORDER BY amount ASC pushed to Presto. */ + @Test + @Order(7) + void sort() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + SortOperator sort = new SortOperator<>( + new TransformationDescriptor<>( + (Record r) -> new Record(r.getField(3)), Record.class, Record.class + ).withSqlImplementation("amount", "ASC"), + DataSetType.createDefault(Record.class)); + TableSink sink = tableSink("order_id", "customer_id", "region", "amount"); + src.connectTo(0, sort, 0); + sort.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "sort must not change the cardinality"); + assertEquals(680.5, queryDouble("SELECT min(amount) FROM " + SINK_TABLE), 0.001); + assertEquals(2200.0, queryDouble("SELECT max(amount) FROM " + SINK_TABLE), 0.001); + assertSqlReachedPresto("ORDER BY amount ASC"); + } + + /** TableSink: filter + sink composed into a single CREATE TABLE AS SELECT in Presto. */ + @Test + @Order(8) + void tableSink() throws Exception { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + PrestoTableSource src = new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount"); + FilterOperator filter = new FilterOperator<>( + new PredicateDescriptor<>( + (Record r) -> "AMER".equals(r.getField(2)), Record.class + ).withSqlImplementation("region = 'AMER'")); + TableSink sink = new TableSink<>( + new Properties(), "overwrite", SINK_TABLE, + "order_id", "customer_id", "region", "amount"); + src.connectTo(0, filter, 0); + filter.connectTo(0, sink, 0); + + wayangContext().execute(new WayangPlan(sink)); + + try (Connection c = jdbc()) { + ResultSet rs = c.createStatement().executeQuery( + "SELECT count(*), count_if(region <> 'AMER') FROM " + SINK_TABLE); + rs.next(); + assertEquals(60000, rs.getLong(1), "sink table must hold all AMER orders"); + assertEquals(0, rs.getLong(2), "sink table must hold only AMER orders"); + } + assertSqlReachedPresto("CREATE TABLE " + SINK_TABLE + " AS"); + } + + // JavaPlanBuilder combination tests + + /** JavaPlanBuilder: read, filter, project, write — all in Presto. */ + @Test + @Order(9) + void javaPlanBuilderReadTableFilterProjection() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + new JavaPlanBuilder(wayangContext(), "Presto JavaPlanBuilder readTable integration test") + .readTable(new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .asRecords() + .projectRecords(new String[]{"order_id", "amount"}) + .writeTable(SINK_TABLE, "overwrite", new String[]{"order_id", "amount"}, new Properties()); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "60000 projected AMER orders expected"); + assertEquals(0, queryLong( + "SELECT count_if(amount NOT IN (2200.0, 680.5, 950.25)) FROM " + SINK_TABLE), + "only AMER order amounts should remain"); + assertSqlReachedPresto("SELECT order_id, amount FROM " + ORDERS + " WHERE region = 'AMER'"); + } + + /** JavaPlanBuilder: filter + global reduce, written in Presto. */ + @Test + @Order(10) + void javaPlanBuilderReadTableFilterGlobalReduce() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + new JavaPlanBuilder(wayangContext(), "Presto JavaPlanBuilder global reduce integration test") + .readTable(new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .reduce((left, right) -> left) + .withSqlUdf("SUM(amount) AS total_amount") + .writeTable(SINK_TABLE, "overwrite", new String[]{"total_amount"}, new Properties()); + + assertSingleDoubleResult(76_615_000.0, "global reduction should return one row"); + assertSqlReachedPresto("SELECT SUM(amount) AS total_amount FROM " + ORDERS + " WHERE region = 'AMER'"); + } + + /** JavaPlanBuilder: reduce-by + sort, written in Presto. */ + @Test + @Order(11) + void javaPlanBuilderReadTableReduceBySort() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + new JavaPlanBuilder(wayangContext(), "Presto JavaPlanBuilder reduce-by and sort integration test") + .readTable(new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount")) + .reduceByKey( + record -> new Record(record.getField(2)), + (left, right) -> left) + .withSqlUdfs("region", "SUM(amount) AS total_amount") + .sort(record -> new Record(record.getField(0))) + .withSqlUdf("region", "ASC") + .writeTable(SINK_TABLE, "overwrite", new String[]{"region", "total_amount"}, new Properties()); + + Map sums = readRegionSums(); + assertEquals(3, sums.size(), "one row per region expected"); + assertTrue(sums.containsKey("AMER") && sums.containsKey("APAC") && sums.containsKey("EMEA")); + assertSqlReachedPresto( + "SELECT region,SUM(amount) AS total_amount FROM " + ORDERS + + " GROUP BY region ORDER BY region ASC"); + } + + /** JavaPlanBuilder: filtered projection straight into a Presto table sink. */ + @Test + @Order(12) + void javaPlanBuilderReadTableFilterProjectionTableSink() throws Exception { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + new JavaPlanBuilder(wayangContext(), "Presto JavaPlanBuilder table sink integration test") + .readTable(new PrestoTableSource(ORDERS, "order_id", "customer_id", "region", "amount")) + .filter(record -> "AMER".equals(record.getField(2))) + .withSqlUdf("region = 'AMER'") + .asRecords() + .projectRecords(new String[]{"order_id", "amount"}) + .writeTable(SINK_TABLE, "overwrite", new String[]{"order_id", "amount"}, new Properties()); + + assertEquals(60000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "sink table should contain projected AMER orders"); + assertSqlReachedPresto( + "CREATE TABLE " + SINK_TABLE + " AS SELECT order_id, amount FROM " + ORDERS + + " WHERE region = 'AMER'"); + } + + /** JavaPlanBuilder: join two tables, flatten, write — all in Presto. */ + @Test + @Order(13) + void javaPlanBuilderReadTableJoin() { + Assumptions.assumeTrue(prestoAvailable, "Presto not reachable"); + + JavaPlanBuilder plan = new JavaPlanBuilder( + wayangContext(), "Presto JavaPlanBuilder join integration test"); + DataQuantaBuilder orders = plan.readTable(new PrestoTableSource( + ORDERS, "order_id", "customer_id", "region", "amount")); + DataQuantaBuilder customers = plan.readTable(new PrestoTableSource( + CUSTOMERS, "cust_id", "name", "tier")); + + orders + .join( + record -> new Record(record.getField(1)), + customers, + record -> new Record(record.getField(0))) + .withSqlUdfs(ORDERS, "customer_id", CUSTOMERS, "cust_id") + .map(new JoinFlattenFunction()) + .withName(JOIN_FLATTEN_NAME) + .writeTable(SINK_TABLE, "overwrite", JOIN_COLUMNS, new Properties()); + + assertEquals(120000, queryLong("SELECT count(*) FROM " + SINK_TABLE), + "join should yield one row per order"); + assertEquals(0, queryLong("SELECT count_if(customer_id <> cust_id) FROM " + SINK_TABLE), + "joined customer IDs should match"); + assertSqlReachedPresto("JOIN " + CUSTOMERS); + } + + // Helpers + + private WayangContext wayangContext() { + Configuration config = new Configuration(); + config.setProperty("wayang.presto.jdbc.url", JDBC_URL); + config.setProperty("wayang.presto.jdbc.user", USER); + // No password: the Presto JDBC driver rejects an (even empty) password on a + // non-SSL connection. + config.getMappingProvider().addAllToWhitelist( + Collections.singleton(new JoinFlattenMapping())); + return new WayangContext(config) + .withPlugin(Presto.plugin()); + } + + private TableSink tableSink(String... columnNames) { + return new TableSink<>(new Properties(), "overwrite", SINK_TABLE, columnNames); + } + + private static MapOperator, Record> joinFlattenOperator() { + MapOperator, Record> operator = new MapOperator<>( + new TransformationDescriptor<>( + new JoinFlattenFunction(), + DataUnitType.createBasicUnchecked(Tuple2.class), + DataUnitType.createBasic(Record.class)), + DataSetType.createDefaultUnchecked(Tuple2.class), + DataSetType.createDefault(Record.class)); + operator.setName(JOIN_FLATTEN_NAME); + return operator; + } + + private static Record flattenJoinResult(Object joinResult) { + if (joinResult instanceof Record) { + return (Record) joinResult; + } + Tuple2 pair = (Tuple2) joinResult; + Record left = (Record) pair.field0; + Record right = (Record) pair.field1; + return new Record( + left.getField(0), left.getField(1), left.getField(2), left.getField(3), + right.getField(0), right.getField(1), right.getField(2)); + } + + private long queryLong(String sql) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); ResultSet rs = statement.executeQuery(sql)) { + rs.next(); + return rs.getLong(1); + } catch (Exception e) { + throw new RuntimeException("query failed: " + sql, e); + } + } + + private double queryDouble(String sql) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); ResultSet rs = statement.executeQuery(sql)) { + rs.next(); + return rs.getDouble(1); + } catch (Exception e) { + throw new RuntimeException("query failed: " + sql, e); + } + } + + private int columnCount(String table) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + table + " LIMIT 1")) { + return rs.getMetaData().getColumnCount(); + } catch (Exception e) { + throw new RuntimeException("query failed: column count of " + table, e); + } + } + + private void assertSingleDoubleResult(double expected, String message) { + try (Connection c = jdbc(); Statement statement = c.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + SINK_TABLE)) { + assertTrue(rs.next(), message); + assertEquals(expected, rs.getDouble(1), 0.01, message); + assertFalse(rs.next(), message); + } catch (Exception e) { + throw new RuntimeException("query failed: SELECT * FROM " + SINK_TABLE, e); + } + } + + private Map readRegionSums() { + Map sums = new HashMap<>(); + try (Connection c = jdbc(); Statement statement = c.createStatement(); + ResultSet rs = statement.executeQuery("SELECT * FROM " + SINK_TABLE)) { + while (rs.next()) { + sums.put(rs.getString(1), rs.getDouble(2)); + } + return sums; + } catch (Exception e) { + throw new RuntimeException("query failed: SELECT * FROM " + SINK_TABLE, e); + } + } + + private static final class JoinFlattenFunction implements + FunctionDescriptor.SerializableFunction, Record> { + @Override + public Record apply(Tuple2 tuple) { + return flattenJoinResult(tuple); + } + } + + /** Test-only mapping for the unresolved logical join Tuple-to-Record mismatch. */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static final class JoinFlattenMapping implements Mapping { + @Override + public java.util.Collection getTransformations() { + OperatorPattern pattern = new OperatorPattern( + "joinFlatten", + new MapOperator(null, DataSetType.none(), DataSetType.createDefault(Record.class)), + false) + .withAdditionalTest(operator -> JOIN_FLATTEN_NAME.equals(((MapOperator) operator).getName())); + + ReplacementSubplanFactory factory = new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> createPrestoProjection().at(epoch)); + + return Collections.singleton(new PlanTransformation( + SubplanPattern.createSingleton(pattern), + factory, + PrestoPlatform.getInstance())); + } + + private static PrestoProjectionOperator createPrestoProjection() { + ProjectionDescriptor, Record> descriptor = new ProjectionDescriptor<>( + new JoinFlattenFunction(), + Arrays.asList(JOIN_COLUMNS), + DataUnitType.createBasicUnchecked(Tuple2.class), + DataUnitType.createBasic(Record.class)); + MapOperator, Record> projection = new MapOperator<>( + descriptor, + DataSetType.createDefaultUnchecked(Tuple2.class), + DataSetType.createDefault(Record.class)); + projection.setName(JOIN_FLATTEN_NAME); + return new PrestoProjectionOperator((MapOperator) (MapOperator) projection); + } + } + + /** Assert that Presto actually ran a query containing the given fragment. */ + private void assertSqlReachedPresto(String fragment) { + try (Connection c = jdbc()) { + ResultSet rs = c.createStatement().executeQuery( + "SELECT count(*) FROM system.runtime.queries " + + "WHERE state = 'FINISHED' " + + "AND query LIKE '%" + fragment.replace("'", "''") + "%' " + + "AND query NOT LIKE '%system.runtime%'"); + rs.next(); + assertTrue(rs.getLong(1) > 0, "Expected a Presto query containing: " + fragment); + } catch (Exception e) { + throw new RuntimeException("query-history check failed", e); + } + } + + private static Connection jdbc() throws Exception { + Properties p = new Properties(); + p.setProperty("user", USER); + return DriverManager.getConnection(JDBC_URL, p); + } +}