Skip to content

Commit ab934dc

Browse files
committed
Support Supervision.restart for SubFlow's
1 parent 689e30b commit ab934dc

1 file changed

Lines changed: 29 additions & 13 deletions

File tree

stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -512,13 +512,6 @@ import pekko.util.ccompat.JavaConverters._
512512
private var substreamWaitingToBePushed = false
513513
private var substreamCancelled = false
514514

515-
def propagateSubstreamCancel(ex: Throwable): Boolean =
516-
decider(ex) match {
517-
case Supervision.Stop => true
518-
case Supervision.Resume => false
519-
case Supervision.Restart => false
520-
}
521-
522515
override def onPull(): Unit = {
523516
if (substreamSource eq null) {
524517
// can be already pulled from substream in case split after
@@ -610,14 +603,31 @@ import pekko.util.ccompat.JavaConverters._
610603

611604
override def onDownstreamFinish(cause: Throwable): Unit = {
612605
substreamCancelled = true
613-
if (isClosed(in) || propagateSubstreamCancel(cause)) {
614-
cancelStage(cause)
615-
} else {
616-
// Start draining
617-
if (!hasBeenPulled(in)) pull(in)
606+
decider(cause) match {
607+
case Supervision.Stop =>
608+
cancelStage(cause)
609+
case Supervision.Resume =>
610+
if (isClosed(in)) cancelStage(cause)
611+
else {
612+
// Start draining
613+
if (!hasBeenPulled(in)) pull(in)
614+
}
615+
case Supervision.Restart =>
616+
if (isClosed(in)) completeStage()
617+
else {
618+
restartState()
619+
// Start draining
620+
if (!hasBeenPulled(in)) pull(in)
621+
}
618622
}
619623
}
620624

625+
private def restartState(): Unit = {
626+
substreamSource = null
627+
substreamWaitingToBePushed = false
628+
substreamCancelled = false
629+
}
630+
621631
override def onPush(): Unit = {
622632
val elem = grab(in)
623633
try {
@@ -640,7 +650,13 @@ import pekko.util.ccompat.JavaConverters._
640650
decider(ex) match {
641651
case Supervision.Resume => pull(in)
642652
case Supervision.Stop => onUpstreamFailure(ex)
643-
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
653+
case Supervision.Restart =>
654+
if (isClosed(in)) completeStage()
655+
else {
656+
restartState()
657+
// Start draining
658+
if (!hasBeenPulled(in)) pull(in)
659+
}
644660
}
645661
}
646662
}

0 commit comments

Comments
 (0)