Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
78 views
in Technique[技术] by (71.8m points)

python - Getting around for loops in PySpark?

I have a clustering algorithm in Python that I am trying to convert to PySpark (for parallel processing).

I have a dataset that contains regions, and stores within those regions. I want to perform my clustering algorithm for all stores in a single region.

I have a few for loops before getting to the ML. How can I modify the code to remove the for loops in PySpark? I have read for loops in PySpark are generally not a good practice - but I need to be able to perform the model on many sub-datasets. Any advice?

For reference, I"m currently looping (through Pandas DataFrames) like this pseudocode below:

for region in df_region: 
    for distinct stores in region: 
          [apply ML clustering algorithm]
question from:https://stackoverflow.com/questions/65894457/getting-around-for-loops-in-pyspark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Search Built-in Algorithms
You could consider looking up RDD-based built-in clustering algorithms first since they are usually common and were released via rigorous validation process.
Clustering - RDD-based API

If you're more familiar with DataFrame-based API, then you could go here for a glance. And you might want to keep in mind as of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode (no new features, only bug fixes). The primary ML API is now the DataFrame-based API in the spark.ml package.

Implement Yourself
Pandas UDFs
If you do have a model object already, consider Pandas UDFs since they have iterator support now (Since 3.0.0). Simply saying, it means a model won't be loaded for each row.

from pyspark.sql.functions import pandas_udf

@pandas_udf(...)
def categorize(iterator):
  model = ... # load model
  for features in iterator:
    yield model.predict(features)

"""
GROUP BY in Spark SQL or window functions can be considered.
It depends on your scenarios, just remember DataFrames are still based on RDDs. 
They are immutable and are high-level abstraction.
"""
spark_df.withColumn("clustered_result", categorize("some_column")).show()

RDD Exploring
If, unfortunately, your intentional execution of the clustering algorithm is not included in the set of Spark built-in clustering algorithms and won't have a progress of training which means the generation of a model. You could consider converting the Pandas DataFrame into RDD data structures, then implementing your clustering algorithm. A rough process will be like the following:

pandas_df = ....
spark_df = spark.createDataFrame(pandas_df)
.
.
clustering_result = spark_df.rdd.map{p => cluster_algorithm(p)}

note1: It's only a rough progress, you might want to partition the whole dataset into few RDDs based on region then execute the clustering algorithm in each partitioned RDDs. Because the information of the clustering algorithm kinda not too clear, I could only give the advice based on some assumptions.
note2: RDD implementation should be your last option

  1. RDD Programming Guide
  2. 2017, Chen Jin, A Scalable Hierarchical Clustering Algorithm Using Spark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...