Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

apache spark - getExecutorMemoryStatus().size() not outputting correct num of executors

In short, I need the number of executors/workers in the Spark cluster, but using sc._jsc.sc().getExecutorMemoryStatus().size() gives me 1 when in fact there are 12 executors.

With more details, I'm trying to determine the number of executors and use that number as the number of partitions I ask Spark to distribute my RDD across. I do this to leverage the parallelism, as my initial data is just a range of numbers but then every one of them gets processed in a rdd#foreach method. The process is both memory-wise and computationally heavy, so I want the range of numbers originally to reside in as many partitions as the executors, to allow all executors to process chunks of it simultanuously.

Reading the comment in this question and seeing the documentation for the scala's getExecutorMemoryStatus, the suggested command: sc._jsc.sc().getExecutorMemoryStatus().size() seemed reasonable. But for some reason I get an answer 1 no matter how many executors actually exist (in my last run - it was 12).

Am I doing something wrong there? Am I calling the wrong method? In the wrong way?

I am running on a standalone Spark cluster that is being initiated for the run of the application each time.

Here is a minimal example of the problem:

from pyspark import SparkConf, SparkContext
import datetime


def print_debug(msg):
    dbg_identifier = 'dbg_et '
    print(dbg_identifier + str(datetime.datetime.now()) + ':  ' + msg)


print_debug('*****************before configuring sparkContext')
conf = SparkConf().setAppName("reproducing_bug_not_all_executors_working")
sc = SparkContext(conf=conf)
print_debug('*****************after configuring sparkContext')


def main():
    executors_num = sc._jsc.sc().getExecutorMemoryStatus().size()
    list_rdd = sc.parallelize([1, 2, 3, 4, 5], executors_num)
    print_debug('line before loop_a_lot. Number of partitions created={0}, 
        while number of executors is {1}'
          .format(list_rdd.getNumPartitions(), executors_num))
    list_rdd.foreach(loop_a_lot)
    print_debug('line after loop_a_lot')


def loop_a_lot(x):
    y = x
    print_debug('started working on item %d at ' % x + str(datetime.datetime.now()))
    for i in range(100000000):
        y = y*y/6+5
    print_debug('--------------------finished working on item %d at ' % x + str(datetime.datetime.now())
      + 'with a result: %.3f' % y)

if __name__ == "__main__":
    main()

And to show the problem - at the last time I ran it I got, in the driver's output (pasting only relevant parts, placeholders instead of the real ips and ports):

$> grep -E 'dbg_et|Worker:54 - Starting Spark worker' slurm-<job-num>.out
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port1> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:26 INFO  Worker:54 - Starting Spark worker <ip1>:<port2> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port3> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip2>:<port4> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port5> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip3>:<port6> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port7> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip4>:<port8> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port9> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip5>:<port10> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port11> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO  Worker:54 - Starting Spark worker <ip6>:<port12> with 10 cores, 124.9 GB RAM
dbg_et 2018-07-14 20:48:37.044785:  *****************before configuring sparkContext
dbg_et 2018-07-14 20:48:38.708370:  *****************after configuring sparkContext
dbg_et 2018-07-14 20:48:39.046295:  line before loop_a_lot. Number of partitions created=1, while number of executors is 1
dbg_et 2018-07-14 20:50:11.181091:  line after loop_a_lot

