Tag Archives: Kafka Connect

Sink Kafka connect

Well?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect
Source Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your Sink Kafka Connector.

Table of contents

1. Use Case
2. Code
3. Links

1. Use Case

One scenario that has become popular and I came across with some questions around in the last months is to use Kafka to trigger functions as service.

In simple words: Send a message in your kafka topic and your function(s) gets invoked with the payload which you sent to Kafka, you can now have a function trigger in response to messages in Kafka Topics.

My example is using OCI functions.

2. Code

The connector needs extends SinkConnector. Is almost the same from source and you need to implement six methods here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public Class<? extends Task> taskClass() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

    @Override
    public ConfigDef config() {
        // TODO Auto-generated method stub
        return null;
    }

And the Task needs extends SinkTask Is almost the same from source but now you don’t have the poll method and instead, you have put.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // here is where the magic happenings.
        // just add a boolean triggerFn method that trigger the function.
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

Run Docker

1
2
3
4
5
6
7
docker run -it --rm --name fn-oci-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v fn-sink-kafka-connector/target/fn-sink-kafka-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/fn-connector \
    debezium/connect:latest

Configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -X POST \
    http://localhost:8082/connectors \
    -H 'content-type: application/json' \
    -d '{
    "name": "FnSinkConnector",
    "config": {
      "connector.class": "com.fn.sink.kafka.connect.FnSinkConnector",
      "tasks.max": "1",
      "topics": "test-sink-topic",
      "tenant_ocid": "<tenant_ocid>",
      "user_ocid": "<user_ocid>",
      "public_fingerprint": "<public_fingerprint>",
      "private_key_location": "/path/to/kafka-connect/secrets/<private_key_name>",
      "function_url": "<FUNCTION_URL>"
    }
  }'

I created a package “http” where I add the OCI FN part, the code is based on this one here.

GitHub
You can check the full code in my GitHub.

3. Links

https://docs.confluent.io/current/connect/devguide.html

https://github.com/oracle/oci-java-sdk/blob/master/bmc-examples/src/main/java/InvokeFunctionExample.java

https://docs.cloud.oracle.com/en-us/iaas/Content/Functions/Concepts/functionsoverview.htm

Happy Coding.

Source Kafka connect

How’s the man?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your own Source Kafka Connector, and in the end, I’ll show an easy way to deploy your code.
Openweathermap source kafka Connector.

Table of contents

1. Get Started
2. Use Case
3. Shema
4. AbstractConfig
5. Connector
6. Task
7. Code
8. SMTs
9. Converter
10. Transformation
11. Deploy
12. Links

1. Get Started

Kafka Connect aims to reduce the burden of connecting Kafka with external systems such as databases, key-value stores, search indexes, and even file systems. Connectors are the high-level abstractions that manage the streaming of data from and to Kafka. Each connector speaks the language of the external system it connects to, hiding this complexity from the application developer who can focus on more valuable business logic.

The best place to start when implementing your Source Connector is the Confluent Connector Development Guide.

There’s a code sample here;

This maven quickstart is used to generate a skeleton plugin for Kafka Connect. Here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0

mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0 \
    -Dpackage=com.github.jcustenborder.kafka.connect.test \
    -DgroupId=com.github.jcustenborder.kafka.connect \
    -DartifactId=testconnect \
    -DpackageName=com.github.jcustenborder.kafka.connect.test \
    -Dversion=1.0-SNAPSHOT

The nice thing about the Archetype is that it will create the boilerplate code for your connector, some basic properties, and some empty tests.

The dependencies that I used are;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-transforms</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>com.konghq</groupId>
            <artifactId>unirest-java</artifactId>
            <version>3.1.02</version>
        </dependency>

And we also need the maven plugin to generate a jar with all dependencies.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

2. Use Case

OpenWeatherMap real-time weather data website. This API is already used in a lot of examples around Development. I already used some time for IoT and Data analyse demos.
They have a free API tier, which is limited to 1 request per second, which is quite enough for tracking changes in weather. Their different API schemas return plenty of numeric and textual data, all interesting for analysis. This is a perfect example for us. I tried to choose an API that contains a more complex Json, as a result, to show more in a didactic way the Shema part.

Check the API.

A partition is a division of source records that usually depends on the source medium.
In our case, there is only one API URL and we are only ever requesting current data, so a very good partition would be query weather data for different cities and each partition would be processed by a separate task.

3. Schema

The Schema interface is one of the most important components of Kafka Connect. This allows you to define what the data looks like without having to worry about the way the data is stored.

Docs here.

The best way to start is to have a look at the API result, and you can see a sample at

https://samples.openweathermap.org/data/2.5/weather?q=London,uk&appid=b6907d289e10d714a6e88b30761fae22

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
  "coord": {
    "lon": -0.13,
    "lat": 51.51
  },
  "weather": [
    {
      "id": 300,
      "main": "Drizzle",
      "description": "light intensity drizzle",
      "icon": "09d"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 280.32,
    "pressure": 1012,
    "humidity": 81,
    "temp_min": 279.15,
    "temp_max": 281.15
  },
  "visibility": 10000,
  "wind": {
    "speed": 4.1,
    "deg": 80
  },
  "clouds": {
    "all": 90
  },
  "dt": 1485789600,
  "sys": {
    "type": 1,
    "id": 5091,
    "message": 0.0103,
    "country": "GB",
    "sunrise": 1485762037,
    "sunset": 1485794875
  },
  "id": 2643743,
  "name": "London",
  "cod": 200
}

