In short, I need the number of executors/workers in the Spark cluster, but using sc._jsc.sc().getExecutorMemoryStatus().size()
gives me 1 when in fact there are 12 executors.
With more details, I'm trying to determine the number of executors and use that number as the number of partitions I ask Spark to distribute my RDD across. I do this to leverage the parallelism, as my initial data is just a range of numbers but then every one of them gets processed in a rdd#foreach
method. The process is both memory-wise and computationally heavy, so I want the range of numbers originally to reside in as many partitions as the executors, to allow all executors to process chunks of it simultanuously.
Reading the comment in this question and seeing the documentation for the scala's getExecutorMemoryStatus
, the suggested command: sc._jsc.sc().getExecutorMemoryStatus().size()
seemed reasonable. But for some reason I get an answer 1 no matter how many executors actually exist (in my last run - it was 12).
Am I doing something wrong there? Am I calling the wrong method? In the wrong way?
I am running on a standalone Spark cluster that is being initiated for the run of the application each time.
Here is a minimal example of the problem:
from pyspark import SparkConf, SparkContext
import datetime
def print_debug(msg):
dbg_identifier = 'dbg_et '
print(dbg_identifier + str(datetime.datetime.now()) + ': ' + msg)
print_debug('*****************before configuring sparkContext')
conf = SparkConf().setAppName("reproducing_bug_not_all_executors_working")
sc = SparkContext(conf=conf)
print_debug('*****************after configuring sparkContext')
def main():
executors_num = sc._jsc.sc().getExecutorMemoryStatus().size()
list_rdd = sc.parallelize([1, 2, 3, 4, 5], executors_num)
print_debug('line before loop_a_lot. Number of partitions created={0},
while number of executors is {1}'
.format(list_rdd.getNumPartitions(), executors_num))
list_rdd.foreach(loop_a_lot)
print_debug('line after loop_a_lot')
def loop_a_lot(x):
y = x
print_debug('started working on item %d at ' % x + str(datetime.datetime.now()))
for i in range(100000000):
y = y*y/6+5
print_debug('--------------------finished working on item %d at ' % x + str(datetime.datetime.now())
+ 'with a result: %.3f' % y)
if __name__ == "__main__":
main()
And to show the problem - at the last time I ran it I got, in the driver's output (pasting only relevant parts, placeholders instead of the real ips and ports):
$> grep -E 'dbg_et|Worker:54 - Starting Spark worker' slurm-<job-num>.out
2018-07-14 20:48:26 INFO Worker:54 - Starting Spark worker <ip1>:<port1> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:26 INFO Worker:54 - Starting Spark worker <ip1>:<port2> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip2>:<port3> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip2>:<port4> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip3>:<port5> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip3>:<port6> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip4>:<port7> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip4>:<port8> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip5>:<port9> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip5>:<port10> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip6>:<port11> with 10 cores, 124.9 GB RAM
2018-07-14 20:48:29 INFO Worker:54 - Starting Spark worker <ip6>:<port12> with 10 cores, 124.9 GB RAM
dbg_et 2018-07-14 20:48:37.044785: *****************before configuring sparkContext
dbg_et 2018-07-14 20:48:38.708370: *****************after configuring sparkContext
dbg_et 2018-07-14 20:48:39.046295: line before loop_a_lot. Number of partitions created=1, while number of executors is 1
dbg_et 2018-07-14 20:50:11.181091: line after loop_a_lot
And in the worker_dir
Spark made a new directory for the run, which has 12 subdirectories - only one of which (this time it was directory 5
) has a copy of the script and a non-empty output which makes sense as the misread number of executors, 1
, made Spark creating the rdd in one partition only. Here is the full output of that worker (this output is actually the stderr - I have no idea why it's not in the stdout as it should be):
dbg_et 2018-07-14 20:48:41.806805: started working on item 1 at 2018-07-14 20:48:41.806733
dbg_et 2018-07-14 20:48:59.710258: --------------------finished working on item 1 at 2018-07-14 20:48:59.710198
with a result: inf
dbg_et 2018-07-14 20:48:59.710330: started working on item 2 at 2018-07-14 20:48:59.710315
dbg_et 2018-07-14 20:49:17.367545: --------------------finished working on item 2 at 2018-07-14 20:49:17.367483
with a result: inf
dbg_et 2018-07-14 20:49:17.367613: started working on item 3 at 2018-07-14 20:49:17.367592
dbg_et 2018-07-14 20:49:35.382441: --------------------finished working on item 3 at 2018-07-14 20:49:35.381597
with a result: inf
dbg_et 2018-07-14 20:49:35.382517: started working on item 4 at 2018-07-14 20:49:35.382501
dbg_et 2018-07-14 20:49:53.227696: --------------------finished working on item 4 at 2018-07-14 20:49:53.227615
with a result: inf
dbg_et 2018-07-14 20:49:53.227771: started working on item 5 at 2018-07-14 20:49:53.227755
dbg_et 2018-07-14 20:50:11.128510: --------------------finished working on item 5 at 2018-07-14 20:50:11.128452
with a result: inf
Can someone help me understand what causes the problem? Any idea? Might it be because of Slurm? (as you can see by the way I grep
ed the driver's output file - I am running Spark on top of Slurm as the cluster to which I have access is managed by it)
See Question&Answers more detail:
os