Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
226 views
in Technique[技术] by (71.8m points)

Spring integration: unexpected behavior on error handling in splitter-aggregator flow

I have following failing test. The question is why reply in case of error on one of splitted "sub-messages" is error only, without result for the other, successfully-handled, sub-message (as expected in test)? Is there a modification to this code to achieve result expected in test?

@RunWith(SpringRunner.class)
public class ErrorHandlingTests {

    @Autowired
    StringsService stringsService;

    interface StringsService {
        @Nonnull
        List<String> process(@Nonnull List<String> data);
    }

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel")
                    .<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow2() {
            AtomicInteger incomingCorrelationId = new AtomicInteger();

            return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> message) {
                            List<String> strings = (List<String>) message.getPayload();
                            int id = incomingCorrelationId.get();
                            return strings
                                    .stream()
                                    .map(r -> MessageBuilder
                                            .withPayload(r)
                                            .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, id)
                                            .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, strings.size())
                                            .build())
                                    .collect(Collectors.toList());
                        }
                    })
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");

                            return "R: " + s;
                        }
                    })
                    .aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
                        @Override
                        protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
                            return group.getMessages()
                                    .stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.toList());
                        }
                    }))
                    .get();
        }
    }

    @Test
    public void testErrorHandlingInFlow2() {
        assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
        assertEquals(Arrays.asList("R: a", "Failure for oops"), stringsService.process(Arrays.asList("a", "oops")));
    }
}

Updated version, working, with applied advice.

@RunWith(SpringRunner.class)
public class ErrorHandlingTests2 {

    interface StringsService {
        @Nonnull
        List<String> process(@Nonnull List<String> data);
    }

    @Autowired
    StringsService stringsService;

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel")
                    .<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow2() {
            return IntegrationFlows.from(StringsService.class, gws -> gws.errorChannel("errorChannel"))
                    .split(new AbstractMessageSplitter() {
                        @Override
                        protected Object splitMessage(Message<?> message) {
                            List<String> strings = (List<String>) message.getPayload();
                            return strings
                                    .stream()
                                    .map(r -> MessageBuilder
                                            .withPayload(r)
                                            .build())
                                    .collect(Collectors.toList());
                        }
                    })
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");

                            return "R: " + s;
                        }
                    }, c -> c.advice(advice()))
                    .aggregate(a -> a.outputProcessor(new AbstractAggregatingMessageGroupProcessor() {
                        @Override
                        protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
                            return group.getMessages()
                                    .stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.toList());
                        }
                    }))
                    .get();
        }

        @Bean
        ExpressionEvaluatingRequestHandlerAdvice advice() {
            ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
            advice.setReturnFailureExpressionResult(true);
            advice.setOnFailureExpression(
                    new FunctionExpression<Message<?>>(s ->
                            MessageBuilder
                                    .withPayload("Failure for " + s.getPayload())
                                    .copyHeaders(s.getHeaders()).build())
            );
            return advice;
        }
    }

    @Test
    public void testErrorHandlingInFlow2() {
        assertEquals(Arrays.asList("R: a", "R: b"), stringsService.process(Arrays.asList("a", "b")));
        assertEquals(Arrays.asList("R: a", "Failure for oops", "R: b"), stringsService.process(Arrays.asList("a", "oops", "b")));
    }

}
question from:https://stackoverflow.com/questions/65887787/spring-integration-unexpected-behavior-on-error-handling-in-splitter-aggregator

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
  1. The splitter does support Java Stream.
  2. The splitter populated those headers by default. Not sure why would one need a custom IntegrationMessageHeaderAccessor.CORRELATION_ID. It is like this in the splitter: final Object correlationId = message.getHeaders().getId();
  3. You have an aggregator already after a transform(). So, when transform throws an exception, it is really the fact that it doesn't reach the aggregator. In fact that both just don't know about each other. This is one of those first class citizen features in Spring Integration where endpoints are loosely-couple with message channels in between. And you can just send message to the same endpoint from different places. Anyway I guess it would behave the same way even with the plain Java: you have a loop and gather data into a list in the end of the loop. Now imaging you fail on the line before the gather logic, so the exception is thrown to the called and there is indeed no any info how it was gathered afterwards.

Now about the possible fix for your expected logic. Please, take a look into an ExpressionEvaluatingRequestHandlerAdvice to be applied on your transformer. So, when you got an exception, it is handled in the failureChannel sub-flow and a compensation messages is returned as a regular reply to be aggregated with others. See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...