Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
152 views
in Technique[技术] by (71.8m points)

c# - How do I split and merge this dataflow pipeline?

I am trying to create a dataflow using tpl with the following form:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

The idea is, that GetInputPathsBlock is a block, which finds the paths to the input data that is to be loaded, and then sends the path to each LoadDataBlock. The LoadDataBlocks are all identical (except that they have each recieved a unique inputPath string from GetInputPaths). The loaded data is then sent to the ProcessDataBlock, which does some simple processing. Then the data from each ProcessDataBlockis sent to MergeDataBlock, which merges it and sends it to SaveDataBlock, which then saves it to a file.

Think of it as a dataflow that needs to run for each month. First the path is found for the data for each day. Each day's data is loaded and processed, and then merged together for the entire month and saved. Each month can be run parallelly, data for each day in a month can be loaded parallelly and processed parallelly (after the individual day data has been loaded), and once everything for the month has been loaded and processed, it can be merged and saved.

What I tried

As far as I can tell TransformManyBlock<TInput,string> can be used to do the splitting (GetInputPathsBlock), and can be linked to a normal TransformBlock<string,InputData> (LoadDataBlock), and from there to another TransformBlock<InputData,ProcessedData> (ProcessDataBlock), but I don't know how to then merge it back to a single block.

What I looked at

I found this answer, which uses TransformManyBlock to go from an IEnumerable<item> to item, but I don't fully understand it, and I can't link a TransformBlock<InputData,ProcessedData> (ProcessDataBlock) to a TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>, so I don't know how to use it.

I have also seen answers like this, which suggests using JoinBlock, but the number of input files N varies, and the files are all loaded in the same way anyway.

There is also this answer, which seems to do what I want, but I don't fully understand it, and I don't know how the setup with the dictionary would be transferred to my case.

How do I split and merge my dataflow?

  • Is there a block type I am missing
  • Can I somehow use TransformManyBlock twice?
  • Does tpl make sense for the split/merge or is there a simpler async/await way?
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I would use a nested block to avoid splitting my monthly data and then having to merge them again. Here is an example of two nested TransformBlocks that process all days of the year 2020:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

For collecting the daily results of the inner block I used the extension method ToListAsync that is shown below:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...