package org.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; public class MyFirstKafkaStream { public static void main(String[] args) { System.out.println("Start"); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder (); //Now StreamsBuilder replace KStreamBuilder KStream inputKStream = builder.stream("input-kafka-topic"); KStream outputKStream = inputKStream.filterNot((key, value) -> value.substring(0,3).equals("ABC")) .mapValues(value -> value.toLowerCase()); // - to in order to write the results back to kafka outputKStream.to("output-kafka-topic", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }