We can summarise the problem as follows:
- We have a source observable that can produce a value or error
- We need to do something if an error occurs
- From the first error we want to make multiple attempts with notification
- If those attempts all fail, we want to do something
- At the end of the process we want to restart it all.
So:
Error
-> Initial dialog
-> Retry with notifications on each attempt
-> Do it all again
At any point throughout this process a successful value emission should bypass everything and flow back out.
With that highly opinionated approach in mind, here is the utility I created:
public static IObservable<T> WithGatedRetry<T>(
this IObservable<T> source,
int retriesPerCycle,
Func<Exception, Task> onInitialFailure,
Func<Action<Func<Task>>, Task<Func<Exception, int, Task>>> retryNotificationBlock,
Func<Exception, Task> onFailedCycle)
{
IObservable<T> GetInitialHandler(Exception e) =>
Observable.FromAsync(() => onInitialFailure(e))
.Select(_ => (T)default);
IObservable<T> GetCycleFailureHandler(Exception e) =>
Observable.FromAsync(() => onFailedCycle(e))
.Select(_ => (T)default);
IObservable<T> GetRetryFlow() =>
Observable.Create<T>(async sub =>
{
var attempt = 1;
Func<Task> disposeCallback = () => Task.CompletedTask;
var notifier = await retryNotificationBlock(dc =>
{
disposeCallback = dc;
});
await notifier(null, 1);
return
source
.Do(
_ =>
{
},
async (Exception e) =>
{
if (attempt + 1 <= retriesPerCycle)
{
await notifier(e, ++attempt);
}
}
)
.Retry(retriesPerCycle)
.Finally(async () =>
{
if (disposeCallback != null)
{
await disposeCallback();
}
})
.Subscribe(
val => { sub.OnNext(val); sub.OnCompleted(); },
(Exception e) => { sub.OnError(e); }
);
});
IObservable<T> GetCycleFlow() =>
GetRetryFlow()
.Catch((Exception e) =>
GetCycleFailureHandler(e)
.Select(_ => GetCycleFlow())
.Switch()
)
.Retry();
IObservable<T> GetPrimaryFlow() =>
source
.Catch((Exception e) => GetInitialHandler(e))
.Select(val =>
EqualityComparer<T>.Default.Equals(val, default)
? GetCycleFlow().Select(_ => GetPrimaryFlow()).Switch()
: GetPrimaryFlow().StartWith(val)
)
.Switch();
return GetPrimaryFlow();
}
I'll fully admit this may not be the best way to do it, and there's a bit of a callback-inside-a-callback kludge in the notification block (for each retry attempt), in order to support cleaning up once an retry "cycle" has been completed (successfully or otherwise).
Usage is as follows:
var latestState =
Observable.SelectMany(fetchTimer, stateFetcher)
.WithGatedRetry(
3,
async ex =>
{
// show initial error dialog
},
async (disposerHook) =>
{
// Show the "attempting retries" dialog
disposerHook(async () =>
{
// Close the "attempting retries" dialog
// and do any cleanup
});
return async (Exception ex, int attempt) =>
{
// Update the dialog
// ex is the exception produced by the just-completed attempt
};
},
async ex =>
{
// Show the "We still can't quite get it" dialog
// after this task completes, the entire process starts again
}
)
.Publish();
This approach allows for the tailored hook-points, and flows successful values as expected.
In fact, downstream subscribers should only ever see a value when one is successfully provided - they also shouldn't see an error as it sits in an infinite retry.
In comparison with the solution in the original question, this uses Select
+ Switch
as opposed to SelectMany
in order to ensure inner observables are correctly unsubscribed and disposed.