Advanced topics in Kafka
Kafka Connect | Kafka Stream
To get data into and out of kafka we use kafka connect. Source is multiple databases or data from APIs or text file or kafka, sink is another database or file or kafka. Changes in source will reflect in sink as a stream in real time.
Kafka connect uses a declarative framework. i.e. connect config can be written in config files / yaml files. So don’t worry, it is simple to use.
- Brokers store data as log files.
- Topics in Kafka are logical groups of log files.
- We can use producer and consumer api for processing, but end up writing lot of common code.
- So Kafka connect provides all the boiler plate code for us we can directly focus on writing connectors to different db.
source — native data — [connector — connect record — converter] — bytes — kafka — [connector — connect record — converter] — sink
So, in total, we need a source, a source connector, a sink connector and a sink.
Run MySQL in Docker
Let’s start a MySQL source
Running ElasticSearch in Docker
Lets start a ElasticSearch sink
docker network create elastic
docker run --name es01 --net elastic -p 9200:9200 -p 9300:9300 -it docker.elastic.co/elasticsearch/elasticsearch:8.2.0
Running Kafka in Docker
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:latest
You can see zookeeper starting in port 2181
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
Kafka runs in 9092 and connects to zookeeper
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9
Curl on localhost:8083
curl -H "Accept:application/json" localhost:8083/
Kafka Source Connector
Connector instance in Kafka Connect using the MySQL source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d
Kafka Sink Connector
Connector instance in Kafka Connect using the ElasticSearch sink connector
curl -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-01/config \
"topics" : "orders",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true"
Adding data in source you see data flowing from mysql to kafka to elasticsearch
When you want to deploy kafka, there are two options to choose
Two types of workers:
- Distributed (recommended)
- Worker is a JVM process — this itself is kafka connect standalone worker
- It can scale vertically adding resources, but we still have to stop and start servers. Hence loosing the info on offsets distributed worker
- Configs are stored in nearest node i.e. kafka. When one node dies, other comes up, reads the info from config and picks up from there
Connect Message Transforms
Example of a Message Transform
Though Kafka connect can do small transactions it doesn’t perform stateful processing such as aggregations. Thus we use Kafka streams!
- We use Kafka streams to process data coming to Kafka topics.
- Connect is useful when you want data in and out, but to modify that’s where Kafka streams come in.
- Kafka streams is a streaming engine.
- Event streams are like topics in brokers, sequence of events etc..
- You can think Kafka stream as a standalone application or microservice that takes data from Kafka and process it.
Java Code for Kafka Stream
- Event streams have events which are a key value pair.
- Key isn’t unique in Kafka event streams.
Kafka stream processor is a DAG processing nodes & edges that represent flow of event stream.
- KTable can subscribe to only one topic at a time unlike Kstream.
- KTable represents latest vale of each record.
- KTable is backed by a state store state.
- Store is the copy of events KTable is built from.
- Ktable doesn’t not forward every change by default cache get flushed every 30 sec
- Global ktables: holds all records of all partitions — stores more static data
Java Code for KTable
Joins in KStream
To execute Join in KStreams, keys must be same
- Stream-Stream — combine 2 event streams to new event — Results in a stream
- Stream-Table — Results in a stream
- Table-Table — Results in a table
Java code for Joins
Aggregations in KStream
- Remember state of the stream and hence able to perform (count, max, sum..)
- Groups by key. Kafka repartitions the data, so that same key is in one partition
- Reduce is like aggregation but reduce — has restriction of being same type. Aggregations don’t have restriction of begin same type, but key has to be same.