@Component
@Slf4j
public class ScheduledConsumer {

    private final BusinessService businessService;
    private final KafkaListenerEndpointRegistry registry;

    @Autowired
    public ScheduledConsumer(final BusinessService businessService, KafkaListenerEndpointRegistry registry) {
        this.businessService = businessService;
        this.registry = registry;
    }

    @Scheduled(cron = "${cron-expression.start}")
    public void resumeConsuming() {
            this.registry.getListenerContainers().forEach(MessageListenerContainer::resume);
            log.info("Resume consuming business objects...");
    }

    @Scheduled(cron = "${cron-expression.stop}")
    public void pauseConsuming() {
        this.registry.getListenerContainers().forEach(MessageListenerContainer::pause);
        log.info("Pause consuming business objects...");
    }

    @KafkaListener(id = "mycontainer", topics = "${topic}", autoStartup = "${consumer.autostart}")
    public void consume(final ConsumerRecord<String, BusinessObject> businessRecord,
            final Acknowledgment acknowledgment) {
        log.info("Processing business object...");
        this.businessService.process(businessRecord.value());
        acknowledgment.acknowledge();
    }

}