1. 1. Kinds of Subscriber
  2. 2. Kinds of Publisher
  3. 3. Combine with async/await
  4. 4. Transforming Operators
    1. 4.1. collect
    2. 4.2. map
    3. 4.3. tryMap
    4. 4.4. flatMap
    5. 4.5. replaceNil(with:)
    6. 4.6. replaceEmpty(with:)
    7. 4.7. scan
  5. 5. Filtering Operators
    1. 5.1. filter
    2. 5.2. removeDuplicates
    3. 5.3. ignoreOutput
    4. 5.4. first(where:)
    5. 5.5. last(where:)
    6. 5.6. dropFirst
    7. 5.7. drop(while:)
    8. 5.8. drop(untilOutputFrom:)
    9. 5.9. prefix(2)
    10. 5.10. prefix(while: )
    11. 5.11. prefix(untilOutputFrom: )
  6. 6. Combining Operators
    1. 6.1. prepend(Output…)
    2. 6.2. prepend(Sequence)
    3. 6.3. prepend(Publisher)
    4. 6.4. append(Output…)
    5. 6.5. append(Sequence)
    6. 6.6. append(Publisher)
    7. 6.7. switchToLatest
    8. 6.8. merge(with:)
    9. 6.9. combineLatest
    10. 6.10. zip
  7. 7. Timing Operators
    1. 7.1. delay(for:tolerance:scheduler:options)
    2. 7.2. collect(.byTime(DispatchQueue.main, .seconds(4)))
    3. 7.3. debounce
    4. 7.4. throttle
    5. 7.5. timeout
    6. 7.6. measureInterval
  8. 8. Sequence Operators
    1. 8.1. min
    2. 8.2. max
    3. 8.3. first
    4. 8.4. last
    5. 8.5. output(at:)
    6. 8.6. output(in:)
    7. 8.7. count()
    8. 8.8. contains
    9. 8.9. allSatisfy
    10. 8.10. reduce
  9. 9. Networking
    1. 9.1. dataTaskPublisher
    2. 9.2. decode(type:decoder:)
  10. 10. Debug
    1. 10.1. print
    2. 10.2. handleEvents
    3. 10.3. breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)
  11. 11. Timer
    1. 11.1. Using RunLoop
    2. 11.2. Timer
    3. 11.3. Using DispatchQueue
  12. 12. Key-Value Observing
    1. 12.1. publisher(for:options:)
    2. 12.2. ObservedObject
  13. 13. Resource Management
    1. 13.1. share()
    2. 13.2. multicast(_😃
    3. 13.3. Future
  14. 14. Error Handling
    1. 14.1. Never
    2. 14.2. Dealing with Errors
  15. 15. Schedulers
    1. 15.1. Where to use a scheduler?
    2. 15.2. Implementations of Scheduler
  16. 16. Custom Publishers and Operators
    1. 16.1. 1. Extends Methods for Publisher
    2. 16.2. Creating a Publisher that produces values
    3. 16.3. Publishers Transforming Values
    4. 16.4. Backpressure

Learning Combine Again

Combine is a reactive programming framework written by Apple for Swift, like ReactiveSwift or RxSwift. The purpose of designing Combine is to provide an official framework to manage the state in SwiftUI. We can also apply it in UIKit. This article is used to record some universal concepts and easy to forget knowledge points.

There are three key moving pieces in Combine: publishers, operators and subscribers. All the other objects in Combine are for them or based on them.

  • Publisher

    • Publishers can produce three types of events, multiple Output values, a successful completion or a failure completion with a Failure value.
    • Have two associated types: Output and Failure. The Output is the type of values wrapped in normal events. The Failure is the type of error wrapped in the failure completion event.
  • Operator

    • Operators are used to convert events from upstream to new events wrapping values of new types.
  • Subscriber

    • Swift provides two built-in subscribers, sink and assign.
      • The sink subscriber allows you to provide your code with closures that will receive output values and completions.
      • The assign subscriber allows you to straightforward bind received output values to properties of your UI or data models via key paths.
  • Subscription

Publishers do not emit any values if there are no subscribers potentially receiving the output.

Kinds of Subscriber

  • sink

    • Receives events from a publisher and provides two closures where you can process values and completion events.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public func sink(
    receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure>) -> Void),
    receiveValue: @escaping ((Self.Output) -> Void)
    ) -> AnyCancellable {...}

    /* Examples */

    let just = Just(0)

    just.sink {
    print("receive completion: \($0)")
    } receiveValue: {
    print("receive value: \($0)")
    }.store(in: &disposeBag)

    // receive value: 0
    // receive completion: finished
  • assign

    • Straightforward binds events to a property of an object.
    1
    2
    3
    4
    public func assign<Root>(
    to keyPath: ReferenceWritableKeyPath<Root,
    Self.Output>, on object: Root
    ) -> AnyCancellable
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    var age: Int = 0 {
    didSet {
    print("age did changed: \(age)")
    }
    }

    init() {

    let just = Just(18)

    just
    .assign(to: \.age, on: self)
    .store(in: &disposeBag)
    }
    // age did changed: 18
    • Republishing events from a publisher to a property marked with @Published .
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    class Person {
    @Published
    var age: Int = 0 {
    didSet {
    print("change age to \(age)")
    }
    }
    }

    class CombineView {
    var disposeBag = Set<AnyCancellable>()

    let person = Person()

    init() {
    person.$age.sink {
    print("-- \($0)")
    }.store(in: &disposeBag)

    let just = Just(0)
    just
    .assign(to: &person.$age)
    }
    }
    // -- 0

    Notice that republishing values to the @Published property age doesn’t trigger the didSet property observer. In effect, the events are passed into the new publisher got by the projected value of age with code format $age.

