Learning Combine Again
Combine
is a reactive programming framework written by Apple for Swift, like ReactiveSwift
or RxSwift
. The purpose of designing Combine is to provide an official framework to manage the state in SwiftUI. We can also apply it in UIKit. This article is used to record some universal concepts and easy to forget knowledge points.
There are three key moving pieces in Combine: publishers, operators and subscribers. All the other objects in Combine are for them or based on them.
-
Publisher
- Publishers can produce three types of events, multiple
Output
values, a successful completion or a failure completion with aFailure
value. - Have two associated types:
Output
andFailure
. TheOutput
is the type of values wrapped in normal events. TheFailure
is the type of error wrapped in the failure completion event.
- Publishers can produce three types of events, multiple
-
Operator
- Operators are used to convert events from upstream to new events wrapping values of new types.
-
Subscriber
- Swift provides two built-in subscribers,
sink
andassign
.- The
sink
subscriber allows you to provide your code with closures that will receive output values and completions. - The
assign
subscriber allows you to straightforward bind received output values to properties of your UI or data models via key paths.
- The
- Swift provides two built-in subscribers,
-
Subscription
Publishers do not emit any values if there are no subscribers potentially receiving the output.
Kinds of Subscriber
-
sink
- Receives events from a publisher and provides two closures where you can process values and completion events.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public func sink(
receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void),
receiveValue: @escaping ((Self.Output) -> Void)
) -> AnyCancellable {...}
/* Examples */
let just = Just(0)
just.sink {
print("receive completion: \($0)")
} receiveValue: {
print("receive value: \($0)")
}.store(in: &disposeBag)
// receive value: 0
// receive completion: finished -
assign
- Straightforward binds events to a property of an object.
1
2
3
4public func assign<Root>(
to keyPath: ReferenceWritableKeyPath<Root,
Self.Output>, on object: Root
) -> AnyCancellable1
2
3
4
5
6
7
8
9
10
11
12
13
14
15var age: Int = 0 {
didSet {
print("age did changed: \(age)")
}
}
init() {
let just = Just(18)
just
.assign(to: \.age, on: self)
.store(in: &disposeBag)
}
// age did changed: 18- Republishing events from a publisher to a property marked with
@Published
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25class Person {
var age: Int = 0 {
didSet {
print("change age to \(age)")
}
}
}
class CombineView {
var disposeBag = Set<AnyCancellable>()
let person = Person()
init() {
person.$age.sink {
print("-- \($0)")
}.store(in: &disposeBag)
let just = Just(0)
just
.assign(to: &person.$age)
}
}
// -- 0Notice that republishing values to the
@Published
propertyage
doesn’t trigger thedidSet
property observer. In effect, the events are passed into the new publisher got by the projected value ofage
with code format$age
.
Kinds of Publisher
Just
- Just returns a value and a successful completion.
1
2
3
4
5
6
7
8
9
10let just = Just(0)
just.sink {
print("receive completion: \($0)")
} receiveValue: {
print("receive value: \($0)")
}.store(in: &disposeBag)
// receive value: 0
// receive completion: finished
A streamline diagram representing the relationship between Publisher
, Subscriber
and Subscription
.
Future
- A publisher that emits a single value and then completes or just fails.
- The future is greedy, so the block used to create a future will be executed immediately after initialization.
Normal publishers will not generate values until a subscriber subscribes to it. - Multiple subscriptions will only return old generated values and do not trigger the publisher to generate values again.
1 | func requestCode() -> Future<Int, Never> { |
-
PassthroughSubject
- A publisher that enables you to publish values on demand and will happily pass on these values and a completion event.
-
CurrentValueSubject
- A publisher similar to
PassthroughSubject
saves the latest value.
- A publisher similar to
-
eraseToAnyPublisher
- This method can hide the actual type of your publisher(maybe a subject).
Combine with async/await
Swift provides numerous extensions for Combine that you can quickly use to bridge Combine to Swift concurrency.
1 | override func viewDidLoad() { |
We can use for await
to iterate the values
property of the subject. The values
is of type AsyncPublisher
, which conforms to the protocol AsyncSequence
.
Have you noted that the console only printed one value
0
and didn’t print the value1
?
- I think this is related to the asynchronous sequence
values
. When the speed of consuming values is less than the speed of yielding values during for await, more and more new yielded values will pile up. To create a asynchronous stream, we need to provide a buffer config to state ignoring these redundant values or caching them when encountering the above questions. - The property
values
may be an asynchronous sequence without any buffer. We send two values at once because theprint
method does not finish when the second value comes, so the second value is abandoned. - The examples below demonstrate these phenomenons.
1 | override func viewDidLoad() { |
1 | override func viewDidLoad() { |
Transforming Operators
collect
It transforms a stream of individual values from a publisher to a single array.
collect
: collects all values to an array.collect(2)
: collects every two values to an array..collect(.byTime(RunLoop.main, .seconds(5)))
: collects several values to an array base on a specific strategy.
map
It transforms every values from the upstream to the other type.
map { $0 * 2 }
: normal map operator like in Swift foundation.map(\.x, \.y)
: key-path map, extract several properties of the value by key path.
tryMap
Like the normal map
, but that takes a throwing closure. It will emit a failure event with the thrown error to the downstream.
flatMap
It flattens several publishers to one. All values emitted by these publishers will be flattened to one sequence arranged by time.
flatMap(maxPublishers: .max(2))
: specify the max number of publishers theirs values will be coalesced.
replaceNil(with:)
It replaces all nil values form upstream to a constant value.
replaceEmpty(with:)
It emits a specified value when the publisher didn’t emit any value until completed.
scan
It cumulates all values from upstream one by one begin with an original value.
Filtering Operators
filter
The basic filtering operator.
removeDuplicates
It filters values that have been emitted.
ignoreOutput
It disregards all values, and only receives completion events.
first(where:)
It finds the first matched value and then emits a completion event immediately.
last(where:)
It finds the last matched value after the publisher sent completion events.
dropFirst
It discards any values until finding the specified value, and then receives subsequent values.
drop(while:)
It discards any values until finding a matched value, and then receives subsequent values.
drop(untilOutputFrom:)
It skips any value until the second publisher starts emitting values.
prefix(2)
It only takes the first two values.
prefix(while: )
It takes values until the while
predication is met.
prefix(untilOutputFrom: )
It takes values until the other publisher begins emitting values.
Combining Operators
prepend(Output…)
It will prepend multiple values into the value sequence of the original publisher.
prepend(Sequence)
It will prepend a sequence of values into the value sequence of the original publisher.
prepend(Publisher)
It will add values emitted by the second publisher before the original publisher’s values.
append(Output…)
It will append multiple values into the value sequence of the original publisher.
append(Sequence)
It will append a sequence of values into the value sequence of the original publisher.
append(Publisher)
It will add values emitted by the second publisher after the original publisher’s values.
switchToLatest
It takes a publisher that pushes a sequence of publishers. It only emits values that were emitted by the latest publisher value.
merge(with:)
It will interleave emissions from different publishers of the same type.
combineLatest
It will emit values of a tuple type that contains all latest values from multiple publishers of different value types.
zip
It will emit tuples of paired values in the same indexes of different publishers.
Timing Operators
delay(for:tolerance:scheduler:options)
It will delay the specified time to emit values emitted from the upstream.
collect(.byTime(DispatchQueue.main, .seconds(4)))
It will emit an array of values emitted during the latest specified time duration.
debounce
.debounce(for: .seconds(1.0), scheduler: DispatchQueue.main)
It will pause emitting values until doesn’t receive any values from the upstream during a specified interval and then relay the latest value emitted during the interval.
throttle
.throttle(for: .seconds(3), scheduler: DispatchQueue.main, latest: true)
It is similar to thedebounce
, the significant difference is that thethrottle
would emit the first value and then relay the first or latest value during every specified interval.
timeout
.timeout(.seconds(5), scheduler: DispatchQueue.main)
The upstream publisher needs to emit new values within the specified interval after beginning or emitting a value, or else the subscription will be timeout and complete with a finished or error event.
measureInterval
let measurer = publisher.measureInterval(using: RunLoop.main)
It will return a new publisher that emits values representing intervals between each consecutive value received from the source publisher.
Sequence Operators
min
It will only relay the minimum value after the upstream publisher completes.
max
It will only relay the maximum value after the upstream publisher completes.
first
It will return the first value and instantly finish the subscription.
last
It will only replay the last value after the upstream publisher finishes.
output(at:)
It will only replay the value at the specified index of the upstream publisher’s values.
output(in:)
It will only relay the upstream publisher’s values at the indices in the specified range.
count()
It will only emit a number value representing the amount of the upstream publisher’s values.
contains
contains(1)
contains(where: { $0 > 99 })
It will emit a boolean value representing whether the values from the upstream publisher contain the specified value or have any value meets the condition.
allSatisfy
It will emit a boolean values indicating whether all values emitted by the upstream publisher match that predicate.
reduce
It will iteratively accumulate a new value based on the emissions of the upstream publisher.
Networking
dataTaskPublisher
Converting a normal data task to a publisher.
1 | guard let url = URL(string: "https://github.com") else { return } |
decode(type:decoder:)
Easily decoding the requested data from the data task publisher above.
1 | URLSession.share |
Debug
It will print all events of the upstream publisher. Contain subscription and value request events.
1 | (1...3).publisher |
handleEvents
Used to execute some side effects or monitor the publisher’s actual events.
1 | publisher |
breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)
If events from the upstream publisher match one of these conditions, it will trigger a breakpoint to help you find out what is wrong.
1 | .breakpoint(receiveOutput: { value in |
Timer
We usually need to repeatedly execute some operations when coding. Combine also provides some convenient approaches to help us create a timer publisher.
Using RunLoop
We all know that our applications are running in an endless loop on our devices. Of course, we can use this feature to implement a timer.
1 | let runLoop = RunLoop.main |
Timer
Another common class used to create a timing loop is Timer
. Combine provides a more modern API to handily create a timer without other setup boilerplate used in ordinary coding.
1 | let publisher = Timer.publish(every: 1.0, on: .main, in: .common) |
The above publisher is of type ConnectablePublisher
, which is a variant of Publisher. It won’t start firing upon subscription until you explicitly call its connect()
method.
Using DispatchQueue
DispatchQueue provides an interface to start a timing loop. But unfortunately, Combine does not provide any convenient methods for it, so we have to write some complicated code to implement a timer.
1 | let queue = DispatchQueue.main |
Key-Value Observing
KVO has always been an essential component in Objective-C. We can use it to observe value changes of properties of KVO-compliant objects.
publisher(for:options:)
For a KVO-compliant object, you can directly call its publisher
method with a key path of the property you want to observe to observe value changes of the property.
Besides classes based on Objective-C, customized classes we declare in Swift cannot be observed by KVO. These classes require to satisfy some requirements.
- The class needs conforming to
NSObject
protocol. - The properties you want to observe must be marked with
@objc dynamic
.
1 | class Storage: NSObject { |
ObservedObject
A Swift class conforming to ObservedObject
protocol will get a property objectWillChange
of type Publisher. You need to call the objectWillChange’s send()
method when you think of the object’s value changed. In most cases, properties value influences the whole object’s value, so we can invoke the send()
method in property observer didSet
. Fortunately, Combine provides a property wrapper @Published
to help us simplify these boilerplate code.
Every property marked with @Published
will trigger the objectWillChange’s send
method when their values change.
1 | class Storage: ObservedObject { |
Resource Management
Publishers are usually structures. A publisher of value type would be copied when passed to a method or assigned to another variable. Multiple subscribers that subscribe to the same publisher will also trigger all values that are repeatedly emitted.
If the publisher executes some resource-intensive tasks(like data decoding, requesting remote data), repeatedly emitting values would result in performance issues. If we can use these values repeatedly, the problem will be solved.
share()
The purpose of this operator is to let you obtain a publisher by reference rather than by value. Then, when subscribed by multiple subscribers, a publisher will never be copied many times.
multicast(_😃
Sometimes we want to share a subscription to new subscribers and replay emitted values. Other reactive frameworks usually provide a shareReplay
method to do this. Unfortunately, Combine does not provide a similar one. But Swift gave us another method called multicast
to deal with this situation.
ulticast
will use a subject to relay the values of the upstream publisher and return a new publisher of the ConnectablePublisher
type. The connectable publisher does not start emitting values until you call its connect
method. This gives you plenty of time to set up all the subscribers you need before connecting it to the upstream publisher and starting work.
1 | let subject = PassthroughSubject<Data, URLError>() |
Future
Future is a kind of publisher that provides a block with a promise parameter. In the block, you can call the promise with a value, either a failure or a finished event. So Future is usually used in events that only need to send a value or return an error, such as network request.
The other special characteristic is that Future is of class of reference type rather than struct of value type.
A future will start immediately when created, and its values will be cached to provide for other new subscribers, rather than calculating repeatedly.
Error Handling
When declaring a publisher type, we notice that it requires two associated types, Output and Error. The Output represents the type of emitted values from the publisher, and the Error represents the type of the error value carried by a failure completion event.
In practice, some publishers may never emit a failure event, such as a button click. And more situations are fallible, such as network requests, we should properly deal with these error events to display a user-friendly failure page.
Combine provides sufficient operators and types to cover these scenarios. In this section, we start with never-failing publishers. We have used them many times above.
Never
A Just
publisher is a never-failing publisher that just returns a certain value. Combine provides many variations of common methods for never-failing publishers. Using sink
method to subscribe a never-failing publisher can ignore the completion block, and more operators we will talk about later.
setFailureType
It can convert an infallible publisher to be fallible.
1 | enum MyError {} |
assign(to:on:)
Binding values to a property. It can only be called by never-failing publishers.
The parameter at
on
argument will be captured by the subscription. Be careful strong reference cycles.
assertNoFailure
Add an assertion for your publisher that you think does not return errors. If the publisher emits a failure event, the application in the development environment will throw a fatal error exception.
Dealing with Errors
-
try*
In Combine, many value transforming operators have atry
variation for executing methods that may throw errors. The new returned publisher will be fallible with error type equal to the type of errors thrown in the bloc. -
mapError
Used to convert error types in failure events. -
retry
A publisher sends a failure event that does not finish immediately, but retry multiple times.
1 | Api |
replaceError(with:)
Replaces errors with default values, then the publisher will become infallible.
1 | Api |
Schedulers
A schedular is a protocol that defines when and how to execute a closure.
Note, a scheduler is not equal to a thread.
Where to use a scheduler?
We have two fundamental scenarios where we should specify schedulers.
subscribe(on:options:)
- Create subscriptions and transforming operations on the specified scheduler.
receive(on:options:)
- Receive values on the specified scheduler.
1 | (1...1) |
As you can see, we manipulate the subscriber that fetches values in the main thread and receives values in the global thread.
Furthermore, there are some operators using Scheduler
as their parameters.
debounce(for:scheduler:options:)
delay(for:tolerance:scheduler:options:)
measureInterval(using:options:)
throttle(for:scheduler:latest:)
timeout(_:scheduler:options:customError:)
Implementations of Scheduler
Apple provides four concrete implementations for Scheduler protocol.
ImmediateScheduler
- A simple scheduler that executes code immediately on the current thread.
RunLoop
- Tied to the thread object of the foundation.- Nowadays, RunLoop is a less useful class to manage threads. We prefer to use DispatchQueue to manipulate how to execute tasks. But in some specific situations, like Timer, RunLoop is also needed.
DispatchQueue
- Can either be serial or concurrent.
OperationQueue
- A queue that regulates the execution of work items.
Please note that the values you receive on concurrent schedulers are unsorted.
Custom Publishers and Operators
We have multiple approaches to custom our own publishers and operators based on existing Combine components.
- Extends methods for
Publisher
. - Implements a type in the
Publishers
namespace with aSubscription
that produces values. - Same as above, but with a subscription that transforms values from upstream publishers.
1. Extends Methods for Publisher
The simplest way to create an operator is to extend a method for the protocol Publisher
.
Now we practice to create a unwrap
operator that filters all nil values form the upstream publisher, like compactMap
but without any block and more readable.
1 | extension Publisher { |
Erasing the new publisher’s type can hide redundant type details and improve extendability.
Creating a Publisher that produces values
We have talked about the relationship between Publisher, Subscriber, and Subscription above.
In Combine, we always write code about Publisher or Subscriber and hardly make contact with Subscription, but Subscription is the unsung hero of Combine.
A subscription bridges a publisher and a subscriber and captures the subscriber in order to avoid it being released. This is why we should store the subscription into a set by the store(in:)
method.
Today, we ready to create a DispatchTimerPublisher
that will emit time values according to a specified configuration.
- First, we define a structure
DispatchTimerConfiguration
representing our timer’s configuration.
1 | struct DispatchTimerConfiguration { |
- Then, we declare our new publisher
DispatchTimer
in the Publishers namespace.
1 | extension Publishers { |
-
As you can see, the core code is the
receive(subscriber:)
method. We should pass a Subscription to the received subscriber, then the subscriber will use it to request values. -
Final, the most important part of our DispatchTimer is the DispatchTimerSubscription.
1 | private final class DispatchTimerSubscription<S: Subscriber>: |
- There are two methods we have to implement,
cancel
andrequest(_:)
- The method
cancel
, as it literally means, is used to cancel the subscription. Normally, we should break the connection between the publisher and the subscriber and do not hold the captured subscriber. - The method
request(_:)
is the core. We will manipulate the generated values here.
- The method
For using easily, we can extend a shortcut method for our publisher DispatchTimer
.
1 | extension Publishers { |
Publishers Transforming Values
In RxSwift, we can use the operator shareReplay
to cache and replay emitted values for new coming subscribers to avoid recomputing the same data. Unfortunately, Combine does not provide us with a similar one, but we can implement it ourselves.
Our ShareReplay
design is like this.
- Every subscriber has an individual subscription.
- A new ‘ShareReplay’ publisher will wrap the upstream publisher and store all subscriptions received by subscribers.
- The
ShareReplay
publisher will relay the upstream publisher’s value to subscribers and cache these values. - If a new subscriber connects, the publisher will replay the cached values.
1 | fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription { |
The subscribe(subscriber:) method of Publisher doesn’t return a Callable value. When we use this method to subscribe to a Subject, you will find that the subscriber is not released before receiving a completion event, even if the publisher is not captured.
A test subscriber detecting whether invoking the deinit method.
1 | class MySubscriber<T>: Subscriber { |
Executing below code, the console doesn’t print any deinit message. No object hold the publisher or subscriber, but they are not be released.
1 | func test() { |
If the publisher send a completion event, the console will print deinit message.
1 | func test() { |
If we use our custom publisher Dispatchtimer
as we wrote in the above sections, the publisher, subscription and subscriber will be published immediately. So why do they have different behaviors? I think Apple does some special actions in Subject, it may store the subscription in a global table and remove it when receiving a completion event. Based on this, we should be aware that do not use the subscribe(subscriber:)
method if you want to cancel the subscription before receiving a completion event. Besides, if you custom a publisher, carefully use this method(be released immediately) unless you know how to manage subscriptions correctly.
Backpressure
In the majority of reactive frameworks, a subscriber just receives values from the upstream publisher regardless of whether it can consume these values. It’s a pull
design, as opposed to a push
one. This means that subscribers ask publishers to emit values and specify how many they want to receive. This design is more adaptive and we can use it to optimize our data stream to save computing resources.
In Combine, we usually use two methods to implement backpressure.
- A publisher with a custom
Subscription
to deal with congestion. - A subscriber delivers value at the end of a chain of publishers.
If we define a subscription producing values, it is easy to manipulate the speed of producing new values according to the subscriber’s demand. Today we are mainly talking about the second approach.
Extends a pausableSink
method in the Publishers namespace. It takes two block parameters of receiveValue
and receiveCompletion
. Every time you receive a new value from the receiveValue
block, you can return a boolean value that represents whether you need to request a new value.
1 | // Conforming entities can be paused. |