I am using Firebase in my app, along with RxJava.
Firebase is capable of notify your app whenever something changed in the backend data (addition, removals, changes, ...).
I am trying to combine the feature of Firebase with RxJava.
The data I am listening for is called Leisure
, and the Observable
emits LeisureUpdate
which contains a Leisure
and the type of update (add, remove, moved, changed).
Here is my method which allows to subscribe to this events.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
First off, I would like to use Observable.fromCallable()
method in order to create the Observable
, but I guess it is impossible, since Firebase uses callbacks, right?
I keep a single instance of the Observable
in order to always have one Observable
where multiple Subscriber
can subscribe.
The problem comes when everyone unsubscribe and I need to stop listening for the events in Firebase.
I didn't find anyway to make the Observable
understand if there is any subscription still. So I keep counting how many calls I got to subscribeToLeisuresUpdates()
, with leisureUpdatesSubscriptionsCount
.
Then every time someone wants to unsubscribe it has to call
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
This is the only way I found to make the Observable
emits items when there is a subscriber, but I feel like there must be an easier way, specially understanding when there is no more subscribers listening to the observable.
Anyone who encountered a similar problem or have a different approach?
See Question&Answers more detail:
os