Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
235 views
in Technique[技术] by (71.8m points)

java - creating logic inside Spring Batch using Tasklet or inside CompositeItemWriter?

I have created 3 files writers that creates 3 local files. But all the information that is read goes into each file the same way. I am trying to set some logic so that only the items that are being called go into their proper file. For Example: fileA,FileB,FileC are created. When the reader hits the MONGODB it should parse through the buisness column and find the 3 different types, TypeA,TypeB, and TypeC. Once the reader has found the different types it should add it to the appropriate file, instead of printing everything to each file created. FileA should only have the information from TypeA there shouldnt be any other information written other than that. Is there a way to do it? here is what my code looks like.

ItemReader:

  @Bean
  public MongoItemReader<PaymentAudit> mongoreader() {
    LOG.debug("Mongo-Reader");    
    @SuppressWarnings("unchecked")
    MongoItemReader<PaymentAudit> mongoreader = new MongoItemReader();
    mongoreader.setTemplate(mongoTemplate);
    mongoreader.setQuery("{}");
    mongoreader.setTargetType(PaymentAudit.class);
    mongoreader.setSort(new HashMap<String, Sort.Direction>() {
        {
            put("_id", Direction.ASC);
        }
    });
    return mongoreader;
  }

FileItemWriter:

   @StepScope
@Bean
public FlatFileItemWriter<PaymentAudit> writer() {
    LOG.debug("Mongo-writer");  
    String exportFilePath="C:\filewriter\retail.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
 FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeA";
    StringHeaderWriter headerWriter = new 
  StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}
@Bean
public FlatFileItemWriter<PaymentAudit> writer2() {
    LOG.debug("flatFileItemWriter");    
    String exportFilePath="C:\filewriter\hcc.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
  FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeB";
    StringHeaderWriter headerWriter = new 
    StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}
@Bean
public FlatFileItemWriter<PaymentAudit> writer3() {
    LOG.debug("Mongo-writer");  
    String exportFilePath="C:\filewriter\srx.txt";
    FlatFileItemWriter<PaymentAudit> flatFile = new 
    FlatFileItemWriterBuilder<PaymentAudit>()
            .name("")
            .resource(new FileSystemResource(exportFilePath))
            .lineAggregator(createPaymentPortalLineAggregator())
            .build();
    String exportFileHeader = "TypeC";
    StringHeaderWriter headerWriter = new 
    StringHeaderWriter(exportFileHeader);
    flatFile.setHeaderCallback(headerWriter);
    return flatFile;

}

@SuppressWarnings({ "unchecked", "rawtypes" })
public CompositeItemWriter<PaymentAudit> compositeItemWriter(){
    CompositeItemWriter writer = new CompositeItemWriter();


  private LineAggregator<PaymentAudit> createPaymentPortalLineAggregator() {
    DelimitedLineAggregator<PaymentAudit> lineAggregator = new 
    DelimitedLineAggregator<>();
    lineAggregator.setDelimiter("|");
    FieldExtractor<PaymentAudit> fieldExtractor = 
    createPaymentPortalFieldExtractor();
    lineAggregator.setFieldExtractor(fieldExtractor);
    return lineAggregator;
}

private FieldExtractor<PaymentAudit> createPaymentPortalFieldExtractor() {
    BeanWrapperFieldExtractor<PaymentAudit> extractor = new 
BeanWrapperFieldExtractor<>();
    extractor.setNames(new String[] { "TypeA, TypeB, TypeC"});      
    return extractor;
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You need to use the ClassifierCompositeItemWriter in order to classify items and write each type in its corresponding file. Here is a quick example you can try:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

@Configuration
@EnableBatchProcessing
public class MyJob {

    private JobBuilderFactory jobBuilderFactory;

    private StepBuilderFactory stepBuilderFactory;

    public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public ItemReader<Person> itemReader() {
        Person foo1 = new Person();foo1.setId(1);foo1.setName("foo1");
        Person foo2 = new Person();foo2.setId(2);foo2.setName("foo2");
        Person bar1 = new Person();bar1.setId(3);bar1.setName("bar1");
        Person bar2 = new Person();bar2.setId(4);bar2.setName("bar2");
        return new ListItemReader<>(Arrays.asList(foo1, foo2, bar1, bar2));
    }

    @Bean
    public ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter(ItemWriter<Person> fooItemWriter, ItemWriter<Person> barItemWriter) {
        ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter = new ClassifierCompositeItemWriter<>();
        classifierCompositeItemWriter.setClassifier((Classifier<Person, ItemWriter<? super Person>>) person -> {
            if (person.getName().startsWith("foo")) {
                return fooItemWriter;
            } else {
                return barItemWriter;
            }
        });
        return classifierCompositeItemWriter;
    }

    @Bean
    public FlatFileItemWriter<Person> fooItemWriter() {
        return new FlatFileItemWriterBuilder<Person>()
                .name("fooItemWriter")
                .resource(new FileSystemResource("foos.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .build();
    }

    @Bean
    public FlatFileItemWriter<Person> barItemWriter() {
        return new FlatFileItemWriterBuilder<Person>()
                .name("barItemWriter")
                .resource(new FileSystemResource("bars.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .build();
    }

    @Bean
    public Step dataExtractionStep() {
        return stepBuilderFactory.get("dataExtractionStep")
                .<Person, Person>chunk(2)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter(fooItemWriter(), barItemWriter()))
                .stream(fooItemWriter())
                .stream(barItemWriter())
                .build();
    }

    @Bean
    public Job dataExtractionJob() {
        return jobBuilderFactory.get("dataExtractionJob")
                .start(dataExtractionStep())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    public static class Person {

        private int id;

        private String name;

        public Person() {
        }

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + ''' +
                    '}';
        }
    }

}

This sample reads some Person items and writes those with name foo* to foos.txt and those with name bar* to bars.txt.

Hope this helps.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...