I'm stuck in a process where I need to perform some action for each column value in my Dataframe which requires traversing through the DF again. Following is a data sample:
Row(user_id='KxGeqg5ccByhaZfQRI4Nnw', gender='male', business_id='S75Lf-Q3bCCckQ3w7mSN2g', friends='my4q3Sy6Ei45V58N2l8VGw, R3zl9VKw63rPxSfBxbasWw, c-c64rURhBR8V8scSbwo7Q, tn6qogrDbb9hEKfRBGUUpw, pu_AQig2fw40PshvtgONPQ, IDrgtQccPN9c4rBn7yyk4Q, OIIx11vTeLN8EBcZrYXHKQ')
friends
is here is just a list of other user_id
. What I'm trying to do is fetch some value for each of this friends
for this specific user. Now, since this is user_id
I'd need to query my DF for this, which isn't allowed in UDF. I'm neither able to perform spark.sql
nor refer a Dataframe
and perform a filter since both are sparkSession
objects.
What different approach can I try here?
Trying by creating a DF and then filtering:
tempDF=sparkSession.sql("SELECT review_sentiment,user_id,business_id FROM events")
def getfriendsSentiment(friendsList, b_id):
listOfSentiments=[]
for friend_id in friendsList.split(','):
try:
listOfSentiments.append(tempDF.filter("user_id='"+friend_id+"' AND business_id='"+b_id+"'").rdd.flatMap(lambda x:x).collect()[0])
except:
pass
friendsSentiment = udf(getfriendsSentiment, StringType())
businessReviewUserDfWithFriends=businessReviewUserDfWithFriends.withColumn('friendsSentimentToBusiness', friendsSentiment('friends','business_id'))
Error:
py4j.Py4JException: Method __getstate__([]) does not exist
Trying by creating a Table and querying it:
sparkSession.sql("CREATE TABLE events USING DELTA LOCATION '/delta/events/'")
def getfriendsSentiment(friendsList, b_id):
listOfSentiments=[]
for friend_id in friendsList.split(','):
try:
listOfSentiments.append(spark.sql("SELECT review_sentiment FROM events WHERE user_id='"+friend_id+"' AND business_id='"+b_id+"' GROUP BY review_sentiment ORDER BY COUNT(review_sentiment) DESC LIMIT 1").rdd.flatMap(lambda x:x).collect()[0])
except:
pass
Error:
PicklingError: Could not serialize object: Exception: It appears that you are attempting........
What can I do to get around this?
See Question&Answers more detail:
os