You can do all of this with dataframe transformations and udfs. The only slightly annoying thing is that, because you technically have two different types of dictionaries (one where key=integer and value=dictionary, the other where key=integer value=float), you will have to define two udfs with different datatypes. Here is one possible way to do this:
from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,FloatType
data = [[160,163,27.0],[160,183,27.0],[161,162,22.0],
[161,170,31.0],[162,161,22.0],[162,167,24.0],
[163,160,27.0],[163,164,27.0],[164,163,27.0],
[164,165,35.0],[165,164,35.0],[165,166,33.0],
[166,165,33.0],[166,167,31.0],[167,162,24.0],
[167,166,31.0],[167,168,27.0],[168,167,27.0],
[168,169,23.0],[169,168,23.0]]
cols = ['FromComponentID','ToComponentID','Cost']
df = spark.createDataFrame(data,cols)
combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
MapType(IntegerType(),FloatType()))
combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
MapType(IntegerType(),MapType(IntegerType(),FloatType())))
mapdf = df.groupBy('FromComponentID')
.agg(collect_list(create_map('ToComponentID','Cost')).alias('maps'))
.agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))
result_dict = mapdf.collect()[0][0]
For a large dataset, this should offer some performance boosts over a solution that requires the data to be collected onto a single node. But since spark still has to serialize the udf, there won't be huge gains over an rdd based solution.
Update:
An rdd solution is a lot more compact but, in my opinion, it is not as clean. This is because pyspark doesn't store large dictionaries as rdds very easily. The solution is to store it as a distributed list of tuples and then convert it to a dictionary when you collect it to a single node. Here is one possible solution:
maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))
result_dict = dict(maprdd.collect())
Again, this should offer performance boosts over a pure python implementation on single node, and it might not be that different than the dataframe implementation, but my expectation is that the dataframe version will be more performant.