package com.example.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumer { public static void main(String[] args) { Logger logger = LoggerFactory.getLogger(KafkaConsumer.class.getName()); System.out.println(My first Kafka Consumer); String bootstrapServers = kafka19092; String groupId = order_application; String topic = my-topic10; create consumer configs Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); create consumer org.apache.kafka.clients.consumer.KafkaConsumer consumer1 = new org.apache.kafka.clients.consumer.KafkaConsumer(properties); subscribe consumer to my topic consumer1.subscribe(Arrays.asList(topic)); Poll for new data while(true){ ConsumerRecordsString, String records = consumer1.poll(Duration.ofMillis(100)); new in Kafka 2.0.0 consumer1.subscribe(Arrays.asList(topic)); for (ConsumerRecordString, String record records){ logger.info(Key + record.key() + , Value + record.value()); logger.info(Partition + record.partition() + , Offset + record.offset()); System.out.println(record.value()); } } System.out.println(Close Producer); } }