File tree Expand file tree Collapse file tree
stream/src/main/scala/org/apache/pekko/stream/impl/fusing Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -511,13 +511,6 @@ import pekko.util.ccompat.JavaConverters._
511511 private var substreamWaitingToBePushed = false
512512 private var substreamCancelled = false
513513
514- def propagateSubstreamCancel (ex : Throwable ): Boolean =
515- decider(ex) match {
516- case Supervision .Stop => true
517- case Supervision .Resume => false
518- case Supervision .Restart => false
519- }
520-
521514 setHandler(
522515 out,
523516 new OutHandler {
@@ -614,11 +607,22 @@ import pekko.util.ccompat.JavaConverters._
614607
615608 override def onDownstreamFinish (cause : Throwable ): Unit = {
616609 substreamCancelled = true
617- if (isClosed(in) || propagateSubstreamCancel(cause)) {
618- cancelStage(cause)
619- } else {
620- // Start draining
621- if (! hasBeenPulled(in)) pull(in)
610+ decider(cause) match {
611+ case Supervision .Stop =>
612+ cancelStage(cause)
613+ case Supervision .Resume =>
614+ if (isClosed(in)) cancelStage(cause)
615+ else {
616+ // Start draining
617+ if (! hasBeenPulled(in)) pull(in)
618+ }
619+ case Supervision .Restart =>
620+ if (isClosed(in)) completeStage()
621+ else {
622+ substreamCancelled = false
623+ // Start draining
624+ if (! hasBeenPulled(in)) pull(in)
625+ }
622626 }
623627 }
624628
You can’t perform that action at this time.
0 commit comments