Short version of the question!
Consider the following snippet (assuming spark
is already set to some SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector
, and I'd like this conversion to be expressed using the basic DataFrame
API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:
- Is there a way to get a straight cast working? See below for details (and a failed attempt at a workaround)? Or, is there any other operation that has the effect I was after?
- Which is more efficient out of the two alternative solutions I suggest below (UDF vs exploding/reassembling the items in the list)? Or are there any other almost-but-not-quite-right alternatives that are better than either of them?
A straight cast doesn't work
This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Now e.g. df_with_strings.collect()[0]["temperatures"][1]
is '-7.0'
. But if I cast to an ml Vector then things do not go so well:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
This gives an error:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Yikes! Any ideas how to fix this?
Possible alternatives
Alternative 1: Using VectorAssembler
There is a Transformer
that seems almost ideal for this job: the VectorAssembler
. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector
and Float
columns, not Array
columns, so the follow doesn't work:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
It gives this error:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler
to collect them all back up again:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
This seems like it would be ideal, except that TEMPERATURE_COUNT
be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?
Alternative 2: use a UDF
A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Ignorable remarks
The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.
Not a solution: use Vector
to begin with
In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Inefficient solution: use map()
One possibility is to use the RDD map()
method to transform the list to a Vector
. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Failed attempt at a workaround for cast
In desperation, I noticed that Vector
is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
This gives the error:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
See Question&Answers more detail:
os