Here if you have a look in the json you are going to see that the json has some structures inside, for example, you can see weather that contains, id, main, descriptions and so on.

We can map this in an easy way to a schema;

1
2
3
4
5
6
7
8
public static Schema WEATHER_SCHEMA = SchemaBuilder.struct()
        .name("Weather")
        .version(1)
        .field(ID, Schema.INT64_SCHEMA)
        .field(MAIN, Schema.STRING_SCHEMA)
        .field(DESCRIPTION, Schema.STRING_SCHEMA)
        .field(ICON, Schema.STRING_SCHEMA)
        .build();

But sometimes the json can contain a more complex structure and for this, we can create a schema that uses another schema as a type.
Let’s see wind

1
2
3
4
5
6
7
public static Schema WIND_SCHEMA = SchemaBuilder.struct()
        .name("Wind")
        .version(1)
        .field(SPEED, Schema.OPTIONAL_FLOAT32_SCHEMA)
        .field(DEG, Schema.OPTIONAL_INT32_SCHEMA)
        .optional()
        .build();

So now I can create a schema using the others that I created before.

1
2
3
4
5
6
7
8
public static Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(NAME, Schema.STRING_SCHEMA)
        .field(WEATHER, WEATHER_ARRAY_SCHEMA)
        .field(MAIN, MAIN_SCHEMA)
        .field(WIND, WIND_SCHEMA)
        .build();

4. AbstractConfig

AbstractConfig is an abstract class that defines an interface that our WeatherAPIConfig
needs to adhere to, and here is where we get the configuration for our connector.

We need to pass the openWeatherApiKey, a list of cities, that we want to get the weather data and the kafka topic. I add pollFrequency as a hardcoded value because the API has a limitation of the number of requests in the free tier.

1
2
3
4
5
6
7
public WeatherAPIConfig(Map<String, ?> originals) {
    super(config(), originals);
    this.openWeatherApiKey = this.getString(OPEN_WEATHER_API_KEY);
    this.cities = this.getString(CITIES);
    this.pollFrequency = this.getLong(POLL_FREQUENCY);
    this.kafkaTopic = this.getString(KAFKA_TOPIC);
}

5. Connector

SourceConnector is an abstract class that defines an interface that our WeatherAPIConnector
needs to adhere to. There are six methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // here is were we implement the configuration class
}

@Override
public Class<? extends Task> taskClass() {
    // just need pass the name of the class
    return null;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    // here is a function partition strategy to split the list o cities to a different task
    return null;
}

@Override
public void stop() {
   // since we don't start anything on Star method we don't need do anything here
}

@Override
public ConfigDef config() {
    // here we already defined in the weatherAPI class
    return null;
}

6. Task

In a similar manner to the Source Connector class, we implement the Source Task abstract class. Here we have only 4 methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // initialize the configuration and the weather API client
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    // this method is going to be called continuous and because of the API limitation, we need to add a sleep to decrease the poll frequency.
    return null;
}

@Override
public void stop() {
    // TODO Auto-generated method stub      
}

7. Code

I create one package model where I add my DTO to the fields that I used in my schema and package shema.

WeatherAPIClient this class just calls the API and passes the list of cities.
I use the WeatherAPIConfig config to get all configurations;

1
2
3
4
5
6
7
8
public List<Weather> getCurrentWeather() {
    return Arrays.stream(config.getCities().split(","))
            .map(city -> Unirest.get(BASE_URL)
                    .queryString("q", city)
                    .queryString("APPID", config.getOpenWeatherApiKey())
                    .asObject(Weather.class))
            .map(HttpResponse::getBody)
            .collect(Collectors.toList());

8. SMTs

From an architectural point of view, SMT resides exactly between the connectors and the converters and they apply their data transformation just before storing the data into Kafka and right before extracting the data from it.

The Kafka Connect frameworks come with many built-in SMT to address the most common transformations use cases like for example:

  • mask sensitive message fields,
  • add identifiers,
  • tag events,
  • cast data types to comply to the destination,
  • convert data (e.g. timestamp conversion like time-based data conversion standardization),
  • slim down messages
  • or re-routing them via re-partitioning.

Full documentation here.

SMT is configured right beside connectors as their configuration properties reside inside their same file.

Let’s see one example using FileSource Connector

1
2
3
4
5
6
7
8
9
10
11
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

As you can see SMT can be combined and chained pretty easily by simply configuring them next to the Kafka Connect sinks and sources.

But the most important thing that should always be kept in mind is the “one message at a time”.
This is particularly crucial because it strongly limits SMT’s usage. In particular, it doesn’t permit any kind of transformation that is characterized by any kind of aggregation logic. Processing one message at a time is mandatory due to how partitions work and how the offset semantic is implemented in Kafka.

9. Converter

Here I create a package converter where I add an example of a converter, Serialize, and Deserializer. Here we have only three methods to implement.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    // get the properties
}

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
    // convert to array of bytes
    return null;
}

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
    // convert to schema value
    return null;
}