And in the worker_dir Spark made a new directory for the run, which has 12 subdirectories - only one of which (this time it was directory 5) has a copy of the script and a non-empty output which makes sense as the misread number of executors, 1, made Spark creating the rdd in one partition only. Here is the full output of that worker (this output is actually the stderr - I have no idea why it's not in the stdout as it should be):

dbg_et 2018-07-14 20:48:41.806805:  started working on item 1 at 2018-07-14 20:48:41.806733
dbg_et 2018-07-14 20:48:59.710258:  --------------------finished working on item 1 at 2018-07-14 20:48:59.710198
with a result: inf
dbg_et 2018-07-14 20:48:59.710330:  started working on item 2 at 2018-07-14 20:48:59.710315
dbg_et 2018-07-14 20:49:17.367545:  --------------------finished working on item 2 at 2018-07-14 20:49:17.367483
with a result: inf
dbg_et 2018-07-14 20:49:17.367613:  started working on item 3 at 2018-07-14 20:49:17.367592
dbg_et 2018-07-14 20:49:35.382441:  --------------------finished working on item 3 at 2018-07-14 20:49:35.381597
with a result: inf
dbg_et 2018-07-14 20:49:35.382517:  started working on item 4 at 2018-07-14 20:49:35.382501
dbg_et 2018-07-14 20:49:53.227696:  --------------------finished working on item 4 at 2018-07-14 20:49:53.227615
with a result: inf
dbg_et 2018-07-14 20:49:53.227771:  started working on item 5 at 2018-07-14 20:49:53.227755
dbg_et 2018-07-14 20:50:11.128510:  --------------------finished working on item 5 at 2018-07-14 20:50:11.128452
with a result: inf

Can someone help me understand what causes the problem? Any idea? Might it be because of Slurm? (as you can see by the way I greped the driver's output file - I am running Spark on top of Slurm as the cluster to which I have access is managed by it)

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Short fix: Allow time (e.g. add a sleep command) before you use defaultParallelism or _jsc.sc().getExecutorMemoryStatus() if you use either at the beginning of the application's execution.

Explanation: There seems to be a short period of time at startup when there is only one executor (I believe that the single executor is the driver, which in some contexts is considered as an executor). That's why using sc._jsc.sc().getExecutorMemoryStatus() at the top of the main function yielded the wrong number for me. The same happened with defaultParallelism(1).

My suspicion is that the driver starts working using itself as a worker before having all the workers connecting to it. It agrees with the fact that submitting the below code to spark-submit using --total-executor-cores 12

import time

conf = SparkConf().setAppName("app_name")
sc = SparkContext(conf=conf)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

log.warn('defaultParallelism={0}, and size of executorMemoryStatus={1}'.format(sc.defaultParallelism,
           sc._jsc.sc().getExecutorMemoryStatus().size()))
time.sleep(15)
log.warn('After 15 seconds: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism, 
                  sc._jsc.sc().getExecutorMemoryStatus().size()))
rdd_collected = (sc.parallelize([1, 2, 3, 4, 5] * 200, 
spark_context_holder.getParallelismAlternative()*3)
             .map(lambda x: (x, x*x) * 2)
             .map(lambda x: x[2] + x[1])
             )
log.warn('Made rdd with {0} partitioned. About to collect.'
          .format(rdd_collected.getNumPartitions()))
rdd_collected.collect()
log.warn('And after rdd operations: defaultParallelism={0}, and size of executorMemoryStatus={1}'
          .format(sc.defaultParallelism,
                  sc._jsc.sc().getExecutorMemoryStatus().size()))

gave me the following output

> tail -n 4 slurm-<job number>.out
18/09/26 13:23:52 WARN dbg_et: defaultParallelism=2, and size of executorMemoryStatus=1
18/09/26 13:24:07 WARN dbg_et: After 15 seconds: defaultParallelism=12, and size of executorMemoryStatus=13
18/09/26 13:24:07 WARN dbg_et: Made rdd with 36 partitioned. About to collect.
18/09/26 13:24:11 WARN dbg_et: And after rdd operations: defaultParallelism=12, and size of executorMemoryStatus=13

and that checking time at which the worker directories were created, I saw it was just after the correct values to both defaultParallelism and getExecutorMemoryStatus().size() were recorded(2). The important thing is that this time was quite a long time (~10 seconds) after the recording of the wrong values for these two parameters (see the time of the line with "defaultParallelism=2" above vs the time of these directories' creation below)

 > ls -l --time-style=full-iso spark/worker_dir/app-20180926132351-0000/
 <permission user blah> 2018-09-26 13:24:08.909960000 +0300 0/
 <permission user blah> 2018-09-26 13:24:08.665098000 +0300 1/
 <permission user blah> 2018-09-26 13:24:08.912871000 +0300 10/
 <permission user blah> 2018-09-26 13:24:08.769355000 +0300 11/
 <permission user blah> 2018-09-26 13:24:08.931957000 +0300 2/
 <permission user blah> 2018-09-26 13:24:09.019684000 +0300 3/
 <permission user blah> 2018-09-26 13:24:09.138645000 +0300 4/
 <permission user blah> 2018-09-26 13:24:08.757164000 +0300 5/
 <permission user blah> 2018-09-26 13:24:08.996918000 +0300 6/
 <permission user blah> 2018-09-26 13:24:08.640369000 +0300 7/
 <permission user blah> 2018-09-26 13:24:08.846769000 +0300 8/
 <permission user blah> 2018-09-26 13:24:09.152162000 +0300 9/

(1) Before starting to use getExecutorMemoryStatus() I tried using defaultParallelism, as you should, but it kept giving me the number 2. Now I understand this is from the same reason. Running on a standalone cluster, if the the driver sees only 1 executor then defaultParallelism = 2 as can be seen in the documentation for spark.default.parallelism.

(2) I'm not sure how come the values are correct BEFORE the directories are created - but I'm assuming the executors' starting order has them connecting to the driver before creating the directories.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...