Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
5.0k views
in Technique[技术] by (71.8m points)

rxjs - Angular forkJoin Subscribe not firing

I have a couple of methods in the form of

    getFirstDataBunch() {
        return this.repository.getData(this.parameter).pipe(
            switchMap(
                result => {
                    //Do something with result;
                    return of(true);
                }
            )
        );
    }

Which I then call on my ngOnInit as:

    function ngOnInit(){
        forkJoin(this.getFirstDataBunch(), this.getSecondDataBunch(), this.getThirdDataBunch()).subscribe(
            results => {
                //do stuff with results; 
            }
        );
    }

I can see that all the observables are being called, and they return an observable, however the forkJoin Subscribe method is never called, from what I've read forkJoin fires only when all observables return and are finished, and I do believe that to be the case, so why is not firing?

I've also seen that some people simply go for combineLatest, but because these methods only return once I don't really need to keep looking for updates on them.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I would simply use the combination operator that you want/need and pipe it with a take(1).

Why the take() operator?

Because take returns an Observable that emits only the first X values emitted by the source Observable. We set how many emissions.

Example:

import { of } from 'rxjs';
import { take } from 'rxjs/operators';

//emit 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
//take the first emitted value then complete
const example = source.pipe(take(1));
//output: 1
const subscribe = example.subscribe(console.log);

//output: 1, 2, 3
const example2 = source.pipe(take(3)).subscribe(console.log);

If you want to be 100% sure to avoid any possible memory leak, I suggest using also takeUntil() (since take(1) can be stuck forever for some reason, if the observable never emits)

.pipe(take(1), takeUntil(this.destroy$))

As a general suggestion, place always takeUntil() as the last argument of your pipe.

About what combination operator:

forkJoin --> as you wrote, waits for all observables to complete and then emits the last emitted value from each (like a Promise.all). It would work, but each observable needs to complete (not just emit!).

zip --> waits for all observables to emit, then emit values as an array (in sequence order - picture below). I would probably use this.

combineLatest --> Waits for all observables to emit, emits once. Then whenever any input Observable emits a value, it emits the new value of the observable that emitted and the last emission of the others (who didn't emit). Sure, that would work too with take(1)...

zip example


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...