IO integration with kotlinx.coroutines
by Jorge Castillo
- •
- March 02, 2020
- •
- kotlin• functional programming• arrow
- |
- 16 minutes to read.

This article showcases the brand new Arrow integration module for KotlinX Coroutines included in the Arrow 0.10.5 release.
Kotlin stdlib suspend vs KotlinX Coroutines
Before stepping into content, we believe it is important to make a clear differentiation between the Kotlin standard library suspend support and the KotlinX Coroutines runtime library.
Support for suspend
computations is provided by the Kotlin standard library, and then ecosystem libraries can build their apis on top of it.
KotlinX Coroutines would be one of those ecosystem libraries that provides a “runtime” for the standard library suspend support, using an opinionated encoding.
To give you a more specific example of this differentiation: Kotlin standard library does not provide support for cancelable coroutines, only for non-cancellable ones where you can only call the continuation with a successful result or an exception. For cancellation, you have those primitives under the KotlinX Coroutines library umbrella.
ArrowFx is also included in the ecosytem category, since it relies on the standard library suspend support to provide a pure functional api on top of it, and enables users to approach to their programs using the Functional Programming paradigm and all its benefits.
We wanted to make this clear, since nowadays it’s usual that we think about KotlinX and the stdlib suspend
support as a single thing when they’re notably different. We need to pick a runtime library for it, and that depends on our needs and end goal.
Once this is clear, we can move into the article 🙏.
The suspended future
Many frameworks like Spring or Ktor already provide first class support for suspend
functions as a solution for concurrency and non blocking computations. Thanks to this, they can offload their I/O operations from the core threads and therefore handle more concurrent requests than usual.
With the upcoming Spring version 5.2, we will be able to write endpoint routes for our restful controllers as suspend
functions like:
@RestController
class CoroutinesRestController {
@GetMapping("/home")
suspend fun homeEndpoint(): String {
delay(10)
return "Welcome to this site!"
}
}
And for the current Ktor stable, we can already do this:
embeddedServer(Netty, port = 8080) {
routing {
get("/home") {
delay(10)
call.respondText("Welcome to this site!")
}
}
}.start(wait = true)
Note how we can call the delay()
suspended function within the block for both platforms.
Given how coroutines are expected to run within a CoroutineScope
, these libraries benefit by having their I/O operations automatically cancelled whenever the mentioned scope is cancelled.
Another well known library that recently added full support for suspend
functions is Retrofit starting on version 2.6.0. Now, you can write your service definition interfaces like:
interface MusicService {
@GET("/music/bands")
suspend fun getBands() : List<Band>
}
So you don’t need to provide any call adapters or wrap your responses into Call
or Deferred
anymore. Everything is plain and clear.
Even Kotlin itself provides support for suspend main
functions now so you can run your program within a coroutine:
suspend fun main() {
println("This is suspended! 🍃")
}
Given how the Kotlin ecosystem is evolving towards a suspended future, we decided to provide direct Arrow Fx integration with any codebase that supports suspended computations. We want you to be able to integrate your functional codebases with all those systems seamlessly.
After all, IO
and suspend
represent the same thing, so we should be able to provide a very thin integration layer to make it work.
To achieve that, we are showcasing the brand new Kotlinx.Coroutines Arrow integration module, available starting on Arrow 0.10.5
.
How to use it
The first requirement is to add the Arrow Maven repository to our projects in the root build.gradle
:
allprojects {
repositories {
mavenCentral()
jcenter()
maven { url "https://dl.bintray.com/arrow-kt/arrow-kt/" }
}
}
And then add the following dependency to the corresponding modules build.gradle
files.
dependencies {
def arrow_version = "0.10.5"
implementation "io.arrow-kt:arrow-fx-kotlinx-coroutines:$arrow_version"
}
Solving our integration problem
Let’s use this module to solve our integration problem over some of the mentioned libraries. Let’s start with the Ktor or Spring examples.
Since these frameworks allow using a suspend
function now, this means we have pure integration points that allow us to write purely functional apps in Kotlin using suspend
.
In the case of restful backends like the ones defined with Ktor or Spring, routes are the entry point to our program. That’s what we usually call the edge of the world. We can hook our pure functional logic starting there.
For doing this, and before having suspend support, we needed to unsafely run our program at the edge. An example with Ktor would be:
fun main() {
val bandStorage = BandStorage()
embeddedServer(Netty, port = 8080) {
routing {
get("/music/bands") {
getBands(bandStorage)
.effectMap { bands -> call.respond("bands" to bands) }
.unsafeRunSync()
}
}
}.start(wait = true)
}
private fun getBands(bandStorage: BandStorage): IO<List<Band>> = TODO()
That was because a resolved result is needed by Ktor so it could serialize it for the response. So our complete architecture beyond that point could be completely pure, but the program needed to be run at the edge. The same goes for any other similar frameworks.
After adding first class support for suspend
computations, we are allowed to return a suspended function, which is equivalent to Arrow’s IO. So we should be able to return our IO
there and let the platform run it for us. This would mimic what we can find in Haskell for example, where our program is always an IO, and the runtime platform takes care of evaluating it for you.
To do this, we’ll just need some very basic interoperability functions that Arrow provides.
Until version 0.10.5
, the IO
data type already provided a suspended()
function we could use to run our IO
in a suspended environment. This effectively suspended the current coroutine to run our IO
computation. It looked like:
fun main() {
val bandStorage = BandStorage()
embeddedServer(Netty, port = 8080) {
routing {
get("/music/bands") {
getBands(bandStorage)
.effectMap { bands -> call.respond("bands" to bands) }
.suspended()
}
}
}.start(wait = true)
}
This extension is still available, but the issue with this variant is that it uses suspendCoroutine from the stdlib under the hood, which does not support cancellation propagation, i.e., we don’t have a means to cancel our inner IO
task whenever the outer coroutine gets cancelled.
For adding cancellation support, you should use the suspendCancellable()
variant that the ArrowFx KotlinX integration module provides.
import arrow.integrations.kotlinx.suspendCancellable
fun main() {
val bandStorage = BandStorage()
embeddedServer(Netty, port = 8080) {
routing {
get("/music/bands") {
getBands(bandStorage)
.effectMap { bands -> call.respond("bands" to bands) }
.suspendCancellable()
}
}
}.start(wait = true)
}
This variant uses the stdlib suspendCancellableCoroutine under the hood instead, which passes a cancellable continuation to the block. That pipes cancellation from outer coroutines so Arrow can hook its cancellation logic for the inner IO
task by calling cont.invokeOnCancellation {}
on the continuation.
This makes this variant highly recommendable, so our IO
program gets cancelled according to the outer coroutine cancellation, and we avoid any leaks or race conditions 🙏.
But what if the scenario was the other way around and the library we are using provided a suspend
function we needed to call? That would be the case for Retrofit, for example:
interface MusicService {
@GET("/music/bands")
suspend fun getBands() : List<Band>
}
This would also be a common scenario for codebases where we wanted to gradually migrate from coroutines to a functional approach.
In this case, we would need to convert the suspend
computation to an IO
task somehow. The KotlinX Arrow integration module provides IO.effect {}
for this:
interface MusicService {
@GET("/music/bands")
suspend fun getBands() : List<Band>
}
interface Database {
fun storeBands(bands: List<Band>): IO<List<Band>>
}
fun loadAllBands(
service: MusicService,
database: Database
): IO<List<Band>> =
IO.effect { service.getBands() } // suspended computation
.flatMap { database.storeBands(it) } // IO computation
In case you want to flatMap
over another suspend
function, you can use effectMap
:
interface MusicService {
suspend fun getBands() : List<Band>
}
interface Database {
suspend fun storeBands(bands: List<Band>): List<Band>
}
fun loadAllBands(
service: MusicService,
database: Database
): IO<List<Band>> =
IO.effect { service.getBands() } // suspended computation
.effectMap { database.storeBands(it) } // suspended computation
There is also the polymorphic style using almost the same syntax, but relying on the Concurrent<F>
typeclass, which would look like:
fun <F> Concurrent<F>.loadAllBands(
service: MusicService,
database: Database
): Kind<F, List<Band>> =
effect { service.getBands() }
.flatMap { database.storeBands(it) }
And with the mentioned extensions, we should be able to integrate all our IO
based codebases into any frameworks, libraries, or platforms supporting suspend
computations and still keep all the benefits of those.
They can also be used for integration between different layers or pieces in our architecture, if we are facing a gradual migration from coroutines to a pure functional style.
Scoping our IO’s
Let’s imagine a completely different scenario now. Sometimes we don’t want to return a suspend
computation, but just scope our IO
with the current CoroutineScope
to propagate cancellation, mostly for ergonomics.
In this case, we can use the unsafeRunScoped
extension, like:
service.getBands().unsafeRunScoped(scope) { }
The braces are for a callback that is called when the program completes, so we highly suggest only using it for errors that should not be handled by our program, so we want to crash for those, report them to an error tracking service, or similar. Anything else should be handled within the program using the standard IO
error handling capabilities, e.g., handleError
, handleErrorWith
, etc.
An oversimplified example of this could be:
service.getBands()
.handleError { emptyList() }
.effectMap { bands -> render(bands) }
.unsafeRunScoped(scope) { /* Optionally track or crash errors */ }
Our program handles all errors that are expected in our domain to be resilient, uses effectMap
to render results on UI, and finally leaves unhandled errors within the unsafe run callback.
There is also an alternative syntax for this as an extension function over the CoroutineScope
. The result is the same:
scope.unsafeRunIO(service.getBands()) { }
With the scoped IO approach we ensure whenever the outer scope gets cancelled, our IO task will also be, since it’s scoped to it. This approach is very useful for ongoing refactors from coroutines to functional style.
Android integration
A very common example of the need for scoping IO
tasks can be Android. We don’t want to get too technical about a specific platform here, but it’s worth a mention. Let’s have a look at a couple of usual examples that Android devs would feel familiar with.
Let’s say we are into a Fragment
. Fragments represent a screen on the Android UI system, so they are thought for UI purposes. Every Fragment
has a view lifecycle and a CoroutineScope
tied to it. That means the fragment view is created and at some point destroyed.
Whenever its view lifecycle reaches its end, the coroutine scope is used for cancellation. So in case we wanted to scope our IO
task so it gets cancelled whenever the view gets destroyed, we could do the following:
class MyFragment: Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
longRunningIOTask().unsafeRunScoped(viewLifecycleOwner.lifecycleScope) { }
}
}
So we’d launch our task as soon as the view is created, but we’d also ensure that as soon as the view got destroyed, the scope would be cancelled and therefore also our longRunningIOTask
. Our task would effectively live within the fragment view lifecycle scope.
But many times we want our IO
tasks to survive what is called a configuration change in Android. Configuration changes occur when the system enforces the view to be recreated because of a system wide setting changed, e.g: the Device Language. In that case we would like our IO to survive, and use a mechanism to deliver the result to the new view once recreated.
In OOP approaches, that is usually achieved with a ViewModel
. ViewModels provide a viewModelScope
that lives longer than the view lifecycle, i.e, it survives the configuration change. That means we can use that one to scope your IO
tasks we want to live longer.
service.getBands().unsafeRunScoped(viewModel.viewModelScope) {}
In this case, the task would live longer than the view, so there’s a potential leak. You’d need to take care of releasing the old view reference and hooking the new instance after the config change by yourself.
In the future, we will provide a Stream
reactive data type that will work seamlessly for any F
data type, so you can build an observable layer to deliver your asynchronous results to, mimicking what you get with LiveData
, but with a completely functional approach.
Final details
We recommend visiting the official Arrow docs for the kotlinx.coroutines integration module.
The new ArrowFx KotlinX Coroutines integration module was written by 47ers Alberto Ballano and Daniel Montoya with the help of Simon Vergauwen. Big thanks to all of them for making our lives easier! 🙏 🎉
You can also find me on Twitter for more live updates on Arrow and ArrowFx and follow official @arrow_kt for official announcements.