Scheduled Kafka Consumer With Spring Boot
@Component @Slf4j public class ScheduledConsumer { private final BusinessService businessService; private final KafkaListenerEndpointRegistry registry; @Autowired public ScheduledConsumer(final BusinessService businessService, KafkaListenerEndpointRegistry registry) { this.businessService = businessService; this.registry = registry; } @Scheduled(cron = "${cron-expression.start}") public void resumeConsuming() { this.registry.getListenerContainers().forEach(MessageListenerContainer::resume); log.info("Resume consuming business objects..."); } @Scheduled(cron = "${cron-expression.stop}") public void pauseConsuming() { this.registry.getListenerContainers().forEach(MessageListenerContainer::pause); log.info("Pause consuming business objects..."); } @KafkaListener(id = "mycontainer", topics = "${topic}", autoStartup = "${consumer.autostart}") public void consume(final ConsumerRecord<String, BusinessObject> businessRecord, final Acknowledgment acknowledgment) { log.info("Processing business object..."); this.businessService.process(businessRecord.value()); acknowledgment.acknowledge(); } }