StringConverter implements Converter
StringDeserializer implements Deserializer
StringSerializer implements Serializer

10. Transformation

Here I just add an example of transformation.

IntegrityCheck> implements Transformation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void configure(Map<String, ?> props) {
    //
}

@Override
public R apply(R record) {
   // check messages with null key with schema and without schema
}

@Override
public ConfigDef config() {
    return CONFIG_DEF;
}

11. Deploy

There is an official Kafka Connect Docker image here, but you can use any kafka connect docker image like Debezium

1
2
3
4
5
6
7
docker run -it --rm --name weather-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v openweathermap-connector/target/openweathermap-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/openweathermap-connector \
    debezium/connect:latest

And we need call the rest api with this Json to start the connector;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
   “name”: “weathermap-connector”,
  “Config”: {
      “connector-class”:”com.openweathermap.kafka.connect.WeatherAPIConnector,
      “Value.converter”:”com.openweathermap.kafka.connect.converter.StringConverter,
      “value.converter.encoding”:”UTF-8,
      “tasks.max”: “1,
      “open.weather.api.key”:”12312312312312313123123123123,
      “cities”: “Ireland, Brazil”,
      “kafka.topic”:”weather”,
      “name”:”weather-connector”,
      “transform”:”ReplaceField,IntegrityCheck”,
      “transform.ReplaceField.type”:”com.openweathermap.kafka.connect.transform.ReplaceField$Value”,
      “transform.ReplaceField.blacklist”:”main”,
     "transform.IntegrityCheck.type”:”com.openweathermap.kafka.connect.transform.IntegrityCheck$Value”,
      “transform.IntegrityCheck.field”:”integrity”,
  }
}

GitHub
You can check the full code in my Github, and I did a Scala demo as well, where I just follow the same structure of folders, packages and classes.

12. Links

https://docs.confluent.io/current/connect/devguide.html

https://www.confluent.io/wp-content/uploads/Partner-Dev-Guide-for-Kafka-Connect.pdf?x18424

https://docs.confluent.io/current/connect/userguide.html

https://docs.confluent.io/3.1.1/connect/userguide.html

https://www.confluent.jp/blog/create-dynamic-kafka-connect-source-connectors/

Stay tuned! Next blog post I’ll show how to code a Sink Kafka connector.

Kafka Connect

How are you?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Source Standalone mode
2. Source Distributed mode
3. Sink
4. Don’t use Docker composer
5. Lensesio
6. Strimzi
7. Debezium
8. JDBC
9. Link

1. Source Standalone mode

Standalone mode is the best way to get started with minimum config and infrastructure setup. In this section, we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.

Here I’m going to show how to use the FileSource Connector

Let’s create a Docker compose file. “docker-compose.yml”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
      - kafka

In the same directory where we have created the yaml file execute the following command to start Kafka cluster. When the below command runs for the very first time it downloads the image. Once the image is downloaded it creates a Kafka cluster.

1
docker-compose up

For starting any Kafka connect cluster we require – workers config and connector (file-stream) config.
Create two files: workers-config.properties and file-stream-connector-properties.

Workers-config.properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

File-stream-connector-properties:

1
2
3
4
5
6
7
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone

You should see three files in the folder; docker-compose.yml, file-stream-connector-properties and workers-config.properties

Create an input file source-input.txt, the content of the file is transferred to Kafka topic.

1
touch source-input.txt

Mount a host directory in a docker container: Make sure we are in the directory where we have created the files. After mount, we automatically switch to cp-kafka-connect.

1
docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host confluentinc/cp-kafka-connect

This docker image expect the connector in this folder;
/etc/kafka-connect/jars

Create Kafka topic “kafka-connect-standalone” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create standalone connector using workers-config.properties and file-stream-connector-properties.

1
connect-standalone workers-config.properties file-stream-connector-properties

Now you can test: Open file source-input.txt and type some message to it & save it. The message should have been transferred to Kafka topic.

What happened in the background: We wrote data to the source file and Kafka connect standalone pushed the data to topic. No programming, just configs.

Now stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in the given directory. We have a new guest in the form of a file “standalone.offsets”.
This file is created by Kafka connect to keep track of from where it should resume reading messages from the source file on re-starts.
When Kafka connect starts again it should resume reading without publishing duplicate messages to the topic.

Try to execute the above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate if you do not have a duplicate message.

2. Source Distributed mode

Now let’s configure a connector in distributed mode.

Create Kafka topic “kafka-connect-distibuted” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create a config file “connect-source.json”.

1
2
3
4
5
6
7
8
9
name=file-stream-kafka-connect-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=source-input.txt
topic=kafka-connect-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

and run;

1
curl -d @<path-to-config-file>/connect-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Now write some message in the source file and once the file is saved, all messages are posted in topic “kafka-connect-distributed”.

check for the message copied from file to topic by FileSourceConnector. Note that messages are stored in JSON format as the connector topic created earlier with config value.converter.schemas.enable=true.

We can run the command to check the topic;

1
kafka-console-consumer --topic kafka-connect-distributed --from-beginning --bootstrap-server 127.0.0.1:9092

3. Sink

Now we need a sink example; Let’s look at a JDBC one. MondoBD.
We can add this to the docker composer file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

