Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
626 views
in Technique[技术] by (71.8m points)

apache spark - Return complex nested array type from UDF pyspark

Updated more issues at the end post

I need to create new column for df with UDF in pyspark. The UDF have to return nested array with format:

    [
        [before], [after], [from_tbl], [where_tbl], [to_tbl], [lst_tbl], [db_info]
    ]
        
with:
-----------------
before, after = [
                        [query_type,out,[from],[where]],
                        [query_type,out,[from],[where]]
                ]
-----------------
to_tbl = [write_mode, [table_name], table_action]
-----------------
from_tbl, where_tbl, from, where, table_name, lst_tbl, db_info = [a,b,c]

I define schema return from UDF such as:

schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
    T.StructField('query_type', T.StringType(), True),
    T.StructField('out', T.StringType(), True),
    T.StructField('from', T.ArrayType(T.StringType(), True), True),
    T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
    T.StructField('query_type', T.StringType(), True),
    T.StructField('out', T.StringType(), True),
    T.StructField('from', T.ArrayType(T.StringType(), True), True),
    T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
    T.StructField('write_mode', T.StringType(), True),
    T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
    T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)

])

    @F.udf(returnType=schema_return)
    def udf(parameter):
...

And i received an error:

Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 7 fields are required while 0 values are provided.

I flow by this tutorial: https://prodevsblog.com/questions/123979/how-to-return-a-tuple-type-in-a-udf-in-pyspark/ With example:

schema = StructType([
    StructField("min", FloatType(), True),
    StructField("size", IntegerType(), True),
    StructField("edges",  ArrayType(FloatType()), True),
    StructField("val_to_index",  MapType(FloatType(), IntegerType()), True)
    # StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)]))

])

def func(values):
  mn = min(values)
  size = len(values)
  lst = sorted(values)[::-1]
  val_to_index = {x: i for i, x in enumerate(values)}
  return (mn, size, lst, val_to_index)

func = udf(func, schema)
dff = df.select('*', func('y[]').alias('complex_type'))
dff.show(10, False)

# +---+----------+------------------------------------------------------+
# |x  |y[]       |complex_type                                          |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+

Where i'm wrong?. And how to define schema for nested array above.

Here is which my UDF return

return [before, after, from_tbl, where_tbl, to_tbl, list(set(lst_tbl)), dbinfo]
or 
return [] # maybe this is cause

Update more

After @mck talk me don't return []. I replaced return [] to return None. But i received more error same the first error such as:

Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 0 values are provided

With schema

schema_return = T.StructType([
    T.StructField('before', T.ArrayType(T.StructType([
        T.StructField('query_type', T.StringType(), True),
        T.StructField('out', T.StringType(), True),
        T.StructField('from', T.ArrayType(T.StringType(), True), True),
        T.StructField('where', T.ArrayType(T.StringType(), True), True),
    ])), True),
    T.StructField('after', T.ArrayType(T.StructType([
        T.StructField('query_type', T.StringType(), True),
        T.StructField('out', T.StringType(), True),
        T.StructField('from', T.ArrayType(T.StringType(), True), True),
        T.StructField('where', T.ArrayType(T.StringType(), True), True),
    ])), True),
    T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('to_tbl', T.StructType([
        T.StructField('write_mode', T.StringType(), True),
        T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
        T.StructField('table_action', T.StringType(), True),
    ]), True),
    T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])

Based on number of values in error = 3. I guess cause from

T.StructField('to_tbl', T.StructType([
    T.StructField('write_mode', T.StringType(), True),
    T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
    T.StructField('table_action', T.StringType(), True),
]), True),

My list: [before], [after], [from_tbl], [where_tbl], [to_tbl], [lst_tbl], [db_info] wil have nested element = [] if conditional not satisfy. If i replace [] to None. It impact to last of logic code. How can i keep [] instead None. And why this lead to error Thank you so much


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I did it. If return StructType. Need to return None. Not return [] include nested element. Thank you so much @mck


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...