Kafka Streams

$ cd ~/tmp
$ git clone https://github.com/webmak1/kafka-streams
$ cd kafka-streams/
$ git checkout end-state


$ vi gradle/wrapper/gradle-wrapper.properties


distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.1-bin.zip


$ vi ./build.gradle


dependencies {
    implementation 'org.apache.kafka:kafka-streams:2.2.1'
    implementation 'org.apache.kafka:kafka-clients:2.2.1'
    testImplementation 'junit:junit:4.12'
}


$ vi src/main/java/com/linuxacademy/ccdak/streams/StreamsMain.java


package com.linuxacademy.ccdak.streams;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> source = builder.stream("streams-input-topic");

        source.to("streams-output-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}


// Need to create a topic from the beginning
$ kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic streams-input-topic

[Enter]
^C


$ kafka-topics.sh --list --bootstrap-server localhost:9092


$ ./gradlew runStreams


Wait 2 minutes


// Termanal 1
// Receive Messages
$ kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic streams-output-topic \
    --property print.key=true


// Termanal 2
// Send Messages
$ kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic streams-input-topic \
    --property parse.key=true \
    --property key.separator=:

> hello:world
> test:one
> test:two

^C


returns in Termanal 1:

hello	world
test	one
test	two


Kafka Streams Stateless Transformations


$ vi src/main/java/com/linuxacademy/ccdak/streams/StatelessTransformationsMain.java


package com.linuxacademy.ccdak.streams;

import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

public class StatelessTransformationsMain {

    public static void main(String[] args) {
        // Set up the configuration.
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateless-transformations-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // Since the input topic uses Strings for both key and value, set the default Serdes to String.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Get the source stream.
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> source = builder.stream("stateless-transformations-input-topic");

        // Split the stream into two streams, one containing all records where the key begins with "a", and the other containing all other records.
        KStream<String, String>[] branches = source
            .branch((key, value) -> key.startsWith("a"), (key, value) -> true);
        KStream<String, String> aKeysStream = branches[0];
        KStream<String, String> othersStream = branches[1];

        // Remove any records from the "a" stream where the value does not also start with "a".
        aKeysStream = aKeysStream.filter((key, value) -> value.startsWith("a"));

        // For the "a" stream, convert each record into two records, one with an uppercased value and one with a lowercased value.
        aKeysStream = aKeysStream.flatMap((key, value) -> {
            List<KeyValue<String, String>> result = new LinkedList<>();
            result.add(KeyValue.pair(key, value.toUpperCase()));
            result.add(KeyValue.pair(key, value.toLowerCase()));
            return result;
        });

        // For the "a" stream, modify all records by uppercasing the key.
        aKeysStream = aKeysStream.map((key, value) -> KeyValue.pair(key.toUpperCase(), value));

        //Merge the two streams back together.
        KStream<String, String> mergedStream = aKeysStream.merge(othersStream);

        //Print each record to the console.
        mergedStream = mergedStream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));

        //Output the transformed data to a topic.
        mergedStream.to("stateless-transformations-output-topic");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        // Print the topology to the console.
        System.out.println(topology.describe());
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach a shutdown handler to catch control-c and terminate the application gracefully.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.out.println(e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }

}


// Need to create a topic from the beginning
$ kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic stateless-transformations-input-topic

[Enter]
^C


// Need to create a topic from the beginning
$ kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic stateless-transformations-output-topic

[Enter]
^C


$ kafka-topics.sh --list --bootstrap-server localhost:9092


// Termanal 2
// Send Messages
$ kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic stateless-transformations-input-topic1 \
    --property parse.key=true \
    --property key.separator=:

>akey:avalue
>akey:avalue
>akey:bvalue
>bkey:bvalue

^C


$ ./gradlew runStatelessTransformations
***
key=AKEY, value=AVALUE
key=AKEY, value=avalue
key=AKEY, value=AVALUE
key=AKEY, value=avalue
key=AKEY, value=AVALUE
key=AKEY, value=avalue
key=bkey, value=bvalue


// Termanal 1
// Receive Messages
$ kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic stateless-transformations-output-topic1 \
    --property print.key=true


// Terminal 2
>akey:avalue
>akey:avalue
>akey:bvalue
>bkey:bvalue


// Termanal 1
AKEY	AVALUE
AKEY	avalue
AKEY	AVALUE
AKEY	avalue
bkey	bvalue


Kafka Streams Aggregations