and run;

1
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Create a connect-mongodb-sink.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter

  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

Elasticsearch

1
2
3
4
5
6
7
elasticsearch:
    image: itzg/elasticsearch:2.4.3
    environment:
      PLUGINS: appbaseio/dejavu
      OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
    ports:
      - "9200:9200"

Create a config file “connect-elasticsearch-sink.json”.

1
2
3
4
5
6
7
8
9
10
11
name=sink-elastic-twitter-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=http://elasticsearch:9200
type.name=kafka-connect
key.ignore=true

Here is a nice link to check about Docker options and images.

I found this with a nice and simple docker composer for single or multiple
nodes here.

4. Don’t use Docker composer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper:3.5.5

#Start Kafka Broker
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0

#Start Kafka Connect
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=kafka_connect_configs \
-e OFFSET_STORAGE_TOPIC=kafka_connect_offsets \
-e STATUS_STORAGE_TOPIC=kafka_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
debezium/connect:1.0

5. Lensesio

Another option is to use the lenses.io image that already contains several kafka connectors pre installed and a nice user interface.

1
2
3
4
5
6
7
8
9
10
11
12
13
services:
  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

https://lenses.io/
https://github.com/lensesio/fast-data-dev

6. Strimzi

Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. It provides container images and Operators for running Kafka on Kubernetes.

https://strimzi.io/

https://medium.com/@sincysebastian/setup-kafka-with-debezium-using-strimzi-in-kubernetes-efd494642585

https://medium.com/@vamsiramakrishnan/strimzi-on-oracle-kubernetes-engine-oke-kafka-connect-on-oracle-streaming-service-oss-a-c73cc9714e90

https://itnext.io/kafka-connect-on-kubernetes-the-easy-way-b5b617b7d5e9

7. Debezium

Debezium before, it is an open source project for applying the Change Data Capture (CDC) pattern to your applications using Kafka.

https://debezium.io/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  postgres:
    image: debezium/postgres:9.6
    ports:
     - "5432:5432"
    environment:
     - POSTGRES_USER=postgresuser
     - POSTGRES_PASSWORD=postgrespw
     - POSTGRES_DB=inventory
  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.5.2
    ports:
     - "9200:9200"
    environment:
     - http.host=0.0.0.0
     - transport.host=127.0.0.1
     - xpack.security.enabled=false
  connect:
    image: debezium/connect-jdbc-es:${DEBEZIUM_VERSION}
    build:
      context: debezium-jdbc-es
    ports:
     - 8083:8083
     - 5005:5005
    links:
     - kafka
     - mysql
     - postgres
     - elastic
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_source_connect_statuses

And of course, you can use Strimzi and Debezium together here.

8. JDBC

One of the most common integrations that people want to do with Kafka is getting data in or from a database.

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/

https://dev.to/rmoff/streaming-data-from-kafka-to-s3-video-walkthrough-2elh

9. Link

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html

https://docs.confluent.io/3.1.2/cp-docker-images/docs/quickstart.html

https://github.com/confluentinc/cp-docker-images/tree/5.3.1-post/examples

https://www.baeldung.com/kafka-connectors-guide

https://blog.softwaremill.com/do-not-reinvent-the-wheel-use-kafka-connect-4bcabb143292

https://www.confluent.io/blog/webify-event-streams-using-kafka-connect-http-sink/

Stay tuned! Next blog post I’ll show how to code your own Kafka connector.

kafka Connector Architecture

What’s the story Rory?

This blog post is part of my series of posts regarding “Kafka Connect Overview“.
If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Kafka Connect
2. Source & Sink Connectors
3. Standalone & Distributed
4. Converters & Transforms
5. Life cycle
6. Code
7. Books
8. Link

1. Kafka Connect

Kafka Connects goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and its unique features and design decisions

Kafka Connect has three major models in its design:

  • Connector model
  • Worker model
  • Data model

The connector model addresses three key user requirements. First, Kafka Connect performs broad copying by default by having users define jobs at the level of Connectors which then break the job into smaller Tasks. This two level scheme strongly encourages connectors to use configurations that encourage copying broad swaths of data since they should have enough inputs to break the job into smaller tasks. It also provides one point of parallelism by requiring Connectors to immediately consider how their job can be broken down into subtasks, and select an appropriate granularity to do so. Finally, by specializing source and sink interfaces, Kafka Connect provides an accessible connector API that makes it very easy to implement connectors for a variety of systems.

The worker model allows Kafka Connect to scale to the application. It can run scaled down to a single worker process that also acts as its own coordinator, or in clustered mode where connectors and tasks are dynamically scheduled on workers. However, it assumes very little about the process management of the workers, so it can easily run on a variety of cluster managers or using traditional service supervision. This architecture allows scaling up and down, but Kafka Connect’s implementation also adds utilities to support both modes well. The REST interface for managing and monitoring jobs makes it easy to run Kafka Connect as an organization-wide service that runs jobs for many users. Command line utilities specialized for ad hoc jobs make it easy to get up and running in a development environment, for testing, or in production environments where an agent-based approach is required.

