Going with the Flow: from RxJava to Kotlin coroutines - Part 1
Refactoring an API request
I’ve been playing around with Kotlin’s coroutines library. I had some trouble wrapping my head around the whole coroutine concept, mainly because I was consistently looking out for RxJava resemblances. Well, the truth is RxJava is one thing, and coroutines are another thing. Sure, they can be used for the same use cases, but they’re two different concepts. I’ll try not to go too deep into the rabit hole here, but RxJava is an API for asynchronous and/or concurrent programming that follows the functional and reactive paradigms. On the other hand, the coroutines library aims to facilitate asynchronous and/or concurrent programming, while deferring the decision of going functional or reactive to the user. Once I became aware of this, coroutines became a lot easier to understand. And it took me a lot less time than RxJava. I dare say that this might mean they’re easier to grasp for beginners, or at least to someone that’s not familiarized with RxJava.
In this article series, I’ll go through a sample app built with RxJava and refactor it using the coroutines library. The plan for the series is to:
- Refactor API requests with coroutines;
- Refactor a Database event subscription with Flow;
- Refactor a UI interaction with Flow and Channels;
I will show you both implementations and explain the reasoning behind them, albeit assuming that you’re at least familiar with RxJava. I will measure performance (I’m an Engineer™) and show you how can you write tests for both versions. In this article, I’ll start with the refactoring that, in my opinion, lays the foundation to understand the upcoming ones - refactoring API requests with coroutines. So, let’s get started.
The app
Well, more like “The view”. I didn’t want to show you just small “before” and “after” code samples, but I also didn’t want to make an extremely complex and hard to follow app.
The UI is composed by a Fragment
with a search
bar and a RecyclerView
(don’t mind the
BottomNavigationView
, it’s there just so that I
can jump between different code samples - this is my
skeleton/playground project). Each
RecyclerView
item shows a card with user
information. When the app starts, it checks the database for
existing data, and displays it accordingly. It also queries
the Github API for more data in order to update the database.
The search bar filters the user list by name, and the
DELETE button on each card sends a delete command to
the database for the corresponding user.
I’m using Room for the database and Retrofit for the Github
API requests. Dependencies are provided by Dagger. The app as
a whole is built using a common pattern (Clean Architecture). State is managed through view state and view event
classes. Data flow between the view and the
ViewModel
is unidirectional. If you want to know
more about the implementation details, you can check the
repository. That said, let’s dive into the API request details.
Handling an API request with RxJava
To fetch the users we need to contact the Github API. However, some of the information we want to show, such as location or blog url, are not available in the list that the API returns. As such, we need to do another request - one for each user - to retrieve those details.
Given this, the app has the following Retrofit API:
interface Api {
@GET("users")
fun getAllUsers(): Maybe<List<GithubUser>>
@GET("users/{username}")
fun getUserDetails(@Path("username") username: String): Maybe<GithubDetailedUser>
}
Yes, I could use Observable
instead of
Maybe
here, but Maybe
makes more
semantic meaning to me: maybe I’ll get the response I
expect, or maybe I won’t. Still,
getAllUsers
returns a
List<GithubUser>
stream, and we need to
operate on each individual user. So the
repository converts this stream into an
Observable
stream of GithubUser
. The
other stream remains the same:
override fun getUsersFromApi(): Observable<User> {
return api.getAllUsers() // returns Maybe for semantic purposes - one possible response on each request.
.flattenAsObservable { it } // However, we need to transform each element of the list
.map { userMapper.mapToEntity(it) }
}
override fun getUserDetailsFromApi(username: Username): Maybe<DetailedUser> {
return api.getUserDetails(username.value) // Username is an inline class. Handy for domain modeling!
.map { detailedUserMapper.mapToEntity(it) }
}
Following Clean Architecture, I have
UseCase
classes connecting the
ViewModel
to the repository. Regardless, I’m
skipping them here since I’m only using them to forward the
calls from the ViewModel
to the repository. This
is actually something that bothers me, because according to
the Clean Architecture definition, a use case is called a “use
case” because it encapsulates use case logic. On Android
though, we tend to keep the this logic both in the repository
and the ViewModel
(at least in most Clean
Architecture implementations I’ve seen so far). In other
words, the UseCase
classes are used only to
improve code readability - as their intent is, by nature,
straightforward - and define boundaries. Since most of the
work done by an Android app is fetching data from wherever and
showing it on the screen, one might wonder if the extra
classes, extra maintenance effort, extra performance cost and
extra APK size increase are really worth it. Anyway, this is a
subject for another article, maybe. Back to the refactoring.
The API is ready, and the repository is ready. Now we just
need to make the call in the ViewModel
, and
subscribe to it:
// Gets users from the api and stores them in the database
private fun updateCache() {
getUsersFromApiAsSingle()
.doOnSuccess { Logger.d("Updating database") }
.subscribeOn(Schedulers.io())
.subscribe(
{ updateCachedUsers(it) }, //onSuccess
{ handleErrors(it) } // onError
)
.addTo(compositeDisposable) // Extension function
}
private fun getUsersFromApiAsSingle(): Single<List<DetailedUser>> {
return getUsersFromApi(NoParameters()) // NoParameters is a UseCase implementation detail
.take(10) // Github API has a hourly call limit :D and 10 are more than enough for what we're doing
.flatMapMaybe { getUserDetailsFromApi(it.username) } // 2nd api call with information from the 1st one
.toList() // gather all stream events back into one Single list
}
I’m going to pretend I don’t have all these layers and boundaries for a second, so that the whole process is easier to visualize:
api.getAllUsers() // returns Maybe for semantic purposes - one possible response on each request.
.flattenAsObservable { it } // However, we need to transform each element of the list
.map { userMapper.mapToEntity(it) }
.take(10) // Github API has a hourly call limit :D and 10 are more than enough for what we're doing
.flatMapMaybe { // 2nd api call with information from the 1st one
api.getUserDetails(username.value)
.map { detailedUserMapper.mapToEntity(it) }
}
.toList() // gather all stream events back into one Single list -> Single<List<DetailedUser>>
.doOnSuccess { Logger.d("Updating database") }
.subscribeOn(Schedulers.io())
.subscribe(
{ updateCachedUsers(it) }, //onSuccess
{ handleErrors(it) } // onError
)
.addTo(compositeDisposable) // Extension function
Ok, so what’s happening here?
-
We send a request to the API and get back a
Maybe<List<GithubUser>>
stream; -
We flatten the list into an
Observable<GithubUser>
stream; -
We map each element to a domain entity called
User
(even though I’m pretending there are no boundaries, I left this mapping on purpose since it’s part of the stream’s operations); - We take the first ten elements just because we’ll have to do another API call for each user, and Github has a very low limit for unauthenticated requests;
-
We use
flatMapMaybe
to get the user details for each of the ten users, and map each one of the returned objects (GithubDetailedUser
) to a domain entity calledDetailedUser
. WhyflatMapMaybe
instead of a regularflatMap
? Because thegetUserDetails
API call returns aMaybe<GithubDetailedUser>
, and a simpleflatMap
requires that you provide it with the same kind of stream you apply it on, since it has to return the same type (in this case, anObservable
stream). As such,flatMapMaybe
is expecting a Maybe stream as its parameter, and returns anObservable
stream at the end; -
After
flatMapMaybe
does its magic and flattens the incoming streams into oneObservable<DetailedUser>
stream, we call thetoList
operator, which in turn will output aSingle<List<DetailedUser>>
stream; -
Finally, we do some logging, bind all operations to a thread
from the IO pool and subscribe to the whole thing. Since the
last operation outputs a
Single
stream, the observer only has two functions: anonSuccess
lambda that calls theupdateCachedUsers
method, and anonError
lambda that calls thehandleErrors
method. If everything goes through the happy path,updateCachedUsers
then proceeds to update the database with the information it gets as parameter, i.e. aList<DetailedUser>
. The lack of anobserveOn
operator is not a bug:updateCachedUsers
is supposed to run in the background, so we can keep the stream running in the same thread.
Whew. That’s a whole lot of stream operations. But it does exactly what we want: it fetched data from a remote API and updates the local database, all in an asynchronous - and partially concurrent, thanks to the flatMapMaybe - fashion. Let’s see how can we do the same with coroutines.
Coroutines
First, I want to talk a little about how coroutines work in Kotlin. I’ll only graze the surface on the topic, but it should be enough to get you started. To make it easier to process, I’ll talk about the basic coroutine theory now, and then talk about the practical concepts as they show up while refactoring. If you already understand how coroutines work under the hood, you can skip to the next section.
So, a coroutine is a construct meant to turn async programming into a walk in a park. They are usually referred to as lightweigth threads, since they are so much lighter to use than a regular thread. On Android at least, a typical thread occupies 1 to 2 MB of memory (that’s why we love thread pools!). Each Java thread gets mapped to a kernel thread, which means that the OS manages them. The OS then schedules which thread runs at a given time, jumps between them (context switching), has to invalidate cache in the process… All of these and other related operations have their performance cost.
Coroutines are executed in threads. These threads come from
thread pools managed by coroutines themselves. Coroutines are
not bound to any particular thread, which means that they can
start in one thread, suspend, and resume in another
thread. Since this process is fully managed by coroutines
through Continuation
s, we don’t get the context
switching overhead (more on this in a minute). They also
aren’t managed by the OS, which automatically frees us from
the thread scheduler overhead mentioned above. The coroutine
object in itself has a small memory footprint (bytes), which
means that you can have a bunch of them being executed at the
same time without having to worry about running out of memory.
There are a few examples online where people launch 100.000
coroutines at the same time without having any problems at
all, while getting an OutOfMemoryException when they try to do
the same with regular threads.
I mentioned Continuation
before. This happens to
be one of the most important aspects about coroutines, if not
the most important. Kotlin coroutines implement what is called
a continuation passing style. Whenever you
write a suspendable function (you’ve probably
seen suspend fun
written somewhere by now),
you’re letting Kotlin know that this function is to be
executed in a coroutine. Why? Because under the hood, the
compiler translates the suspend fun
to a function
that receives a Continuation
as a parameter! So,
when you write something like this
suspend fun login(user: User): Token { ... }
It gets decompiled to something similar to
Object login(User user, Continuation<Token> continuation) { ... }
Every time a coroutine suspends, it stores its state in the continuation. When it wants to resume its execution, it only has to check the continuation for the information it needs, and that’s it. This seamless suspend-resume process is what allows us to write seemingly sequential code instead of callbacks for async work. Whenever there’s async work to be done, the coroutine suspends, and later resumes when the work is done. Meanwhile, the world keeps spinning like nothing’s going on, as this suspension does not block the thread the coroutine is running on. In fact, while this coroutine is suspended, another coroutine can immediately start some work on the same thread.
Ok, you’re still reading. I know this is a lot, but hopefully it’ll help you understand things when we get our hands on the code (it certainly did help me).
Handling an API request with coroutines
Retrofit has native support for coroutines, so the first step
is to add the suspend
keyword to the methods and
change their return parameters:
@GET("users")
suspend fun getAllUsers(): List<GithubUser>
@GET("users/{username}")
suspend fun getUserDetails(@Path("username") username: String): GithubDetailedUser
Easy enough. Now, propagate the same changes to the repository:
override suspend fun getUsersFromApi(): List<User> {
return api.getAllUsers()
.map { userMapper.mapToEntity(it) }
}
override suspend fun getUserDetailsFromApi(username: Username): DetailedUser {
val detailedUser = api.getUserDetails(username.value)
return detailedUserMapper.mapToEntity(detailedUser)
}
It’s not that different from what we had before. We’re still just mapping the data entities to domain entities. Nothing more.
Next up is the ViewModel
. Here is where the
differences are noticeable:
private fun updateCacheWithCoroutines() {
// I don't like try-catch. So we're using an exception handler instead
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
handleErrors(throwable)
}
// we want the coroutine to be bounded to the `ViewModel`'s lifecycle (it's on the main thread)
viewModelScope.launch(exceptionHandler) {
// But the request should go to the backgound
withContext(Dispatchers.IO) {
getUsersFromApiThroughCoroutine(this)
} // Don't forget: at this point, we're in the main thread context again!
}
}
private suspend fun getUsersFromApiThroughCoroutine(coroutineScope: CoroutineScope) {
val userList = getUsersFromApi(NoParameters()) // List<User>
.take(10) // Github API has a hourly call limit :D and 10 are more than enough for what we're doing
.map { coroutineScope.async { getUserDetailsFromApi(it.username) } } // Yay parallelism!
.map { it.await() } // Wait for them to finish... These two last maps are pretty much a flatMap
if (userList.isNotEmpty()) {
Logger.d("Updating database")
updateCachedUsers(userList)
}
}
Before I start explaining what’s happening here, there are a few keypoints that you need to be aware of (if you know how coroutines are launched and what contexts and jobs are, you can skip to the next code snippet):
-
Coroutines are launched through a
CoroutineBuilder
. The typical ones arelaunch
andasync
(theres alsorunBlocking
, which we’ll use for testing). As you can see above, you can launch coroutines inside coroutines: we’re launching coroutines withasync
, while in another coroutine that was launched bylaunch
. Theasync
builder is used for concurrent tasks. You execute concurrent coroutines with it and wait for them with theawait
suspending function, which suspends the parent coroutine until the async children finish; -
Dispatchers
are used to confine coroutine execution to specific threads. They are used either with aCoroutineBuilder
or with thewithContext
suspendable function; -
Every coroutine is bound to a
CoroutineScope
. These scopes let you bind the coroutine to specific lifecycles.CoroutineBuilder
s are actually extension functions defined inCoroutineScope
types (except forrunBlocking
). On Android, we probably want to avoidGlobalScope
, which is meant for coroutines that run throughout the app’s lifetime. In the code, I use aViewModelScope
to bind the coroutines to theViewModel
’s lifecycle; -
Coroutines can be cancelled. We can cancel them either by
throwing a
CancellationException
or through aJob
. Every coroutine is associated with aJob
. Canceling aJob
will cancel its coroutine.Job
s can form parent-child hierarchies, where cancellation of the parent also cancels all children, and failure/cancellation of a child also cancels the parent (except if the child throws aCancellationException
). There’s also theSupervisorJob
, where a child can fail without affecting other children or the parent. The typical way to obtain a coroutine’sJob
is either by storing the return value of thelaunch
builder, or by accessing it directly inside the coroutine; -
Just like Android, coroutines have a
context. It’s main elements are the
coroutine’s
Job
andDispatcher
. When the coroutine is launched, we have the choice of passing aCoroutineContext
to the builder. If we don’t, it’ll use a default one. We can also change the coroutine’s context later through thewithContext
function. A bunch of different classes implement theCoroutineContext
interface, which is really handy (for instance,Dispatchers
implement it, so we can pass them to aCoroutineBuilder
orwithContext
).
Back to the code. Like I did with the RxJava version, I’m going to pretend that there are no layers and join the whole thing:
// I don't like try-catch. So we're using an exception handler instead
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
handleErrors(throwable)
}
// we want the coroutine to be bounded to the `ViewModel`'s lifecycle (it's on the main thread)
viewModelScope.launch(exceptionHandler) {
// But the request should go to the background
withContext(Dispatchers.IO) {
val userList = api.getAllUsers()
.map { userMapper.mapToEntity(it) }
.take(10) // Github API has a hourly call limit :D and 10 are more than enough for what we're doing
.map {
async { // Yay concurrency!
val detailedUser = api.getUserDetails(it.username.value)
detailedUserMapper.mapToEntity(detailedUser)
}
}
.map { it.await() } // Wait for all calls to finish... These two last maps are pretty much a flatMap
if (userList.isNotEmpty()) {
Logger.d("Updating database")
updateCachedUsers(userList)
}
} // Don't forget: at this point, we're in the main thread context again!
}
Lets begin:
-
We start by creating a
CoroutineExceptionHandler
. Coroutines bubble up exceptions all the way to the top-most coroutine, so instead of havingtry-catch
blocks all over the place, we pass an exception handler to the top-most coroutine. This handler works for simple cases, as it will handle all exceptions (note that child coroutines can still have their own exception handling mechanisms); -
We
launch
a coroutine on theviewModelScope
and pass it the exception handler (CoroutineExceptionHandler
also implementsCoroutineContext
). By launching the coroutine on this scope, we’re also binding it to the main thread. On a side note, we need to clear all theviewModelScope
’sJob
s at the end of theViewModel
’s lifecycle, just like we do with Rx’sCompositeDisposable
; -
As soon as we start the coroutine, we change its context so
that it’ll run on the IO thread pool. Now, this is
very important: the code inside
withContext
’s lambda will run on an IO thread. As soon as the lambda ends in line 25, we’re back to the main thread! Unlike RxJava where we’re used to bind the upstream to a thread and the downstream to another thread, coroutines rely on the actual blocks of code. In other words, everything inside thelaunch {}
block will run on the main thread, but since we explicitly specify that an inner block should run on an IO thread withwithContext
, so it will be; - We get the users from the API, map them to domain entities and take the first ten;
-
Now for the other interesting bit. For each
User
, we’re launching a new coroutine withasync
in order to fetch the details from the API. We then map each response item to a domain entity. Due to theasync
calls, thismap
will returnList<Deferred<DetailedUser>>
, as eachasync
returns aDeferred<T>
type. Allasync
calls effectively run in a concurrent manner; -
Right below, we have another
map
being called. We’re calling it so that we can callawait
on each of theDeferred
values, in order for the coroutine to suspend until they all finish. In the end, by using thismap
together with the one with theasync
calls, we’re mimicking what an Rx’sflatMap
would do; -
That’s it. The rest is just sequential, normal code.
However, a side note on this.
updateCachedUsers
is a call that gets propagated through the repository until it triggers Room to actually update itself with the new data. Now, the caveat: if the actual Room function was asuspend
function, thisupdateCachedUsers
call above would have to be out thewithContext
block, and be executed in the main thread. As it turns out, Roomsuspend
functions are main-safe, as Room uses its own customDispatcher
- calling it from any other thread other that main will only slow things down.
Final words
This concludes the part 1 of this series. I wanted to include both testing and performance measurements for this refactoring in this article as well, but damn, this is already huge as it is. That said, in part 2 I’ll be writing about how can you unit test both implementations (although I don’t agree that you should in this specific case, but I’ll explain why in the article) and compare them both in terms of performance. I’m expecting them to be really close to each other in execution time, but I’m rather curious with the memory footprint of each implementation. If I had to bet, I’d say that RxJava is heavier on memory usage, but we’ll see.
As for RxJava vs. coroutines… Well, as I said in the beginning, you really can’t compare two different things. The only thing I can say right now is that both are really fun to use, and if your RxJava use case is one where you only use it for async work and you’re looking for an alternative, you should totally consider coroutines.
Thank you so much for taking your time to read this. Do you have any thoughts or opinions? If so, please leave a comment or reach out on Twitter. See you in part 2!