Задача: Читать сообщения пачками из jms очереди, делать обработку, применять какую-либо логику уже на пачку.
Хм… Задача довольно необычна, но иногда такая необходимость крайне нужна. В моем случае — это единственная возможность для организации промежуточного буфера, который разгрузит сервис, умеющий обрабатывать пакетные команды.
Для решения этой задачи есть несколько вариантов:
- Apache Camel. Все можно сделать почти из коробки. Есть компонент SJMS он умеет читать пачками jms сообщения, можно задать таймаут и максимальный размер пачки.
- Spring JmsTemplate + Spring Batch. Собственно более тяжелый вариант, относительно 1, но довольно интересный.
Общая модель работы фреймворка
- Создаем Reader и Writer. Объекты которые будут что-то вычитывать и записывать соответственно. В моем примере Reader — читает jms. Writer — просто выводит в консоль всю пачку. Глобально это могут быть самые разнообразные операции. Spring-Batch укомплектован самыми различными реализациями, но при необходимости добавить что-то свое, не составит труда.
- Создаем Processor — он может выполнять любую бизнес логику над данными, полученными из Reader. Тут нужно обратить внимание, что processor применяется до формирования пачки.
- После создаются шаги Step из которых собирается Job. Здесь и начинается работа. Spring запускает job на выполнение, он работает пока не reader не вычитает все данные. Признак конца — возврат null.
Реализация
Для начала нужно создать проект. Удобней всего это сделать из архетипа spring-boot.
После создания нужно добавить зависимости:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
Отмечу, что jmsTemplate и scheduling (интервальный запуск) уже входят в spring-boot.
Сконфигурируем reader, writer, processor и создадим job.
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public JmsItemReader reader() {
JmsItemReader<Message> reader = new JmsItemReader<>();
reader.setJmsTemplate(jmsTemplate);
reader.setItemType(Message.class);
return reader;
}
@Bean
public ItemProcessor processor() {
return new JmsProcessor();
}
@Bean
public StringItemWriter writer() {
return new StringItemWriter();
}
@Bean
public Job testJob() {
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.flow(step1())
.end()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Message, String> chunk(batchSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
Добавим запуск каждые n-секунд, используя аннотацию @Scheduled
public class RunScheduler {
@Autowired
private JobLauncher jobLauncher;
private Job job;
@Scheduled(initialDelay = 10000L, fixedRateString = "${restartJobDelay}")
public void run() {
try {
String dateParam = new Date().toString();
JobParameters param =
new JobParametersBuilder().addString("date", dateParam).toJobParameters();
JobExecution execution = jobLauncher.run(job, param);
System.out.println("Exit Status : " + execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}
}
public Job getJob() {
return job;
}
public void setJob(Job job) {
this.job = job;
}
}
Единственно чего не хватает — транзакционности. Если упадем на обработке или записи, потеряем всю вычитанную пачку.
Ссылки:
- Документация Spring Batch
- Документация по Spring Scheduling
- Документация по Spring JMS
Ссылка на проект в GitHub: sboychenko/spring-batch-jms-test