I'm trying to use WindowFunction with DataStream, my goal is to have a Query like the following
SELECT *,
count(id) OVER(PARTITION BY country) AS c_country,
count(id) OVER(PARTITION BY city) AS c_city,
count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country
have helped me for the aggregation by the country field, but I need to do the aggregation by two fields in the same time window.
I don't know if it is possible to have two or more keys in keyBy( ) for this case
val parsed = stream2.map(x=> {
val arr = x.split(",")
(arr(0).toInt, arr(1), arr(2))
})
parsed
.keyBy(x => x._2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ProcessWindowFunction[
(Int, String, String), (Int, String, String, Int), String, TimeWindow
]() {
override def process(key: String, context: Context,
elements: Iterable[(Int, String, String)],
out: Collector[(Int, String, String, Int)]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
}
}).print().setParallelism(1)
This is great for the first aggregation, but I'm missing the second aggregation for the city field in the same time window.
input data:
10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"
expected output
(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)
---------------- UPDATE 2 ------------------
I currently want to do the aggregation for 3 columns. If the option I'm using is to chain the KeyBy() output, but this can become very long and complex, and not very readable.
In addition to this, I put a time window of Time.seconds(1) because without this window the KeyBy() output above takes as individual events.
my interest is if I can make these aggregations in a single process function.
I have the code that long...
parsed
.keyBy(_.country) // key by product id.
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new ProcessWindowFunction[
AlarmasIn, AlarmasOut, String, TimeWindow
]() {
override def process(key: String, context: Context,
elements: Iterable[AlarmasIn],
out: Collector[AlarmasOut]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address, lst.size,0,0)))
}
})
.keyBy( _.city).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction[
AlarmasOut, AlarmasOut, String, TimeWindow
]() {
override def process(key: String,
context: Context,
elements: Iterable[AlarmasOut],
out: Collector[AlarmasOut]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,lst.size,x.c_addr)))
}
})
.keyBy( _.address).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction[
AlarmasOut, AlarmasOut, String, TimeWindow
]() {
override def process(key: String,
context: Context,
elements: Iterable[AlarmasOut],
out: Collector[AlarmasOut]): Unit = {
val lst = elements.toList
lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,x.c_city,lst.size)))
}
})
.print()
/// CASE CLASS
case class AlarmasIn(
id: Int,
country: String,
city: String,
address: String
)
case class AlarmasOut(
id: Int,
country: String,
city: String,
address: String,
c_country: Int,
c_city: Int,
c_addr: Int
)
See Question&Answers more detail:
os