Repeating suspended effects in Kotlin

Repeating suspended effects in Kotlin

All programs contain side effects and for the last few versions, Arrow has provided a library to encode functional effects based on the Kotlin stdlib suspend support. That library is Arrow Fx Coroutines.

Suspend is a language mechanism to flag effects that the compiler understands.

Let’s dive into this idea for some context:

Flagging a computation as suspend enforces a calling context, meaning the compiler can ensure that we can’t call the effect from anywhere other than an environment prepared to run suspended effects. That will be another suspended function or a Coroutine.

This means we’re able to make side effects compile time tracked, so our feedback loop in terms of how we use them gets remarkably faster. After all, it’s preferable to get strict compile time checks rather than tests to maintain, given both mechanisms ultimately represent feedback on the integrity of our code.

So let’s go ahead and flag all side effects in our program as suspend:

interface UserService {
-  fun loadUser(): User
+  suspend fun loadUser(): User
}
class UserPersistence {
-  fun loadUser(): User = { /* ... */ }
+  suspend fun loadUser(): User = { /* ... */ }
}
class AnalyticsTracker {
-  fun trackEvent(event: Event): Unit = { /* ... */ }
+  suspend fun trackEvent(event: Event): Unit = { /* ... */ }
}

class Logger {
-  fun log(message: String): Unit = { /* ... */ }
+  suspend fun log(message: String): Unit = { /* ... */ }
}

Now the Kotlin compiler is aware of those effects and we are enforcing a calling context.

This might look like a simple thing, but it unlocks the ability to go declarative. We’re converting our unsafe effects into pure descriptions of effects that require a prepared environment to be executed.

This effectively means we’re decoupling the pure declaration of our program logic (frequently called algebras in the functional world) from the runtime. And therefore, the runtime has the chance to see the big picture of our program and decide how to run and optimize it.

We can now swap runtimes for our suspended programs based on our needs, use those to pick the execution strategy, the context on where to run it, and more. (Think of tests).

If you want to dig more into how to do that using Arrow, you can check out the Arrow Fx documentation or the Environment tests, which showcase the different execution strategies you can pick for your suspended programs. You can also peek into the actual Environment itself, which is an interface you can also implement in case you want to provide your own for a very specific purpose. Otherwise, there’s a default one built-in.

Here’s a simple example as a starting point:

fun program() {
  val disposable = Environment().unsafeRunAsyncCancellable(
    fa = { sendTweet(Tweet) }, // our suspended program
    e = { e -> callback("Error: $e") }, // fallback error handler for uncaught errors
    a = { a -> callback("Success: $a") } // fallback successful result
  )

  // ...
  disposable() // invoke for cancellation
}

For more details, please check the links above.

Repeating effects

Arrow Fx Coroutines provides a series of functional operators that are meant to run suspended effects. These operators get suspended lambdas to execute, and they are also suspended by themselves. By exposing those as suspended functions, we enforce a prepared environment on the call site, following the reasoning explained above. In other words, the library makes sure nobody runs effects from unsafe places.

One of the operators provided by the library is repeat, which we can use to repeat any suspended effect under a given policy, as long as it doesn’t fail. Here’s how to use it:

interface OfflineService {
  suspend fun syncPendingTasks(): List<Task> // our suspended effect
}

suspend fun program(service: OfflineService) {
  repeat(Schedule.recurs(100)) {
    service.syncPendingTasks()
  }
}

repeat will keep repeating the effect until the policy is over or until the effect throws.

You can put any suspended effect in there, so you can use this pattern to repeat suspended computations across the board just by wrapping those.

Also, note that repeat requires a parameter. Arrow Fx Coroutines provides a highly composable and reusable data type to create repeating policies: The Schedule.

Here’s a simple one that will make your suspended effect recurs up to 10 times.

Schedule.recurs<Int>(10)

Or this composed one to space requests every 60 seconds up to a maximum number of recursions:

Schedule.spaced(60.seconds) and Schedule.recurs(100)

The following one can be used to make the delay between repeats grow linearly. Delay will be the result of n * base where n stands for the current execution number and base is the provided Duration.

Schedule.linear(10.milliseconds)

To grow exponentially, we have this one for an exponential backoff starting at 250 milliseconds. The growth factor is 2 by default, but you can provide your own.

Schedule.exponential(250.milliseconds)

Here’s one that continues forever and returns the number of iterations, which might make sense for tests, for example:

Schedule.forever()

And finally, here’s an example of a complex one right away, to showcase how composable Schedules are:

fun <A> complexPolicy(): Schedule<A, List<A>> =
  Schedule.exponential<A>(10.milliseconds)
    .whileOutput { it.nanoseconds < 60.seconds.nanoseconds }
    .andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
    .zipRight(Schedule.identity<A>().collect())

Let me describe what’s happening here, step by step:

  • This policy starts with an exponential backoff delay while under 60 seconds.
  • Then, it continues with up to 100 recursions spaced every 60 seconds.
  • The delay is also randomized (jittered) slightly to avoid coordinated backoff from multiple services.
  • zipRight discards the results from the left Schedule and keeps the ones on the right. Schedule.identity<A>().collect() ends up collecting all the outputs, so the result will be a List<A> here.

