package com.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerWithCallback { public static void main(String[] args) { System.out.println("My first Kafka Producer"); String bootstrapServers = "kafka1:9092"; // create Producer properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // create the producer KafkaProducer producer = new KafkaProducer(properties); // create a producer record ProducerRecord record = new ProducerRecord("my-topic10", "Java message"); // send data - asynchronous producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { // the record was successfully sent System.out.println("Received info about metadata. \n" + "Topic:" + recordMetadata.topic() + "\n" + "Partition: " + recordMetadata.partition() + "\n" + "Offset: " + recordMetadata.offset() + "\n" + "Timestamp: " + recordMetadata.timestamp()); } else { System.out.println("Error while producing: "+ e); } } }); // Flush the data - is sent producer.flush(); // Flush and Close the producer producer.close(); System.out.println("Producer has been closed."); } }