package org.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Properties; public class KTablesExample { public static void main(String[] args) { System.out.println("Start"); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-application2"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder (); //Now StreamsBuilder replace KStreamBuilder KStream inputKStream = builder.stream("input-kafka-topic"); KTable wordCountsKTableStream = inputKStream // transform the message to lowercase // you can use .mapValues(String::toLowerCase) as well .mapValues(textLine -> textLine.toLowerCase()) // split the message into a list of words .flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+"))) // the key of a message became the word .selectKey((key, word) -> word) // group by key before aggregation .groupByKey() // count occurrences of the words .count(); // write the results to a Kafka topic wordCountsKTableStream.toStream().to("output-kafka-topic", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }