Задача: Читать сообщения пачками из jms очереди, делать обработку, применять какую-либо логику уже на пачку.

Хм… Задача довольно необычна, но иногда такая необходимость крайне нужна. В моем случае — это единственная возможность для организации промежуточного буфера, который разгрузит сервис, умеющий обрабатывать пакетные команды.

Для решения этой задачи есть несколько вариантов:

  1. Apache Camel. Все можно сделать почти из коробки. Есть компонент SJMS он умеет читать пачками jms сообщения, можно задать таймаут и максимальный размер пачки.
  2. Spring JmsTemplate + Spring Batch. Собственно более тяжелый вариант, относительно 1, но довольно интересный.

Общая модель работы фреймворка

  1. Создаем Reader и Writer. Объекты которые будут что-то вычитывать и записывать соответственно. В моем примере Reader — читает jms. Writer — просто выводит в консоль всю пачку. Глобально это могут быть самые разнообразные операции. Spring-Batch укомплектован самыми различными реализациями, но при необходимости добавить что-то свое, не составит труда.
  2. Создаем Processor — он может выполнять любую бизнес логику над данными, полученными из Reader. Тут нужно обратить внимание, что processor применяется до формирования пачки.
  3. После создаются шаги Step из которых собирается Job. Здесь и начинается работа. Spring запускает job на выполнение, он работает пока не reader не вычитает все данные. Признак конца — возврат null.

Реализация

Для начала нужно создать проект. Удобней всего это сделать из архетипа spring-boot.

После создания нужно добавить зависимости:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-batch</artifactId>
</dependency>

Отмечу, что jmsTemplate и scheduling (интервальный запуск) уже входят в spring-boot.

Сконфигурируем reader, writer, processor и создадим job.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;


    @Bean
    public JmsItemReader reader() {
        JmsItemReader<Message> reader = new JmsItemReader<>();
        reader.setJmsTemplate(jmsTemplate);
        reader.setItemType(Message.class);
        return reader;
    }

    @Bean
    public ItemProcessor processor() {
        return new JmsProcessor();
    }

    @Bean
    public StringItemWriter writer() {
        return new StringItemWriter();
    }


    @Bean
    public Job testJob() {
        return jobBuilderFactory.get("testJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Message, String> chunk(batchSize)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

Добавим запуск каждые n-секунд, используя аннотацию @Scheduled

public class RunScheduler {

    @Autowired
    private JobLauncher jobLauncher;

    private Job job;

    @Scheduled(initialDelay = 10000L, fixedRateString = "${restartJobDelay}")
    public void run() {
        try {

            String dateParam = new Date().toString();
            JobParameters param =
                    new JobParametersBuilder().addString("date", dateParam).toJobParameters();

            JobExecution execution = jobLauncher.run(job, param);
            System.out.println("Exit Status : " + execution.getStatus());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Job getJob() {
        return job;
    }

    public void setJob(Job job) {
        this.job = job;
    }
}

Единственно чего не хватает — транзакционности. Если упадем на обработке или записи, потеряем всю вычитанную пачку.


Ссылки:


Ссылка на проект в GitHub: sboychenko/spring-batch-jms-test