Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
358 views
in Technique[技术] by (71.8m points)

android - Realm: working with Clean-Architecture and RxJava2

A bit of context, I’ve tried to apply some clean-architecture to one of my projects and I’m having trouble with the (Realm) disk implementation of my repository. I have a Repository which pulls some data from different DataStores depending on some conditions (cache). This is the theory, the problem comes when mixing all of this with UseCases and RxJava2.

First I get the list of objects from Realm and then I manually create an Observable of it. But the subscribe (as expected) is executed on a different thread so realm ends up crashing… (second block of code)

This is the code I use to create the Observables (from an abstract class DiskStoreBase):

Observable<List<T>> createListFrom(final List<T> list) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
            if (list != null) {
                emitter.onNext(list);
                emitter.onComplete();
            } else {
                emitter.onError(new ExceptionCacheNotFound());
            }
        }
    });
}

How can I deal with this scenario?

More code of DiskStoreForZone:

@Override
public Observable<List<ResponseZone>> entityList() {
    Realm realm = Realm.getDefaultInstance();
    List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
    return createListFrom(result);
}

The exact crash:

E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It doesn't work because despite using Rx, your data layer is not reactive.

Realm by its nature is a reactive datasource, and its managed objects by nature are also mutable (updated in place by Realm), and thread-confined (can only be accessed on the same thread where the Realm was opened).

For your code to work, you'd need to copy out the data from the Realm.

@Override
public Single<List<ResponseZone>> entityList() {
    return Single.fromCallable(() -> {
       try(Realm realm = Realm.getDefaultInstance()) {
           return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
       }
    });
}

I took the liberty and represented your Single as a Single, considering it's not an Observable, it does not listen for changes, there is only 1 event and that is the list itself. So sending it through an ObservableEmitter doesn't really make sense as it does not emit events.

Therefore, this is why I said: your data layer is not reactive. You are not listening for changes. You are just obtaining data directly, and you are never notified of any change; despite using Rx.


I drew some pictures in paint to illustrate my point. (blue means side-effects)

clean-architecture non-reactive

in your case, you call a one-off operation to retrieve the data from multiple data-sources (cache, local, remote). Once you obtain it, you don't listen for changes; technically if you edit the data in one place and another place, the only way to update is by "forcing the cache to retrieve the new data manually"; for which you must know that you modified the data somewhere else. For which you need a way to either directly call a callback, or send a message/event - a notification for change.

So in a way, you must create a cache invalidation notification event. And if you listen to that, the solution could be reactive again. Except you're doing this manually.

----------------------------------------------------------------------

Considering Realm is already a reactive data source (similarly to SQLBrite for SQLite), it is able to provide change notifications by which you can "invalidate your cache".

In fact, if your local data source is the only source of data, and any write from network is a change that you listen to, then your "cache" can be written down as replay(1).publish().refCount() (replay latest data for new subscribers, replace data with new if new data is evaluated) which is RxReplayingShare.

clean architecture reactive

Using a Scheduler created from the looper of a handler thread, you can listen to changes in the Realm on a background thread, creating a reactive data source that returns up-to-date unmanaged copies that you can pass between threads (although mapping directly to immutable domain models is preferred to copyFromRealm() if you choose this route - the route being clean architecture).

return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() {
    @Override
    public void subscribe(ObservableEmitter<List<ResponseZone>> emitter)
            throws Exception {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync();
        final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> {
            if(!emitter.isDisposed()) {
                if(results.isValid() && results.isLoaded()) {
                    emitter.onNext(observableRealm.copyFromRealm(results));
                }
            }
        };

        emitter.setDisposable(Disposables.fromRunnable(() -> {
            if(results.isValid()) {
                results.removeChangeListener(listener);
            }
            observableRealm.close();
        }));
        results.addChangeListener(listener);
        // initial value will be handled by async query
    }
}).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);

Where looper scheduler is obtained as

    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler  = AndroidSchedulers.from(handlerThread.getLooper());
    }

And that is how you create reactive clean architecture using Realm.


ADDED:

The LooperScheduler is only needed if you intend to actually enforce Clean Architecture on Realm. This is because Realm by default encourages you to use your data objects as domain models and as a benefit provides lazy-loaded thread-local views that mutate in place when updated; but Clean Architecture says you should use immutable domain models instead (independent from your data layer). So if you want to create reactive clean architecture where you copy from Realm on a background thread any time when Realm changes, then you'll need a looper scheduler (or observe on a background thread, but do the copying from a refreshed Realm on Schedulers.io()).

With Realm, generally you'd want to use RealmObjects as your domain models, and rely on lazy-evaluation. In that case, you do not use copyFromRealm() and you don't map the RealmResults to something else; but you can expose it as a Flowable or a LiveData.

You can read related stuff about this here.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...