Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
509 views
in Technique[技术] by (71.8m points)

python - Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKey

my "asdasd.csv" file has the following structure.

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

Ok, I get the following {key,value} tuple to operate with it.

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 

My code for calculating the mean is the following, I have to calculate the mean from each column, X, Y Z for each Key.

rdd_ori = sc.textFile("asdasd.csv") 
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) 
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

My problem I that I tried that code and it works fine on other PC with the same MV I'm using for developing it (PySpark Py3)

Here is an example, that this code is correct:

enter image description here

But I don't know why I'm getting this error, important part is in Strong.

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take(1))

/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res

/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 990 # SparkContext#runJob. 991 mappedRDD = rdd.mapPartitions(partitionFunc) --> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}. ". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main process() File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func return f(iterator) File "/opt/spark/current/python/pyspark/rdd.py", line 1842, in combineLocally merger.mergeValues(iterator) File "/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' object is not subscriptable

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Heres how reduceByKey works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
#           part A (key)               part B (value)       counter

Let me go step by step

After performing the following mapValues function

rdd_ori.mapValues(lambda x: (x,1))

the rdd data will look as

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))

So when reduceByKey is invoked as

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))

And all the rows with same key are grouped and values are passed to the lambda function of reducyByKey.

Since in your case, all the keys are same, the values are passed to a and b variables in the following iterations.

In first iteration, a is ((-5.9427185, 0.6761626999999999, 8.128204), 1) and b is ((-5.958191, 0.6880646, 8.135345), 1) so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) is correct and passes.

In second iteration, a is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) which is (-11.910430999999999, 1.3582764, 16.271881, 2)

So if you look at the format of the data there is no such a[0][0] in a. You can just get a[0], a[1] .. and so on. So thats the issue. And thats what the error message is suggesting too.

TypeError: 'float' object is not subscriptable

The solution to this is to format the data so that you can access a as a[0][0] which can be done if you format your reduceByKey of the following format.

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))

But that would trouble your last mapValues function

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

as your values, i.e. a in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4) so a[0] means (-23.848236199999995, 2.6879882999999998, 32.604461) and a[1] means 4 and there aren't any more so you will encounter

IndexError: tuple index out of range

So your last mapValues should be

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

So overall, following code should work for you

rdd_ori = sc.textFile("asdasd.csv") 
    .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) 
    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

I hope I have explained it well enough.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...