I'm relatively new to reactive programming and really trying to get my head around it still even after reading Rx.Net in Action.
I have a service class which have two public methods Get
and GetStream
.
GetStream
will create an observable which will concat the store
and the update
observable so consumers receive future updates without problem.
The update
observable performs an update of symbols from the providers
every minute - polling api request
Get
will create an observable that subscribes to items from the store
and concat with the update
observable if it's updating. The problem being once the update has finished, no OnComplete
gets called and the consumer hangs indefinitely. I'm consuming this observable as a blocking call in places and non-blocking in others.
I suppose I can get away with a ResetEvent
but that doesn't seem a very reactive way of dealing with this. Is there a better reactive alternative? Maybe I'm trying to fit an observable pattern in a place where it should just return an IEnumerable
?
The goal is to have a observable that updates the store every specified interval. Have one method to get the current state of the store including current update. Have another method to get the state of the store and any future updates.
Here is a snippet of code of where i've got to at the moment.
public class SymbolService : ISymbolService, IDisposable
{
private readonly ILogger _logger = Logger.Create(nameof(SymbolService));
private readonly IObservable<(string Provider, Symbol Symbol)> _update;
private readonly ISymbolStore _store;
private readonly IDisposable _token;
private bool _updating = true;
public SymbolService(SymbolOptions options, ISymbolStore store,
IEnumerable<ISymbolProvider> providers)
{
_store = store;
// Request and aggregate symbols from providers and put
// into store.
var source = providers
.Select(provider => provider
.GetSymbols()
.Select(symbol => (provider.Name, symbol))
.Take(10))
.Merge()
.Do(entry =>
{
_updating = true;
_store.Put(entry);
},
() =>
_updating = false)
.Log(_logger, "Symbol");
// Perform update of symbols every interval
var update = Observable
.Timer(TimeSpan.Zero, options.RefreshInterval)
.SelectMany(_ => source)
.Publish();
_update = update;
_token = update
.Connect();
}
/// <summary>
/// Gets all symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> Get(string provider)
{
// Get symbols from store by provider and concat stream
// if update is being performed
return _store
.Get(provider)
.Concat(Observable
.If(() => _updating, _update
.TakeWhile(_ => _updating)
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol)));
}
/// <summary>
/// Gets all symbols and future symbols for a given provider.
/// </summary>
/// <param name="provider">The specified provider.</param>
/// <returns>The observable stream.</returns>
public IObservable<Symbol> GetStream(string provider)
{
// Get symbols from store by provider and concat stream
// of all future symbols
return _store
.Get(provider)
.Concat(_update
.Where(x => x.Provider.Equals(provider))
.Select(x => x.Symbol))
.Distinct();
}
public void Dispose()
{
_token?.Dispose();
}
}
Any help on this is greatly appreciated.
question from:
https://stackoverflow.com/questions/65925510/notify-an-observable-to-complete-when-request-has-finished 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…