Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
4.9k views
in Technique[技术] by (71.8m points)

java - RxJava synchronization

I want to synchronize heavy shared object with RxJava, but I don't know how to do it properly. Unfortunately, I was unable to include the RxJava tag which requires 1500 reputation.

example:

class Service {
     
     HeavyObject heavyObject;
     HeavyObjectRepository heavyObjectRepository; // reactive repository

     Single<ComputationResult> doHeavyComputation(Object params) {
          // here I want to apply synchronization to heavy object
          return (heavyObject == null ? heavyObjectRepository.fetch().doOnSuccess(ho -> {
              heavyObject = ho;
          }) : Single.just(heavyObject))
                  .flatMap(ho -> compute(ho, params));
     }

     Single<ComputationResult> compute(HeavyObject heavyObject, Object params) {
         // ...
     }
}

More details:

Service instance is shared, so many threads could access and possibly mutate the shared object so I need to provide some sort of synchronization.

This is what I came up with just right now, but I still don't know if it's correct:

class Service {

    private final HeavyObjectRepository heavyObjectRepository = new HeavyObjectRepository();

    HeavyObject heavyObject;

    public Single<ComputationResult> compute(Object... args) {
        return Single.fromCallable(() -> {
            synchronized (this) {
                if (heavyObject == null) {
                    return heavyObjectRepository.findOne().map(ho -> {
                        synchronized (this) {
                            if (heavyObject == null) {
                                heavyObject = ho;
                            }
                            return heavyObject;
                        }
                    });
                }
                return Single.just(heavyObject);
            }
        }).flatMap(o -> o).flatMap(ho -> compute(ho, args));
    }

    private Single<ComputationResult> compute(HeavyObject heavyObject, Object... args) {
        return Single.fromCallable(() -> {
            synchronized (this) {
                for (int i = 0; i < 10; i++) {
                    heavyObject.counter += 1;
                }
            }
            return new ComputationResult();
        });
    }

    public static class HeavyObject {

        long counter = 0;
    }

    public static class ComputationResult {
    }

    private static class HeavyObjectRepository {

        public Single<HeavyObject> findOne() {
            return Single.just(new HeavyObject());
        }
    }

    public static void main(String[] args) {
        Service service = new Service();
        Single.mergeArray(
                service.compute(new Object()).subscribeOn(Schedulers.computation()),
                service.compute(new Object()).subscribeOn(Schedulers.computation()),
                service.compute(new Object()).subscribeOn(Schedulers.computation()),
                service.compute(new Object()).subscribeOn(Schedulers.computation()),
                service.compute(new Object()).subscribeOn(Schedulers.computation())
        ).blockingSubscribe();
        if (service.heavyObject.counter != 50) {
            throw new AssertionError(String.format("%d != %d", service.heavyObject.counter, 50));
        }
    }
}

If I understand correctly, this is what @akarnokd suggested:

public Single<ComputationResult> compute(Object... args) {
    return Single
            .fromCallable(() -> {
                if (heavyObject == null) {
                    return heavyObjectRepository.findOne()
                            .observeOn(Schedulers.single())
                            .map(ho -> {
                                if (heavyObject == null) {
                                    heavyObject = ho;
                                }
                                return heavyObject;
                            });
                }
                return Single.just(heavyObject);
            })
            .flatMap(o -> o)
            .flatMap(ho -> compute(ho, args))
            .subscribeOn(Schedulers.single());
}

private Single<ComputationResult> compute(HeavyObject heavyObject, Object... args) {
    return Single.fromCallable(() -> {
        for (int i = 0; i < 100; i++) {
            heavyObject.counter += 1;
        }
        return new ComputationResult();
    });
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神解答

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...