The data model addresses the remaining requirements. Many of the benefits come from coupling tightly with Kafka. Kafka serves as a natural buffer for both streaming and batch systems, removing much of the burden of managing data and ensuring delivery from connector developers. Additionally, by always requiring Kafka as one of the endpoints, the larger data pipeline can leverage the many tools that integrate well with Kafka. This allows Kafka Connect to focus only on copying data because a variety of stream processing tools are available to further process the data, which keeps Kafka Connect simple, both conceptually and in its implementation. This differs greatly from other systems where ETL must occur before hitting a sink. In contrast, Kafka Connect can bookend an ETL process, leaving any transformation to tools specifically designed for that purpose. Finally, Kafka includes partitions in its core abstraction, providing another point of parallelism

In simples words

Kafka Connect is a distributed, scale, fault-tolerant service designed to reliably stream data between Kafka and other data systems. Data is produced from a source and consumed to a sink.

Connect tracks the offset that was last consumed for a source, to restart task at the correct starting point after a failure. These offsets are different from Kafka offsets, they are based on the sourse system like database, file, etc.

In standalone mode, the source offset is tracked in a local file and in a distributed mode, the source offset is tracked in a Kafka topic.

2. Source & Sink connectors

Producers and Consumers provide complete flexibility to send any data to Kafka or process in any way. This flexibility means you do everything yourself.

Kafka Connect’s simple frameworks allows;

  • developers to create connectors that copy data to or from others systems.
  • Operators to use said connectors just by writing configuration files and submitting them to Connect. (No code)
  • Community and 3rd-party engineers to build reliable plugins for common data sources and sinks.
  • Deployments to deliver fault-tolerant and automated load balance out-of-the-box.

And the frameworks do the hard work;

  • Serialization and deserialization.
  • Schema registry integration.
  • Fault-tolerant and failover.
  • Partitioning and scale-out.
  • And let the developers focus on domain specific details.

3. Standalone & Distributed

In standalone mode we have a source or a Sink and a Kafka broker, when we deploy Kafka connect in the standalone we actually need to pass a configuration file containing all the connection properties that we need to run. So standalone’s main way of providing configuration to our connector is by using properties files and not the Rest API.

In the distributed mode we usually have more than one worker, since these workers can be in different machines or containers they can not share the same storage space, so a properties file is out of the question. Instead kafka connect in distributed mode leverage kafka topics in order to sink between themselves.

4. Converters & Transforms

Pluggable API to convert data between native formats and Kafka. Just like the name says. Converters are used to come for data from a format to another.
In the Source connectors converters are invoked after the data has been fetched from the source and before it is published to kafka.
In the Sink connectors converters are invoked after the data has been consumed from Kafka and before it is stored in the sink.

Apache Kafka ships with Json Converter.

Transform is a simple operation that can be applied in the message level.

There’s a nice blog post about Single message transforms here.

Single message transforms SMT modify events before storing in Kafka, mask sensitive information, add identifiers, tag events, remove unnecessary columns and more, modify events going out of Kafka, route high priority events to faster datastore, cast data types to match destination and more.

5. Life cycle

I build this animation using the slides from (Randall Hauch, Confluent) Kafka Summit SF 2018 here.

Sequence diagram

6. Code

Confluent makes available a code example that you can found here.

Confluent hub

7. Books

Modern Big Data Processing with Hadoop

8. Links

https://docs.confluent.io/current/connect/managing/confluent-hub/component-archive.html

https://docs.confluent.io/current/connect/design.html

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

Kafka Connect Overview

How’s it going horse?

If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion. The idea is that I’ll create a series of posts regarding Kafka Connect. Stay tuned!

Table of contents

1. What is Kafka Connect?
2. Concepts
3. Features
4. Why Kafka connect?
5. Courses
6. Books
7. Influencers List
8. Link

1. What is Kafka Connect?

Kafka Connect (or Connect API) is a framework to import/export data from/to other systems. It was added in the Kafka 0.9.0.0 release and uses the Producer and Consumer API internally. The Connect framework itself executes so-called “connectors” that implement the actual logic to read/write data from other systems. The Connect API defines the programming interface that must be implemented to build a custom connector. Many open source and commercial connectors for popular data systems are available already. However, Apache Kafka itself does not include production ready connectors.

Connectors are meant to provide a simple way of connecting to external systems, only requiring a configuration file, while the scaling, distribution and persistence of state is handled by the framework for you. Connectors for common things like JDBC exist already at the Confluent Hub.
Official blog announcement and overview

For example;
Sometimes you need to process streams of data that are not in your Kafka cluster. These data may be located in a SQL database like SQL Server, MySQL or a simple CSV file. In order to process those data, you have to move them from your database to the Kafka cluster. To this end, you have some options and two of them are:

  1. Create an application that reads data from your source storage system and produces them to Kafka cluster.
  2. Or use Kafka Connect to move your data easily from source storage system to your Kafka cluster.

If you choose the first option you need to write codes that move your data to the Kafka cluster. Your code must deal with the failure of your application (for example it must store the offset of the last record of tables that are moved to Kafka, so it can continue to copy the records that were not inserted into Kafka), scalability, polling and much more.

But if you choose the second option you can move data without writing any code. The Kafka Connect does the same job as the first option but in a scalable and fault-tolerant way. The process of copying data from a storage system and move it to Kafka Cluster is so common that Kafka Connect tool is created to address this problem.

