Building a Custom Combine Operator for Exponential Backoff

Make your Combine code reusable


Mar 3, 2022 • 8 min readSource Code

In the previous post, I showed you how to use Combine to improve error handling for Combine pipelines and expose errors in SwiftUI apps in a way that’s meaningful for the user.

Not surprisingly, we ended up with code that looked a bit more complicated than what we had in the beginning. Properly handling errors will take up more lines of code than not handling errors at all (or just ignoring them).

But we can do better!

In this post, you will learn about Combine operators: what they are, how they work, and how refactoring our code into a custom Combine operator will make it easier to reason about and more reusable at the same time.

What is a Combine Operator?

Combine defines three main concepts to implement the idea of reactive programming:

  1. Publishers
  2. Subscribers
  3. Operators

Publishers deliver values over time, and subscribers act on these values as they receive them. Operators sit in the middle between publishers and subscribers, and can be used to manipulate the stream of values.

There are a few reasons why we need operators:

  • Publishers don’t always produce events in the format that is required by the subscriber. For example, a publisher might emit the result of a HTTP network request, but our subscriber needs a custom data structure. In this situation, we can use an operator like map or decode to turn the output of the publisher into the data structure the subscriber expects.
  • Publishers might produce more events than the subscriber is interested in. For example, when typing a search term, we might not be interested in every single keystroke but only the final search term. In this situation, we can use operators like debounce or throttle to reduce the number of events our subscriber has to handle.

Operators help us to take the output produced by a publisher and turn it into something that the subscriber can consume. We’ve already used a number of built-in operators in previous episodes, for example:

  • map (and its friend, tryMap) to transform elements
  • debounce to publish elements only after a pause between two events
  • removeDuplicates to remove duplicate events
  • flatMap to transform elements into a new publisher

How can we implement a custom operator?

Usually, when creating Combine pipelines, we will start with a publisher, and then connect a bunch of Combine’s built-in operators to process the events emitted by the publisher. At the end of any Combine pipeline is a subscriber that receives the events. As you saw in the previous post, pipelines can become complicated quite quickly.

Technically, operators are just functions that create other publishers and subscribers which handle the events they receive from an upstream publisher.

This means, we can create our own custom operators by extending Publisher with a function that returns a publisher (or subscriber) that operates on the events it receives from the publisher we use it on.

Let’s see what this means in practice by implementing a simple operator that allows us to inspect events coming down a Combine pipeline using Swift’s dump() function. This function prints the contents of a variable to the console, showing the structure of the variable as a nested tree - similar to the debug inspector in Xcode.

Now, you might be aware of Combine’s print() operator, which works very similarly. However, it doesn’t provide as much detail, and - more importantly - doesn’t show the result as a nested structure.

To add an operator, we first need to add an extension to the Publisher type. As we don’t want to manipulate the events this operator receives, we can use the upstream publisher’s types as the result types as well, and return AnypPublisher<Self.Output, Self.Failure> as the result type:

extension Publisher {
  func dump() -> AnyPublisher<Self.Output, Self.Failure> {
  }
}

Inside the function, we can then use the handleEvents operator to examine any events this pipeline processes. handleEvents has a bunch of optional closures that get called when the publisher receives new subscriptions, new output values, a cancellation event, when it is finished, or when the subscriber requests more elements. As we are only interested in new Output values, we can ignore most of the closures and just implement the receiveOutput closure.

Whenever we receive a value, we will use Swift’s dump() function to print the contents of the value to the console:

extension Publisher {
  func dump() -> AnyPublisher<Self.Output, Self.Failure> {
    handleEvents(receiveOutput:  { value in
      Swift.dump(value)
    })
    .eraseToAnyPublisher()
  }
}

We can use this operator like any of Combine’s built-in operators. In the following example, we attach our new operator to a simple publisher that emits the current date:

Just(Date())
  .dump()
 
// prints:
 
 2022-03-02 09:38:49 +0000
  - timeIntervalSinceReferenceDate: 667906729.659255

Implementing a retry operator with a delay

Now that we’ve got a basic understanding of how to implement a basic operator, let’s see if we can refactor the code from the previous episode. Here is the relevant part:

return dataTaskPublisher
  .tryCatch { error -> AnyPublisher<(data: Data, response: URLResponse), Error> in
    if case APIError.serverError = error {
      return Just(Void())
        .delay(for: 3, scheduler: DispatchQueue.global())
        .flatMap { _ in
          return dataTaskPublisher
        }
        .print("before retry")
        .retry(10)
        .eraseToAnyPublisher()
    }
    throw error
  }
  .map(\.data)

Let’s begin by constructing an overloaded extension for the retry operator on Publisher:

extension Publisher {
  func retry<T, E>(_ retries: Int, withDelay delay: Int) 
    -> Publishers.TryCatch<Self, AnyPublisher<T, E>> 
       where T == Self.Output, E == Self.Failure 
  {
  }
}

