Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.4k views
in Technique[技术] by (71.8m points)

spring - Partitions and JdbcPagingItemReader doesn't gives correct values

I am working on Spring Batch and Partition using the JdbcPagingItemReader, but I am only getting half records.

If I am expecting 100 thousand records instead getting only 50 thousand. What's wrong is happening How to use Nested or Inner Query in OraclePagingQueryProvider ?

My Original Query

SELECT q.*
  FROM (SELECT DEPT.ID id,
               DEPT.CREATOR createdby,
               DEPT.CREATE_DATE createddate,
               DEPT.UPDATED_BY updatedby,
               DEPT.LAST_UPDATE_DATE updateddate,
               DEPT.NAME name,
               DEPT.STATUS status,
               statusT.DESCR statusdesc,
               REL.ROWID_DEPT1 rowidDEPT1,
               REL.ROWID_DEPT2 rowidDEPT2,
               DEPT2.DEPT_FROM_VAL parentcid,
               DEPT2.NAME parentname,
               ROW_NUMBER() OVER (PARTITION BY DEPT.CREATE_DATE ORDER BY DEPT.ID) AS rn
          FROM TEST.DEPT_TABLE DEPT
          LEFT JOIN TEST.STATUS_TABLE statusT
            ON DEPT.STATUS = statusT.STATUS
          LEFT JOIN TEST.C_REL_DEPT rel
            ON DEPT.ID = REL.ROWID_DEPT2
          LEFT JOIN TEST.DEPT_TABLE DEPT2
            ON REL.ROWID_DEPT1 = DEPT2.ID) q
 WHERE rn BETWEEN ? AND ?;  // ? will be fromValue to toValue

Code:

@Configuration
public class CustomerJob2 {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public CustomerPartitioner customerPartitioner() {
        return new CustomerPartitioner();
    }

    @Bean("readCustomerJob")
    @Primary
    public Job readCustomerJob() throws Exception {
        return jobBuilderFactory.get("readCustomerJob")
                .incrementer(new RunIdIncrementer())
                .start(customerStepOne())
                .build();
    }

    @Bean
    public Step customerStepOne() throws Exception {    
        return stepBuilderFactory.get("customerStepOne")
                .partitioner(slaveStep().getName(), customerPartitioner())
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    // slave step
    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<Customer, Customer>chunk(3000)
                .reader(pagingItemReader(null, null))
                .writer(customerWriter())
                .listener(customerStepOneExecutionListener())
                .build();
    }


    // Reader
    @Bean(destroyMethod = "")
    @StepScope
    public JdbcPagingItemReader<Customer> pagingItemReader(
            @Value("#{stepExecutionContext['fromValue']}") Long fromValue,
            @Value("#{stepExecutionContext['toValue']}") Long toValue) throws Exception {
        
        System.out.println(" FROM = "+ fromValue + " TO VALUE ="+ toValue);
        
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(this.dataSource);
        reader.setRowMapper(new CustomerRowMapper());
        reader.setSaveState(false);
        reader.setPageSize(3000);
        
        // Sort Keys
        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);

        OraclePagingQueryProvider queryProvider = new OraclePagingQueryProvider();
        queryProvider.setSelectClause("q.* FROM ( SELECT Row_Number() OVER (ORDER BY party.ROWID_OBJECT) MyRow, "
                + " OTHER coumns in the Query");

        queryProvider.setFromClause("**** "
                + "LEFT JOIN ********* "
                + "LEFT JOIN ********* "
                + "LEFT JOIN ********* ) q ");
                
        queryProvider.setWhereClause("MyRow BETWEEN "+ fromValue + " AND "+ toValue);
        
        queryProvider.setSortKeys(sortKeys);
        reader.setQueryProvider(queryProvider);
        reader.afterPropertiesSet();
        return reader;
    }
    
    
    @Bean
    public CustomerWriter customerWriter() {
        return new CustomerWriter();
    }
}

Partition Logic

public class CustomerPartitioner implements Partitioner{
    private static final String CUSTOMER_CNT = "SELECT COUNT(party.IS) ***** COMPLEX JOIN";
    
    @Autowired
    @Qualifier("edrJdbcTemplate")
    private JdbcTemplate jdbcTemplate;
    
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Long custCnt = jdbcTemplate.queryForObject(CUSTOMER_CNT, Long.class);
    
        int toValue = 0;
        int fromValue = 0;
        int increment = 3000;
        int counter = 0;
        
        int temp = 0;
        Map<String, ExecutionContext> partitionMap = new HashMap<>();
        
        for (int i = 0; i < custCnt; i += increment) { // custCnt fives 100 thousand
            counter++;
            temp = i;           
            if(i == 0) {
                fromValue = temp;               
                toValue = increment;
            }else {
                fromValue = toValue + 1;
                toValue = fromValue + increment - 1;
            }
            ExecutionContext context = new ExecutionContext();
            context.put("fromValue", fromValue);
            context.put("toValue", toValue);
            partitionMap.put("Thread--" + counter, context);
        }
        return partitionMap;
    }
}

Here are the logs -

2020-06-22 22:44:14.750 INFO 15752 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=readCustomerJob]] launched with th

e following parameters: [{JobID=1592846054670, date=1592846054670}]
2020-06-22 22:44:14.790  INFO 15752 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [customerStepOne]
Cust Count = 1035483
 FROM = 6001 TO VALUE =9000
 FROM = 0 TO VALUE =3000
 FROM = 3001 TO VALUE =6000
 FROM = 9001 TO VALUE =12000
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-4] o.s.b.i.database.JdbcPagingItemReader    : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-1] o.s.b.i.database.JdbcPagingItemReader    : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-2] o.s.b.i.database.JdbcPagingItemReader    : Reading page 0
2020-06-22 22:44:15.874 DEBUG 15752 --- [cTaskExecutor-3] o.s.b.i.database.JdbcPagingItemReader    : Reading page 0
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...