Kafka connectors provide some powerful features. They can be easily configured to route unprocessable or invalid messages to a dead letter queue, apply Single Message Transforms before a message is written to Kafka by a source connector or before it is consumed from Kafka by a sink connector, integrate with Confluent Schema Registry for automatic schema registration and management, and convert data into types such as Avro or JSON. By leveraging existing connectors, developers can quickly create fault-tolerant data pipelines that reliably stream data from an external source into records in Kafka topics or from Kafka topics into an external sink, all with mere configuration and no code!

2. Concept

To efficiently discuss the inner workings of Kafka Connect, it is helpful to establish a few major concepts, and of course, I suggest a look in the official docs, here.

  • Connectors – the high level abstraction that coordinates data streaming by managing tasks
  • Tasks – the implementation of how data is copied to or from Kafka
  • Workers – the running processes that execute connectors and tasks
  • Converters – the code used to translate data between Connect and the system sending or receiving data
  • Transforms – simple logic to alter each message produced by or sent to a connector
  • Dead Letter Queue – how Connect handles connector errors

Each connector instance can break down its job into multiple tasks, thereby parallelizing the work of copying data and providing scalability. When a connector instance starts up a task, it passes along the configuration properties that each task will need. The task stores this configuration as well as the status and the latest offsets for the records it has produced or consumed externally in Kafka topics. Since the task does not store any state, tasks can be stopped, started, or restarted at any time. Newly started tasks will simply pick up the latest offsets from Kafka and continue on their merry way.

Workers are a physical concept. They are processes that run inside JVM. Your job in Kafka Connect concepts is called a connector. It is something like this:

  • Copy records from table ‘accounts’ of my MySQL to Kafka topic ‘accounts’. It is called a source connector because you move data from external storage to Kafka.
  • Copy each message from Kafka topic ‘product-events’ to a CSV file ‘myfile.csv’. It is called a sink connector because you move data from Kafka to external storage.

Kafka Connect uses workers for moving data. Workers are just simple Linux (or any other OS) processes. Kafka Connect can create a cluster of workers to make the copying data process scalable and fault tolerant. Workers need to store some information about their status, their progress in reading data from external storage and so on. To store that data, they use Kafka as their storage. Note that Kafka Connect cluster (which is a cluster of workers) is completely different from the Kafka cluster (which is a cluster of Kafka brokers). More workers mean that your copying process is more fault tolerant.

Standalone vs. Distributed Mode

There are two modes for running workers: standalone mode and distributed mode. You should identify which mode works best for your environment before getting started.

Standalone mode is useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to Kafka).

Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve.

Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. And, because of Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs do not result in any lost data.

Converters are necessary to have a Kafka Connect deployment support a particular data format when writing to or reading from Kafka. Tasks use converters to change the format of data from bytes to a Connect internal data format and vice versa.

3. Features

A common framework for Kafka connectors
It standardizes the integration of other data systems with Kafka. Also, it simplifies connector development, deployment, and management.

Distributed and standalone modes
Scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments.

REST interface
By an easy to use REST API, we can submit and manage connectors to our Kafka Connect cluster.

Automatic offset management
However, Kafka Connect can manage the offset commit process automatically even with just a little information from connectors. Hence, connector developers do not need to worry about this error-prone part of connector development.

Distributed and scalable by default
It builds upon the existing group management protocol. And to scale up a Kafka Connect cluster we can add more workers.

Streaming/batch integration
We can say for bridging streaming and batch data systems, Kafka Connect is an ideal solution.

Transformations
Enable to make simple and lightweight modifications to individual messages


From Zero to Hero with Kafka Connect by Robin Moffatt

4. Why Kafka Connect

Auto-recovery After Failure
To each record, a “source” connector can attach arbitrary “source location” information which it passes to Kafka Connect. Hence, at the time of failure Kafka Connect will automatically provide this information back to the connector. In this way, it can resume where it failed. Additionally, auto recovery for “sink” connectors is even easier.

Auto-failover
Auto-failover is possible because the Kafka Connect nodes build a Kafka cluster. That means if suppose one node fails the work that it is doing is redistributed to other nodes.

Simple Parallelism
A connector can define data import or export tasks, especially which execute in parallel.

5. Courses

https://www.udemy.com/course/kafka-connect/

https://www.pluralsight.com/courses/kafka-connect-fundamentals

6. Books

Kafka: The Definitive Guide is the best option to start. There is a chapter “Kafka Connect”

Building Data Streaming Applications with Apache Kafka – there is a nice chapter “Deep dive into Kafka Connect”

7. Influencers List

@rmoff
@tlberglund

8. Links

https://www.confluent.io/blog/announcing-kafka-connect-building-large-scale-low-latency-data-pipelines/

https://docs.confluent.io/current/connect/concepts.html

What is Kafka?

How’s the form?

if you’re not familiar with Big Data or Data lake, I suggest you have a look on my previous post “What is Big Data?” and “What is data lake?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. What is Kafka?
2. Architecture
3. History
4. Courses
5. Books
6. Influencers List
7. Link

1. What is Kafka?

In simple terms, Kafka is a messaging system that is designed to be fast, scalable, and durable. It is an open-source stream processing platform. Kafka is a distributed publish-subscribe messaging system that maintains feeds of messages in partitioned and replicated topics.

