|
|
4 rokov pred | |
|---|---|---|
| src | 4 rokov pred | |
| .gitignore | 4 rokov pred | |
| Makefile | 4 rokov pred | |
| README.md | 4 rokov pred | |
| build.gradle | 4 rokov pred | |
| settings.gradle | 4 rokov pred |
Base module that built to avoid writing boilerplate source code for each Kafka Stream application. Currently, it wraps:
vars are exposed to be overwritten via ENV variables;ENV variables| Var | Type | Default | Description |
|---|---|---|---|
KSTREAMS_DEBUG |
boolean |
false |
|
KSTREAMS_CONSUMER_GROUP_NAME |
string |
consumer |
Kafka consumer group name |
KSTREAMS_SERVERS |
string |
localhost:29092 |
Kafka broker addresses |
KSTREAMS_INPUT_NAME |
string |
in |
Where from (topic name) we gonna get stream data |
KSTREAMS_INPUT_PARTITIONS |
int |
1 |
Number of desired partitions |
KSTREAMS_INPUT_REPLICATION_FACTOR |
int |
1 |
Number of desired replication factor |
KSTREAMS_OUTPUT_NAME |
string |
out |
Where to (topic name) we gonna put stream data |
Default configuration properties could be overwritten in 2 ways: application.conf or ENV variables.
In application.conf you could define only those variables you want to change while implementation this module.
Or maybe you might change ENV variables while running your application.
Look at exposed ENV variables above or reference.conf in resources.
Implementation is pretty simple. All you need is love to extend abstract class StreamWrapper and describe your Topology.
public class Stream extends BaseKStreams {
@Override
public @NotNull
Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(
getConfig().getInput().getName(),
Consumed.with(Serdes.String(), Serdes.ByteArray()))
.map((key, value) -> KeyValue.pair(key, Arrays.copyOf(value, 8, value.length)))
.to(
getConfig().getOutput().getName(),
Produced.with(Serdes.String(), Serdes.ByteArray()));
return builder.build();
}
}
After that step, we could start it.
public class Application {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(1);
final Stream stream = new Stream();
// intercept SIGTERM
Runtime.getRuntime()
.addShutdownHook(
new Thread("streams-shutdown-hook") {
@Override
public void run() {
stream.close();
latch.countDown();
}
});
try {
stream.getStreams().start();
latch.await();
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
System.exit(0);
}
}
Ensure that you already have gradle.properties and defined gitlab_token and gitlab_project.
All other stuff Gradle and make will do for us, just
make