Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
186 views
in Technique[技术] by (71.8m points)

c# - Rx.Net + Reactive-Ui + MahApps.Metro - Repeating & retrying asynchronous request with gated dialogs

Given an observable of form:

var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(1));
var stateFetcher =
    Observable.FromAsync(async () => await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

var delayedFetch = fetchTimer.SelectMany(stateFetcher);

This provides the means for fetching the state after a delay.

A modification can do this at regular intervals:

var regularFetch = Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => stateFetcher).Switch();

This requests a value every 5 seconds.

The request can fail however (remote service unreachable etc); with that in mind, trying to produce a mechanism to retry the operation, as well as hooks for alerting the user can be tricky.

Suspending a timer-based operation on failure - this question by me covers the initial approach to this, as well as some of the attempts / a partial solution.

Here I want to share the solution I arrived at.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

We can summarise the problem as follows:

  1. We have a source observable that can produce a value or error
  2. We need to do something if an error occurs
  3. From the first error we want to make multiple attempts with notification
  4. If those attempts all fail, we want to do something
  5. At the end of the process we want to restart it all.

So:

Error -> Initial dialog -> Retry with notifications on each attempt -> Do it all again

At any point throughout this process a successful value emission should bypass everything and flow back out.

With that highly opinionated approach in mind, here is the utility I created:

public static IObservable<T> WithGatedRetry<T>(
    this IObservable<T> source,
    int retriesPerCycle,
    Func<Exception, Task> onInitialFailure,
    Func<Action<Func<Task>>, Task<Func<Exception, int, Task>>> retryNotificationBlock,
    Func<Exception, Task> onFailedCycle)
{
    IObservable<T> GetInitialHandler(Exception e) =>
        Observable.FromAsync(() => onInitialFailure(e))
        .Select(_ => (T)default);

    IObservable<T> GetCycleFailureHandler(Exception e) =>
        Observable.FromAsync(() => onFailedCycle(e))
        .Select(_ => (T)default);

    IObservable<T> GetRetryFlow() =>
        Observable.Create<T>(async sub =>
        {
            var attempt = 1;
            Func<Task> disposeCallback = () => Task.CompletedTask;
            var notifier = await retryNotificationBlock(dc =>
            {
                disposeCallback = dc;
            });

            await notifier(null, 1);

            return
                source
                .Do(
                     _ =>
                    {
                    },
                    async (Exception e) =>
                    {
                        if (attempt + 1 <= retriesPerCycle)
                        {
                            await notifier(e, ++attempt);
                        }
                    }
                )
                .Retry(retriesPerCycle)
                .Finally(async () =>
                {
                    if (disposeCallback != null)
                    {
                        await disposeCallback();
                    }
                })
                .Subscribe(
                    val => { sub.OnNext(val); sub.OnCompleted(); },
                    (Exception e) => { sub.OnError(e); }
                );
        });

    IObservable<T> GetCycleFlow() =>
        GetRetryFlow()
        .Catch((Exception e) =>
            GetCycleFailureHandler(e)
            .Select(_ => GetCycleFlow())
            .Switch()
        )
        .Retry();

    IObservable<T> GetPrimaryFlow() =>
        source
        .Catch((Exception e) => GetInitialHandler(e))
        .Select(val =>
            EqualityComparer<T>.Default.Equals(val, default)
            ? GetCycleFlow().Select(_ => GetPrimaryFlow()).Switch()
            : GetPrimaryFlow().StartWith(val)
        )
        .Switch();

    return GetPrimaryFlow();
}

I'll fully admit this may not be the best way to do it, and there's a bit of a callback-inside-a-callback kludge in the notification block (for each retry attempt), in order to support cleaning up once an retry "cycle" has been completed (successfully or otherwise).

Usage is as follows:

var latestState =
    Observable.SelectMany(fetchTimer, stateFetcher)
    .WithGatedRetry(
        3,
        async ex =>
        {
            // show initial error dialog
        },
        async (disposerHook) =>
        {
            // Show the "attempting retries" dialog

            disposerHook(async () =>
            {
                // Close the "attempting retries" dialog
                // and do any cleanup
            });

            return async (Exception ex, int attempt) =>
            {
                // Update the dialog
                // ex is the exception produced by the just-completed attempt
            };
        },
        async ex =>
        {
           // Show the "We still can't quite get it" dialog
           // after this task completes, the entire process starts again
        }
    )
    .Publish();

This approach allows for the tailored hook-points, and flows successful values as expected.

In fact, downstream subscribers should only ever see a value when one is successfully provided - they also shouldn't see an error as it sits in an infinite retry.

In comparison with the solution in the original question, this uses Select + Switch as opposed to SelectMany in order to ensure inner observables are correctly unsubscribed and disposed.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...