Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
105 views
in Technique[技术] by (71.8m points)

Multilevel aggregation logstash

I am trying to load data from the oracle database to elastic search using logstash. Doing multiple aggregations using plugins logstash-filter-aggregate (2.8.0). I extract data from a relational database using plugin-jdbc. Where a teacher has and multiple contact_details.

I would like this result:

"teachers" : [
                {
                  "tch_name" : "aaa",
                  "social_cat" : "art",
                  "tch_id" : 201,
                  "contact_details" : [
                    {
                        "phone_no": ["1111111111","2222222222"],
                        "email_id: [[email protected],[email protected]]
                    }
                   ]
                }
        ]

Database (oracle) queries for creating tables and select statement.

create table mst_school (sch_id integer primary key,udise_sch_code varchar2(50),school_name varchar2(50), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into mst_school (sch_id,udise_sch_code,school_name) values(1,'100','AESA');
insert into mst_school (sch_id,udise_sch_code,school_name) values(2,'200','PVP');
create table teacher_profile (teacher_id integer primary key,name varchar2(50),social_category varchar2(50),sch_id integer references mst_school(sch_id),startdate TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(201,'aaa','art',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi')); --yyyy-MM-dd-HH:mm
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(202,'bbb','math',1, TO_DATE('2020-07-01-00:00','yyyy-MM-dd-hh24:mi'));
insert into teacher_profile (teacher_id,name,social_category,sch_id,startdate) values(203,'ccc','phy',2,TO_DATE('2020-10-22-00:00','yyyy-MM-dd-hh24:mi'));
create table village (village_id integer primary key,village_name varchar2(50),sch_id integer references mst_school(sch_id), latitude number(12,6),longitude number(12,6), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(500,'Pune',1, 18.5135, 73.7699);
insert into village (village_id,village_name,sch_id,latitude,longitude) values(600,'Mumbai',2,19.0760, 72.8777);
create table contact_info (teacher_ids integer REFERENCES teacher_profile(teacher_id), phone_no varchar2(20), email_id varchar2(20), update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL);
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '1111111111', '[email protected]');
insert into contact_info (teacher_ids,phone_no,email_id) values (201, '2222222222', '[email protected]');
insert into contact_info (teacher_ids,phone_no,email_id) values (202, '3333333333', '[email protected]');
insert into contact_info (teacher_ids,phone_no,email_id) values (203, '4444444444', '[email protected]');
select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
                s.sch_id as sch_id,
                s.udise_sch_code as sch_code,
                s.school_name as school_name,
                v.latitude as latitude,
                v.longitude as longitude,
                tch.teacher_id as tch_id,
                tch.name as tch_name,
                tch.social_category as social_cat,
                tch.startdate as startdate,
                c.phone_no as phone_no,
                v.village_id as village_id,
                v.village_name as village_name,
                v.sch_id as vsch_id
            from mst_school s
            LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
            LEFT JOIN village v on s.sch_id = v.sch_id
            LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids;

.conf file Wants to modify configuration file so that the output will be having an array of contact details.

input {
jdbc {
    jdbc_connection_string => "jdbc:oracle:thin:@****:1521/DB19C"
    jdbc_driver_library => "/home/user/Downloads/ojdbc8-19.3.0.0.jar"
        jdbc_user => "****"
        jdbc_password => "****"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    schedule => "* * * * *"     
    statement => "select concat(s.udise_sch_code ,tch.teacher_id) comp_id,
                s.sch_id as sch_id,
                s.udise_sch_code as sch_code,
                s.school_name as school_name,
                v.latitude as latitude,
                v.longitude as longitude,
                tch.teacher_id as tch_id,
                tch.name as tch_name,
                tch.social_category as social_cat,
                to_char(tch.startdate,'yyyy-MM-dd-HH:mm') as startdate,
                c.phone_no as phone_no,
                c.email_id as email_id,
                v.village_id as village_id,
                v.village_name as village_name,
                v.sch_id as vsch_id
            from mst_school s
            LEFT JOIN teacher_profile tch on s.sch_id = tch.sch_id
            LEFT JOIN village v on s.sch_id = v.sch_id
            LEFT JOIN contact_info c on tch.teacher_id = c.teacher_ids"         
            
            tracking_column_type => "numeric"
            
            jdbc_paging_enabled => true
            jdbc_fetch_size => "500"
            charset => "UTF-8"
            codec => json
            tracking_column => sch_id
            last_run_metadata_path => "/home/user/Desktop/.sch_id_tracker_file"
    }
}
filter {
        aggregate {
            task_id => "%{sch_id}"
            code => "
            map['comp_id'] = event.get('comp_id')
            map['sch_id'] = event.get('sch_id')
            map['sch_code'] = event.get('sch_code')
            map['teachers'] ||= []
            map['teachers'] << {
                        'tch_id' => event.get('tch_id'),
                        'tch_name' => event.get('tch_name'),
                        'social_cat' => event.get('social_cat')
                        # Contact details
                    }
                    
            event.cancel()
            "
        
        timeout_tags => ["aggregate"]
        push_previous_map_as_event => true
        timeout => 3
        
        }
}
output {
    elasticsearch {
     document_id => "%{sch_id}"
     index => "school_index"
    }
}

How can I improve my conf file to get desired output.

question from:https://stackoverflow.com/questions/65931388/multilevel-aggregation-logstash

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...