Give your simulator superpowers

RocketSim: An Essential Developer Tool
as recommended by Apple

AsyncThrowingStream and AsyncStream explained with code examples

AsyncThrowingStream and AsyncStream are part of the concurrency framework introduced in Swift 5.5 due to SE-314. Async streams allow you to replace existing code that is based on closures or Combine publishers.

Before diving into details around throwing streams, I recommend reading my article covering async-await if you didn’t do so yet. Most of the code explained in this article will use the APIs explained there.

What is an AsyncThrowingStream?

You can see an AsyncThrowingStream as a stream of elements that could potentially result in a thrown error. Values deliver over time, and the stream can be closed by a finish event. A finish event could either be a success or a failure once an error occurs.

What is an AsyncStream?

An AsyncStream is similar to the throwing variant but will never result in a throwing error. A non-throwing async stream finishes based on an explicit finished call or when the stream cancels.

In this article, we’ll explain how to use an AsyncThrowingStream. The code examples are similar for AsyncStream except for pieces where error handling occurs.

How to use AsyncThrowingStream

An AsyncThrowingStream can be an excellent replacement for existing code based upon closures like progress and completion handlers. To better understand what I mean, I’ll introduce you to a scenario we encountered in the WeTransfer app.

In our app, we had an existing class based on closures called the FileDownloader:

struct FileDownloader {
    enum Status {
        case downloading(Float)
        case finished(Data)
    }

    func download(_ url: URL, progressHandler: (Float) -> Void, completion: (Result<Data, Error>) -> Void) throws {
        // .. Download implementation
    }
}

The file downloader takes a URL, reports progress, and completes with a result containing the downloaded data or an error on failure.

The file downloader reports a stream of values during the file download. In this case, it’s reporting a stream of status values to report the current status of the running download. The FileDownloader is a perfect example of a piece of code that you can rewrite to use AsyncThrowingStream. Though, rewriting requires you to rewrite your code at the implementation level as well, so let’s define an overload method instead:

extension FileDownloader {
    func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
        return AsyncThrowingStream { continuation in
            do {
                try self.download(url, progressHandler: { progress in
                    continuation.yield(.downloading(progress))
                }, completion: { result in
                    switch result {
                    case .success(let data):
                        continuation.yield(.finished(data))
                        continuation.finish()
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    }
                })
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

As you can see, we wrapped the download method inside an AsyncThrowingStream. We describe the stream’s type of value Status as a generic, allowing us to continue the stream with status updates.

We will finish the stream by throwing an error whenever an error occurs. In the case of the completion handler, we’re either finishing by throwing an error or following up the yield with data with a non-throwing finish callback:

switch result {
case .success(let data):
    continuation.yield(.finished(data))
    continuation.finish()
case .failure(let error):
    continuation.finish(throwing: error)
}

It’s essential to not forget about the finish() callback after you’ve received the final status update. Otherwise, we will keep the stream alive, and code at the implementation level will never continue.

We could rewrite the above code by making use of another yield method, accepting a Result enum as an argument:

continuation.yield(with: result.map { .finished($0) })
continuation.finish()

The rewrite simplifies our code and takes away the switch case. We do have to map our Result enum to match the expected Status value. Our stream will finish after throwing the contained error if we yield a failing result.

Iterating over an AsyncThrowingStream

You can start iterating over the stream of values once you’ve configured your async throwing stream. In the case of our FileDownloader example, it will look as follows:

do {
    for try await status in download(url) {
        switch status {
        case .downloading(let progress):
            print("Downloading progress: \(progress)")
        case .finished(let data):
            print("Downloading completed with data: \(data)")
        }
    }
    print("Download finished and stream closed")
} catch {
    print("Download failed with \(error)")
}

We handle any status update, and we can use the catch closure to handle any occurred errors. You can iterate using a for ... in loop based on the AsyncSequence interface, which works the same for an AsyncStream.

In case you’re running into a compile error like:

‘async’ in a function that does not support concurrency

You might want to read my article covering async-await in depth.

The print statements in the above code example help you understand the lifecycle of an AsyncThrowingStream. You can replace the print statements to handle the progress updates and process the data to visualize it for your users.

Debugging an AsyncStream

If a stream fails to report values, we could debug the stream’s yield callbacks by placing breakpoints. Though it could also be that the above “Download finished and stream closed” print statement won’t call, which means your code at the implementation level never continues. The latter could be a result of an unfinished stream.

To validate, we could make use of the onTermination callback:

func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
    return AsyncThrowingStream { continuation in

        /// Configure a termination callback to understand the lifetime of your stream.
        continuation.onTermination = { @Sendable status in
            print("Stream terminated with status \(status)")
        }

        // ..
    }
}

The callback is called on termination of the stream and will tell you whether your stream is still alive or not. I recommend reading Sendable and @Sendable closures explained with code examples to understand the @Sendable attribute.

In case of a thrown error, the output could look as follows:

Stream terminated with status finished(Optional(FileDownloader.FileDownloadingError.example))

The above output will only be possible when using an AsyncThrowingStream. In the case of a regular AsyncStream, the finished output looks as follows:

Stream terminated with status finished

While the result of cancellation looks like this for both types of streams:

Stream terminated with status cancelled

You can also use this termination callback for any cleanup after the stream finishes. Examples could be removing any observers or cleaning disk space after the file download.

Canceling an AsyncStream

An AsyncStream or AsyncThrowingStream can cancel due to an enclosing task getting canceled. An example could look as follows:

let task = Task.detached {
    do {
        for try await status in download(url) {
            switch status {
            case .downloading(let progress):
                print("Downloading progress: \(progress)")
            case .finished(let data):
                print("Downloading completed with data: \(data)")
            }
        }
    } catch {
        print("Download failed with \(error)")
    }
}
task.cancel()

A stream cancels when going out of scope or when the enclosing task cancels. As mentioned before, the cancellation will trigger the onTermination callback accordingly.

Continuing your journey into Swift Concurrency

If you like what you’ve read about async streams, you might enjoy other concurrency topics as well:

Conclusion

An AsyncThrowingStream or AsyncStream is a great way to rewrite existing code based on closures to async-await supporting alternatives. You can deliver a continuous stream of values and finish a stream on success or failure. You can iterate values on the implementation level using a for loop based on the AsyncSequence APIs.

If you like to improve your Swift knowledge, even more, check out the Swift category page. Feel free to contact me or tweet me on Twitter if you have any additional tips or feedback.

Thanks!

 

Stay updated with the latest in Concurrency

The 2nd largest newsletter in the Apple development community with 18,327 developers.


Featured SwiftLee Jobs

Find your next Swift career step at world-class companies with impressive apps by joining the SwiftLee Talent Collective. I'll match engineers in my collective with exciting app development companies. SwiftLee Jobs