Kinds of Publisher

  • Just
    • Just returns a value and a successful completion.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    let just = Just(0)

    just.sink {
    print("receive completion: \($0)")
    } receiveValue: {
    print("receive value: \($0)")
    }.store(in: &disposeBag)

    // receive value: 0
    // receive completion: finished

A streamline diagram representing the relationship between Publisher, Subscriber and Subscription.

  • Future
  • A publisher that emits a single value and then completes or just fails.
  • The future is greedy, so the block used to create a future will be executed immediately after initialization.
    Normal publishers will not generate values until a subscriber subscribes to it.
  • Multiple subscriptions will only return old generated values and do not trigger the publisher to generate values again.
1
2
3
4
5
6
7
8
func requestCode() -> Future<Int, Never> {
Future { promise in
print("begin requesting a code")
DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
promise(.success(10))
}
}
}
  • PassthroughSubject

    • ​A publisher that enables you to publish values on demand and will happily pass on these values and a completion event.
  • CurrentValueSubject

    • A publisher similar to PassthroughSubject saves the latest value.
  • eraseToAnyPublisher

    • This method can hide the actual type of your publisher(maybe a subject).

Combine with async/await

Swift provides numerous extensions for Combine that you can quickly use to bridge Combine to Swift concurrency.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override func viewDidLoad() {
super.viewDidLoad()

let subject = PassthroughSubject<Int, Never>()

Task {
print("begin")
for await item in subject.values {
print("\(item)")
}
print("completed")
}

DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.send(0)
subject.send(1)
subject.send(completion: .finished)
}
}
// begin
// 0
// completed

We can use for await to iterate the values property of the subject. The values is of type AsyncPublisher, which conforms to the protocol AsyncSequence.

Have you noted that the console only printed one value 0 and didn’t print the value 1?

  • I think this is related to the asynchronous sequence values. When the speed of consuming values is less than the speed of yielding values during for await, more and more new yielded values will pile up. To create a asynchronous stream, we need to provide a buffer config to state ignoring these redundant values or caching them when encountering the above questions.
  • The property values may be an asynchronous sequence without any buffer. We send two values at once because the print method does not finish when the second value comes, so the second value is abandoned.
  • The examples below demonstrate these phenomenons.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override func viewDidLoad() {
super.viewDidLoad()

let subject = PassthroughSubject<Int, Never>()

Task {
print("begin")
for await item in subject.values {
print("\(item)")
}
print("completed")
}

DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.send(0)
}

DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
subject.send(1)
}
}
// begin
// 0
// 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override func viewDidLoad() {
super.viewDidLoad()

let subject = PassthroughSubject<Int, Never>()

Task {
print("begin")
for await item in subject.values {
await Task.sleep(NSEC_PER_SEC) // Simulates a time-consuming operation.
print("\(item)")
}
print("completed")
}

DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
subject.send(0)
}

DispatchQueue.global().asyncAfter(deadline: .now() + 2) {
subject.send(1) // Will be abandoned.
}
}
// begin
// 0

