Advanced topics in Kafka

Kafka Connect

To get data into and out of kafka we use kafka connect. Source is multiple databases or data from APIs or text file or kafka, sink is another database or file or kafka. Changes in source will reflect in sink as a stream in real time.

Kafka connect uses a declarative framework. i.e. connect config can be written in config files / yaml files. So don’t worry, it is simple to use.

  • Brokers store data as log files.
  • Topics in Kafka are logical groups of log files.
  • We can use producer and consumer api for processing, but end up writing lot of common code.
  • So Kafka connect provides all the boiler plate code for us we can directly focus on writing connectors to different db.

source — native data — [connector — connect record — converter] — bytes[] — kafka — [connector — connect record — converter] — sink

So, in total, we need a source, a source connector, a sink connector and a sink.

Run MySQL in Docker

Let’s start a MySQL source

MySQL Docker image
Image & Container list

Running ElasticSearch in Docker

Lets start a ElasticSearch sink

Elasticsearch docker image
docker network create elastic
Elasticsearch network
docker run --name es01 --net elastic -p 9200:9200 -p 9300:9300 -it docker.elastic.co/elasticsearch/elasticsearch:8.2.0

Running Kafka in Docker

Zookeeper

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:latest

You can see zookeeper starting in port 2181

Zookeeper Docker image

Kafka

$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
Kafka Docker image

Kafka runs in 9092 and connects to zookeeper

Kafka Connector

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9

Curl on localhost:8083

curl -H "Accept:application/json" localhost:8083/

Kafka Source Connector

Connector instance in Kafka Connect using the MySQL source connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d 
'{
"name": "my-connector",
"config": {
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'

Kafka Sink Connector

Connector instance in Kafka Connect using the ElasticSearch sink connector

curl -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-01/config \
-d '{
"connector.class": "io.debezium.connector.mysql.ElasticsearchSinkConnector",
"topics" : "orders",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true"
}'

Adding data in source you see data flowing from mysql to kafka to elasticsearch

Kafka Workers

When you want to deploy kafka, there are two options to choose

Two types of workers:

  1. Standalone
  2. Distributed (recommended)
  • Worker is a JVM process — this itself is kafka connect standalone worker
  • It can scale vertically adding resources, but we still have to stop and start servers. Hence loosing the info on offsets distributed worker
  • Configs are stored in nearest node i.e. kafka. When one node dies, other comes up, reads the info from config and picks up from there

Connect Message Transforms

Example of a Message Transform

Though Kafka connect can do small transactions it doesn’t perform stateful processing such as aggregations. Thus we use Kafka streams!

Kafka Streams

  • We use Kafka streams to process data coming to Kafka topics.
  • Connect is useful when you want data in and out, but to modify that’s where Kafka streams come in.
  • Kafka streams is a streaming engine.
  • Event streams are like topics in brokers, sequence of events etc..
  • You can think Kafka stream as a standalone application or microservice that takes data from Kafka and process it.

Java Code for Kafka Stream

  • Event streams have events which are a key value pair.
  • Key isn’t unique in Kafka event streams.
Event Stream Java code

Topology

Kafka stream processor is a DAG processing nodes & edges that represent flow of event stream.

Kafka Stream Topology

KTable

  • KTable can subscribe to only one topic at a time unlike Kstream.
  • KTable represents latest vale of each record.
  • KTable is backed by a state store state.
  • Store is the copy of events KTable is built from.
  • Ktable doesn’t not forward every change by default cache get flushed every 30 sec
  • Global ktables: holds all records of all partitions — stores more static data

Java Code for KTable

KTable Java code

Joins in KStream

To execute Join in KStreams, keys must be same

  1. Stream-Stream — combine 2 event streams to new event — Results in a stream
  2. Stream-Table — Results in a stream
  3. Table-Table — Results in a table

Java code for Joins

Aggregations in KStream

  • Remember state of the stream and hence able to perform (count, max, sum..)
  • Groups by key. Kafka repartitions the data, so that same key is in one partition
  • Reduce is like aggregation but reduce — has restriction of being same type. Aggregations don’t have restriction of begin same type, but key has to be same.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store