@@ -103,6 +103,40 @@ func NewStandardDependencyPlanner(
103103 }, nil
104104}
105105
106+ func isAnnotationIndirection (annotationCtx taxonomy.AnnotationCtx ) bool {
107+ _ , isView := annotationCtx .GetView ()
108+ _ , isSubquery := annotationCtx .GetSubquery ()
109+ return isView || isSubquery
110+ }
111+
112+ // orchestrateIndirection handles indirection vertices (views, subqueries, CTEs) in the dependency planner.
113+ // Unlike regular tables that need HTTP acquisition, indirections are already materialized by the
114+ // "create tail" builder and appear in the FROM clause as inline subqueries.
115+ // This method creates a NopBuilder as a placeholder in the execution DAG.
116+ func (dp * standardDependencyPlanner ) orchestrateIndirection (
117+ annotationCtx taxonomy.AnnotationCtx ,
118+ ) (int , error ) {
119+ rc , err := tableinsertioncontainer .NewTableInsertionContainer (
120+ annotationCtx .GetTableMeta (),
121+ dp .handlerCtx .GetSQLEngine (),
122+ dp .handlerCtx .GetTxnCounterMgr (),
123+ )
124+ if err != nil {
125+ return - 1 , err
126+ }
127+ builder := primitivebuilder .NewNopBuilder (
128+ dp .primitiveComposer .GetGraphHolder (),
129+ dp .primitiveComposer .GetTxnCtrlCtrs (),
130+ dp .handlerCtx ,
131+ dp .handlerCtx .GetSQLEngine (),
132+ []string {},
133+ )
134+ dp .execSlice = append (dp .execSlice , builder )
135+ idx := len (dp .execSlice ) - 1
136+ dp .tableSlice = append (dp .tableSlice , rc )
137+ return idx , nil
138+ }
139+
106140func (dp * standardDependencyPlanner ) dataflowEdgeExists (from , to int ) bool {
107141 edges , ok := dp .dataflowToEdges [to ]
108142 if ! ok {
@@ -204,6 +238,7 @@ func (dp *standardDependencyPlanner) Plan() error {
204238 edgeCount , dependencyMax )
205239 }
206240 idsVisited := make (map [int64 ]struct {})
241+ indirectionNodeIDs := make (map [int64 ]struct {})
207242 // first pass: set up AOT stuff
208243 // - stream per edge.
209244 edgeStreams := make (map [dataflow.Edge ]streaming.MapStream )
@@ -218,6 +253,17 @@ func (dp *standardDependencyPlanner) Plan() error {
218253 tableExpr := n .GetTableExpr ()
219254 annotation := n .GetAnnotation ()
220255 dp .annMap [tableExpr ] = annotation
256+ // Indirection nodes (views, subqueries, CTEs) are already materialized
257+ // by the create tail builder; register them and skip acquisition.
258+ if isAnnotationIndirection (annotation ) {
259+ indirectionNodeIDs [n .ID ()] = struct {}{}
260+ idx , indErr := dp .orchestrateIndirection (annotation )
261+ if indErr != nil {
262+ return indErr
263+ }
264+ dp .nodeIDIdxMap [n .ID ()] = idx
265+ continue
266+ }
221267 for _ , e := range edges {
222268 if e .From ().ID () == n .ID () {
223269 insPsc , tcc , insErr := dp .processOrphan (tableExpr , annotation , n )
@@ -228,6 +274,20 @@ func (dp *standardDependencyPlanner) Plan() error {
228274 toNode := e .GetDest ()
229275 toAnnotation := toNode .GetAnnotation ().Clone () // this bodge protects split source vertices
230276 toTableExpr := toNode .GetTableExpr ()
277+ // Handle indirection destination nodes.
278+ if isAnnotationIndirection (toAnnotation ) {
279+ if _ , alreadyHandled := indirectionNodeIDs [toNode .ID ()]; ! alreadyHandled {
280+ indirectionNodeIDs [toNode .ID ()] = struct {}{}
281+ dp .annMap [toTableExpr ] = toAnnotation
282+ toIdx , toIndErr := dp .orchestrateIndirection (toAnnotation )
283+ if toIndErr != nil {
284+ return toIndErr
285+ }
286+ dp .nodeIDIdxMap [toNode .ID ()] = toIdx
287+ }
288+ orderedEdges = append (orderedEdges , e )
289+ continue
290+ }
231291 stream , streamErr := dp .getStreamFromEdge (e , toAnnotation , tcc )
232292 if streamErr != nil {
233293 return streamErr
@@ -251,6 +311,69 @@ func (dp *standardDependencyPlanner) Plan() error {
251311 fromAnnotation := fromNode .GetAnnotation ()
252312 toAnnotation := toNode .GetAnnotation ().Clone () // this bodge protects split source vertices
253313 toTableExpr := toNode .GetTableExpr ()
314+ // For indirection nodes, builders are already created; just wire up edge dependencies.
315+ _ , fromIsIndirection := indirectionNodeIDs [fromNode .ID ()]
316+ _ , toIsIndirection := indirectionNodeIDs [toNode .ID ()]
317+ if fromIsIndirection && toIsIndirection {
318+ // Both sides are indirections; no acquisition or streaming needed.
319+ // Just ensure edge dependencies are registered.
320+ fromIdx := dp .nodeIDIdxMap [fromNode .ID ()]
321+ toIdx := dp .nodeIDIdxMap [toNode .ID ()]
322+ if ! dp .dataflowEdgeExists (fromIdx , toIdx ) {
323+ dp .dataflowToEdges [toIdx ] = append (dp .dataflowToEdges [toIdx ], fromIdx )
324+ }
325+ continue
326+ }
327+ if fromIsIndirection {
328+ // Source is indirection, destination is a regular table.
329+ fromIdx := dp .nodeIDIdxMap [fromNode .ID ()]
330+ toIdx , toBuilderExists := dp .nodeIDIdxMap [toNode .ID ()]
331+ if ! toBuilderExists {
332+ toInsPsc , pscExists := insertPrepearedStatements [toNode .ID ()]
333+ if ! pscExists {
334+ return fmt .Errorf ("unknown insert prepared statement" )
335+ }
336+ dp .annMap [toTableExpr ] = toAnnotation
337+ toAnnotation .SetDynamic ()
338+ arrivingDestinationNodeStream := nodeStreamCollections .GetArriving (toNode .ID ())
339+ departingDestinationNodeStream := nodeStreamCollections .GetDeparting (toNode .ID ())
340+ var toErr error
341+ toIdx , toErr = dp .orchestrate (
342+ - 1 , toAnnotation , toInsPsc , arrivingDestinationNodeStream , departingDestinationNodeStream )
343+ if toErr != nil {
344+ return toErr
345+ }
346+ dp .nodeIDIdxMap [toNode .ID ()] = toIdx
347+ }
348+ if ! dp .dataflowEdgeExists (fromIdx , toIdx ) {
349+ dp .dataflowToEdges [toIdx ] = append (dp .dataflowToEdges [toIdx ], fromIdx )
350+ }
351+ continue
352+ }
353+ if toIsIndirection {
354+ // Destination is indirection, source is a regular table.
355+ toIdx := dp .nodeIDIdxMap [toNode .ID ()]
356+ fromIdx , fromBuilderExists := dp .nodeIDIdxMap [fromNode .ID ()]
357+ if ! fromBuilderExists {
358+ insPsc , pscExists := insertPrepearedStatements [fromNode .ID ()]
359+ if ! pscExists {
360+ return fmt .Errorf ("unknown insert prepared statement" )
361+ }
362+ arrivingSourceNodeStream := nodeStreamCollections .GetArriving (fromNode .ID ())
363+ departingSourceNodeStream := nodeStreamCollections .GetDeparting (fromNode .ID ())
364+ var fromErr error
365+ fromIdx , fromErr = dp .orchestrate (- 1 , fromAnnotation , insPsc , arrivingSourceNodeStream , departingSourceNodeStream )
366+ if fromErr != nil {
367+ return fromErr
368+ }
369+ dp .nodeIDIdxMap [fromNode .ID ()] = fromIdx
370+ }
371+ if ! dp .dataflowEdgeExists (fromIdx , toIdx ) {
372+ dp .dataflowToEdges [toIdx ] = append (dp .dataflowToEdges [toIdx ], fromIdx )
373+ }
374+ continue
375+ }
376+ // Neither side is an indirection; original logic.
254377 departingSourceNodeStream := nodeStreamCollections .GetDeparting (fromNode .ID ())
255378 arrivingDestinationNodeStream := nodeStreamCollections .GetArriving (toNode .ID ())
256379 arrivingSourceNodeStream := nodeStreamCollections .GetArriving (fromNode .ID ())
0 commit comments