Transforming Operators

collect

It transforms a stream of individual values from a publisher to a single array.

  • collect: collects all values to an array.
  • collect(2): collects every two values to an array.
  • .collect(.byTime(RunLoop.main, .seconds(5))): collects several values to an array base on a specific strategy.

map

It transforms every values from the upstream to the other type.

  • map { $0 * 2 }: normal map operator like in Swift foundation.
  • map(\.x, \.y): key-path map, extract several properties of the value by key path.

tryMap

Like the normal map, but that takes a throwing closure. It will emit a failure event with the thrown error to the downstream.

flatMap

It flattens several publishers to one. All values emitted by these publishers will be flattened to one sequence arranged by time.

  • flatMap(maxPublishers: .max(2)): specify the max number of publishers theirs values will be coalesced.

replaceNil(with:)

It replaces all nil values form upstream to a constant value.

replaceEmpty(with:)

It emits a specified value when the publisher didn’t emit any value until completed.

scan

It cumulates all values from upstream one by one begin with an original value.

Filtering Operators

filter

The basic filtering operator.

removeDuplicates

It filters values that have been emitted.

ignoreOutput

It disregards all values, and only receives completion events.

first(where:)

It finds the first matched value and then emits a completion event immediately.

last(where:)

It finds the last matched value after the publisher sent completion events.

dropFirst

It discards any values until finding the specified value, and then receives subsequent values.

drop(while:)

It discards any values until finding a matched value, and then receives subsequent values.

drop(untilOutputFrom:)

It skips any value until the second publisher starts emitting values.

prefix(2)

It only takes the first two values.

prefix(while: )

It takes values until the while predication is met.

prefix(untilOutputFrom: )

It takes values until the other publisher begins emitting values.

Combining Operators

prepend(Output…)

It will prepend multiple values into the value sequence of the original publisher.

prepend(Sequence)

It will prepend a sequence of values into the value sequence of the original publisher.

prepend(Publisher)

It will add values emitted by the second publisher before the original publisher’s values.

append(Output…)

It will append multiple values into the value sequence of the original publisher.

append(Sequence)

It will append a sequence of values into the value sequence of the original publisher.

append(Publisher)

It will add values emitted by the second publisher after the original publisher’s values.

switchToLatest

It takes a publisher that pushes a sequence of publishers. It only emits values that were emitted by the latest publisher value.

merge(with:)

It will interleave emissions from different publishers of the same type.

combineLatest

It will emit values of a tuple type that contains all latest values from multiple publishers of different value types.

zip

It will emit tuples of paired values in the same indexes of different publishers.

Timing Operators

delay(for:tolerance:scheduler:options)

It will delay the specified time to emit values emitted from the upstream.

collect(.byTime(DispatchQueue.main, .seconds(4)))

It will emit an array of values emitted during the latest specified time duration.

debounce

  • .debounce(for: .seconds(1.0), scheduler: DispatchQueue.main)
    It will pause emitting values until doesn’t receive any values from the upstream during a specified interval and then relay the latest value emitted during the interval.

throttle

  • .throttle(for: .seconds(3), scheduler: DispatchQueue.main, latest: true)
    It is similar to the debounce, the significant difference is that the throttle would emit the first value and then relay the first or latest value during every specified interval.

timeout

  • .timeout(.seconds(5), scheduler: DispatchQueue.main)
    The upstream publisher needs to emit new values within the specified interval after beginning or emitting a value, or else the subscription will be timeout and complete with a finished or error event.

measureInterval

  • let measurer = publisher.measureInterval(using: RunLoop.main)
    It will return a new publisher that emits values representing intervals between each consecutive value received from the source publisher.

Sequence Operators

min

It will only relay the minimum value after the upstream publisher completes.

max

It will only relay the maximum value after the upstream publisher completes.

first

It will return the first value and instantly finish the subscription.

last

It will only replay the last value after the upstream publisher finishes.

output(at:)

It will only replay the value at the specified index of the upstream publisher’s values.

output(in:)

It will only relay the upstream publisher’s values at the indices in the specified range.

count()

It will only emit a number value representing the amount of the upstream publisher’s values.

contains

  • contains(1)
  • contains(where: { $0 > 99 })
    It will emit a boolean value representing whether the values from the upstream publisher contain the specified value or have any value meets the condition.

allSatisfy

