package com.example.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerWithKey { public static void main(String[] args) { System.out.println("My first Kafka Producer"); String bootstrapServers = "kafka1:9092"; String myKey = "anyString"; String myMessage = "anyMessage"; // create Producer properties Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // create the producer KafkaProducer producer = new KafkaProducer(properties); // create a producer record for (int i=0; i<10; i++ ) { ProducerRecord record = new ProducerRecord("my-topic1", myKey, "Java message "+i); // send data - asynchronous producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("Partition: " + recordMetadata.partition() + " " + "Offset: " + recordMetadata.offset()); } else { System.out.println("Error: " + e); } } }); } // Flush data - data is sent producer.flush(); // Flush and Close the producer producer.close(); System.out.println("The producer has been closed."); } }