Let's say you want to use Identity
class defined like this (identity.py
):
class Identity(object):
def __getstate__(self):
raise NotImplementedError("Not serializable")
def identity(self, x):
return x
you can for example use a callable object (f.py
) and store an Identity
instance as a class member:
from identity import Identity
class F(object):
identity = None
def __call__(self, x):
if not F.identity:
F.identity = Identity()
return F.identity.identity(x)
and use these as shown below:
from pyspark.sql.functions import udf
import f
sc.addPyFile("identity.py")
sc.addPyFile("f.py")
f_ = udf(f.F())
spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
| 0|
| 1|
| 2|
+-----+
or standalone function and closure:
from pyspark.sql.functions import udf
import identity
sc.addPyFile("identity.py")
def f():
dict_ = {}
@udf()
def f_(x):
if "identity" not in dict_:
dict_["identity"] = identity.Identity()
return dict_["identity"].identity(x)
return f_
spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
| 0|
| 1|
| 2|
+------+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…