7 useful ways to create Flow in Kotlin

Learn different ways to create Kotlin Flow with examples
May 10 2022 · 4 min read

Introduction 

In this blog post we are going to learn different ways to create Kotlin Flow with examples.

7 Ways to Create a Flow

1. flowOf()

2. asFlow()

3. flow{}

4. MutableStateFlow & MutableSharedFlow()

5. ChannelFlow

6. CallbackFlow

7. emptyFlow

Let’s get started!

1. flowOf( …)

flowOf() creates an instance of flow from a given set of items. Basically, it’s a static flow builder which has pre-fined items to emit. In the RxJava world we have just to create an observable.

Example


val userNameFlow = flowOf("Jhone", "Ely", "Jimmy")
userNameFlow.collect {
      println("$it") 
}

2. asFlow()

asFlow() is an extension function. With asFlow() you can convert collections, ranges, values and sequences etc… to Flow.

Example


val userNameFlow = listOf("Jhone", "Ely", "Jimmy").asFlow()
userNameFlow.collect {
      println("$it") 
}
// Convert Int range to flow
(1..10).asFlow().collect { println("$it") }

3. flow{ }

Creates an instance of cold flow from the given suspendable block where we can manually emit values. It’ll only emit once when we start collecting it. In RxJava world Observable.create{} is equivalent to flow{}

Example :

Let's assume we’re showing a live show, we need to make an API call to fetch the live show updates in 10-sec interval.


val showFlow = flow<Show> {
    while (fetch) {
        val show = fetchLiveShow()
        emit(show)
        delay(10000)
    }
}

showFlow.collect { 
    // Update Live show
}

4. MutableStateFlow & MutableSharedFlow

It defines the corresponding constructor functions to create a hot flow that can be directly updated.

MutableStateFlow :

The mutable state holder flow emits the current and initial value to its collector. MutableStateFlow is a hot flow. With StateFlow data is a state.

Example:

We’ll create MutableStateFlow of UiState to update our UI to show progress while making API call and show upcoming shows once we have shows from the API call.


// In viewModel
private val _uiState = MutableStateFlow<UiState>(UiState.Success(emptyList<Show>))
// Immutable state flow, UI will updates accordingly StateFlow
val uiState : StateFlow<UiState> = _uiState

init {
    viewModelScope.launch {
        _uiState.tryEmit(UiState.Loading)
        showRepository.fetchUpcomingShow()
            .collect { shows ->
                _uiState.tryEmit(UiState.Success(shows))
            }
    }
}
sealed class UiState{
   object Loading : UiState()
    data class Success(shows : List<Show>) : UiState()
}

MutableSharedFlow :

The mutable shared flow that shares all emitted values among all its collector. With SharedFlow data is basically an event.

Example:


runBlocking {
    val userFlow = MutableSharedFlow<User>(replay = 1)

    userFlow.tryEmit(User(name = "User 0"))
    userFlow.tryEmit(User(name = "User 1"))
    userFlow.tryEmit(User(name = "User 2"))

    val job1  = userFlow.onEach {
        Log.e("Job1","Collected $it")
    }.launchIn(this)

    val job2  = userFlow.onEach {
        Log.e("Job2","Collected $it")
    }.launchIn(this)

    delay(1000)
    job1.cancel()
    delay(1000)
    job2.cancel()
}
//Output
Job1: Collected User(name=User 2)
Job2: Collected User(name=User 2)

Here, we’ve MutableSharedFlow with replay 1 which means it will cache the number of items specified in replay for its future collector. In our example, both job1 and job2 received the last emitted item User(name=User 2)

5. ChannelFlow

Creates an instance of cold flow with the elements that are sent to a SendChannel.

ChannelFlow produces the values in a separate coroutine without waiting for a receiver to complete processing on emitted items. Whereas normal flow works sequentially, emit() suspends until the consumer finishes working on the latest item.

Let’s compare the output of normal flow and channel flow

Example 1: normal Flow


val scope = CoroutineScope(Dispatchers.IO)

val flow = flow<String> {
    println("emit : 1")
    emit("1")
    println("emit : 2")
    emit("2")
}

scope.launch {
    flow.collect {
        println("collect $it")
        delay(5000)
    }
}

with normal flow, the output will be something like this...

emit : 1
collect 1
// 5 sec delay
emit : 2
collect 2

Here, the producer emits the next item only after the previous item is processed by the consumer.

Example 2: ChannelFlow


val scope = CoroutineScope(Dispatchers.IO)

val flow = channelFlow {
    println("send : 1")
    send("1")
    println("send : 2")
    send("2")
}

scope.launch {
    flow.collect {
        println("collect $it")
        delay(5000)
    }
}

And the output:

send : 1
send : 2
collect  1
// 5 sec delay
collect 2

Here, the producer does not wait for its receiver, its works independently from its consumer.

6. CallbackFlow

Creates an instance of cold flow that convert listener/callbacks into Flows like button click. We can’t use Simple Flow because emit() is suspend function,callbackFlow allows values to be emitted from a different CoroutineContext with the send function or outside a coroutine with the trySend function.

Example :


fun networkAvailabilityFlow(context: Context): Flow<Boolean>
        = callbackFlow {
    val callback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            trySend(true)
        }
        override fun onLost(network: Network) {
            trySend(false)
        }
    }
    val manager = context.getSystemService(Context.CONNECTIVITY_SERVICE)
            as ConnectivityManager
    manager.registerNetworkCallback(NetworkRequest.Builder().run {
        addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
        build()
    }, callback)

    awaitClose {
        manager.unregisterNetworkCallback(callback)
    }
}

Here, we’ve converted NetworkCallback to Flow, which emit the state of net availability to its collector. Let’s see how we can use this flow.


val isNetworkAvailableFlow = networkAvailabilityFlow(context)
    .collect { isConnected -> 
        // notify user about network state
    }

awaitClose is important in CallbackFlow without it, the coroutine will end up immediately. awaitClose invokes its arguments once the channel is closed.

7. emptyFlow

Create an empty flow.

Example :

Suppose our fetchUpcomingShows API call requires token, so what if our token is null? In that case we’ll return as empty flow.


fun fetchUpcomingShows(token: String?): Flow<List<Show>> {
    return if (token.isNullOrEmpty()) {
        emptyFlow<List<Show>>()
    } else {
        flow<Show> {
            val shows = fetchUpcomingShows()
            emit(shows)
        }
    }
  }
}

Conclusion 

That’s it!! So today you’ve learned different ways to create flow. Each way has its own unique usage, so it’s good to know them before using them in your project.

As always, suggestions and feedback are more than welcome.

Hope you found this blog post helpful.

Thank you!!


radhika-s image
Radhika saliya
Android developer | Sharing knowledge of Jetpack Compose & android development


radhika-s image
Radhika saliya
Android developer | Sharing knowledge of Jetpack Compose & android development

Whether you need...

  • *
    High-performing mobile apps
  • *
    Bulletproof cloud solutions
  • *
    Custom solutions for your business.
Bring us your toughest challenge and we'll show you the path to a sleek solution.
Talk To Our Experts
footer
Subscribe Here!
Follow us on
2024 Canopas Software LLP. All rights reserved.