Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
141 views
in Technique[技术] by (71.8m points)

c# - Notify an observable to complete when request has finished

I'm relatively new to reactive programming and really trying to get my head around it still even after reading Rx.Net in Action.

I have a service class which have two public methods Get and GetStream.

GetStream will create an observable which will concat the store and the update observable so consumers receive future updates without problem.

The update observable performs an update of symbols from the providers every minute - polling api request

Get will create an observable that subscribes to items from the store and concat with the update observable if it's updating. The problem being once the update has finished, no OnComplete gets called and the consumer hangs indefinitely. I'm consuming this observable as a blocking call in places and non-blocking in others.

I suppose I can get away with a ResetEvent but that doesn't seem a very reactive way of dealing with this. Is there a better reactive alternative? Maybe I'm trying to fit an observable pattern in a place where it should just return an IEnumerable?

The goal is to have a observable that updates the store every specified interval. Have one method to get the current state of the store including current update. Have another method to get the state of the store and any future updates.

Here is a snippet of code of where i've got to at the moment.

public class SymbolService : ISymbolService, IDisposable
{
    private readonly ILogger _logger = Logger.Create(nameof(SymbolService));
    
    private readonly IObservable<(string Provider, Symbol Symbol)> _update;

    private readonly ISymbolStore _store;

    private readonly IDisposable _token;

    private bool _updating = true;

    public SymbolService(SymbolOptions options, ISymbolStore store,
        IEnumerable<ISymbolProvider> providers)
    {
        _store = store;

        // Request and aggregate symbols from providers and put
        // into store.
        var source = providers
            .Select(provider => provider
                .GetSymbols()
                .Select(symbol => (provider.Name, symbol))
                .Take(10))
            .Merge()
            .Do(entry =>
                {
                    _updating = true;
                    _store.Put(entry);
                },
                () =>
                    _updating = false)
            .Log(_logger, "Symbol");

        // Perform update of symbols every interval
        var update = Observable
            .Timer(TimeSpan.Zero, options.RefreshInterval)
            .SelectMany(_ => source)
            .Publish();

        _update = update;

        _token = update
            .Connect();
    }

    /// <summary>
    /// Gets all symbols for a given provider.
    /// </summary>
    /// <param name="provider">The specified provider.</param>
    /// <returns>The observable stream.</returns>
    public IObservable<Symbol> Get(string provider)
    {
        // Get symbols from store by provider and concat stream
        // if update is being performed
        return _store
            .Get(provider)
            .Concat(Observable
                .If(() => _updating, _update
                    .TakeWhile(_ => _updating)
                    .Where(x => x.Provider.Equals(provider))
                    .Select(x => x.Symbol)));
    }

    /// <summary>
    /// Gets all symbols and future symbols for a given provider.
    /// </summary>
    /// <param name="provider">The specified provider.</param>
    /// <returns>The observable stream.</returns>
    public IObservable<Symbol> GetStream(string provider)
    {
        // Get symbols from store by provider and concat stream
        // of all future symbols
        return _store
            .Get(provider)
            .Concat(_update
                .Where(x => x.Provider.Equals(provider))
                .Select(x => x.Symbol))
            .Distinct();
    }

    public void Dispose()
    {
        _token?.Dispose();
    }
}

Any help on this is greatly appreciated.

question from:https://stackoverflow.com/questions/65925510/notify-an-observable-to-complete-when-request-has-finished

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

After glossing through the book again I figured the BehaviourSubject would fit the solution quite well. I've only included the relevant code snippet that answered my question.

Subscribing to the BehaviourSubject which gets notified when the update has finished processing which completes the observable and disposes tokens.

    private readonly BehaviorSubject<bool> _updating;

    public SymbolService(SymbolOptions options, ISymbolStore store,
        IEnumerable<ISymbolProvider> providers)
    {
        _store = store;

        _updating = new BehaviorSubject<bool>(true);

        // Request and aggregate symbols from providers and put
        // into store.
        var source = providers
            .Select(provider => provider
                .GetSymbols()
                .Select(symbol => (provider.Name, symbol)))
            .Merge();

        // Perform update of symbols every interval
        var update = Observable
            .Timer(TimeSpan.Zero, options.RefreshInterval)
            .Log(_logger, "Updating", LogEventLevel.Information)
            .Do(_ => _updating.OnNext(true))
            .SelectMany(_ =>
            {
                return source
                    .GetSymbols()
                    .Do(_store.Put)
                    .Finally(() => _updating.OnNext(false));
            })
            .Publish();

        _update = update;

        _token = update
            .Connect();
    }

    public IObservable<Symbol> Get(string provider)
    {
        // Get symbols from store by provider and concat stream
        // if update is being performed
        return _store
            .Get(provider)
            .Concat(_update
                .Where(x => x.Provider.Equals(provider))
                .Select(x => x.Symbol))
            .TakeUntil(_updating.Where(x => x == false));
    }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...