In this blog post we are going to learn different ways to create Kotlin Flow with examples.
1. flowOf()
2. asFlow()
3. flow{}
4. MutableStateFlow & MutableSharedFlow()
5. ChannelFlow
6. CallbackFlow
7. emptyFlow
flowOf()
creates an instance of flow from a given set of items. Basically, it’s a static flow builder which has pre-fined items to emit. In the RxJava world we have just
to create an observable.
Example
val userNameFlow = flowOf("Jhone", "Ely", "Jimmy")
userNameFlow.collect {
println("$it")
}
asFlow()
is an extension function. With asFlow()
you can convert collections, ranges, values and sequences etc… to Flow.
Example
val userNameFlow = listOf("Jhone", "Ely", "Jimmy").asFlow()
userNameFlow.collect {
println("$it")
}
// Convert Int range to flow
(1..10).asFlow().collect { println("$it") }
Creates an instance of cold flow from the given suspendable block where we can manually emit values. It’ll only emit once when we start collecting it. In RxJava world Observable.create{}
is equivalent to flow{}
Example :
Let's assume we’re showing a live show, we need to make an API call to fetch the live show updates in 10-sec interval.
val showFlow = flow<Show> {
while (fetch) {
val show = fetchLiveShow()
emit(show)
delay(10000)
}
}
showFlow.collect {
// Update Live show
}
It defines the corresponding constructor functions to create a hot flow that can be directly updated.
The mutable state holder flow emits the current and initial value to its collector. MutableStateFlow is a hot flow. With StateFlow
data is a state.
Example:
We’ll create MutableStateFlow
of UiState
to update our UI to show progress while making API call and show upcoming shows once we have shows from the API call.
// In viewModel
private val _uiState = MutableStateFlow<UiState>(UiState.Success(emptyList<Show>))
// Immutable state flow, UI will updates accordingly StateFlow
val uiState : StateFlow<UiState> = _uiState
init {
viewModelScope.launch {
_uiState.tryEmit(UiState.Loading)
showRepository.fetchUpcomingShow()
.collect { shows ->
_uiState.tryEmit(UiState.Success(shows))
}
}
}
sealed class UiState{
object Loading : UiState()
data class Success(shows : List<Show>) : UiState()
}
The mutable shared flow that shares all emitted values among all its collector. With SharedFlow
data is basically an event.
Example:
runBlocking {
val userFlow = MutableSharedFlow<User>(replay = 1)
userFlow.tryEmit(User(name = "User 0"))
userFlow.tryEmit(User(name = "User 1"))
userFlow.tryEmit(User(name = "User 2"))
val job1 = userFlow.onEach {
Log.e("Job1","Collected $it")
}.launchIn(this)
val job2 = userFlow.onEach {
Log.e("Job2","Collected $it")
}.launchIn(this)
delay(1000)
job1.cancel()
delay(1000)
job2.cancel()
}
//Output
Job1: Collected User(name=User 2)
Job2: Collected User(name=User 2)
Here, we’ve MutableSharedFlow
with replay
1 which means it will cache the number of items specified in replay
for its future collector. In our example, both job1 and job2 received the last emitted item User(name=User 2)
Creates an instance of cold flow with the elements that are sent to a SendChannel.
ChannelFlow produces the values in a separate coroutine without waiting for a receiver to complete processing on emitted items. Whereas normal flow works sequentially, emit()
suspends until the consumer finishes working on the latest item.
Let’s compare the output of normal flow and channel flow
val scope = CoroutineScope(Dispatchers.IO)
val flow = flow<String> {
println("emit : 1")
emit("1")
println("emit : 2")
emit("2")
}
scope.launch {
flow.collect {
println("collect $it")
delay(5000)
}
}
with normal flow, the output will be something like this...
emit : 1
collect 1
// 5 sec delay
emit : 2
collect 2
Here, the producer emits the next item only after the previous item is processed by the consumer.
val scope = CoroutineScope(Dispatchers.IO)
val flow = channelFlow {
println("send : 1")
send("1")
println("send : 2")
send("2")
}
scope.launch {
flow.collect {
println("collect $it")
delay(5000)
}
}
And the output:
send : 1
send : 2
collect 1
// 5 sec delay
collect 2
Here, the producer does not wait for its receiver, its works independently from its consumer.
Creates an instance of cold flow that convert listener/callbacks into Flows like button click. We can’t use Simple Flow
because emit()
is suspend function,callbackFlow
allows values to be emitted from a different CoroutineContext
with the send
function or outside a coroutine with the trySend
function.
Example :
fun networkAvailabilityFlow(context: Context): Flow<Boolean>
= callbackFlow {
val callback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(true)
}
override fun onLost(network: Network) {
trySend(false)
}
}
val manager = context.getSystemService(Context.CONNECTIVITY_SERVICE)
as ConnectivityManager
manager.registerNetworkCallback(NetworkRequest.Builder().run {
addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
build()
}, callback)
awaitClose {
manager.unregisterNetworkCallback(callback)
}
}
Here, we’ve converted NetworkCallback
to Flow, which emit the state of net availability to its collector. Let’s see how we can use this flow.
val isNetworkAvailableFlow = networkAvailabilityFlow(context)
.collect { isConnected ->
// notify user about network state
}
awaitClose
is important in CallbackFlow
without it, the coroutine will end up immediately. awaitClose
invokes its arguments once the channel is closed.
Create an empty flow.
Example :
Suppose our fetchUpcomingShows
API call requires token, so what if our token is null? In that case we’ll return as empty flow.
fun fetchUpcomingShows(token: String?): Flow<List<Show>> {
return if (token.isNullOrEmpty()) {
emptyFlow<List<Show>>()
} else {
flow<Show> {
val shows = fetchUpcomingShows()
emit(shows)
}
}
}
}
That’s it!! So today you’ve learned different ways to create flow. Each way has its own unique usage, so it’s good to know them before using them in your project.
As always, suggestions and feedback are more than welcome.
Hope you found this blog post helpful.
Thank you!!
Whether you need...