Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
528 views
in Technique[技术] by (71.8m points)

python - Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

I get the following error when I add --conf spark.driver.maxResultSize=2050 to my spark-submit command.

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)

The reason of adding this configuration was the error:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Therefore, I increased maxResultSize to 2.5 Gb, but the Spark job fails anyway (the error shown above). How to solve this issue?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It seems like the problem is the amount of data you are trying to pull back to to your driver is too large. Most likely you are using the collect method to retrieve all values from a DataFrame/RDD. The driver is a single process and by collecting a DataFrame you are pulling all of that data you had distributed across the cluster back to one node. This defeats the purpose of distributing it! It only makes sense to do this after you have reduced the data down to a manageable amount.

You have two options:

  1. If you really need to work with all that data, then you should keep it out on the executors. Use HDFS and Parquet to save the data in a distributed manner and use Spark methods to work with the data on the cluster instead of trying to collect it all back to one place.

  2. If you really need to get the data back to the driver, you should examine whether you really need ALL of the data or not. If you only need summary statistics then compute that out on the executors before calling collect. Or if you only need the top 100 results, then only collect the top 100.

Update:

There is another reason you can run into this error that is less obvious. Spark will try to send data back the driver beyond just when you explicitly call collect. It will also send back accumulator results for each task if you are using accumulators, data for broadcast joins, and some small status data about each task. If you have LOTS of partitions (20k+ in my experience) you can sometimes see this error. This is a known issue with some improvements made, and more in the works.

The options for getting past if if this is your issue are:

  1. Increase spark.driver.maxResultSize or set it to 0 for unlimited
  2. If broadcast joins are the culprit, you can reduce spark.sql.autoBroadcastJoinThreshold to limit the size of broadcast join data
  3. Reduce the number of partitions

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...