By using the Flow
API with Firebase Realtime Database and Firestore, you can easily fetch data and listen for updates in your Android app, without the need for complex callbacks or asynchronous code.
In this article, we will look at how to use the Flow
API in Kotlin with Firebase Realtime Database and Firestore to fetch data in a reactive streaming manner.
Initially, we will understand an example but later we will create an extension that can convert any database reference to a Flow.
Let’s get started!
We are what we repeatedly do. Excellence, then, is not an act, but a habit. Try out Justly and start building your habits today!
Let’s assume we want to get a user from the path users/{id}
Let’s first define the repository that wraps database interaction and hide details from ViewModel.
class UserRepository {
private val db: FirebaseDatabase = FirebaseDatabase.getInstance()
fun getUser(userId: String): Flow<User?> {
return callbackFlow {
// Reference the user node in the database
val userRef = db.getReference("users").child(userId)
// Add a listener to the user node
val listener = userRef.addValueEventListener(object : ValueEventListener {
override fun onDataChange(dataSnapshot: DataSnapshot) {
// Emit the user data to the flow
trySend(dataSnapshot.getValue(User::class.java))
}
override fun onCancelled(error: DatabaseError) {
// Cancel the flow on error
cancel()
}
})
// Return the listener to be used to cancel the flow
awaitClose { userRef.removeEventListener(listener) }
}
}
}
In this example, the FirebaseDatabase
instance is defined in the init block. The getUser
function uses this instance to retrieve the user data from the database. It returns a Flow of User
objects.
And then sets up a ValueEventListener
to listen for updates to the specific user node in the database. The trySend
function is used to emit the updated user data whenever there is a change in the database.
In last, awaitClose
function is used to clean up the listener when the Flow is canceled.
Now, Let’s use this in a ViewModel.
class UserViewModel(private val repository: UserRepository) : ViewModel() {
private val _user = MutableLiveData<User>()
val user: LiveData<User>
get() = _user
fun observeUser(userId: String) {
viewModelScope.launch {
withContext(Dispatchers.IO) {
repository.getUser(userId).collect {
_user.postValue(it)
}
}
}
}
}
This is very straightforward, we start observing the user using collect
on a Flow, and post the value to liveData
.
That’s it if you want to write a one-time wrapper around Firebase Realtime Database.
However, most of the time you will need to do this multiple times in a project. Let’s see how we can write a simple extension to do this.
Basically, we will need to move our code of DatabaseRepository
in an extension function of DatabaseReference
. Let’s see how we can do that.
fun <T> DatabaseReference.addValueEventListenerFlow(dataType: Class<T>): Flow<T?> = callbackFlow {
val listener = object : ValueEventListener {
override fun onDataChange(dataSnapshot: DataSnapshot) {
val value = dataSnapshot.getValue(dataType)
trySend(value)
}
override fun onCancelled(error: DatabaseError) {
cancel()
}
}
addValueEventListener(listener)
awaitClose { removeEventListener(listener) }
}
This extension function takes dataType
as an argument and sets up ValueEventListener
to listen for updates to the node in the database.
To use this extension function, you can simply call it on the userRef
object. For example:
fun getUser(userId: String): Flow<User?> {
val userRef = FirebaseDatabase.getInstance().getReference("users").child(userId)
return userRef.addValueEventListenerFlow(User::class.java)
}
This extension function can be used to quickly convert ValueEventListener
into a Flow
, allowing you to use the standard Flow
operators to transform and consume the data in a reactive way.
Now let’s see how we can listen to real-time updates from FireStore.
private val firestore = FirebaseFirestore.getInstance()
fun getUser(userId: String): Flow<User?> = callbackFlow {
val listener = object : EventListener<DocumentSnapshot> {
override fun onEvent(snapshot: DocumentSnapshot?, exception: FirebaseFirestoreException?) {
if (exception != null) {
// An error occurred
cancel()
return
}
if (snapshot != null && snapshot.exists()) {
// The user document has data
val user = snapshot.toObject(User::class.java)
trySend(user)
} else {
// The user document does not exist or has no data
}
}
}
val registration = firestore.collection("users").document(userId).addSnapshotListener(listener)
awaitClose { registration.remove() }
}
The getUser method uses the addSnapshotListener method to listen for updates to the user document and emit the updated data to the flow.
Let’s use it in ViewModel.
class UserViewModel(private val repository: UserRepository) : ViewModel() {
private val _user = MutableStateFlow<User?>(null)
val user: StateFlow<User?>
get() = _user
fun getUser(userId: String) {
viewModelScope.launch {
withContext(Dispatchers.IO) {
repository.getUser(userId).collect { user ->
_user.value = user
}
}
}
}
Here we have used StateFlow
to listen to the updates from Repository
.
Now let’s create an extension function for the EventListener
interface in Firestore, to avoid boilerplate code.
The extension functions of both the Firebase real-time database DatabaseReference
and the Firestore DocumentReference
are kind of similar.
Let’s see how we can write an extension.
fun <T> DocumentReference.addSnapshotListenerFlow(dataType: Class<T>): Flow<T?> = callbackFlow {
val listener = object : EventListener<DocumentSnapshot> {
override fun onEvent(snapshot: DocumentSnapshot?, exception: FirebaseFirestoreException?) {
if (exception != null) {
// An error occurred
cancel()
return
}
if (snapshot != null && snapshot.exists()) {
// The document has data
val data = snapshot.toObject(dataType)
trySend(data)
} else {
// The document does not exist or has no data
}
}
}
val registration = addSnapshotListener(listener)
awaitClose { registration.remove() }
}
The above extension function defines an EventListener
object that listens for updates to the document and emits the updated data to the flow same as the Firebase database extension function.
To use the extension function, you can simply call it on a DocumentReference
object and collect the emitted items from the flow. For example:
fun getUser(userId: String): Flow<User?> {
val userRef = firestore.collection("users").document(userId)
return userRef.addSnapshotListenerFlow(User::class.java)
}
I hope the above code examples are helpful to understand how to use the callbackFlow
function and the EventListener interface in Firestore to fetch data and listen for updates in an Android app.
Remember to handle errors and cancel the flow when necessary, and make sure to remove the listener when it is no longer needed to avoid leaking resources.
That’s it for today, I hope you learned something!
This is just the basics of the Firebase database but is very useful as mainly we will use this for real-time changes in our database. Add/Update/Delete calls are mostly one-time calls, so there is no need to use Flow for those operations.
Let's Work Together
Not sure where to start? We also offer code and architecture reviews, strategic planning, and more.