Wikipedia definition: Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a “massively scalable pub/sub message queue designed as a distributed transaction log, making it highly valuable for enterprise infrastructures to process streaming data. Additionally, Kafka connects to external systems (for data import/export) via Kafka Connect and provides Kafka Streams, a Java stream processing library.

I Googled about and I found …
Kafka is designed for distributed high throughput systems. Kafka tends to work very well as a replacement for a more traditional message broker. In comparison to other messaging systems, Kafka has better throughput, built-in partitioning, replication and inherent fault-tolerance, which makes it a good fit for large-scale message processing applications.

Other …
Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another. Kafka is suitable for both offline and online message consumption. Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.

Producers

Producers produce messages to a topic of their choice. It is possible to attach a key to each message, in which case the producer guarantees that all messages with the same key will arrive to the same partition.

Consumers

Consumers read the messages of a set of partitions of a topic of their choice at their own pace. If the consumer is part of a consumer group, i.e. a group of consumers subscribed to the same topic, they can commit their offset. This can be important if you want to consume a topic in parallel with different consumers.

Topics and Logs

A topic is a feed name or category to which records are published. Topics in Kafka are always multi-subscriber — that is, a topic can have zero, one, or many consumers that subscribe to the data written to it. For each topic, the Kafka cluster maintains a partition log that looks like this:

Topics are logs that receive data from the producers and store them across their partitions. Producers always write new messages at the end of the log.

Partitions

A topic may have many partitions so that it can handle an arbitrary amount of data. In the above diagram, the topic is configured into three partitions (partition{0,1,2}). Partition 0 has 13 offsets, Partition 1 has 10 offsets, and Partition 2 has 13 offsets.

Partition Offset

Each partitioned message has a unique sequence ID called an offset. For example, in Partition 1, the offset is marked from 0 to 9. The offset is the position in the log where the consumer last consumed or read a message.

Distribution

The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

Each partition has one server which acts as the “leader” and zero or more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

Geo-Replication

Kafka MirrorMaker provides geo-replication support for your clusters. With MirrorMaker, messages are replicated across multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in active/active scenarios to place data closer to your users, or support data locality requirements.

Replicas

Replicas are nothing but backups of a partition. If the replication factor of the above topic is set to 4, then Kafka will create four identical replicas of each partition and place them in the cluster to make them available for all its operations. Replicas are never used to read or write data. They are used to prevent data loss.

Messaging System

A messaging system is a system that is used for transferring data from one application to another so that the applications can focus on data and not on how to share it. Kafka is a distributed publish-subscribe messaging system. In a publish-subscribe system, messages are persisted in a topic. Message producers are called publishers and message consumers are called subscribers. Consumers can subscribe to one or more topic and consume all the messages in that topic.

Two types of messaging patterns are available − one is point to point and the other is publish-subscribe (pub-sub) messaging system. Most of the messaging patterns follow pub-sub.

  • Point to Point Messaging System – In a point-to-point system, messages are persisted in a queue. One or more consumers can consume the messages in the queue, but a particular message can be consumed by a maximum of one consumer only. Once a consumer reads a message in the queue, it disappears from that queue. The typical example of this system is an Order Processing System, where each order will be processed by one Order Processor, but Multiple Order Processors can work as well at the same time.
  • Publish-Subscribe Messaging System – In the publish-subscribe system, messages are persisted in a topic. Unlike point-to-point system, consumers can subscribe to one or more topic and consume all the messages in that topic. In the Publish-Subscribe system, message producers are called publishers and message consumers are called subscribers. A real-life example is Dish TV, which publishes different channels like sports, movies, music, etc., and anyone can subscribe to their own set of channels and get them whenever their subscribed channels are available

Brokers

Brokers are simple systems responsible for maintaining published data. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. Each broker may have zero or more partitions per topic. For example, if there are 10 partitions on a topic and 10 brokers, then each broker will have one partition. But if there are 10 partitions and 15 brokers, then the starting 10 brokers will have one partition each and the remaining five won’t have any partition for that particular topic. However, if partitions are 15 but brokers are 10, then brokers would be sharing one or more partitions among them, leading to unequal load distribution among the brokers. Try to avoid this scenario.

Cluster

When Kafka has more than one broker, it is called a Kafka cluster. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data.
Multi-tenancy
You can deploy Kafka as a multi-tenant solution. Multi-tenancy is enabled by configuring which topics can produce or consume data. There is also operations support for quotas. Administrators can define and enforce quotas on requests to control the broker resources that are used by clients. For more information, see the security documentation.

Zookeeper

ZooKeeper is used for managing and coordinating Kafka brokers. ZooKeeper is mainly used to notify producers and consumers about the presence of any new broker in the Kafka system or about the failure of any broker in the Kafka system. ZooKeeper notifies the producer and consumer about the presence or failure of a broker based on which producer and consumer makes a decision and starts coordinating their tasks with some other broker.

2. Architecture

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Apache describes Kafka as a distributed streaming platform that lets us:

  • Publish and subscribe to streams of records.
  • Store streams of records in a fault-tolerant way.
  • Process streams of records as they occur.

Apache.org states that:

  • Kafka runs as a cluster on one or more servers.
  • The Kafka cluster stores a stream of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

