Sin descripción

Alexey Kim 00052a159c Optimize imports hace 3 años
src 00052a159c Optimize imports hace 3 años
.gitignore 8377813d2e Initial commit hace 3 años
Makefile 190a85448b 1.0-SNAPSHOT hace 3 años
README.md 190a85448b 1.0-SNAPSHOT hace 3 años
build.gradle 0e5bf6eff4 bump 1.0.1 hace 3 años
settings.gradle 190a85448b 1.0-SNAPSHOT hace 3 años

README.md

Base module that built to avoid writing boilerplate source code for each Kafka Stream application. Currently, it wraps:

  • basic kafka connection and in/out streams configuration. Some vars are exposed to be overwritten via ENV variables;
  • KafkaStreams builder that contains topology definition etc.

TOC

  1. Prerequisites
  2. Dependencies
  3. Exposed ENV variables
  4. How to

Prerequisites

Dependencies

Exposed ENV variables

Var Type Default Description
KSTREAMS_DEBUG boolean false
KSTREAMS_CONSUMER_GROUP_NAME string consumer Kafka consumer group name
KSTREAMS_SERVERS string localhost:29092 Kafka broker addresses
KSTREAMS_INPUT_NAME string in Where from (topic name) we gonna get stream data
KSTREAMS_INPUT_PARTITIONS int 1 Number of desired partitions
KSTREAMS_INPUT_REPLICATION_FACTOR int 1 Number of desired replication factor
KSTREAMS_OUTPUT_NAME string out Where to (topic name) we gonna put stream data

How to

Configure

Default configuration properties could be overwritten in 2 ways: application.conf or ENV variables. In application.conf you could define only those variables you want to change while implementation this module. Or maybe you might change ENV variables while running your application. Look at exposed ENV variables above or reference.conf in resources.

Implement

Implementation is pretty simple. All you need is love to extend abstract class StreamWrapper and describe your Topology.

public class Stream extends BaseKStreams {

  @Override
  public @NotNull
  Topology getTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream(
            getConfig().getInput().getName(),
            Consumed.with(Serdes.String(), Serdes.ByteArray()))
        .map((key, value) -> KeyValue.pair(key, Arrays.copyOf(value, 8, value.length)))
        .to(
            getConfig().getOutput().getName(),
            Produced.with(Serdes.String(), Serdes.ByteArray()));

    return builder.build();
  }
}

After that step, we could start it.

public class Application {

  public static void main(String[] args) {
    final CountDownLatch latch = new CountDownLatch(1);
    final Stream stream = new Stream();

    // intercept SIGTERM
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread("streams-shutdown-hook") {
              @Override
              public void run() {
                stream.close();
                latch.countDown();
              }
            });

    try {
      stream.getStreams().start();
      latch.await();
    } catch (Throwable e) {
      e.printStackTrace();
      System.exit(1);
    }

    System.exit(0);
  }
}
Build

Ensure that you already have gradle.properties and defined gitlab_token and gitlab_project. All other stuff Gradle and make will do for us, just

make