In this article we will see basics of RxJava with some examples.This article will focus on abcs of RxJava, for those who are RxJava beginners.
We are what we repeatedly do. Excellence, then, is not an act, but a habit. Try out Justly and start building your habits today!
RxJava is implementation of ReactiveX library, mainly useful for the purpose of background tasks and threading.We can easily handle asynchronous calls like network call (api call). Here if we execute our network call on main thread then it may block our main thread until response comes from network call.As a developer with the help of RxJava you don’t need to worry too much about threading, since RxJava will automatically manage it for us.
RxJava basically deals with 3 O’s. These are Observable, Observer and Operator. In the world of RxJava everything can be considered as streams.
Here, a stream emits items, and each item can be consumed.The Observable emits items(streams) and those items can be consumed by Observers. Observers can also be sometimes referred as a Subscribers as they subscribes to items emitted by Observable. Now, the items that are emitted from Observable can be further modified with the help of Operators.
Observables will not start emitting the items until someone subscribes to them.Observable calls Subscriber.onNext() for all the items.If something goes wrong then Subscriber.onError() will be called and if all items emitted successfully then finally Subscriber.onComplete() will be called. Let’s see this with few Examples.In this article we will see basics of RxJava with some examples.This article will focus on abcs of RxJava, for those who are RxJava beginners
add following dependencies in your Module level build.gradle file.
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
implementation 'io.reactivex.rxjava2:rxkotlin:2.4.0'
add the following plugin in your Module level build.gradle file, if you intend to use kotlin-android-extensions that allows to access view directly from your XML into activities, fragments etc.
id 'kotlin-android-extensions'
Create new android studio project with EmptyActivity. In your activity_main.xml file paste following.
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<Button
android:id="@+id/btnEx1"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="20dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="20dp"
android:text="Ex1"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/btnEx2"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="Ex2"
app:layout_constraintStart_toEndOf="@id/btnEx1"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/btnEx3"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="Ex3"
app:layout_constraintStart_toEndOf="@+id/btnEx2"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/btnEx4"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="Ex4"
app:layout_constraintStart_toEndOf="@+id/btnEx3"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/btnEx5"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="20dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="20dp"
android:text="Ex5"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/btnEx1" />
<Button
android:id="@+id/btnEx6"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="buffer"
app:layout_constraintStart_toEndOf="@id/btnEx5"
app:layout_constraintTop_toBottomOf="@id/btnEx1" />
<Button
android:id="@+id/btnEx7"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="map"
app:layout_constraintStart_toEndOf="@id/btnEx6"
app:layout_constraintTop_toBottomOf="@id/btnEx1" />
<Button
android:id="@+id/btnEx8"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="4dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="4dp"
android:text="Ex8"
app:layout_constraintStart_toEndOf="@id/btnEx7"
app:layout_constraintTop_toBottomOf="@id/btnEx1" />
<Button
android:id="@+id/btnEx9"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_marginStart="20dp"
android:layout_marginTop="4dp"
android:layout_marginEnd="20dp"
android:text="zip"
app:layout_constraintStart_toStartOf="parent"
app:layout_constraintTop_toBottomOf="@id/btnEx5" />
</androidx.constraintlayout.widget.ConstraintLayout>
Now, inside your MainActivity set click listeners for buttons. and inside the click listeners we will call various functions that will use RxJava.(like this btnEx1.setOnClickListener { startStream() })
private fun startStream() {
val list: List<String> = listOf("1", "2", "3", "4", "5", "6")
list.toObservable()
.subscribeBy(
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("onComplete") }
).addTo(compositeDisposable)
}
Here, we will call this function from click listener of btnEx1
simply we are creating a list of strings and then list.toObservable() will convert the list into observable. Then observable will call subscriber’s onNext() and we can see emitted items in Logcat and finally observable will call onComplete. Here composite disposable will helpful in avoid any kind of resource leaking.
Output: (available in Logcat):
com.example.rxjavacomplexoperators I/System.out: 1
com.example.rxjavacomplexoperators I/System.out: 2
com.example.rxjavacomplexoperators I/System.out: 3
com.example.rxjavacomplexoperators I/System.out: 4
com.example.rxjavacomplexoperators I/System.out: 5
com.example.rxjavacomplexoperators I/System.out: 6
com.example.rxjavacomplexoperators I/System.out: onComplete
private fun startStream2() {
Observable.just("Hello", "world", "How", "are", "you?")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
).addTo(compositeDisposable)
}
Here, just operator will convert the items into the Observables and instead of using subscribeBy() we have used subscribe(). Observable will emit strings and subscriber will consume those strings and finally observable will call onComplete.
Output:
com.example.rxjavacomplexoperators I/System.out: Received: Hello
com.example.rxjavacomplexoperators I/System.out: Received: world
com.example.rxjavacomplexoperators I/System.out: Received: How
com.example.rxjavacomplexoperators I/System.out: Received: are
com.example.rxjavacomplexoperators I/System.out: Received: you?
com.example.rxjavacomplexoperators I/System.out: Completed
Observable.fromArray("Apple", "Orange", "banana")
.subscribe { println(it) }
.addTo(compositeDisposable)
}
Here, Observable.fromArray() will create observable and subscriber will simply print the output in Logcat. As this is super simple we are not managing Error.
Output:
com.example.rxjavacomplexoperators I/System.out: Apple
com.example.rxjavacomplexoperators I/System.out: Orange
com.example.rxjavacomplexoperators I/System.out: banana
private fun startStream4() {
Observable.fromIterable(listOf("Titan", "Fastrack", "Sonata"))
.subscribe(
{ value -> println("Received:$value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
Here, Observable.fromIterable() will create observable and then subscriber will print the output, error(if there) and Done message(on Complete) in logcat.
Output:
com.example.rxjavacomplexoperators I/System.out: Received:Titan
com.example.rxjavacomplexoperators I/System.out: Received:Fastrack
com.example.rxjavacomplexoperators I/System.out: Received:Sonata
com.example.rxjavacomplexoperators I/System.out: Done
private fun startStream5() {
getObservableFromList(listOf("Summer", "Winter", "Monsoon"))
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
here, getObservableFromList function will create observable and rest is same as earlier example. kind is nothing but simple string that will verify if it is null then it will throw Exception , otherwise with the given string onNext() and finally onComplete() will be called.
private fun getObservableFromList(myList: List<String>) =
Observable.create<String> { emitter ->
myList.forEach { kind ->
if (kind == "") {
emitter.onError(Exception("Nothing to show"))
}
emitter.onNext(kind)
}
emitter.onComplete()
}
Output:
com.example.rxjavacomplexoperators I/System.out: Received: Summer
com.example.rxjavacomplexoperators I/System.out: Received: Winter
com.example.rxjavacomplexoperators I/System.out: Received: Monsoon
com.example.rxjavacomplexoperators I/System.out: Done
private fun startStream6() {
Observable.just("A", "B", "C", "D", "E", "F", "G", "H")
.buffer(2)
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
Here, we have used buffer operator of RxJava, what it simple does is instead of emitting items one at a time it will collect the items emitted by Observable into Bundle as per buffer count (here 2) and then emit these bundled items.
Output:
com.example.rxjavacomplexoperators I/System.out: Received: [A, B]
com.example.rxjavacomplexoperators I/System.out: Received: [C, D]
com.example.rxjavacomplexoperators I/System.out: Received: [E, F]
com.example.rxjavacomplexoperators I/System.out: Received: [G, H]
com.example.rxjavacomplexoperators I/System.out: Done
private fun startStream7() {
val observable = Observable.fromArray(1, 2, 3, 4)
val transformation = observable.map { e ->
e * 2
}
transformation.filter { e ->
e > 2
}
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
Here, we have used map, filter operators of rxJava , map can do lot of things, it simply transforms the items emitted by Observable as per our requirement,here map operator multiplies each items emitted by observable into 2 and then we are using filter that simply filters the items emitted by observables which are larger than 2.
Output:
com.example.rxjavacomplexoperators I/System.out: Received: 4
com.example.rxjavacomplexoperators I/System.out: Received: 6
com.example.rxjavacomplexoperators I/System.out: Received: 8
com.example.rxjavacomplexoperators I/System.out: Done
private fun startStream8() {
Observable.range(1, 20)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter { e ->
return@filter e % 2 == 0
}
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
Here, range operator will create an Observable that will emit the items in the given range(i.e 1 , 2 , 3 , … 20). Then we are applying filter operator that will filter the items emitted by Observables which are divisible by 2 (even numbers between 1 to 20).
.subscribeOn(Schedulers.io()): it is a thread from a thread pool that will be used for performing Network call, database interactions, etc.The main purpose of this thread is to perform asynchronous call on other than main thread.
.observeOn(AndroidSchedulers.mainThread()): If after completion of asynchronous call if we want to get our result back on the main thread we should use this. However, in our examples these statements are not mandatory as we are not performing any asynchronous call (network call, database interactions).
Output:
com.example.rxjavacomplexoperators I/System.out: Received: 2
com.example.rxjavacomplexoperators I/System.out: Received: 4
com.example.rxjavacomplexoperators I/System.out: Received: 6
com.example.rxjavacomplexoperators I/System.out: Received: 8
com.example.rxjavacomplexoperators I/System.out: Received: 10
com.example.rxjavacomplexoperators I/System.out: Received: 12
com.example.rxjavacomplexoperators I/System.out: Received: 14
com.example.rxjavacomplexoperators I/System.out: Received: 16
com.example.rxjavacomplexoperators I/System.out: Received: 18
com.example.rxjavacomplexoperators I/System.out: Received: 20
com.example.rxjavacomplexoperators I/System.out: Done
private fun startStream9() {
val list: List<String> = listOf("1", "2", "3", "4", "5", "6", "7")
val numObservable = list.toObservable()
val charObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H")
val zipper = BiFunction<String, String, String> { t1, t2 -> "$t1-$t2" }
Observable.zip(
numObservable, charObservable,
zipper
)
.subscribeOn(Schedulers.io())
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
)
.addTo(compositeDisposable)
}
Here, we have taken two observables (numObservable and charObservable). Zipper is a BiFunction that will accept two input(t1,t2) and produces result.(t1-t2). Zip operator of rxjava pairs emitted items from two observables. We are using dash (-) to seperate out two observable items.
Output:
com.example.rxjavacomplexoperators I/System.out: Received: 1-A
com.example.rxjavacomplexoperators I/System.out: Received: 2-B
com.example.rxjavacomplexoperators I/System.out: Received: 3-C
com.example.rxjavacomplexoperators I/System.out: Received: 4-D
com.example.rxjavacomplexoperators I/System.out: Received: 5-E
com.example.rxjavacomplexoperators I/System.out: Received: 6-F
com.example.rxjavacomplexoperators I/System.out: Received: 7-G
com.example.rxjavacomplexoperators I/System.out: Done
package com.example.rxjavacomplexoperators
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import io.reactivex.Observable
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.functions.BiFunction
import io.reactivex.rxkotlin.addTo
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import kotlinx.android.synthetic.main.activity_main.*
import java.lang.Exception
class MainActivity : AppCompatActivity() {
private val compositeDisposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btnEx1.setOnClickListener { startStream() }
btnEx2.setOnClickListener { startStream2() }
btnEx3.setOnClickListener { startStream3() }
btnEx4.setOnClickListener { startStream4() }
btnEx5.setOnClickListener { startStream5() }
btnEx6.setOnClickListener { startStream6() }
btnEx7.setOnClickListener { startStream7() }
btnEx8.setOnClickListener { startStream8() }
btnEx9.setOnClickListener { startStream9() }
}
override fun onDestroy() {
super.onDestroy()
compositeDisposable.clear()
}
private fun startStream() {
val list: List<String> = listOf("1", "2", "3", "4", "5", "6")
list.toObservable()
.subscribeBy(
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("onComplete") }
).addTo(compositeDisposable)
}
private fun startStream2() {
Observable.just("Hello", "world", "How", "are", "you?")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
).addTo(compositeDisposable)
}
private fun startStream3() {
Observable.fromArray("Apple", "Orange", "banana")
.subscribe { println(it) }.addTo(compositeDisposable)
}
private fun startStream4() {
Observable.fromIterable(listOf("Titan", "Fastrack", "Sonata"))
.subscribe(
{ value -> println("Received:$value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
private fun startStream5() {
getObservableFromList(listOf("Summer", "Winter", "Monsoon"))
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
private fun startStream6() {
Observable.just("A", "B", "C", "D", "E", "F", "G", "H")
.buffer(2)
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
private fun startStream7() {
val observable = Observable.fromArray(1, 2, 3, 4)
val transformation = observable.map { e ->
e * 2
}
transformation.filter { e ->
e > 2
}
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
private fun startStream8() {
Observable.range(1, 20)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter { e ->
return@filter e % 2 == 0
}
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
).addTo(compositeDisposable)
}
private fun startStream9() {
val list: List<String> = listOf("1", "2", "3", "4", "5", "6", "7")
val numObservable = list.toObservable()
val charObservable = Observable.just("A", "B", "C", "D", "E", "F", "G", "H")
val zipper = BiFunction<String, String, String> { t1, t2 -> "$t1-$t2" }
Observable.zip(
numObservable, charObservable,
zipper
)
.subscribeOn(Schedulers.io())
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Done") }
)
.addTo(compositeDisposable)
}
}
private fun getObservableFromList(myList: List<String>) =
Observable.create<String> { emitter ->
myList.forEach { kind ->
if (kind == "") {
emitter.onError(Exception("Nothing to show"))
}
emitter.onNext(kind)
}
emitter.onComplete()
}
It is just abcs of RxJava. There are lot more stuffs you can do with RxJava. I hope this tutorial will help you to start with RxJava with the simplest examples.
Thanks for your support!
Whether you need...