3. History

Kafka was developed around 2010 at LinkedIn by a team that included Jay Kreps, Jun Rao, and Neha Narkhede. The problem they originally set out to solve was low-latency ingestion of large amounts of event data from the LinkedIn website and infrastructure into a lambda architecture that harnessed Hadoop and real-time event processing systems. The key was the “real-time” processing. At the time, there weren’t any solutions for this type of ingress for real-time applications.

There were good solutions for ingesting data into offline batch systems, but they exposed implementation details to downstream users and used a push model that could easily overwhelm a consumer. Also, they were not designed for the real-time use case.

Kafka was developed to be the ingestion backbone for this type of use case. Back in 2011, Kafka was ingesting more than 1 billion events a day. Recently, LinkedIn has reported ingestion rates of 1 trillion messages a day.

https://www.confluent.io/blog/apache-kafka-hits-1-1-trillion-messages-per-day-joins-the-4-comma-club/

Why Kafka?

In Big Data, an enormous volume of data is used. But how are we going to collect this large volume of data and analyze that data? To overcome this, we need a messaging system. That is why we need Kafka. The functionalities that it provides are well-suited for our requirements, and thus we use Kafka for:

  • Building real-time streaming data pipelines that can get data between systems and applications.
  • Building real-time streaming applications to react to the stream of data.

Kafka can work with Flume/Flafka, Spark Streaming, Storm, HBase, Flink, and Spark for real-time ingesting, analysis and processing of streaming data. Kafka is a data stream used to feed Hadoop Big Data lakes. Kafka brokers support massive message streams for low-latency follow-up analysis in Hadoop or Spark. Also, Kafka Streaming (a subproject) can be used for real-time analytics.

Why is it so popular?

RedMonk.com published an article in February 2016 documenting some interesting stats around the “rise and rise” of a powerful asynchronous messaging technology called Apache Kafka.
https://redmonk.com/fryan/2016/02/04/the-rise-and-rise-of-apache-kafka/

Kafka has operational simplicity. Kafka is to set up and use, and it is easy to figure out how Kafka works. However, the main reason Kafka is very popular is its excellent performance. It is stable, provides reliable durability, has a flexible publish-subscribe/queue that scales well with N-number of consumer groups, has robust replication, provides producers with tunable consistency guarantees, and it provides preserved ordering at the shard level (i.e. Kafka topic partition). In addition, Kafka works well with systems that have data streams to process and enables those systems to aggregate, transform, and load into other stores. But none of those characteristics would matter if Kafka was slow. The most important reason Kafka is popular is Kafka’s exceptional performance.

Who Uses Kafka?

A lot of large companies who handle a lot of data use Kafka. LinkedIn, where it originated, uses it to track activity data and operational metrics. Twitter uses it as part of Storm to provide a stream processing infrastructure. Square uses Kafka as a bus to move all system events to various Square data centers (logs, custom events, metrics, and so on), outputs to Splunk, for Graphite (dashboards), and to implement Esper-like/CEP alerting systems. It’s also used by other companies like Spotify, Uber, Tumbler, Goldman Sachs, PayPal, Box, Cisco, CloudFlare, and Netflix.

Why Is Kafka So Fast?

Kafka relies heavily on the OS kernel to move data around quickly. It relies on the principals of zero copy. Kafka enables you to batch data records into chunks. These batches of data can be seen end-to-end from producer to file system (Kafka topic log) to the consumer. Batching allows for more efficient data compression and reduces I/O latency. Kafka writes to the immutable commit log to the disk sequential, thus avoiding random disk access and slow disk seeking. Kafka provides horizontal scale through sharding. It shards a topic log into hundreds (potentially thousands) of partitions to thousands of servers. This sharding allows Kafka to handle massive load.

Benefits of Kafka

Four main benefits of Kafka are:

  • Reliability. Kafka is distributed, partitioned, replicated, and fault tolerant. Kafka replicates data and is able to support multiple subscribers. Additionally, it automatically balances consumers in the event of failure.
  • Scalability. Kafka is a distributed system that scales quickly and easily without incurring any downtime.
  • Durability. Kafka uses a distributed commit log, which means messages persists on disk as fast as possible providing intra-cluster replication, hence it is durable.
  • Performance. Kafka has high throughput for both publishing and subscribing messages. It maintains stable performance even when dealing with many terabytes of stored messages.

Use Cases

Kafka can be used in many Use Cases. Some of them are listed below

  • Metrics − Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
  • Log Aggregation Solution − Kafka can be used across an organization to collect logs from multiple services and make them available in a standard format to multiple con-sumers.
  • Stream Processing − Popular frameworks such as Storm and Spark Streaming read data from a topic, processes it, and write processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.

4. Courses

https://www.udemy.com/apache-kafka-tutorial-for-beginners/

5. Book

Kafka: The Definitive Guide is the best option to start.

oreilly
pdf
github

6. Influencers List

@nehanarkhede

@rmoff

@tlberglund

7. Link

Confluent

Apache Kafka

Thorough Introduction to Apache Kafka

A good Kafka explanation

What is Kafka

Kafka Architecture and Its Fundamental Concepts

Apache Kafka Tutorial — Kafka For Beginners

What to consider for painless Apache Kafka integration