It will emit a boolean values indicating whether all values emitted by the upstream publisher match that predicate.

reduce

It will iteratively accumulate a new value based on the emissions of the upstream publisher.

Networking

dataTaskPublisher

Converting a normal data task to a publisher.

1
2
3
4
guard let url = URL(string: "https://github.com") else { return }

URLSession.share
.dataTaskPublisher(for: url)

decode(type:decoder:)

Easily decoding the requested data from the data task publisher above.

1
2
3
URLSession.share
.dataTaskPublisher(for: url)
.decode(type: MyType.self, decoder: JSONDecoder())

Debug

print

It will print all events of the upstream publisher. Contain subscription and value request events.

1
2
3
4
5
6
7
8
9
(1...3).publisher
.print("number")

// number: receive subscription: (1...3)
// number: request unlimited
// number: receive value: (1)
// number: receive value: (2)
// number: receive value: (3)
// number: receive finished

handleEvents

Used to execute some side effects or monitor the publisher’s actual events.

1
2
3
4
5
6
7
8
publisher
.handleEvents(receiveSubscription: { _ in

}, receiveOutput: { _ in

}, receiveCancel: {

})

breakpoint(receiveSubscription:receiveOutput:receiveCompletion:)

If events from the upstream publisher match one of these conditions, it will trigger a breakpoint to help you find out what is wrong.

1
2
3
.breakpoint(receiveOutput: { value in 
return value < 0 // Expects the value isn't less than zero.
})

Timer

We usually need to repeatedly execute some operations when coding. Combine also provides some convenient approaches to help us create a timer publisher.​

Using RunLoop

We all know that our applications are running in an endless loop on our devices. Of course, we can use this feature to implement a timer.

1
2
3
4
5
6
7
8
9
10
11
12
let runLoop = RunLoop.main 

let subscription = runLoop.schedule(
after: runLoop.now,
interval: .seconds(1),
tolerance: .milliseconds(100)
) {
print("Timer fired")
}

// Cancels the subscription will terminate the timer.
subscription.cancel()

Timer

Another common class used to create a timing loop is Timer. Combine provides a more modern API to handily create a timer without other setup boilerplate used in ordinary coding.

1
let publisher = Timer.publish(every: 1.0, on: .main, in: .common)

The above publisher is of type ConnectablePublisher, which is a variant of Publisher. It won’t start firing upon subscription until you explicitly call its connect() method.

Using DispatchQueue

DispatchQueue provides an interface to start a timing loop. But unfortunately, Combine does not provide any convenient methods for it, so we have to write some complicated code to implement a timer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let queue = DispatchQueue.main

let source = PassthroughSubject<Int, Never>()

var counter = 0

let cancelable = queue.schedule(
after: queue.now,
interval: .seconds(1)
) {
source.send(counter)
counter += 1
}

let subscription = source.sink {
print("Timer emitted \($0)")
}

Key-Value Observing

KVO has always been an essential component in Objective-C. We can use it to observe value changes of properties of KVO-compliant objects.

publisher(for:options:)

For a KVO-compliant object, you can directly call its publisher method with a key path of the property you want to observe to observe value changes of the property.

Besides classes based on Objective-C, customized classes we declare in Swift cannot be observed by KVO. These classes require to satisfy some requirements.​

  • The class needs conforming to NSObject protocol.
  • The properties you want to observe must be marked with @objc dynamic.
1
2
3
4
5
6
7
8
9
10
11
12
13
class Storage: NSObject {
@objc dynamic var size: Int = 0
}

let storage = Storage()

storage.publisher(for: \.size)
.sink {
print("The size of the storage changes to \($0)")
}

storage.size = 100
storage.size = 200

ObservedObject

A Swift class conforming to ObservedObject protocol will get a property objectWillChange of type Publisher. You need to call the objectWillChange’s send() method when you think of the object’s value changed. In most cases, properties value influences the whole object’s value, so we can invoke the send() method in property observer didSet. Fortunately, Combine provides a property wrapper @Published to help us simplify these boilerplate code.
Every property marked with @Published will trigger the objectWillChange’s send method when their values change.

1
2
3
4
5
6
7
8
9
10
11
class Storage: ObservedObject {
@Published var size: Int = 0
}

let storage = Storage()
storage.objectWillChange.sink {
print("storage will change")
}

storage.size = 100
storage.size = 200

Resource Management

