FrangSierra/RxFirebase

Flowable not working on proper scheduler?

karlkar opened this issue · 1 comments

Hi.
I'm trying to use RxFirebaseDatabase with LiveData, so I made a following code:

package pl.karol.common

import android.arch.lifecycle.LiveData
import android.util.Log
import com.google.firebase.database.FirebaseDatabase
import durdinapps.rxfirebase2.RxFirebaseDatabase
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import pl.karol.common.data.Employee

class EmployeesLiveData : LiveData<List<Employee>>() {

    private val mQuery = FirebaseDatabase.getInstance().getReference("/employees")

    private var mDisposable: Disposable? = null

    override fun onActive() {
        super.onActive()
        mDisposable = RxFirebaseDatabase.observeValueEvent(mQuery)
                .map {
                    Log.d(TAG, "onActive: " + Thread.currentThread().name)
                    it.children
                            .mapNotNull {
                                val emp = it.getValue(Employee::class.java)
                                emp?.id = it.key
                                emp
                            }
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    value = it
                }
    }

    override fun onInactive() {
        mDisposable?.dispose()
        mDisposable = null
        super.onInactive()
    }

    companion object {
        private const val TAG = "EmployeesLiveData"
    }
}

As far as I understand Rx the log message should come from Io thread. However what I see in logcat is:

03-01 23:15:11.931 31649-31649/pl.karol.promel D/EmployeesLiveData: onActive: main

Did I misunderstand something? To fix this I have to do:

mDisposable = RxFirebaseDatabase.observeValueEvent(mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .map {
                    Log.d(TAG, "onActive: " + Thread.currentThread().name)
                    it.children
                            .mapNotNull {
                                val emp = it.getValue(Employee::class.java)
                                emp?.id = it.key
                                emp
                            }
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    value = it
                }

The library is a wrapper itself of Reactive, it doesnt affect to the use of the schedulers.

RxJava use by default the thread from where you do the call. I haven't work with the new Architecture patterns and LiveData objets, but in your case I think that in onActive the call is being made in a different Thread. Thats why your log is not shown from IO.

RxJava move your thread to IO in specific cases where it should do extra work with your data(such as a debounce, an interval flowable, a reduce call....). But in this case, the thread will be by default the same from where you declare it.

You can read more about it in this article : https://blog.gojekengineering.com/multi-threading-like-a-boss-in-android-with-rxjava-2-b8b7cf6eb5e2

Happy coding!