@Component
@Slf4j
public class ScheduledConsumer {
private final BusinessService businessService;
private final KafkaListenerEndpointRegistry registry;
@Autowired
public ScheduledConsumer(final BusinessService businessService, KafkaListenerEndpointRegistry registry) {
this.businessService = businessService;
this.registry = registry;
}
@Scheduled(cron = "${cron-expression.start}")
public void resumeConsuming() {
this.registry.getListenerContainers().forEach(MessageListenerContainer::resume);
log.info("Resume consuming business objects...");
}
@Scheduled(cron = "${cron-expression.stop}")
public void pauseConsuming() {
this.registry.getListenerContainers().forEach(MessageListenerContainer::pause);
log.info("Pause consuming business objects...");
}
@KafkaListener(id = "mycontainer", topics = "${topic}", autoStartup = "${consumer.autostart}")
public void consume(final ConsumerRecord<String, BusinessObject> businessRecord,
final Acknowledgment acknowledgment) {
log.info("Processing business object...");
this.businessService.process(businessRecord.value());
acknowledgment.acknowledge();
}
}