There are other interesting builders and operators for Schedule in the library, so I’d suggest you have a look at the available ones here if you are interested.

Schedule Input / Output

It’s important to differentiate between the two. Schedule<Input, Output> gets two different type parameters. The Input of the Schedule will be the result type of the effect, which can be used to infer whether to keep repeating or not within the policy:

suspend fun program(service: OfflineService) {
  repeat(Schedule.doWhile { pendingTasks -> pendingTasks.isNotEmpty() }) {
    service.syncPendingTasks()
  }
  // or
  repeat(Schedule.doUntil { pendingTasks -> pendingTasks.isEmpty() }) {
    service.syncPendingTasks()
  }
}

As you can see, in the case of doWhile or doUntil, it determines the type of the lambda argument so we can decide to stop repeating sync operations when there are no more pending tasks on the list. (Assuming every time we perform a successful sync, those tasks are removed from the list).

On the other hand, the Output is the result of the actual Schedule, and, for repeat, it’s also the result of the repeat function itself. A Schedule generates an output for each execution of the effect.

fun complexPolicy(): Schedule<User, List<User>> =
  Schedule.exponential<User>(10.milliseconds)
    .whileOutput { it.nanoseconds < 60.seconds.nanoseconds }
    .zipRight(Schedule.identity<User>().collect())

suspend fun program() {
  val syncedUsers: List<User> = repeat(complexPolicy()) {
    syncUser()
  }

  syncedUsers.forEach { /*...*/ }
}

suspend fun syncUser(): User = TODO()

This will keep re-running the effect even after succeeding due to repeat, up to a total amount of times determined by the policy, and will collect the outputs of all the repetitions ultimately returning a List<User>, so we have a list of all the synced Users so far.

Error handling

As we clarified above, repeat keeps repeating even if the effect succeeds and if the policy is not over. If the effect throws an exception during any of the repetitions, that error will be rethrown, and that’s something you need to control:

suspend fun loadUsersActive(): Int = throw RuntimeException("Boom!") // always fails!

Either.catch {
  repeat(policy()) {
    syncPendingTasks()
  }
}

That will lift the ultimate result of the effect into Either.Right if it succeeded, or the exception thrown into Either.Left if it didn’t, so it doesn’t blow up the program. Then you could use mapLeft over Either to map that exception to a strongly typed domain error, if you want to.

For specific callbacks to handle errors, you also have a couple of overloads: repeatOrElse and repeatOrElseEither. Both offer a function to handle errors if they are encountered during repetition, each one in a different form. You can also use those to recover from errors.

Here’s repeatOrElse:

val users: List<User> = repeatOrElse(
  complexPolicy(),
  fa = { syncUser() },
  orElse = { throwable, users -> if (users.isNullOrEmpty()) emptyList() else users }
)

And here you have repeatOrElseEither, this time not recovering, but strongly typing our errors instead.

val maybeUsers: Either<SomeSyncFailed, List<User>> = repeatOrElseEither(
  complexPolicy(),
  fa = { syncUser() },
  orElse = { _, _ -> SomeSyncFailed }
)

maybeUsers.fold(
  ifLeft = { processError(it) },
  ifRight = { processUsers(it) }
)

Dependency Injection and Testing

One of the appealing points of these highly composable Schedules is the fact that you get an instance back, so you can literally reuse it as much as you want. This gives us the chance to instantiate a policy, then inject it across the board using our favorite Dependency Injection libraries. So we can easily reuse the same policy for all our network requests, for example.

This will also leverage testability for these behaviors, as you can imagine.

Using the library

Arrow Fx Coroutines is a module by itself, so you can fetch it like so:

depdendencies {
  def arrowVersion = "0.11.0"

  implementation "io.arrow-kt:arrow-fx-coroutines:$arrowVersion"
}

Final thoughts

In this post, we’ve learned how to use the power of suspend to make our side effects pure and compile time tracked, effectively requiring a safe context to call them. We’ve also learned the repeat operator by Arrow Fx Coroutines for repeating our suspended effects across the board and how to leverage this concept using any Dependency Injection mechanisms.

If you are interested in learning how to write pure functional programs using Kotlin, I would really recommend the Functional Programming Fundamentals in Kotlin with Arrow course taking place this December. This is an in-depth 4-day course where we will go over all the relevant functional patterns to learn how to leverage the paradigm for writing professional programs.

We’ll cover topics like determinism, controlling side effects, functional data types, error handling, data validation, concurrency, working efficiently and safely when shared resources across async boundaries, Streams, and much more. These courses are limited in size to keep them very interactive, and they’re composed of both theory and exercises. I’ll be your trainer! 🙋‍

In the next post in this series, we’ll learn about another operator provided by the library: retry. We’ll learn how it’s similar to repeat, and what makes it different. It’s going to be interesting, so stay tuned! 🙌

Ensure the success of your project

47 Degrees can work with you to help manage the risks of technology evolution, develop a team of top-tier engaged developers, improve productivity, lower maintenance cost, increase hardware utilization, and improve product quality; all while using the best technologies.