Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
605 views
in Technique[技术] by (71.8m points)

apache spark - When are accumulators truly reliable?

I want to use an accumulator to gather some stats about the data I'm manipulating on a Spark job. Ideally, I would do that while the job computes the required transformations, but since Spark would re-compute tasks on different cases the accumulators would not reflect true metrics. Here is how the documentation describes this:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

This is confusing since most actions do not allow running custom code (where accumulators can be used), they mostly take the results from previous transformations (lazily). The documentation also shows this:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.

But if we add data.count() at the end, would this be guaranteed to be correct (have no duplicates) or not? Clearly acc is not used "inside actions only", as map is a transformation. So it should not be guaranteed.

On the other hand, discussion on related Jira tickets talk about "result tasks" rather than "actions". For instance here and here. This seems to indicate that the result would indeed be guaranteed to be correct, since we are using acc immediately before and action and thus should be computed as a single stage.

I'm guessing that this concept of a "result task" has to do with the type of operations involved, being the last one that includes an action, like in this example, which shows how several operations are divided into stages (in magenta, image taken from here):

A job dividing several operations into multiple purple stages

So hypothetically, a count() action at the end of that chain would be part of the same final stage, and I would be guaranteed that accumulators used on the last map will no include any duplicates?

Clarification around this issue would be great! Thanks.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

To answer the question "When are accumulators truly reliable ?"

Answer : When they are present in an Action operation.

As per the documentation in Action Task, even if any restarted tasks are present it will update Accumulator only once.

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

And Action do allow to run custom code.

For Ex.

val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
  if(x!=""){
    accNotEmpty += 1
  }
})

But, Why Map+Action viz. Result Task operations are not reliable for an Accumulator operation?

  1. Task failed due to some exception in code. Spark will try 4 times(default number of tries).If task fail every time it will give an exception.If by chance it succeeds then Spark will continue and just update the accumulator value for successful state and failed states accumulator values are ignored.
    Verdict : Handled Properly
  2. Stage Failure : If an executor node crashes, no fault of user but an hardware failure - And if the node goes down in shuffle stage.As shuffle output is stored locally, if a node goes down, that shuffle output is gone.So Spark goes back to the stage that generated the shuffle output, looks at which tasks need to be rerun, and executes them on one of the nodes that is still alive.After we regenerate the missing shuffle output, the stage which generated the map output has executed some of it’s tasks multiple times.Spark counts accumulator updates from all of them.
    Verdict : Not handled in Result Task.Accumulator will give wrong output.
  3. If a task is running slow then, Spark can launch a speculative copy of that task on another node.
    Verdict : Not handled.Accumulator will give wrong output.
  4. RDD which is cached is huge and can't reside in Memory.So whenever the RDD is used it will re run the Map operation to get the RDD and again accumulator will be updated by it.
    Verdict : Not handled.Accumulator will give wrong output.

So it may happen same function may run multiple time on same data.So Spark does not provide any guarantee for accumulator getting updated because of the Map operation.

So it is better to use Accumulator in Action operation in Spark.

To know more about Accumulator and its issues refer this Blog Post - By Imran Rashid.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...