Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

apache spark - Calling another custom Python function from Pyspark UDF

Suppose you have a file, let's call it udfs.py and in it:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

You then want to make a UDF out of the main_f function and run it on a dataframe:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

This works OK if we do this from within the same file as where the two functions are defined (udfs.py). However, trying to do this from a different file (say main.py) produces an error ModuleNotFoundError: No module named ...:

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

I noticed that if I actually nest the nested_f inside the main_f like this:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

everything runs OK. However, my goal here is to have the logic nicely separated in multiple functions, which I can also test individually.

I think this can be solved by submitting the udfs.py file (or a whole zipped folder) to the executors using spark.sparkContext.addPyFile('...udfs.py'). However:

  1. I find this a bit long-winded (esp. if you need to zip folders etc...)
  2. This is not always easy/possible (e.g. udfs.py may be using lots of other modules which then also need to be submitted, leading to bit of chain reaction...)
  3. There are some other inconveniences with addPyFile (e.g. autoreload can stop working etc )

So the question is: is there a way to do all of these at the same time:

  • have the logic of the UDF nicely split to several Python functions
  • use the UDF from a different file than where the logic is defined
  • not needing to submit any dependencies using addPyFile

Bonus points for clarifying how this works/why this doesn't work!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

For small (one or two local files) dependencies you can use --py-files and enumerate them, with something bigger or more dependencies - it's better to pack it in a zip or egg file.

File udfs.py:

def my_function(*args, **kwargs):
    # code

File main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

For run:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

If you have written your own Python module or even third-party modules (which don't need C compilation), I personally needed it with geoip2, it's better to create a zip or egg file.

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

Be careful when using pyspark --master yarn (possibly with other non-local master options), in pyspark shell with --py-files:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

EDIT - The answer on question of how to get functions on executors without addPyFile () and --py-files:

It is necessary to have a given file with functions on individual executors. And reachable through PATH env. Therefore, I would probably write a Python Module, which I then install on the executors and was available in the environment.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...