|
@@ -73,6 +73,10 @@ internal class CoroutineOnDemandValueScope<T, V>(
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ is ProducerState.ProducerReady -> {
|
|
|
|
|
+ setStateProducing(state)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
else -> throw IllegalProducerStateException(state)
|
|
else -> throw IllegalProducerStateException(state)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -96,6 +100,14 @@ internal class CoroutineOnDemandValueScope<T, V>(
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private fun setStateProducing(state: ProducerState.ProducerReady<T, V>) {
|
|
|
|
|
+ val deferred = CompletableDeferred<V>(coroutineScope.coroutineContext.job)
|
|
|
|
|
+ if (!compareAndSetState(state, ProducerState.Producing(state.producer, deferred))) {
|
|
|
|
|
+ deferred.cancel() // avoid leak
|
|
|
|
|
+ }
|
|
|
|
|
+ // loop again
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
private fun finishImpl(exception: Throwable?) {
|
|
private fun finishImpl(exception: Throwable?) {
|
|
|
state.loop { state ->
|
|
state.loop { state ->
|
|
|
when (state) {
|
|
when (state) {
|
|
@@ -173,11 +185,7 @@ internal class CoroutineOnDemandValueScope<T, V>(
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
is ProducerState.ProducerReady -> {
|
|
is ProducerState.ProducerReady -> {
|
|
|
- val deferred = CompletableDeferred<V>(coroutineScope.coroutineContext.job)
|
|
|
|
|
- if (!compareAndSetState(state, ProducerState.Producing(state.producer, deferred))) {
|
|
|
|
|
- deferred.cancel() // avoid leak
|
|
|
|
|
- }
|
|
|
|
|
- // loop again
|
|
|
|
|
|
|
+ setStateProducing(state)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
is ProducerState.Producing -> return true // ok
|
|
is ProducerState.Producing -> return true // ok
|
|
@@ -201,4 +209,4 @@ internal class CoroutineOnDemandValueScope<T, V>(
|
|
|
override fun finish() {
|
|
override fun finish() {
|
|
|
finishImpl(null)
|
|
finishImpl(null)
|
|
|
}
|
|
}
|
|
|
-}
|
|
|
|
|
|
|
+}
|