I've finally found a satisfying solution.
The idea is to zip join two sources, ordered by datetime - but it still should meet the limitation of time window. So I made this steps to achieve all the goals:
- Calculate session numbers for each source
- Shift every parent's session if no parents found
- Shift all downstream sessions accordingly
- Adjust children's sessions to parent's
- Group each session on parent
It still fails on event_type = 6
, but this can be fixed by time_window tweaking.
with raw_data as (
SELECT 1 source_number, 1 dt, '1.1' name, 1 event_type
union all select 2, 0, '2.1', 1
union all select 2, 1, '2.2', 1
union all select 2, 69, '2.5', 1
union all select 3, 60, '3.1', 1
union all select 3, 60, '3.2', 1
union all select 3, 69, '3.3', 1
union all select 1, 1, '1.1', 2
union all select 1, 100, '1.2', 2
union all select 2, 100, '2.1', 2
union all select 2, 200, '2.2', 2
union all select 2, 202, '2.3', 2
union all select 3, 1, '3.1', 2
union all select 3, 10, '3.2', 2
union all select 3, 100, '3.3', 2
union all select 4, 5, '4.1', 2
union all select 4, 15, '4.2', 2
union all select 4, 200, '4.3', 2
union all select 5, 1, '5.1', 2
union all select 5, 5, '5.2', 2
union all select 5, 15, '5.3', 2
union all select 5, 99, '5.4', 2
union all select 5, 100, '5.5', 2
union all select 5, 101, '5.6', 2
union all select 6, 50, '6.1', 2
union all select 6, 140, '6.2', 2
union all select 6, 200, '6.3', 2
union all select 6, 290, '6.4', 2
union all select 7, 50, '7.1', 2
union all select 7, 200, '7.2', 2
union all select 7, 210, '7.3', 2
union all select 7, 1000, '7.4', 2
union all select 1, 1, '1.1', 6
union all select 2, 55, '2.1', 6
union all select 3, 85, '3.1', 6
union all select 4, 255, '4.1', 6
union all select 1, 1, '1.1', 7
union all select 1, 1000, '1.2', 7
union all select 2, 1001, '2.1', 7
union all select 3, 1020, '3.1', 7
union all select 4, 1030, '4.1', 7
)
, windows as (
select 1 source_number, 0 time_window
union all select 2, 60
union all select 3, 60
union all select 4, 120
union all select 5, 120
union all select 6, 150
union all select 7, 500
)
, dat as (
select
*
from raw_data
left join windows using(source_number)
)
, sessions as (
select
*,
row_number() over(partition by event_type, source_number order by dt) session
from dat
)
, calc_parent_shift as (
select
a.*,
case
when count(b.session) = 0
then greatest(countif(count(b.session) = 0) over (w_session) - 1, 0)
else 0
end as parent_shift
from sessions a
left join sessions b
on a.event_type = b.event_type
and a.source_number > b.source_number
and a.session <= b.session
and ABS(a.dt - b.dt) <= a.time_window + b.time_window
group by 1, 2, 3, 4, 5, 6
window w_session as (
partition by a.event_type, a.session order by a.dt
)
)
, shift_parent_session as (
select
* except(session),
session + max(parent_shift) over (w_shift) as session,
session as old_session
from calc_parent_shift
window w_shift as (
partition by event_type, source_number
order by session
)
)
, shift_child_session as (
select
a.* except(session),
ifnull(array_agg(b.session order by b.source_number, b.session)[offset(0)] - a.old_session, 0) as child_shift,
a.session + greatest (
max(ifnull(array_agg(b.session order by b.source_number, b.session)[offset(0)] - a.old_session, 0)) over (w)
- max(a.parent_shift) over (w)
, 0
) as session
from shift_parent_session a
left join shift_parent_session b
on a.event_type = b.event_type
and a.source_number > b.source_number
and a.session <= b.session
and ABS(a.dt - b.dt) <= a.time_window + b.time_window
group by 1, 2, 3, 4, 5, 6, 7, a.session
window w as (
partition by a.event_type, a.source_number
order by a.session
)
)
, session_groups as (
select
a.* except (parent_shift, child_shift, old_session),
min(b.source_number) parent_source_number
from shift_child_session a
left join shift_child_session b
on a.event_type = b.event_type
and a.session = b.session
and ABS(a.dt - b.dt) <= a.time_window + b.time_window
and a.source_number >= b.source_number
group by 1, 2, 3, 4, 5, 6
)
, result as (
select
event_type,
session,
parent_source_number,
array_agg(source_number) source_number,
array_agg(dt) dt,
array_agg(name) name
from session_groups
group by 1, 2, 3
order by 1, 2, 3
)
select * from result
# where event_type = 6
order by event_type, session