Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
189 views
in Technique[技术] by (71.8m points)

python - Change PYSPARK_PYTHON on Spark workers

We distribute our Python app, which uses Spark, together with Python 3.7 interpreter (python.exe with all necessary libs lies near MyApp.exe).

To set PYSPARK_PYTHON we have have function which determines the path to our python.exe:

os.environ['PYSPARK_PYTHON'] = get_python()  

on Windows PYSPARK_PYTHON will become C:/MyApp/python.exe
on Ubuntu PYSPARK_PYTHON will become /opt/MyApp/python.exe

We start the master/driver node and create SparkSession on Windows. Then we start the worker node on Ubuntu but the worker fails with:

Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 1614, 10.0.2.15, executor 1): java.io.IOException: Cannot run program "C:/MyApp/python.exe": error=2, No such file or directory

Of course, there is no C:/MyApp/python.exe on ubuntu.

If I understand this error correctly, PYSPARK_PYTHON from driver is sent to all workers.

Also tried to set PYSPARK_PYTHON in spark-env.sh and spark-defaults.conf. How can I change PYSPARK_PYTHON on Ubuntu workers to become /opt/MyApp/python.exe?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Browsing through the souce code, it looks like the Python driver code puts the value of the Python executable path from its Spark context when creating work items for running Python functions in spark/rdd.py:

def _wrap_function(sc, func, deserializer, serializer, profiler=None):
    assert deserializer, "deserializer should not be empty"
    assert serializer, "serializer should not be empty"
    command = (func, profiler, deserializer, serializer)
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
                                                                             ^^^^^^^^^^^^^
                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)

The Python runner PythonRunner.scala then uses the path stored in the first work item it receives to launch new interpreter instances:

private[spark] abstract class BasePythonRunner[IN, OUT](
    funcs: Seq[ChainedPythonFunctions],
    evalType: Int,
    argOffsets: Array[Array[Int]])
  extends Logging {
  ...
  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  ...
  def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
    ...
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    ...
  }
  ...
}

Based on that, I'm afraid that it seems not currently possible to have separate configurations for the Python executable in the master and in the workers. Also see the third comment to issue SPARK-26404. Perhaps you should file an RFE with the Apache Spark project.

I'm not a Spark guru though and there might still be a way to do it, perhaps by setting PYSPARK_PYTHON to just "python" and then making sure the system PATH is configured accordingly so that your Python executable comes first.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...