You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Today I started a discussion in the Swift forums regarding cooperative task cancellation and AsyncSequence types. Digging through the implementations of types like Async[Throwing]Stream I discovered that those were not cooperative at all and will straight cut off the buffer from the upstream during cancellation. With the current non-cooperative behavior there's no issue in RxSwift's values implementation. However if the Async[Throwing]Stream were cooperative then the implementation would get stuck and never terminate the stream via onDispose.
Here's a custom wrapper AsyncThrowingCooperativeStream type, which simply avoids the direct cancellation forwarding to the inner AsyncThrowingStream._Storage.cancel() method, which would cause the termination of the buffer by calling AsyncThrowingStream._Storage.finish(). It gives the custom logic such as a wrapped Task or an Observable to properly receive the dispose message and forward its decision through the onDispose back to the captured continuation.
publicstructAsyncThrowingCooperativeStream<Element, Failure>where Failure:Error{letcontinuation:AsyncThrowingStream<Element,Failure>.Continuationletstream:AsyncThrowingStream<Element,Failure>publicinit(
_ elementType:Element.Type=Element.self,
bufferingPolicy limit:AsyncThrowingStream<Element,Failure>.Continuation.BufferingPolicy=.unbounded,
_ build:(AsyncThrowingStream<Element,Failure>.Continuation)->Void)where Failure ==anyError{varstreamContinuation:AsyncThrowingStream<Element,Failure>.Continuation!=nilletstream=AsyncThrowingStream<Element,Failure>(
elementType,
bufferingPolicy: limit
){ continuation inbuild(continuation)
streamContinuation = continuation
}self.continuation = streamContinuation
self.stream = stream
}}extensionAsyncThrowingCooperativeStream:AsyncSequence{publicstructIterator:AsyncIteratorProtocol{letcontinuation:AsyncThrowingStream<Element,Failure>.Continuation
// NOTE: This is `@unchecked Sendable` because `AsyncThrowingStream._Context` just captures
// `AsyncThrowingStream._Storage` which itself is `@unchecked Sendable`, so we're safe here.
struct_Box:@uncheckedSendable{letiterator:AsyncThrowingStream<Element,Failure>.Iterator}letbox:_Boxpublicmutatingfunc next()asyncthrows->Element?{letbox=self.box
returntryawaitwithTaskCancellationHandler{tryawaitwithCheckedThrowingContinuation{ continuation in
// Detach the `next` method from the current parent task.
Task.detached{varmutableIterator= box.iterator
do{letelement=tryawait mutableIterator.next()
continuation.resume(returning: element)}catch{
continuation.resume(throwing: error)}}}} onCancel:{[continuation]in
// Forward the cancellation manually to the termination handler, then remove it so that
// during a subsequent `next` call we do not signal another cancellation.
lethandler= continuation.onTermination
continuation.onTermination =nilhandler?(.cancelled)}}}publicfunc makeAsyncIterator()->Iterator{Iterator(continuation: continuation, box:Iterator._Box(iterator: stream.makeAsyncIterator()))}}extensionAsyncThrowingCooperativeStream:Sendablewhere Element:Sendable{}
Here's the wrap similar to the implementation in the RxSwift module:
publicextensionObservableConvertibleType{
// This is a copy from RxSwift's repo which mimics `Publisher.values`
varvalues:AsyncThrowingCooperativeStream<Element,Error>{AsyncThrowingCooperativeStream<Element,Error>{ continuation inletdisposable=asObservable().subscribe(
onNext:{ value in
continuation.yield(value)},
onError:{ error in
continuation.finish(throwing: error)},
onCompleted:{
continuation.finish(throwing:nil)},
onDisposed:{
continuation.finish(throwing:CancellationError()) // THE FIX ✅
})
continuation.onTermination ={@Sendable _ in
disposable.dispose()}}}}
As you can see, onDispose finishes the cooperative stream continuation by throwing a CancellationError. An infinite running Observable which gets deposed without an error will essentially call onDispose, but it will never reach correctly the continuation, at least not a cooperative one.
Note: continuation.finish will change the internal state in such way that a subsequent call to it would result into a no-op. That said, if an Observable completes via onCompleted first, a call to onDisposed will not actually throw CancellationError as terminal would already equal finished and onTermination closure would already be nil-ed.
Expected outcome:
Right now, it's not a bug, but if AsyncThrowingStream ever gets changed to be cooperative on task cancellation, the values implementation will get stuck.
What actually happens:
Nothing right now. The AsyncThrowingStream buffer gets immediately terminated on cancellation and anything coming the Observable subscription will be completely ignored. You can use the above AsyncThrowingCooperativeStream to actually reproduce the potential future issue.
RxSwift/RxCocoa/RxBlocking/RxTest version/commit
Compared with RxSwift 6.5.0
Platform/Environment
Any.
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
easy, 100% repro
Xcode version:
Version 14.1 (14B47b)
Installation method:
Swift Package
I have multiple versions of Xcode installed:
no
Level of RxSwift knowledge:
I have a significant code base
The text was updated successfully, but these errors were encountered:
Hey @DevAndArtist -
Thanks for the suggestion and interesting linked discussion :)
I appreciate the code sample - for the time being this change in Swift would be considered breaking behavior for the behavior of AsyncStream, so I'd want to make sure it's for real changing before making arbitrary changes.
I'll keep this thread open while the discussion is ongoing and until (if) a resolution is made.
Short description of the issue:
Today I started a discussion in the Swift forums regarding cooperative task cancellation and
AsyncSequence
types. Digging through the implementations of types likeAsync[Throwing]Stream
I discovered that those were not cooperative at all and will straight cut off the buffer from the upstream during cancellation. With the current non-cooperative behavior there's no issue in RxSwift'svalues
implementation. However if theAsync[Throwing]Stream
were cooperative then the implementation would get stuck and never terminate the stream viaonDispose
.Here's a custom wrapper
AsyncThrowingCooperativeStream
type, which simply avoids the direct cancellation forwarding to the innerAsyncThrowingStream._Storage.cancel()
method, which would cause the termination of the buffer by callingAsyncThrowingStream._Storage.finish()
. It gives the custom logic such as a wrappedTask
or anObservable
to properly receive the dispose message and forward its decision through theonDispose
back to the captured continuation.Here's the wrap similar to the implementation in the RxSwift module:
As you can see,
onDispose
finishes the cooperative stream continuation by throwing aCancellationError
. An infinite runningObservable
which gets deposed without an error will essentially callonDispose
, but it will never reach correctly the continuation, at least not a cooperative one.Note:
continuation.finish
will change the internal state in such way that a subsequent call to it would result into a no-op. That said, if anObservable
completes viaonCompleted
first, a call toonDisposed
will not actually throwCancellationError
asterminal
would already equalfinished
andonTermination
closure would already benil
-ed.Expected outcome:
Right now, it's not a bug, but if
AsyncThrowingStream
ever gets changed to be cooperative on task cancellation, thevalues
implementation will get stuck.What actually happens:
Nothing right now. The
AsyncThrowingStream
buffer gets immediately terminated on cancellation and anything coming theObservable
subscription will be completely ignored. You can use the aboveAsyncThrowingCooperativeStream
to actually reproduce the potential future issue.RxSwift/RxCocoa/RxBlocking/RxTest version/commit
Compared with RxSwift 6.5.0
Platform/Environment
Any.
How easy is to reproduce? (chances of successful reproduce after running the self contained code)
Xcode version:
Installation method:
I have multiple versions of Xcode installed:
Level of RxSwift knowledge:
The text was updated successfully, but these errors were encountered: