Repeating suspended effects in Kotlin
by Jorge Castillo
- •
- November 27, 2020
- •
- kotlin• functional programming• arrow
- |
- 13 minutes to read.

All programs contain side effects and for the last few versions, Arrow has provided a library to encode functional effects based on the Kotlin stdlib suspend
support. That library is Arrow Fx Coroutines.
Suspend is a language mechanism to flag effects that the compiler understands.
Let’s dive into this idea for some context:
Flagging a computation as suspend
enforces a calling context, meaning the compiler can ensure that we can’t call the effect from anywhere other than an environment prepared to run suspended effects. That will be another suspended function or a Coroutine.
This means we’re able to make side effects compile time tracked, so our feedback loop in terms of how we use them gets remarkably faster. After all, it’s preferable to get strict compile time checks rather than tests to maintain, given both mechanisms ultimately represent feedback on the integrity of our code.
So let’s go ahead and flag all side effects in our program as suspend:
interface UserService {
- fun loadUser(): User
+ suspend fun loadUser(): User
}
class UserPersistence {
- fun loadUser(): User = { /* ... */ }
+ suspend fun loadUser(): User = { /* ... */ }
}
class AnalyticsTracker {
- fun trackEvent(event: Event): Unit = { /* ... */ }
+ suspend fun trackEvent(event: Event): Unit = { /* ... */ }
}
class Logger {
- fun log(message: String): Unit = { /* ... */ }
+ suspend fun log(message: String): Unit = { /* ... */ }
}
Now the Kotlin compiler is aware of those effects and we are enforcing a calling context.
This might look like a simple thing, but it unlocks the ability to go declarative. We’re converting our unsafe effects into pure descriptions of effects that require a prepared environment to be executed.
This effectively means we’re decoupling the pure declaration of our program logic (frequently called algebras in the functional world) from the runtime. And therefore, the runtime has the chance to see the big picture of our program and decide how to run and optimize it.
We can now swap runtimes for our suspended programs based on our needs, use those to pick the execution strategy, the context on where to run it, and more. (Think of tests).
If you want to dig more into how to do that using Arrow, you can check out the Arrow Fx documentation or the Environment
tests, which showcase the different execution strategies you can pick for your suspended programs. You can also peek into the actual Environment
itself, which is an interface you can also implement in case you want to provide your own for a very specific purpose. Otherwise, there’s a default one built-in.
Here’s a simple example as a starting point:
fun program() {
val disposable = Environment().unsafeRunAsyncCancellable(
fa = { sendTweet(Tweet) }, // our suspended program
e = { e -> callback("Error: $e") }, // fallback error handler for uncaught errors
a = { a -> callback("Success: $a") } // fallback successful result
)
// ...
disposable() // invoke for cancellation
}
For more details, please check the links above.
Repeating effects
Arrow Fx Coroutines provides a series of functional operators that are meant to run suspended effects. These operators get suspended lambdas to execute, and they are also suspended by themselves. By exposing those as suspended functions, we enforce a prepared environment on the call site, following the reasoning explained above. In other words, the library makes sure nobody runs effects from unsafe places.
One of the operators provided by the library is repeat
, which we can use to repeat any suspended effect under a given policy, as long as it doesn’t fail. Here’s how to use it:
interface OfflineService {
suspend fun syncPendingTasks(): List<Task> // our suspended effect
}
suspend fun program(service: OfflineService) {
repeat(Schedule.recurs(100)) {
service.syncPendingTasks()
}
}
repeat
will keep repeating the effect until the policy is over or until the effect throws.
You can put any suspended effect in there, so you can use this pattern to repeat suspended computations across the board just by wrapping those.
Also, note that repeat
requires a parameter. Arrow Fx Coroutines provides a highly composable and reusable data type to create repeating policies: The Schedule
.
Here’s a simple one that will make your suspended effect recurs
up to 10 times.
Schedule.recurs<Int>(10)
Or this composed one to space requests every 60 seconds up to a maximum number of recursions:
Schedule.spaced(60.seconds) and Schedule.recurs(100)
The following one can be used to make the delay between repeats grow linearly. Delay will be the result of n
* base
where n
stands for the current execution number and base
is the provided Duration
.
Schedule.linear(10.milliseconds)
To grow exponentially, we have this one for an exponential backoff starting at 250 milliseconds. The growth factor is 2 by default, but you can provide your own.
Schedule.exponential(250.milliseconds)
Here’s one that continues forever and returns the number of iterations, which might make sense for tests, for example:
Schedule.forever()
And finally, here’s an example of a complex one right away, to showcase how composable Schedule
s are:
fun <A> complexPolicy(): Schedule<A, List<A>> =
Schedule.exponential<A>(10.milliseconds)
.whileOutput { it.nanoseconds < 60.seconds.nanoseconds }
.andThen(Schedule.spaced<A>(60.seconds) and Schedule.recurs(100)).jittered()
.zipRight(Schedule.identity<A>().collect())
Let me describe what’s happening here, step by step:
- This policy starts with an exponential backoff delay while under 60 seconds.
- Then, it continues with up to 100 recursions spaced every 60 seconds.
- The delay is also randomized (
jittered
) slightly to avoid coordinated backoff from multiple services. zipRight
discards the results from the leftSchedule
and keeps the ones on the right.Schedule.identity<A>().collect()
ends up collecting all the outputs, so the result will be aList<A>
here.
There are other interesting builders and operators for Schedule
in the library, so I’d suggest you have a look at the available ones here if you are interested.
Schedule Input / Output
It’s important to differentiate between the two. Schedule<Input, Output>
gets two different type parameters. The Input
of the Schedule
will be the result type of the effect, which can be used to infer whether to keep repeating or not within the policy:
suspend fun program(service: OfflineService) {
repeat(Schedule.doWhile { pendingTasks -> pendingTasks.isNotEmpty() }) {
service.syncPendingTasks()
}
// or
repeat(Schedule.doUntil { pendingTasks -> pendingTasks.isEmpty() }) {
service.syncPendingTasks()
}
}
As you can see, in the case of doWhile
or doUntil
, it determines the type of the lambda argument so we can decide to stop repeating sync operations when there are no more pending tasks on the list. (Assuming every time we perform a successful sync, those tasks are removed from the list).
On the other hand, the Output
is the result of the actual Schedule
, and, for repeat
, it’s also the result of the repeat
function itself. A Schedule
generates an output for each execution of the effect.
fun complexPolicy(): Schedule<User, List<User>> =
Schedule.exponential<User>(10.milliseconds)
.whileOutput { it.nanoseconds < 60.seconds.nanoseconds }
.zipRight(Schedule.identity<User>().collect())
suspend fun program() {
val syncedUsers: List<User> = repeat(complexPolicy()) {
syncUser()
}
syncedUsers.forEach { /*...*/ }
}
suspend fun syncUser(): User = TODO()
This will keep re-running the effect even after succeeding due to repeat
, up to a total amount of times determined by the policy, and will collect the outputs of all the repetitions ultimately returning a List<User>
, so we have a list of all the synced Users
so far.
Error handling
As we clarified above, repeat
keeps repeating even if the effect succeeds and if the policy is not over. If the effect throws an exception during any of the repetitions, that error will be rethrown, and that’s something you need to control:
suspend fun loadUsersActive(): Int = throw RuntimeException("Boom!") // always fails!
Either.catch {
repeat(policy()) {
syncPendingTasks()
}
}
That will lift the ultimate result of the effect into Either.Right
if it succeeded, or the exception thrown into Either.Left
if it didn’t, so it doesn’t blow up the program. Then you could use mapLeft
over Either
to map that exception to a strongly typed domain error, if you want to.
For specific callbacks to handle errors, you also have a couple of overloads: repeatOrElse
and repeatOrElseEither
. Both offer a function to handle errors if they are encountered during repetition, each one in a different form. You can also use those to recover from errors.
Here’s repeatOrElse
:
val users: List<User> = repeatOrElse(
complexPolicy(),
fa = { syncUser() },
orElse = { throwable, users -> if (users.isNullOrEmpty()) emptyList() else users }
)
And here you have repeatOrElseEither
, this time not recovering, but strongly typing our errors instead.
val maybeUsers: Either<SomeSyncFailed, List<User>> = repeatOrElseEither(
complexPolicy(),
fa = { syncUser() },
orElse = { _, _ -> SomeSyncFailed }
)
maybeUsers.fold(
ifLeft = { processError(it) },
ifRight = { processUsers(it) }
)
Dependency Injection and Testing
One of the appealing points of these highly composable Schedule
s is the fact that you get an instance back, so you can literally reuse it as much as you want. This gives us the chance to instantiate a policy, then inject it across the board using our favorite Dependency Injection libraries. So we can easily reuse the same policy for all our network requests, for example.
This will also leverage testability for these behaviors, as you can imagine.
Using the library
Arrow Fx Coroutines is a module by itself, so you can fetch it like so:
depdendencies {
def arrowVersion = "0.11.0"
implementation "io.arrow-kt:arrow-fx-coroutines:$arrowVersion"
}
Final thoughts
In this post, we’ve learned how to use the power of suspend
to make our side effects pure and compile time tracked, effectively requiring a safe context to call them. We’ve also learned the repeat
operator by Arrow Fx Coroutines for repeating our suspended effects across the board and how to leverage this concept using any Dependency Injection mechanisms.
If you are interested in learning how to write pure functional programs using Kotlin, I would really recommend the Functional Programming Fundamentals in Kotlin with Arrow course taking place this December. This is an in-depth 4-day course where we will go over all the relevant functional patterns to learn how to leverage the paradigm for writing professional programs.
We’ll cover topics like determinism, controlling side effects, functional data types, error handling, data validation, concurrency, working efficiently and safely when shared resources across async boundaries, Streams, and much more. These courses are limited in size to keep them very interactive, and they’re composed of both theory and exercises. I’ll be your trainer! 🙋
In the next post in this series, we’ll learn about another operator provided by the library: retry
. We’ll learn how it’s similar to repeat
, and what makes it different. It’s going to be interesting, so stay tuned! 🙌