[SPARK-56384][SS] Support stream-stream non-outer join in Update mode#55249
[SPARK-56384][SS] Support stream-stream non-outer join in Update mode#55249HeartSaVioR wants to merge 5 commits intoapache:masterfrom
Conversation
…th stream-stream inner join
| throwError(s"$joinType join between two streaming DataFrames/Datasets" + | ||
| s" is not supported in ${outputMode} output mode, only in Append output mode") | ||
| } | ||
| case _ => |
There was a problem hiding this comment.
Can we list the supported join types explicitly?
There was a problem hiding this comment.
Btw we have pattern match at the very next which will handle the failure based on join types. This pattern match is mostly about checking compatibility with output mode.
| condition = Some(attributeWithWatermark === attribute)), | ||
| OutputMode.Update()) | ||
|
|
||
| // Left outer, right outer, full outer, left semi joins |
There was a problem hiding this comment.
This comment seems a little redundant.
There was a problem hiding this comment.
Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.
There was a problem hiding this comment.
I guess the pattern is consistent in the file (that's what Claude said), let's leave it as it is.
There was a problem hiding this comment.
The comment is just duplicating the list on the next line. Please remove it.
If you disagree, please say so, but I don't care what Claude thinks.
There was a problem hiding this comment.
There is a misunderstanding - I think I have commented already about my rationale.
Let me see if this is the consistent pattern within file or we just added this only here. If it's consistent over the file, I guess we can just leave it as it is. If not we can remove it.
I just asked Claude to "check", not Claude to "judge". So it is my disagreement.
There was a problem hiding this comment.
The test suite is large enough so people leverages code comment (not code) as index where the test is placed. With LLM it's going to be less and less necessary and maybe there are some ways to make it better, but I'd rather not break what existing code is trying to do - if we think we have better way, we should consider it as a refactor and review separately.
| // dedup1: filters 1, 2 (already seen), passes only 4 | ||
| // dedup2: filters 2, 3 (already seen), passes only 4 | ||
| // join: only (4, 4) matches |
There was a problem hiding this comment.
nit: I feel like we don't need to have such verbose comments. The query is pretty short, and the test output should be verified to match the query anyway.
There was a problem hiding this comment.
For stateful operators we tend to be very reader friendly, but for the case when watermark doesn't come into play, maybe it's very obvious to think through without guidance. We can remove comments for this test.
| .withColumnRenamed("value", "value1") | ||
| .withColumn("eventTime1", timestamp_seconds($"value1")) |
There was a problem hiding this comment.
withColumn is a syntax sugar to append a new column - it has been a best practice.
There was a problem hiding this comment.
I won't insist, but this really seems unnecessarily verbose. Typical Java style, I suppose.
There was a problem hiding this comment.
Maybe I misunderstood your point? Did you suggest consolidating two lines into one select? Then it makes sense totally. If you just refer to the single withColumn then I'd say withColumn is better than select but a chain of withColumnRenamed -> withColumn isn't really necessary. I'll change.
| } | ||
|
|
||
| // Stream-stream non-outer join produces the same behavior between Append mode and Update mode. | ||
| // We only run a sanity test here rather than replicating the full Append mode test suite. |
There was a problem hiding this comment.
Can we run a loop over the existing append-mode test suite so all the test cases run for both append and update mode?
There was a problem hiding this comment.
OK that works for me. I'll see whether it is running too long (so that I need to have separate suite).
| ) | ||
| } | ||
|
|
||
| test("dedup on both sides -> stream-stream inner join, update mode") { |
There was a problem hiding this comment.
same as below: if the behavior is the same, could we run a loop over test case with different output modes?
There was a problem hiding this comment.
Yes let's do that.
| // Left outer, right outer, full outer, left semi joins | ||
| Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType => | ||
| // Update mode not allowed | ||
| // Left outer, right outer, full outer joins: Update mode not allowed |
There was a problem hiding this comment.
| // Left outer, right outer, full outer joins: Update mode not allowed | |
| // The behavior for unmatched rows in outer joins with update mode hasn't been defined yet. |
There was a problem hiding this comment.
That's good for explaining rationale, but this is a test. If we want to have this comment I'd say we should have it in the place where we block the operation. I'll do that in UnsupportedOperationChecker and keep this comment as it is.
There was a problem hiding this comment.
I added that comment in UnsupportedOperationChecker.
| import testImplicits._ | ||
|
|
||
| test("windowed left semi join") { | ||
| testWithAppendAndUpdate("windowed left semi join") { outputMode => |
There was a problem hiding this comment.
Not to be confused: we do not apply time window aggregation to come up with time window. It's just a projection. That's how this works in Update mode.
|
@funrollloops Would you mind taking a second look? Thanks! |
| condition = Some(attributeWithWatermark === attribute)), | ||
| OutputMode.Update()) | ||
|
|
||
| // Left outer, right outer, full outer, left semi joins |
There was a problem hiding this comment.
The comment is just duplicating the list on the next line. Please remove it.
If you disagree, please say so, but I don't care what Claude thinks.
| left: SparkPlan, | ||
| right: SparkPlan) extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils { | ||
| right: SparkPlan, | ||
| outputMode: Option[OutputMode] = None) |
There was a problem hiding this comment.
Why optional? The outputMode is actually required right, it just defaults to append? We should only default in one place IMO.
There was a problem hiding this comment.
That doesn't really need to be optional. Good point.
There was a problem hiding this comment.
This should actually be optional as we have two phases of constructing the physical node.
When we convert logical plan to physical plan, that "general" rule does not know about output mode. We handle the semantic of "streaming specific concept" in scope to IncrementalExecution, where we assign watermark value to the physical node (via copying). In short, we actually take the same approach how we assign watermark value here.
Though it could slightly open more chance to miss to assign when we have default value, hence I removed the default value.
| .withColumnRenamed("value", "value1") | ||
| .withColumn("eventTime1", timestamp_seconds($"value1")) |
There was a problem hiding this comment.
I won't insist, but this really seems unnecessarily verbose. Typical Java style, I suppose.
What changes were proposed in this pull request?
This PR proposes to support stream-stream non-outer join (Inner/LeftSemi) in Update mode. The expected behavior of Inner/LeftSemi join in Update mode is simply the same with Append mode (since there is no point of early-firing), hence this is bound to the definition of Update mode, "if the query doesn't contain aggregations, it is equivalent to Append mode".
Why are the changes needed?
Previously stream-stream inner join cannot be used in update mode, where there could be some workload benefited by this. e.g. stream-stream inner join, followed by time window aggregation - this workload has different ability based on the output mode. On Update mode, we have early-firing result of time window aggregation from the input of stream-stream join. This PR unblocks the case.
Does this PR introduce any user-facing change?
Yes, stream-stream non-outer join will be supported with Update mode.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude 4.6 Opus