Publishers are usually structures. A publisher of value type would be copied when passed to a method or assigned to another variable. Multiple subscribers that subscribe to the same publisher will also trigger all values that are repeatedly emitted.

If the publisher executes some resource-intensive tasks(like data decoding, requesting remote data), repeatedly emitting values would result in performance issues. If we can use these values repeatedly, the problem will be solved.

share()

The purpose of this operator is to let you obtain a publisher by reference rather than by value. Then, when subscribed by multiple subscribers, a publisher will never be copied many times.

multicast(_😃

Sometimes we want to share a subscription to new subscribers and replay emitted values. Other reactive frameworks usually provide a shareReplay method to do this. Unfortunately, Combine does not provide a similar one. But Swift gave us another method called multicast to deal with this situation.

ulticast will use a subject to relay the values of the upstream publisher and return a new publisher of the ConnectablePublisher type. The connectable publisher does not start emitting values until you call its connect method. This gives you plenty of time to set up all the subscribers you need before connecting it to the upstream publisher and starting work.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
let subject = PassthroughSubject<Data, URLError>()

let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.github.com")!)
.map(\.data)
multicast(subject: subject)

let sub1 = multicasted
.sink(
receiveCompletion: { _ in }
receiveValue: { print("1: \($0)")}
)

let sub2 = multicasted
.sink(
receiveCompletion: { _ in }
receiveValue: { print("2: \($0)") }
)

let cancelable = multicasted.connect()

Future

Future is a kind of publisher that provides a block with a promise parameter. In the block, you can call the promise with a value, either a failure or a finished event. So Future is usually used in events that only need to send a value or return an error, such as network request.
The other special characteristic is that Future is of class of reference type rather than struct of value type.
A future will start immediately when created, and its values will be cached to provide for other new subscribers, rather than calculating repeatedly.

Error Handling

When declaring a publisher type, we notice that it requires two associated types, Output and Error. The Output represents the type of emitted values from the publisher, and the Error represents the type of the error value carried by a failure completion event.

In practice, some publishers may never emit a failure event, such as a button click. And more situations are fallible, such as network requests, we should properly deal with these error events to display a user-friendly failure page.

Combine provides sufficient operators and types to cover these scenarios. In this section, we start with never-failing publishers. We have used them many times above.

Never

A Just publisher is a never-failing publisher that just returns a certain value. Combine provides many variations of common methods for never-failing publishers. Using sink method to subscribe a never-failing publisher can ignore the completion block, and more operators we will talk about later.

  • setFailureType
    It can convert an infallible publisher to be fallible.
1
2
3
4
5
6
enum MyError {}

let publisher = Just("Hello") // Publisher<String, Never>

let newPublisher = Just("Hello")
.setFailureType(to: MyError.self) // Publisher<String, MyError>
  • assign(to:on:)
    Binding values to a property. It can only be called by never-failing publishers.

The parameter at on argument will be captured by the subscription. Be careful strong reference cycles.

  • assertNoFailure
    ​Add an assertion for your publisher that you think does not return errors. If the publisher emits a failure event, the application in the development environment will throw a fatal error exception.​

Dealing with Errors

  • try*
    In Combine, many value transforming operators have a try variation for executing methods that may throw errors. The new returned publisher will be fallible with error type equal to the type of errors thrown in the bloc.

  • mapError
    Used to convert error types in failure events.

  • retry
    ​A publisher sends a failure event that does not finish immediately, but retry multiple times.

1
2
3
Api
.requestUserInfo()
.retry(3)
  • replaceError(with:)
    Replaces errors with default values, then the publisher will become infallible.
1
2
3
Api
.requestAvatar()
.replaceError(with: UIImage(named: "avatar_placeholder")!)

Schedulers

A schedular is a protocol that defines when and how to execute a closure.

Note, a scheduler is not equal to a thread.

Where to use a scheduler?

We have two fundamental scenarios where we should specify schedulers.

  • subscribe(on:options:)
    • Create subscriptions and transforming operations on the specified scheduler.
  • receive(on:options:)
    • Receive values on the specified scheduler.
1
2
3
4
5
6
7
8
9
10
11
12
13
(1...1)
.publisher
.handleEvents(receiveOutput: { _ in
print("handle in thread \(Thread.current)")
})
.subscribe(on: DispatchQueue.main)
.receive(on: DispatchQueue.global())
.sink { _ in
print("sink in thread \(Thread.current)")
}.store(in: &disposeBag.cancellations)

// handle in thread <_NSMainThread: 0x60000343cb40>{number = 1, name = main}
// sink in thread <NSThread: 0x600003456800>{number = 3, name = (null)}

​As you can see, we manipulate the subscriber that fetches values in the main thread and receives values in the global thread.

Furthermore, there are some operators using Scheduler as their parameters.

  • debounce(for:scheduler:options:)
  • delay(for:tolerance:scheduler:options:)
  • measureInterval(using:options:)
  • throttle(for:scheduler:latest:)
  • timeout(_:scheduler:options:customError:)

Implementations of Scheduler

Apple provides four concrete implementations for Scheduler protocol.

  • ImmediateScheduler
    • A simple scheduler that executes code immediately on the current thread.
  • RunLoop
    ​- Tied to the thread object of the foundation.
    • Nowadays, RunLoop is a less useful class to manage threads. We prefer to use DispatchQueue to manipulate how to execute tasks. But in some specific situations, like Timer, RunLoop is also needed.
  • DispatchQueue
    • Can either be serial or concurrent.
  • OperationQueue
    • A queue that regulates the execution of work items.

Please note that the values you receive on concurrent schedulers are unsorted.

Custom Publishers and Operators

We have multiple approaches to custom our own publishers and operators based on existing Combine components.

  • Extends methods for Publisher.
  • Implements a type in the Publishers namespace with a Subscription that produces values.
  • Same as above, but with a subscription that transforms values from upstream publishers.

1. Extends Methods for Publisher

The simplest way to create an operator is to extend a method for the protocol Publisher.

Now we practice to create a unwrap operator that filters all nil values form the upstream publisher, like compactMap but without any block and more readable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension Publisher {
func unwrap<T>() -> AnyPublisher<T, Failure> where Output == Optional<T> {
compactMap { $0 }.eraseToAnyPublisher()
}
}

[0, nil, 2]
.publisher
.unwrap()
.sink {
print($0)
}

// 0
// 2

Erasing the new publisher’s type can hide redundant type details and improve extendability.

Creating a Publisher that produces values

We have talked about the relationship between Publisher, Subscriber, and Subscription above.
In Combine, we always write code about Publisher or Subscriber and hardly make contact with Subscription, but Subscription is the unsung hero of Combine.

A subscription bridges a publisher and a subscriber and captures the subscriber in order to avoid it being released. This is why we should store the subscription into a set by the store(in:) method.

Today, we ready to create a DispatchTimerPublisher that will emit time values according to a specified configuration.

  • First, we define a structure DispatchTimerConfiguration representing our timer’s configuration.
1
2
3
4
5
6
struct DispatchTimerConfiguration {
let queue: DispatchQueue?
let interval: DispatchTimeInterval
let leeway: DispatchTimeInterval
let times: Subscribers.Demand
}
  • Then, we declare our new publisher DispatchTimer in the Publishers namespace.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
extension Publishers {
struct DispatchTimer: Publisher {
typealias Output = DispatchTime
typealias Failure = Never

let configuration: DispatchTimerConfiguration

init(configuration: DispatchTimerConfiguration) {
self.configuration = configuration
}

func receive<S>(subscriber: S)
where
S : Subscriber,
Never == S.Failure,
DispatchTime == S.Input
{
let subscription = DispatchTimerSubscription(
subscriber: subscriber,
configuration: configuration
)
subscriber.receive(subscription: subscription)
}
}
}
  • As you can see, the core code is the receive(subscriber:) method. We should pass a Subscription to the received subscriber, then the subscriber will use it to request values.

  • Final, the most important part of our DispatchTimer is the DispatchTimerSubscription.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private final class DispatchTimerSubscription<S: Subscriber>:
Subscription where S.Input == DispatchTime
{
let configuration: DispatchTimerConfiguration
var times: Subscribers.Demand
var requested: Subscribers.Demand = .none
var source: DispatchSourceTimer?
var subscriber: S?

init(
subscriber: S,
configuration: DispatchTimerConfiguration
) {
self.configuration = configuration
self.subscriber = subscriber
self.times = configuration.times
}

func request(_ demand: Subscribers.Demand) {
guard times > .none else {
subscriber?.receive(completion: .finished)
return
}

requested += demand

if source == nil, requested > .none {
let source = DispatchSource.makeTimerSource(queue: configuration.queue)

source.schedule(
deadline: .now() + configuration.interval,
repeating: configuration.interval,
leeway: configuration.leeway
)

source.setEventHandler { [weak self] in
guard
let self = self,
self.requested > .none
else { return }

self.requested -= .max(1)
self.times -= .max(1)

_ = self.subscriber?.receive(.now())

if self.times == .none {
self.subscriber?.receive(completion: .finished)
}
}

source.activate()
self.source = source
}
}

func cancel() {
source = nil
subscriber = nil
}
}
  • There are two methods we have to implement, cancel and request(_:)
    • The method cancel, as it literally means, is used to cancel the subscription. Normally, we should break the connection between the publisher and the subscriber and do not hold the captured subscriber.
    • The method request(_:) is the core. We will manipulate the generated values here.

For using easily, we can extend a shortcut method for our publisher DispatchTimer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
extension Publishers {
static func timer(
queue: DispatchQueue? = nil,
interval: DispatchTimeInterval,
leeway: DispatchTimeInterval = .nanoseconds(0),
times: Subscribers.Demand = .unlimited
) -> Publishers.DispatchTimer {
Publishers.DispatchTimer(configuration: .init(
queue: queue,
interval: interval,
leeway: leeway,
times: times
))
}
}

Publishers Transforming Values

In RxSwift, we can use the operator shareReplay to cache and replay emitted values for new coming subscribers to avoid recomputing the same data. Unfortunately, Combine does not provide us with a similar one, but we can implement it ourselves.

Our ShareReplay design is like this.

  1. Every subscriber has an individual subscription.
  2. A new ‘ShareReplay’ publisher will wrap the upstream publisher and store all subscriptions received by subscribers.
  3. The ShareReplay publisher will relay the upstream publisher’s value to subscribers and cache these values.
  4. If a new subscriber connects, the publisher will replay the cached values.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
fileprivate final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
let capacity: Int
var subscriber: AnySubscriber<Output, Failure>?
var demand: Subscribers.Demand = .none
var buffer: [Output]
var completion: Subscribers.Completion<Failure>?

init<S>(
subscriber: S,
replay: [Output],
capacity: Int,
completion: Subscribers.Completion<Failure>?
) where S: Subscriber, Failure == S.Failure, Output == S.Input {
self.subscriber = AnySubscriber(subscriber)
self.buffer = replay
self.capacity = capacity
self.completion = completion
}

private func complete(with completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else {
return
}

self.subscriber = nil
self.completion = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}

private func emitAsNeeded() {
guard let subscriber = subscriber else {
return
}

while self.demand > .none && !buffer.isEmpty {
self.demand -= .max(1)
let nextDemand = subscriber.receive(buffer.removeFirst())
if nextDemand != .none {
self.demand += nextDemand
}
}

if let completion = completion {
complete(with: completion)
}
}

func request(_ demand: Subscribers.Demand) {
if demand != .none {
self.demand += demand
}

emitAsNeeded()
}

func cancel() {
complete(with: .finished)
}

func receive(_ input: Output) {
guard subscriber != nil else { return }

buffer.append(input)
if buffer.count > capacity {
buffer.removeFirst()
}

emitAsNeeded()
}

func receive(completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else {
return
}

self.subscriber = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
}

extension Publishers {
final class ShareReplay<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure

private let lock = NSRecursiveLock()
private let upstream: Upstream
private let capacity: Int
private var replay = [Output]()
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
private var completion: Subscribers.Completion<Failure>?

init(upstream: Upstream, capacity: Int) {
self.upstream = upstream
self.capacity = capacity
}

private func relay(_ value: Output) {
lock.lock()
defer { lock.unlock() }

guard completion == nil else { return }


replay.append(value)
if replay.count > capacity {
replay.removeFirst()
}

subscriptions.forEach {
$0.receive(value)
}
}

private func complete(_ completion: Subscribers.Completion<Failure>) {
lock.lock()
defer { lock.unlock() }

self.completion = completion

subscriptions.forEach {
$0.receive(completion: completion)
}
}

func receive<S>(subscriber: S)
where
S : Subscriber,
Upstream.Failure == S.Failure,
Upstream.Output == S.Input
{
lock.lock()
defer { lock.unlock() }

let subscription = ShareReplaySubscription(
subscriber: subscriber,
replay: replay,
capacity: capacity,
completion: completion)

subscriptions.append(subscription)
subscriber.receive(subscription: subscription)

guard subscriptions.count == 1 else { return }
let sink = AnySubscriber {
$0.request(.unlimited)
} receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
self?.relay(value)
return .none
} receiveCompletion: { [weak self] in
self?.complete($0)
}
upstream.subscribe(sink)

}
}
}

