fix: include port id in materialization reader actor id to fix self-join deadlock#4985
fix: include port id in materialization reader actor id to fix self-join deadlock#4985Ma77Ball wants to merge 6 commits intoapache:mainfrom
Conversation
|
@Yicong-Huang please review |
I don't think fixing the port id into the actor's name is the right way to go. port id shouldn't be part of actor name and Actor name should be unique. If we have an operator with 5 input ports, this fix would name the actor to 5 different names. The bug should be on the other lower layer: an actor could have multiple channels, and channels are connected between different ports. I think we did not differentiate channels well, and the fix is deeper than just renames, the gateway logic might also require fix. To fix it you need a deeper understanding of amber. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4985 +/- ##
=========================================
Coverage 42.72% 42.72%
+ Complexity 2185 2184 -1
=========================================
Files 1031 1031
Lines 38152 38156 +4
Branches 4004 4005 +1
=========================================
+ Hits 16302 16304 +2
Misses 20831 20831
- Partials 1019 1021 +2
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
What changes were proposed in this PR?
Fixes the Difference operator hanging when one upstream operator (e.g., a single CSV) is wired to both of its input ports.
The bug
InputPortMaterializationReaderThreadper upstream URI.(uri, workerActorId)only.ChannelIdentity→ FIFO sequence numbers and end-of-channel markers cross-routed → one port never drains → Difference hangs.The fix
PortIdentityinto the actor name:MATERIALIZATION_READER_<uri>_port<n>[i]_<workerActorId>.toPortIdthrough the three callers ofgetFromActorIdForInputPortStorage:ResourceAllocator→globalPortId.portId.AssignPortHandler→msg.portId.InputManager→InputPortMaterializationReaderThread(new ctor field)Any related issues, documentation, or discussions?
Closes: #2588
How was this PR tested?
VirtualIdentityUtilsSpec— checks the new ID format and asserts distinct IDs for the same(uri, worker)but different port IDs.ExpansionGreedyScheduleGeneratorSpec— buildscsv → differencewith both inputs from the same csv; assertslevelSets.size > 1, proving materialization happens and the schedule isn't a deadlocked single-level region.Was this PR authored or co-authored using generative AI tooling?
Co-authored with: Claude Opus 4.7 in compliance with ASF