Flowable not working on proper scheduler?
karlkar opened this issue · 1 comments
Hi.
I'm trying to use RxFirebaseDatabase with LiveData, so I made a following code:
package pl.karol.common
import android.arch.lifecycle.LiveData
import android.util.Log
import com.google.firebase.database.FirebaseDatabase
import durdinapps.rxfirebase2.RxFirebaseDatabase
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import pl.karol.common.data.Employee
class EmployeesLiveData : LiveData<List<Employee>>() {
private val mQuery = FirebaseDatabase.getInstance().getReference("/employees")
private var mDisposable: Disposable? = null
override fun onActive() {
super.onActive()
mDisposable = RxFirebaseDatabase.observeValueEvent(mQuery)
.map {
Log.d(TAG, "onActive: " + Thread.currentThread().name)
it.children
.mapNotNull {
val emp = it.getValue(Employee::class.java)
emp?.id = it.key
emp
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
value = it
}
}
override fun onInactive() {
mDisposable?.dispose()
mDisposable = null
super.onInactive()
}
companion object {
private const val TAG = "EmployeesLiveData"
}
}
As far as I understand Rx the log message should come from Io thread. However what I see in logcat is:
03-01 23:15:11.931 31649-31649/pl.karol.promel D/EmployeesLiveData: onActive: main
Did I misunderstand something? To fix this I have to do:
mDisposable = RxFirebaseDatabase.observeValueEvent(mQuery)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map {
Log.d(TAG, "onActive: " + Thread.currentThread().name)
it.children
.mapNotNull {
val emp = it.getValue(Employee::class.java)
emp?.id = it.key
emp
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
value = it
}
The library is a wrapper itself of Reactive, it doesnt affect to the use of the schedulers.
RxJava use by default the thread from where you do the call. I haven't work with the new Architecture patterns and LiveData objets, but in your case I think that in onActive
the call is being made in a different Thread. Thats why your log is not shown from IO.
RxJava move your thread to IO in specific cases where it should do extra work with your data(such as a debounce
, an interval flowable
, a reduce
call....). But in this case, the thread will be by default the same from where you declare it.
You can read more about it in this article : https://blog.gojekengineering.com/multi-threading-like-a-boss-in-android-with-rxjava-2-b8b7cf6eb5e2
Happy coding!