Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
217 views
in Technique[技术] by (71.8m points)

java - SpringBatch local partitioning restart problems

I am having issues with restart of local partitioning batch. I am throwing RuntimeException on 101st processed item. The job fails, but something is going wrong, because on restart, the job continues from 150th item (and not from the 100th item that it should).

Here is the xml-conf:

<bean id="taskExecutor" class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor" >
    <property name="workManagerName" value="springWorkManagers" />
</bean>

<bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/>

<batch:job id="LocalPartitioningJob">
    <batch:step id="masterStep">
        <batch:partition step="slaveStep" partitioner="splitPartitioner">
            <batch:handler grid-size="5" task-executor="taskExecutor"  />
        </batch:partition>
    </batch:step>
</batch:job>

<batch:step id="slaveStep">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="partitionReader" processor="compositeItemProcessor" writer="sqlWriter" commit-interval="50" />
        <batch:transaction-attributes isolation="SERIALIZABLE" propagation="REQUIRE" timeout="600" />
        <batch:listeners>
            <batch:listener ref="Processor1" /> 
            <batch:listener ref="Processor2" /> 
            <batch:listener ref="Processor3" />
        </batch:listeners>
    </batch:tasklet>
</batch:step>

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
    <property name="tablePrefix" value="${sb.db.tableprefix}" />
    <property name="dataSource" ref="ds" />
    <property name="maxVarCharLength" value="1000"/>
</bean>

<bean id="transactionManager" class="org.springframework.transaction.jta.WebSphereUowTransactionManager"/>

<jee:jndi-lookup id="ds" jndi-name="${sb.db.jndi}" cache="true" expected-type="javax.sql.DataSource" />

The splitPartitioner implements Partitioner and splits the initial data and saves it to the executionContexts as lists. The processors call remote ejb's to fetch additional data and the sqlWriter is just a org.spring...JdbcBatchItemWriter. PartitionReader code below:

public class PartitionReader implements ItemStreamReader<TransferObjectTO> {
    private List<TransferObjectTO> partitionItems;

    public PartitionReader() {
    }

    public synchronized TransferObjectTO read() {
        if(partitionItems.size() > 0) {
            return partitionItems.remove(0);
        } else {
            return null;
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        partitionItems = (List<TransferObjectTO>) executionContext.get("partitionItems");
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("partitionItems", partitionItems);
    }

    @Override
    public void close() throws ItemStreamException {
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It seems that I had few misunderstandings of SpringBatch and my buggy code. The first misunderstanding was that I thought that the readCount would be rolled back on RuntimeException. Now I see that this is not the case, but when SpringBatch is incrementing this value and upon step failure, the value is committed.

Related to above, I thought that the update method on ItemStreamReader would be always called, but the executionContext update to database would just be committed or rolled back. But it seems that the update is called only if no errors occur and the executionContext update is always committed.

The third misunderstanding was that the partitioning "master step" would not be re-executed on restart, but only slave steps are re-executed. But actually "master step" is re-executed if "master step"'s slave step would fail. So I guess that master and slave steps are actually somehow handled as a single step.

And then there was my buggy code in the PartitionReader, which was supposed to save db-server disk space. Maybe the partitionItems should not be edited on next()? (Related to the above statements) Anyhow here is the code for the working PartitionReader:

public class PartitionReader implements ItemStreamReader<TransferObjectTO> {
    private List<TransferObjectTO> partitionItems;
    private int index;

    public PartitionReader() {
    }

    public synchronized TransferObjectTO read() {
        if(partitionItems.size() > index) {
            return partitionItems.get(index++);
        } else {
            return null;
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        partitionItems = (List<TransferObjectTO>) executionContext.get("partitionItems");
        index = executionContext.getInt("partitionIndex", 0);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("partitionIndex", index);
    }

    @Override
    public void close() throws ItemStreamException {
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...