The subscribe(subscriber:) method of Publisher doesn’t return a Callable value. When we use this method to subscribe to a Subject, you will find that the subscriber is not released before receiving a completion event, even if the publisher is not captured.

A test subscriber detecting whether invoking the deinit method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MySubscriber<T>: Subscriber {
let combineIdentifier = CombineIdentifier()

typealias Input = T

typealias Failure = Never

func receive(subscription: Subscription) {
subscription.request(.unlimited)
}

func receive(_ input: T) -> Subscribers.Demand {
print("receive value \(input)")
return .none
}

func receive(completion: Subscribers.Completion<Never>) {
print("receive completion \(completion)")
}

deinit {
print("subscriber deinit")
}
}

Executing below code, the console doesn’t print any deinit message. No object hold the publisher or subscriber, but they are not be released.

1
2
3
4
5
func test() {
let subscriber = MySubscriber<Int>()
let publisher = PassthroughSubject<Int, Never>()
publisher.subscribe(subscriber)
}

If the publisher send a completion event, the console will print deinit message.

1
2
3
4
5
6
7
8
9
10
func test() {
let subscriber = MySubscriber<Int>()
let publisher = PassthroughSubject<Int, Never>()
publisher.subscribe(subscriber)

publisher.send(completion: .finished)
}

