Browse Source

1.0-SNAPSHOT

Alexey Kim 4 years ago
parent
commit
190a85448b

+ 20 - 0
Makefile

@@ -0,0 +1,20 @@
+# See http://clarkgrubb.com/makefile-style-guide
+MAKEFLAGS += --warn-undefined-variables
+SHELL := bash
+.SHELLFLAGS := -eu -o pipefail -c
+.DEFAULT_GOAL := all
+.DELETE_ON_ERROR:
+.SUFFIXES:
+
+.PHONY: all
+all: test publish
+
+.PHONY: test
+test:
+	-@echo "-> $@"
+	-./gradlew test
+
+.PHONY: publish
+publish:
+	-@echo "-> $@"
+	-./gradlew publish

+ 106 - 0
README.md

@@ -0,0 +1,106 @@
+Base module that built to avoid writing boilerplate source code for each Kafka Stream application.
+Currently, it wraps:
+  - basic kafka connection and in/out streams configuration. Some `var`s are exposed to be overwritten via `ENV` variables;
+  - KafkaStreams builder that contains topology definition etc.
+
+### TOC
+  0. [Prerequisites](#prerequisites)
+  1. [Dependencies](#dependencies)
+  2. [Exposed `ENV` variables](#exposed-env-variables)
+  3. [How to](#how-to)
+     - [Configure](#configure)
+     - [Implement](#implement)
+     - [Build](#build)
+     
+
+#### Prerequisites
+  - [IntelliJ IDEA](https://www.jetbrains.com/idea/)
+  - Java 1.8 (OpenJDK 11 is recommended)
+
+#### Dependencies
+  - [com.github.johnrengelman.shadow](https://github.com/johnrengelman/shadow); version "6.1.0"
+  - [org.jetbrains:annotations](https://github.com/JetBrains/java-annotations); version: "20.1.0"
+  - [com.typesafe:config](https://github.com/lightbend/config); version: "1.4.1"
+  - [org.apache.kafka:kafka-streams](https://kafka.apache.org/documentation/streams/); version: "2.5.0"
+
+#### Exposed `ENV` variables
+| Var | Type | Default | Description |
+| --- | ---- | ------- | ----------- |
+| `KSTREAMS_DEBUG` | `boolean` | `false` | |
+| `KSTREAMS_CONSUMER_GROUP_NAME` | `string` | `consumer` | Kafka consumer group name |
+| `KSTREAMS_SERVERS` | `string` | `localhost:29092` | Kafka broker addresses |
+| `KSTREAMS_INPUT_NAME` | `string` | `in` | Where from (topic name) we gonna get stream data |
+| `KSTREAMS_INPUT_PARTITIONS` | `int` | `1` | Number of desired partitions |
+| `KSTREAMS_INPUT_REPLICATION_FACTOR` | `int` | `1` | Number of desired replication factor |
+| `KSTREAMS_OUTPUT_NAME` | `string` | `out` | Where to (topic name) we gonna put stream data |
+
+#### How to
+##### Configure
+Default configuration properties could be overwritten in 2 ways: `application.conf` or `ENV` variables.
+In `application.conf` you could define only those variables you want to change while implementation this module.
+Or maybe you might change `ENV` variables while running your application.
+Look at exposed `ENV` variables above or `reference.conf` in `resources`.
+
+##### Implement
+Implementation is pretty simple. All you need is ~~love~~ to extend abstract class `StreamWrapper` and describe your `Topology`.
+
+```java
+public class Stream extends BaseKStreams {
+
+  @Override
+  public @NotNull
+  Topology getTopology() {
+    final StreamsBuilder builder = new StreamsBuilder();
+
+    builder
+        .stream(
+            getConfig().getInput().getName(),
+            Consumed.with(Serdes.String(), Serdes.ByteArray()))
+        .map((key, value) -> KeyValue.pair(key, Arrays.copyOf(value, 8, value.length)))
+        .to(
+            getConfig().getOutput().getName(),
+            Produced.with(Serdes.String(), Serdes.ByteArray()));
+
+    return builder.build();
+  }
+}
+```
+
+After that step, we could start it.
+```java
+public class Application {
+
+  public static void main(String[] args) {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final Stream stream = new Stream();
+
+    // intercept SIGTERM
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread("streams-shutdown-hook") {
+              @Override
+              public void run() {
+                stream.close();
+                latch.countDown();
+              }
+            });
+
+    try {
+      stream.getStreams().start();
+      latch.await();
+    } catch (Throwable e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
+
+    System.exit(0);
+  }
+}
+```
+
+##### Build
+Ensure that you already have `gradle.properties` and defined `gitlab_token` and `gitlab_project`.
+All other stuff `Gradle` and `make` will do for us, just
+```bash
+make
+```

+ 75 - 0
build.gradle

@@ -0,0 +1,75 @@
+buildscript {
+    repositories {
+        jcenter()
+    }
+}
+
+plugins {
+    id 'java-library'
+    id 'idea'
+    id 'com.github.johnrengelman.shadow' version '6.1.0'
+    id 'maven-publish'
+}
+
+group 'com.github.kimbeejay'
+version '1.0-SNAPSHOT'
+
+sourceCompatibility = '1.8'
+targetCompatibility = '1.8'
+
+repositories {
+    mavenCentral()
+    jcenter()
+}
+
+apply plugin: "com.github.johnrengelman.shadow"
+
+dependencies {
+    compileOnly group: 'org.jetbrains', name: 'annotations', version: '20.1.0'
+    compileOnly group: 'com.typesafe', name: 'config', version: '1.4.1'
+    compileOnly 'org.apache.kafka:kafka-streams:2.5.0'
+    compileOnly 'org.apache.avro:avro:1.10.1'
+
+    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
+    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+}
+
+jar {
+    manifest {
+        attributes(
+                "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
+                "Git-CommitId": "git rev-parse HEAD".execute().text.trim(),
+                "Build-Time": new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
+        )
+    }
+}
+
+shadowJar {
+    archiveBaseName.set(rootProject.name)
+    archiveClassifier.set("")
+    manifest {
+        inheritFrom project.tasks.jar.manifest
+    }
+}
+
+publishing {
+    repositories {
+        maven {
+            name = "GitHubPackages"
+            url = uri("https://maven.pkg.github.com/kimbeejay/kstreams")
+            credentials {
+                username = project.findProperty("gpr.user") ?: System.getenv("USERNAME")
+                password = project.findProperty("gpr.key") ?: System.getenv("TOKEN")
+            }
+        }
+    }
+    publications {
+        gpr(MavenPublication) {
+            from(components.java)
+        }
+    }
+}
+
+test {
+    useJUnitPlatform()
+}

+ 12 - 0
settings.gradle

@@ -0,0 +1,12 @@
+pluginManagement {
+    repositories {
+        gradlePluginPortal()
+        jcenter()
+        maven {
+            name "JCenter Gradle Plugins"
+            url  "https://dl.bintray.com/gradle/gradle-plugins"
+        }
+    }
+}
+
+rootProject.name = 'kstreams'

+ 70 - 0
src/main/java/com/github/kimbeejay/kstreams/BaseKStreams.java

@@ -0,0 +1,70 @@
+package com.github.kimbeejay.kstreams;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Properties;
+
+public abstract class BaseKStreams {
+
+  private final Configuration config;
+  private final KafkaStreams streams;
+
+  public BaseKStreams() {
+    this(new Configuration());
+  }
+
+  public BaseKStreams(@NotNull Configuration config) {
+    Objects.requireNonNull(config);
+
+    this.config = config;
+    this.streams = new KafkaStreams(getTopology(), buildProps());
+  }
+
+  @NotNull
+  public abstract Topology getTopology();
+
+  protected Properties buildProps() {
+    Properties props = new Properties();
+    props.put(StreamsConfig.APPLICATION_ID_CONFIG, getConfig().getConsumerGroupName());
+    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getConfig().getServers());
+    props.put(
+        StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
+        DefaultProductionExceptionHandler.class.getName());
+
+    if (getConfig().isDebug()) {
+      props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+      props.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840");
+    }
+
+    return props;
+  }
+
+  public void close() {
+    close(null);
+  }
+
+  public void close(Duration duration) {
+    if (Objects.isNull(duration)) {
+      duration = getConfig().getConsumerKillTimeout();
+    }
+
+    getStreams().close(duration);
+  }
+
+  @NotNull
+  public KafkaStreams getStreams() {
+    return this.streams;
+  }
+
+  @NotNull
+  public Configuration getConfig() {
+    return this.config;
+  }
+}

+ 117 - 0
src/main/java/com/github/kimbeejay/kstreams/Configuration.java

@@ -0,0 +1,117 @@
+package com.github.kimbeejay.kstreams;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class Configuration {
+
+  private static final String ns = "kstreams";
+  private static final String nsDebug = ns + ".debug";
+  private static final String nsServers = ns + ".servers";
+
+  private static final String nsConsumer = ns + ".consumer";
+  private static final String nsConsumerGroup = nsConsumer + ".group";
+  private static final String nsConsumerGroupName = nsConsumerGroup + ".name";
+  private static final String nsConsumerKill = nsConsumer + ".kill";
+  private static final String nsConsumerKillTimeout = nsConsumerKill + ".timeout";
+
+  private static final String nsInput = ns + ".input";
+  private static final String nsInputName = nsInput + ".name";
+  private static final String nsInputPartitions = nsInput + ".partitions";
+  private static final String nsInputReplication = nsInput + ".replication";
+  private static final String nsInputReplicationFactor = nsInputReplication + ".factor";
+
+  private static final String nsOutput = ns + ".output";
+  private static final String nsOutputName = nsOutput + ".name";
+
+  private final boolean isDebug;
+  private final String servers;
+  private final String consumerGroupName;
+  private final Duration consumerKillTimeout;
+  private final Topic input;
+  private final Topic output;
+
+  public Configuration() {
+    this(ConfigFactory.load());
+  }
+
+  public Configuration(Config config) {
+    Objects.requireNonNull(config);
+
+    this.isDebug = config.getBoolean(nsDebug);
+    this.servers = config.getString(nsServers);
+    this.consumerGroupName = config.getString(nsConsumerGroupName);
+    this.consumerKillTimeout = Duration.ofSeconds(config.getInt(nsConsumerKillTimeout));
+
+    this.input = new Topic(
+        config.getString(nsInputName),
+        config.getInt(nsInputPartitions),
+        config.getInt(nsInputReplicationFactor));
+
+    this.output = new Topic(
+        config.getString(nsOutputName));
+
+    Objects.requireNonNull(this.servers);
+    Objects.requireNonNull(this.consumerGroupName);
+    Objects.requireNonNull(this.consumerKillTimeout);
+
+    config.checkValid(ConfigFactory.defaultReference(), ns);
+  }
+
+  public boolean isDebug() {
+    return isDebug;
+  }
+
+  public String getServers() {
+    return servers;
+  }
+
+  public String getConsumerGroupName() {
+    return consumerGroupName;
+  }
+
+  public Duration getConsumerKillTimeout() {
+    return consumerKillTimeout;
+  }
+
+  public Topic getInput() {
+    return input;
+  }
+
+  public Topic getOutput() {
+    return output;
+  }
+
+  public static class Topic {
+    final private String name;
+    final private int partitions;
+    final private int replicationFactor;
+
+    public Topic(String name) {
+      this(name, 1, 1);
+    }
+
+    public Topic(String name, int partitions, int replicationFactor) {
+      Objects.requireNonNull(name);
+
+      this.name = name;
+      this.partitions = partitions;
+      this.replicationFactor = replicationFactor;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public int getPartitions() {
+      return partitions;
+    }
+
+    public int getReplicationFactor() {
+      return replicationFactor;
+    }
+  }
+}

+ 57 - 0
src/main/java/com/github/kimbeejay/kstreams/Result.java

@@ -0,0 +1,57 @@
+package com.github.kimbeejay.kstreams;
+
+public interface Result<T> {
+  T get();
+  Exception exception();
+  boolean hasError();
+
+  class Success<T> implements Result<T> {
+
+    private final T value;
+
+    public Success(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public T get() {
+      return this.value;
+    }
+
+    @Override
+    public Exception exception() {
+      return null;
+    }
+
+    @Override
+    public boolean hasError() {
+      return false;
+    }
+  }
+
+  class Error<T> implements Result<T> {
+
+    private final T value;
+    private final Exception error;
+
+    public Error(Exception error, T value) {
+      this.error = error;
+      this.value = value;
+    }
+
+    @Override
+    public T get() {
+      return this.value;
+    }
+
+    @Override
+    public Exception exception() {
+      return this.error;
+    }
+
+    @Override
+    public boolean hasError() {
+      return true;
+    }
+  }
+}

+ 111 - 0
src/main/java/com/github/kimbeejay/utils/CsvToAvro.java

@@ -0,0 +1,111 @@
+// Copyright 2018 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.github.kimbeejay.utils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class CsvToAvro {
+
+  private final String delimiter;
+  private final Schema schema;
+
+  public CsvToAvro(Schema schema, String delimiter) {
+    Objects.requireNonNull(delimiter);
+    Objects.requireNonNull(schema);
+
+    this.delimiter = delimiter;
+    this.schema = schema;
+  }
+
+  public GenericRecord convert(String row) throws IllegalArgumentException {
+    // Split CSV row into using delimiter
+    List<String> values = Arrays.stream(row.split(this.delimiter))
+        .map(String::trim)
+        .collect(Collectors.toList());
+
+    GenericRecord record = new GenericData.Record(this.schema);
+    List<Schema.Field> fields = this.schema.getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      String type = fields.get(i).schema().getType().getName().toLowerCase();
+
+      switch (type) {
+        case "string": {
+          record.put(i, values.get(i));
+          break;
+        }
+
+        case "boolean": {
+          record.put(i, Boolean.valueOf(values.get(i)));
+          break;
+        }
+
+        case "int": {
+          try {
+            record.put(i, Integer.valueOf(values.get(i)));
+          } catch (NumberFormatException e) {
+            e.printStackTrace();
+            record.put(i, 0);
+          }
+          break;
+        }
+
+        case "long": {
+          try {
+            record.put(i, Long.valueOf(values.get(i)));
+          } catch (NumberFormatException e) {
+            e.printStackTrace();
+            record.put(i, 0L);
+          }
+
+          break;
+        }
+
+        case "float": {
+          try {
+            record.put(i, Float.valueOf(values.get(i)));
+          } catch (NumberFormatException e) {
+            e.printStackTrace();
+            record.put(i, 0f);
+          }
+          break;
+        }
+
+        case "double": {
+          try {
+            record.put(i, Double.valueOf(values.get(i)));
+          } catch (NumberFormatException e) {
+            e.printStackTrace();
+            record.put(i, 0);
+          }
+          break;
+        }
+
+        default: {
+          throw new IllegalArgumentException("Field type " + type + " is not supported.");
+        }
+      }
+    }
+
+    return record;
+  }
+}

+ 30 - 0
src/main/resources/reference.conf

@@ -0,0 +1,30 @@
+kstreams {
+    debug: false
+    debug: ${?KSTREAMS_DEBUG}
+
+    consumer {
+        group.name: "consumer"
+        group.name: ${?KSTREAMS_CONSUMER_GROUP_NAME}
+
+        kill.timeout: 10
+    }
+
+    servers: "localhost:29092"
+    servers: ${?KSTREAMS_SERVERS}
+
+    input {
+        name: "in"
+        name: ${?KSTREAMS_INPUT_NAME}
+
+        partitions: 1
+        partitions: ${?KSTREAMS_INPUT_PARTITIONS}
+
+        replication.factor: 1
+        replication.factor: ${?KSTREAMS_INPUT_REPLICATION_FACTOR}
+    }
+
+    output {
+        name: "out"
+        name: ${?KSTREAMS_OUTPUT_NAME}
+    }
+}