Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
318 views
in Technique[技术] by (71.8m points)

apache spark - How to read from a csv file to create a scala Map object?

I have a path to a csv I'd like to read from. This csv includes three columns: "topic, key, value" I am using spark to read this file as a csv file. The file looks like the following(lookupFile.csv):

Topic,Key,Value
fruit,aaa,apple
fruit,bbb,orange
animal,ccc,cat
animal,ddd,dog

//I'm reading the file as follows
val lookup = SparkSession.read.option("delimeter", ",").option("header", "true").csv(lookupFile)

I'd like to take what I just read and return a map that has the following properties:

  • The map uses the topic as a key
  • The value of this map is a map of the "Key" and "Value" columns

My hope is that I would get a map that looks like the following:

val result = Map("fruit" -> Map("aaa" -> "apple", "bbb" -> "orange"),
                 "animal" -> Map("ccc" -> "cat", "ddd" -> "dog"))

Any ideas on how I can do this?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
scala> val in = spark.read.option("header", true).option("inferSchema", true).csv("""Topic,Key,Value
     | fruit,aaa,apple
     | fruit,bbb,orange
     | animal,ccc,cat
     | animal,ddd,dog""".split("
").toSeq.toDS)
in: org.apache.spark.sql.DataFrame = [Topic: string, Key: string ... 1 more field]

scala> val res = in.groupBy('Topic).agg(map_from_entries(collect_list(struct('Key, 'Value))).as("subMap"))
res: org.apache.spark.sql.DataFrame = [Topic: string, subMap: map<string,string>]

scala> val scalaMap = res.collect.map{
     | case org.apache.spark.sql.Row(k : String, v : Map[String, String]) => (k, v) 
     | }.toMap
<console>:26: warning: non-variable type argument String in type pattern scala.collection.immutable.Map[String,String] (the underlying of Map[String,String]) is unchecked since it is eliminated by erasure
       case org.apache.spark.sql.Row(k : String, v : Map[String, String]) => (k, v)
                                                     ^
scalaMap: scala.collection.immutable.Map[String,Map[String,String]] = Map(animal -> Map(ccc -> cat, ddd -> dog), fruit -> Map(aaa -> apple, bbb -> orange))

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...