This defines two input parameters, retries and withDelay, which we can use to specify how many times the upstream publisher should be retried and how much time (in seconds) should be left between each retry.

Since we are going to use the tryCatch operator inside our new operator, we need to use its publisher type, Publishers.TryCatch as the return type.

With this in place, we can now implement the body of the operator by pasting the existing implementation:

extension Publisher {
  func retry<T, E>(_ retries: Int, withDelay delay: Int) 
    -> Publishers.TryCatch<Self, AnyPublisher<T, E>> 
       where T == Self.Output, E == Self.Failure 
  {
    return self.tryCatch { error -> AnyPublisher<T, E> in
      return Just(Void())
        .delay(for: .init(integerLiteral: delay), scheduler: DispatchQueue.global())
        .flatMap { _ in
          return self
        }
        .retry(retries)
        .eraseToAnyPublisher()
    }
  }
}

You might have noticed that we removed the error check. This is because APIError is an error type that is specific to our application. As we are interested in making this an implementation that can be used in other apps as well, let’s see how we can make this more flexible.

Conditionally retrying

To make this code reusable in other contexts, let’s add a parameter for a trailing closure that the caller can use to control whether the operator should retry or not.

func retry<T, E>(_ retries: Int, withDelay delay: Int, condition: ((E) -> Bool)? = nil) -> Publishers.TryCatch<Self, AnyPublisher<T, E>> where T == Self.Output, E == Self.Failure {
  return self.tryCatch { error -> AnyPublisher<T, E> in
    if condition?(error) == true {
      return Just(Void())
        .delay(for: .init(integerLiteral: delay), scheduler: DispatchQueue.global())
        .flatMap { _ in
          return self
        }
        .retry(retries)
        .eraseToAnyPublisher()
    }
    else {
      throw error
    }
  }
}

If the caller doesn’t provide the closure, the operator will retry using the parameters retries and delay.

With this in place, we can simplify the original call:

// ...
return dataTaskPublisher
  .retry(10, withDelay: 3) { error in
    if case APIError.serverError = error {
      return true
    }
      return false
    }
  .map(\.data)
  // ...

Implementing a retry operator for exponential backoff

Now, let’s take this one step further and implement a version of the retry operator with exponential back-off.

Exponential backoff is commonly utilised as part of rate limiting mechanisms in computer systems such as web services, to help enforce fair distribution of access to resources and prevent network congestion. (Wikipedia)

To increment the delay between two requests, we introduce a local variable that holds the current interval, and double it after each request. To make this possible, we need to wrap the inner pipeline that kicks off the original pipeline in a pipeline that increments the backoff variable:

func retry<T, E>(_ retries: Int, 
                 withBackoff initialBackoff: Int, 
                 condition: ((E) -> Bool)? = nil) 
    -> Publishers.TryCatch<Self, AnyPublisher<T, E>> 
    where T == Self.Output, E == Self.Failure 
{
  return self.tryCatch { error -> AnyPublisher<T, E> in
    if condition?(error) ?? true {
      var backOff = initialBackoff
      return Just(Void())
        .flatMap { _ -> AnyPublisher<T, E> in
          let result = Just(Void())
            .delay(for: .init(integerLiteral: backOff), scheduler: DispatchQueue.global())
            .flatMap { _ in
              return self
            }
          backOff = backOff * 2
          return result.eraseToAnyPublisher()
        }
        .retry(retries - 1)
        .eraseToAnyPublisher()
    }
    else {
      throw error
    }
  }
}

To use exponential backoff only for certain kinds of errors, we can implement the closure to inspect the error, just like before. Here is a code snippet that shows how to use incremental backoff with an initial interval of 3 seconds for any APIError.serverError:

return dataTaskPublisher
  .retry(2, withBackoff: 3) { error in
    if case APIError.serverError(_, _, _) = error {
      return true
    }
    else {
      return false
    }
  }
  // ...

To use exponential backoff regardless of the error, this becomes even more compact:

return dataTaskPublisher
  .retry(2, withIncrementalBackoff: 3)
  // ...

Closure

Combine is a very powerful framework that allows us to put together very efficient data and event processing pipelines for our apps.

Sometimes, this power comes at a cost: in the previous episode, we built a powerful error handling pipeline that made our code look more complicated than the original version which used Combine’s built-in operators for handling errors by replacing them with default values of ignoring them altogether.

In this episode, you saw how to make use of custom operators to refactor this code.

Thanks to Combine’s flexible design, creating custom operators doesn’t require writing a lot of code, and helps to make our code more readable and reusable.

Thanks for reading 🔥

Source code
You can find the source code for this post in this GitHub repo.
Newsletter
Enjoyed reading this article? Subscribe to my newsletter to receive regular updates, curated links about Swift, SwiftUI, Combine, Firebase, and - of course - some fun stuff 🎈