Custom Publishers - Part 2

On the first part of these series, we explored how one can create a Publisher from scratch. This Publisher wasn't particularly interesting, but it gave a good idea on how a Publisher and a Subscription work together

On this second part, you will create a new operation from scratch and see how you can leverage your custom Subscriber.


The operation picked for this exercise is a  buffer. A buffer takes a single argument, a size, and instead of sending one element at a time, it will send a collection of those elements when we reach the size threshold. This collection size will be equal to the size argument. From a signature point of view, it will look like this:

func buffer(count: Int) -> Publishers.MyBuffer<[T], Error>

We will call this MyBuffer, since Combine already has a Buffer operator with more advance options.

As described in the previous post, let's start with the barebones entity encapsulating the operation:

extension Publishers {
  
  struct MyBuffer<Upstream: Publisher>: Publisher {

    typealias Failure = Upstream.Failure
    typealias Output = [Upstream.Output]
    
    private let upstream: Upstream
    private let size: Int
    
    init(upstream: Upstream, size: Int) {
      self.upstream = upstream
      self.size = size
    }
    
    func receive<Downstream>(subscriber: Downstream)
      where
      Downstream : Subscriber,
      Failure == Downstream.Failure,
      Output == Downstream.Input
    {
      let bufferSubscriber = MyBufferSubscriber(upstream: upstream,
                                                downstream: subscriber,
                                                size: size)
       upstream.subscribe(bufferSubscriber)
    }
  }
}

As seen in part one, there's a some work to match types between the Upstream and the Downstream :

typealias Output = [Upstream.Output]

Here you state that whoever consumes this Producer will receive an array of the Upstream's Output. This requirement is later enforced here:

Output == Downstream.Input

Personally, I find this part the most interesting, since you are "negotiating" what the Upstream sends, and what the Downstream demands.

The connection between the upstream and our BufferSubscriber is slightly different thant what we did in the previous post, but still straighforward:

let bufferSubscriber = MyBufferSubscriber(upstream: upstream,
                                        downstream: subscriber,
                                              size: size)
upstream.subscribe(bufferSubscriber)                                                 

Next, let's create the Subscriber:

extension Publishers.MyBuffer {
  private class MyBufferSubscriber<Upstream: Publisher, DownStream: Subscriber>: Subscriber
  where
  Upstream.Failure == DownStream.Failure,
  [Upstream.Output] == DownStream.Input {
    
    typealias Input = Upstream.Output
    typealias Failure = Upstream.Failure
    
    private let upstream: Upstream
    private let downstream: DownStream
    private let size: Int
    private var buffer: [Upstream.Output] = []
    
    init(upstream: Upstream, downstream: DownStream, size: Int) {
      self.upstream = upstream
      self.downstream = downstream
      self.size = size
    }
    
    func receive(subscription: Subscription) {
      downstream.receive(subscription: subscription)
    }
    
    func receive(_ input: Upstream.Output) -> Subscribers.Demand {
      buffer.append(input)
      
      if buffer.count >= size {
        defer { buffer = [] } 
        return downstream.receive(buffer)
      }
      
      return .unlimited
    }
    
    func receive(completion: Subscribers.Completion<Upstream.Failure>) {
      // send all the remaining values
      _ = downstream.receive(buffer)
      downstream.receive(completion: completion)
    }
  }
}

Likewise, at the Subscriber scope, some type matching is required:

  [Upstream.Output] == DownStream.Input

As an example, if your Upstream is sending Int, the Downstream will consume [Int] and that's prety much what we stated, but in a generic way via .Output.

The actual logic for the buffer is located here:

buffer.append(input)
      
if buffer.count >= size {
    defer { buffer = [] } 
    return downstream.receive(buffer)
}
      
return .unlimited

We  keep appending values to the`buffer` until it reaches a certain size. When that happens, we empty the buffer and send it downstream.

We also customized receive(completion:), which can be desirable or not, depending on your goals. When the Upstream completes, besides sending the event downstream, we also send the remaining items that are currently in the buffer:

func receive(completion: Subscribers.Completion<Upstream.Failure>) {
  // send all the remaining values
  _ = downstream.receive(buffer)
  downstream.receive(completion: completion)
}

I added this as an example of what's possible, versus necessarily something I would do in my project.

We are still missing the actual operator that can be used across all Publishers. Luckily this is the easiest step:

extension Publisher {
  func buffer(size: Int) -> Publishers.MyBuffer<Self> {
    return Publishers.MyBuffer(upstream: self, size: size)
  }
}

Finally you can test your new operator like this:

let current = CurrentValueSubject<Int, Never>(1)
    
cancellable = current.buffer(size: 3).sink { values in
  print(values)
}
    
current.send(2)
current.send(3)
current.send(4)
current.send(5)
current.send(6)
current.send(7)

current.send(completion: .finished)

And the output:

[1, 2, 3]
[4, 5, 6]
[7]

In this article, we saw how the Subscriber and the Publisher can work together to create a custom operator.

You might have noticed that we didn't use any custom Subscription, like in the previous article. The reason why, is because there was no special behaviour needed to be added for both the cancel and the request(demand:) methods.

Any question hit me on twitter.

Show Comments