// receive completion finished
// subscriber deinit

​If we use our custom publisher Dispatchtimer as we wrote in the above sections, the publisher, subscription and subscriber will be published immediately. So why do they have different behaviors? I think Apple does some special actions in Subject, it may store the subscription in a global table and remove it when receiving a completion event. Based on this, we should be aware that do not use the subscribe(subscriber:) method if you want to cancel the subscription before receiving a completion event. Besides, if you custom a publisher, carefully use this method(be released immediately) unless you know how to manage subscriptions correctly.

Backpressure

In the majority of reactive frameworks, a subscriber just receives values from the upstream publisher regardless of whether it can consume these values. It’s a pull design, as opposed to a push one. This means that subscribers ask publishers to emit values and specify how many they want to receive. This design is more adaptive and we can use it to optimize our data stream to save computing resources.

In Combine, we usually use two methods to implement backpressure.

  • A publisher with a custom Subscription to deal with congestion.
  • A subscriber delivers value at the end of a chain of publishers.

If we define a subscription producing values, it is easy to manipulate the speed of producing new values according to the subscriber’s demand. Today we are mainly talking about the second approach.

Extends a pausableSink method in the Publishers namespace. It takes two block parameters of receiveValue and receiveCompletion. Every time you receive a new value from the receiveValue block, you can return a boolean value that represents whether you need to request a new value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Conforming entities can be paused.
protocol Pausable {
var paused: Bool { get }
func resume()
}

final class PausableSubscriber<Input, Failure: Error>:
Subscriber,
Pausable,
Cancellable
{

let combineIdentifier = CombineIdentifier()

let receiveValue: (Input) -> Bool
let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

private var subscription: Subscription? = nil
var paused = false

init(
receiveValue: @escaping (Input) -> Bool,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self.receiveValue = receiveValue
self.receiveCompletion = receiveCompletion
}

func cancel() {
subscription?.cancel()
subscription = nil
}

// Invoking `resume` method will request the subscription for a new value again.
func resume() {
guard paused else { return }
paused = false
subscription?.request(.max(1))
}

func receive(subscription: Subscription) {
self.subscription = subscription
subscription.request(.max(1))
}


// Receives a new value, checking whether need more values by the receiveValue block.
func receive(_ input: Input) -> Subscribers.Demand {
paused = receiveValue(input) == false
return paused ? .none : .max(1)
}

func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
subscription = nil
}
}

extension Publisher {
func pausableSink(
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Bool
) -> Pausable & Cancellable {
let pausable = PausableSubscriber(
receiveValue: receiveValue,
receiveCompletion: receiveCompletion
)
subscribe(pausable)
return pausable
}
}