Spring Integration DSL — прослушка не работает асинхронно

Как указано в документе Spring Integration (http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-wiretap), если мы хотим, чтобы прослушка выполнялась как асинхронная, нам нужно отправлять сообщения в асинхронный канал (опрашиваемый канал или канал исполнителя)

Я попытался реализовать поток, как показано ниже, но он работает не так, как ожидалось.

return IntegrationFlows
     .from("inputChannel")
     .wireTap(customChannel())
     .handle((p, h) -> {
          System.out.println("After calling wiretap");
          return p;
     })
     .get();
=========================

@Bean
public MessageChannel customChannel() {

    return MessageChannels.executor(new TaskExecutor() {
        @Override
        public void execute(Runnable task) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {

            }
            System.out.println("End customChannel");
        }
    }).get();
}

Я ожидаю, что он напишет «После вызова прослушки», а затем спит 5 секунд, а затем напишет «Завершить customChannel».

Но консоль такая:

End customChannel
After calling wiretap

Пожалуйста, помогите мне дать мне предложение для этого случая!


person Thanh Nguyen    schedule 30.05.2017    source источник


Ответы (1)


Наконец, я нашел способ выполнить асинхронно, как показано ниже.

return IntegrationFlows
 .from("inputChannel")
 .wireTap("CustomChannel")
 .handle((p, h) -> {
      System.out.println("After calling wiretap");
      return p;
 })
 .get();

=========================

@Bean (value = "CustomChannel")
public MessageChannel customChannel() {
    // execute 5 concurrent threads.
    // if additional tasks are submitted when all threads are active, they will wait in the queue.
    return new ExecutorChannel(Executors.newFixedThreadPool(5));
}

@Bean
@ServiceActivator(inputChannel = "CustomChannel")
public MessageHandler exportDataServiceActivator() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            // TODO Handle logic we want here

        }
    };
}
person Thanh Nguyen    schedule 13.06.2017