Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
245 views
in Technique[技术] by (71.8m points)

rx java - RxJava: InterruptedIOException

I wrote some code to download a file from server meanwhile updating progress bar. Downloading code was running in Schedulers.io thread and updating ui code was running in AndroidSchedulers.mainThread. My program terminated after download began. Here is my code:

    Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                Response response = getResponse(url);
                if (response != null && response.isSuccessful()) {
                    InputStream is = response.body().byteStream();
                    subscriber.onNext(response.body().contentLength()); // init progress
                    File storedFile = Utils.getStoredFile(context, filePath);
                    OutputStream os = new FileOutputStream(storedFile);

                    byte[] buffer = new byte[1024];
                    int len;
                    while ((len = is.read(buffer)) != -1) {
                        // write data
                        os.write(buffer, 0, len);

                        count += len;
                        subscriber.onNext(count); // update progress
                    }

                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }

                    os.close();
                    is.close();
                    response.body().close();

            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    })
    .subscribeOn(Schedulers.io()) // io and network operation  
    .observeOn(AndroidSchedulers.mainThread()) // UI view update operation  
    .subscribe(new Observer<Long>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long progress) {
            Log.d(TAG, "onNext -> " + Thread.currentThread().getName());
            Log.d(TAG, "onNext progress -> " + progress);
            // here update view in ui thread
        }
    }
    }

And here is error text:

java.io.InterruptedIOException: thread interrupted
    at okio.Timeout.throwIfReached(Timeout.java:145)
    at okio.Okio$2.read(Okio.java:136)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
    at okio.RealBufferedSource.read(RealBufferedSource.java:50)
    at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418)
    at okio.RealBufferedSource$1.read(RealBufferedSource.java:371)
    at java.io.InputStream.read(InputStream.java:163)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52)
    at rx.Observable.unsafeSubscribe(Observable.java:8098)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executor    at java.util.concurrent.FutureTask.run(FutureTask.java:23    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
    at java.lang.Thread.run(Thread.java:841)
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The observerOn is apply to the Observable.create but internaly you're creating a new observable in another thread. So your pipeline never give the monitor to the main thread. I think your code it's too much complex for what you want to achieve.

Just in case that help you out to understand the concepts of Scheduler

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...