Heres how reduceByKey
works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
# part A (key) part B (value) counter
Let me go step by step
After performing the following mapValues
function
rdd_ori.mapValues(lambda x: (x,1))
the rdd data will look as
((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
So when reduceByKey
is invoked as
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
And all the rows with same key are grouped and values are passed to the lambda
function of reducyByKey
.
Since in your case, all the keys are same, the values are passed to a
and b
variables in the following iterations.
In first iteration, a
is ((-5.9427185, 0.6761626999999999, 8.128204), 1)
and b
is ((-5.958191, 0.6880646, 8.135345), 1)
so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
is correct and passes.
In second iteration, a
is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
which is (-11.910430999999999, 1.3582764, 16.271881, 2)
So if you look at the format of the data there is no such a[0][0]
in a
. You can just get a[0]
, a[1]
.. and so on. So thats the issue. And thats what the error message is suggesting too.
TypeError: 'float' object is not subscriptable
The solution to this is to format the data so that you can access a
as a[0][0]
which can be done if you format your reduceByKey
of the following format.
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
But that would trouble your last mapValues
function
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
as your values, i.e. a
in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4)
so a[0]
means (-23.848236199999995, 2.6879882999999998, 32.604461)
and a[1]
means 4
and there aren't any more so you will encounter
IndexError: tuple index out of range
So your last mapValues
should be
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
So overall, following code should work for you
rdd_ori = sc.textFile("asdasd.csv")
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x, 1))
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
I hope I have explained it well enough.