On the first part of these series, we explored how one can create a Publisher
from scratch. This Publisher
wasn't particularly interesting, but it gave a good idea on how a Publisher
and a Subscription
work together
On this second part, you will create a new operation from scratch and see how you can leverage your custom Subscriber
.
The operation picked for this exercise is a buffer
. A buffer
takes a single argument, a size
, and instead of sending one element at a time, it will send a collection of those elements when we reach the size
threshold. This collection size will be equal to the size
argument. From a signature point of view, it will look like this:
func buffer(count: Int) -> Publishers.MyBuffer<[T], Error>
We will call this MyBuffer
, since Combine already has a Buffer
operator with more advance options.
As described in the previous post, let's start with the barebones entity encapsulating the operation:
extension Publishers {
struct MyBuffer<Upstream: Publisher>: Publisher {
typealias Failure = Upstream.Failure
typealias Output = [Upstream.Output]
private let upstream: Upstream
private let size: Int
init(upstream: Upstream, size: Int) {
self.upstream = upstream
self.size = size
}
func receive<Downstream>(subscriber: Downstream)
where
Downstream : Subscriber,
Failure == Downstream.Failure,
Output == Downstream.Input
{
let bufferSubscriber = MyBufferSubscriber(upstream: upstream,
downstream: subscriber,
size: size)
upstream.subscribe(bufferSubscriber)
}
}
}
As seen in part one, there's a some work to match types between the Upstream
and the Downstream
:
typealias Output = [Upstream.Output]
Here you state that whoever consumes this Producer
will receive an array of the Upstream
's Output. This requirement is later enforced here:
Output == Downstream.Input
Personally, I find this part the most interesting, since you are "negotiating" what the Upstream
sends, and what the Downstream
demands.
The connection between the upstream and our BufferSubscriber
is slightly different thant what we did in the previous post, but still straighforward:
let bufferSubscriber = MyBufferSubscriber(upstream: upstream,
downstream: subscriber,
size: size)
upstream.subscribe(bufferSubscriber)
Next, let's create the Subscriber
:
extension Publishers.MyBuffer {
private class MyBufferSubscriber<Upstream: Publisher, DownStream: Subscriber>: Subscriber
where
Upstream.Failure == DownStream.Failure,
[Upstream.Output] == DownStream.Input {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream: Upstream
private let downstream: DownStream
private let size: Int
private var buffer: [Upstream.Output] = []
init(upstream: Upstream, downstream: DownStream, size: Int) {
self.upstream = upstream
self.downstream = downstream
self.size = size
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
buffer.append(input)
if buffer.count >= size {
defer { buffer = [] }
return downstream.receive(buffer)
}
return .unlimited
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
// send all the remaining values
_ = downstream.receive(buffer)
downstream.receive(completion: completion)
}
}
}
Likewise, at the Subscriber
scope, some type matching is required:
[Upstream.Output] == DownStream.Input
As an example, if your Upstream
is sending Int
, the Downstream
will consume [Int]
and that's prety much what we stated, but in a generic way via .Output
.
The actual logic for the buffer is located here:
buffer.append(input)
if buffer.count >= size {
defer { buffer = [] }
return downstream.receive(buffer)
}
return .unlimited
We keep appending values to the`buffer` until it reaches a certain size. When that happens, we empty the buffer
and send it downstream
.
We also customized receive(completion:)
, which can be desirable or not, depending on your goals. When the Upstream
completes, besides sending the event downstream
, we also send the remaining items that are currently in the buffer:
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
// send all the remaining values
_ = downstream.receive(buffer)
downstream.receive(completion: completion)
}
I added this as an example of what's possible, versus necessarily something I would do in my project.
We are still missing the actual operator that can be used across all Publisher
s. Luckily this is the easiest step:
extension Publisher {
func buffer(size: Int) -> Publishers.MyBuffer<Self> {
return Publishers.MyBuffer(upstream: self, size: size)
}
}
Finally you can test your new operator like this:
let current = CurrentValueSubject<Int, Never>(1)
cancellable = current.buffer(size: 3).sink { values in
print(values)
}
current.send(2)
current.send(3)
current.send(4)
current.send(5)
current.send(6)
current.send(7)
current.send(completion: .finished)
And the output:
[1, 2, 3]
[4, 5, 6]
[7]
In this article, we saw how the Subscriber
and the Publisher
can work together to create a custom operator.
You might have noticed that we didn't use any custom Subscription
, like in the previous article. The reason why, is because there was no special behaviour needed to be added for both the cancel
and the request(demand:)
methods.
Any question hit me on twitter.