Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
239 views
in Technique[技术] by (71.8m points)

how to handle this in spark

I am using spark-sql 2.4.x version , datastax-spark-cassandra-connector for Cassandra-3.x version. Along with kafka.

I have a scenario for some finance data coming from kafka topic. data (base dataset) contains companyId, year , prev_year fields information.

If columns year === prev_year then I need to join with different table i.e. exchange_rates.

If columns year =!= prev_year then I need to return the base dataset itself

How to do this in spark-sql ?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can refer below approach for your case.

scala> Input_df.show
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
|        1|2016|     2017|  12|
|        1|2017|     2017|21.4|
|        2|2018|     2017|11.7|
|        2|2018|     2018|44.6|
|        3|2016|     2017|34.5|
|        4|2017|     2017|  56|
+---------+----+---------+----+


scala> exch_rates.show
+---------+----+
|companyId|rate|
+---------+----+
|        1|12.3|
|        2|12.5|
|        3|22.3|
|        4|34.6|
|        5|45.2|
+---------+----+


scala> val equaldf = Input_df.filter(col("year") === col("prev_year"))

scala> val notequaldf = Input_df.filter(col("year") =!= col("prev_year"))

scala> val joindf  = notequaldf.alias("n").drop("rate").join(exch_rates.alias("e"), List("companyId"), "left")

scala> val finalDF = equaldf.union(joindf)

scala> finalDF.show()
+---------+----+---------+----+
|companyId|year|prev_year|rate|
+---------+----+---------+----+
|        1|2017|     2017|21.4|
|        2|2018|     2018|44.6|
|        4|2017|     2017|  56|
|        1|2016|     2017|12.3|
|        2|2018|     2017|12.5|
|        3|2016|     2017|22.3|
+---------+----+---------+----+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...