Tag Archives: Kafka

OCI Dashboard with Smashing/Dashing and Kafka

What’s up?
2021/03/05

This is another blog about Raspberry PI, and today I want to show how I did a simple Kafka cluster demo. It’s kind of a continuation from my two previous blogs, Kafka at the edge with Raspberry PI and Real-Time Locating System with Kafka.

If you’re not familiar with Kafka, I suggest you have a look at my previous post What is Kafka? before, and you can have a look at how I created the Kafka cluster here.

Idea

I found the idea of an OCI dashboard with smashing dashing in a colleague’s blog some time ago and I decided to pivot a little bit in a simple Raspberry Pi Kafka example where I can get OCI data and combine it with the smashing dashing dashboard idea.

For the producer part, you can use my Micronaut Sense hat example as a start point and just change to get the OCI data. You can get the full Micronaut Kafka Producer code on my GitHub.

https://docs.oracle.com/en-us/iaas/Content/API/SDKDocs/javasdk.htm
https://github.com/oracle/oci-java-sdk

For the consumer part, you can use my Quarkus example. You can get the full Quarkusl Kafka Consumer code on my GitHub.

clone the code and change the update.py file to read the value from the REST interface that Quarkus generates.

Results

Because this is a kind of Oracle Cloud solution you can use Oracle Streaming Service instead of hosting your Kafka.

Oracle Streaming Service (OSS) is a real-time, serverless, Apache Kafka-compatible event streaming platform for developers and data scientists, it provides a fully managed, scalable, and durable solution for ingesting and consuming high-volume data streams in real-time. To learn more about Oracle Streaming Service, see the documentation overview.

You can check my GitHub with some OSS examples and check the Micronaut one.

You just need to change the “application.yml” file from my Micronaut sense hat idea.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kafka:
  bootstrap:
    servers: streaming.{your-region}.oci.oraclecloud.com:9092
  security:
    protocol: SASL_SSL
  sasl:
    mechanism: PLAIN
  key:
    serializer: org.apache.kafka.common.serialization.StringSerializer
    deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value:
    serializer: org.apache.kafka.common.serialization.StringSerializer
    deserializer: org.apache.kafka.common.serialization.StringDeserializer
  retries: 5
  max:
    request:
      size: 1048576
    partition:
      fetch:
        bytes: 1048576
  group:
    id: group-0

And you can use all solutions in the cloud.

The Kafka consumer and the dashboard can be hosted in a cloud VM as well.

OCI Logging service is a highly scalable and fully managed single pane of glass for all the logs in your tenancy. Logging provides access to logs from Oracle Cloud Infrastructure resources. These logs include critical diagnostic information that describes how resources are performing and being accessed. To learn more about the Logging service, see the documentation overview.

OCI Service Connector Hub moves data, such as logs from Logging, to services, such as Object Storage, Streaming, and Monitoring. It triggers functions for custom data processing and sends notifications about changes to cloud resources. To learn more about the Service Connector Hub, see the documentation overview.

Here using Service Connector Hub I can get the logs from my tenancy and send them to OSS in a simple way.

Links

https://www.oc-blog.com/2019/05/13/oci-dashboard-with-smashing-dashing/

https://github.com/AnykeyNL/oci-smashing

Real-Time Locating System with Kafka

How’s the craic?
2021/03/03

This is another blog about Raspberry PI, and today I want to show how I did a simple Kafka cluster demo using the Inky pHAT. It’s kind of a continuation from my previous blog, Kafka at the edge with Raspberry PI.

If you’re not familiar with Kafka, I suggest you have a look at my previous post What is Kafka? before, and you can have a look at how I created the Kafka cluster here.

The Inky pHAT is an add-on board for Raspberry Pi, that has a low-energy, high-falutin, electronic paper (ePaper / eInk / EPD) display for your Pi, in three different color schemes: red/black/white, yellow/black/white, and black/white.
You can learn more about Inky pHAT here and check the API here.

Real-Time Locating System (RTLS) enables identifying and tracking the location of objects or people in real-time. It is used everywhere in transportation and logistics across industries.

Use Cases

  • Real-time alerting on a single event: Monitor assets and people and send an alert to a controller, mobile app, or any other interface if an issue happens.
  • Continuous real-time aggregation of multiple events: Correlation data while it is in motion. Calculate average, enforce business rules, apply an analytic model for predictions on new events, or any other business logic.
  • Batch analytics on all historical events: Take all historical data to find insights, e.g., for analyzing issues of the past, planning future location requirements, or training analytic models.

This is not an exhaustive list.

A postmodern RTLS requires an open architecture and high scalability and of course, the implementations can rely on Kafka.

Idea

A simple Raspberry Pi Kafka example where I can use one node to get the open Dublin Bus data and display it in real-time on the other node using Inky phat.

For the producer part, you can use my Micronaut Sense hat example as a start point and just change to use Dublin Bus data instead. You can get the full Micronaut Kafka Producer code on my GitHub.

https://data.smartdublin.ie/dataset/gtfs-r-real-time-passenger-information
https://developer.nationaltransport.ie/api-details#api=gtfsr&operation=gtfsr

For the consumer part, you can use my Quarkus example. You can get the full Quarkusl Kafka Consumer code on my GitHub.

Results

Kafka can be deployed as a single broker in a vehicle and a global Kafka infrastructure can spread to multiple cloud providers, regions, countries, or even continents and integrate with tens or hundreds of factories or other edge locations.

Curiosity

1- Dublin bus is already using an e-ink display to show bus stop data.

2- I did a demo for real-time data from connected vehicles some time ago.

3- I won a prize at Europe’s First government-funded Blockchain Hackathon with the idea of an app to track medical devices combining Kafka and BlockChain.

Links

https://towardsdatascience.com/tracking-nyc-citi-bike-real-time-utilization-using-kafka-streams-1c0ea9e24e79

https://eng.lyft.com/a-new-real-time-map-matching-algorithm-at-lyft-da593ab7b006

https://github.com/ds4es/real-time-units-gps-tracking

kafka at the edge with Raspberry PI

How’s the man?
2021/03/01

This is another blog about Raspberry PI, and today I want to show how I did a simple Kafka cluster demo using Sense Hat & GFX Hat.

If you’re not familiar with Kafka, I suggest you have a look at my previous post What is Kafka? before, and you can have a look at how I created the Kafka cluster here.

The Sense HAT is an add-on board for Raspberry Pi, tha has an 8×8 RGB LED matrix, a five-button joystick and includes the following sensors: Gyroscope, Accelerometer, Magnetometer, Barometer, Temperature sensor and Relative Humidity sensor.
You can learn more about Sense hat in my previous blog.

The GFX HAT is an add-on board for Raspberry Pi, tha has a 128×64 pixel, 2.15″ LCD display with snazzy six-zone RGB backlight and six capacitive touch buttons. GFX HAT makes an ideal display and interface for your headless Pi projects.
You can learn more about GFX hat here and check the API here.

The idea here is to focus on scenarios where the Kafka clients and the Kafka brokers are running on the edge. This enables edge processing, integration, decoupling, low latency, and cost-efficient data processing.

Edge Kafka is not simply yet another IoT project using Kafka in a remote location. Edge Kafka is an essential component of a streaming nervous system that spans IoT (or OT in Industrial IoT) and non-IoT (traditional data-center / cloud infrastructures).

Multi-cluster and cross-data center deployments of Apache Kafka have become the norm rather than an exception. A Kafka deployment at the edge can be an independent project. However, in most cases, Kafka at the edge is part of an overall Kafka architecture.
Apache Kafka is the New Black at the Edge in Industrial IoT, Logistics, and Retailing

https://www.kai-waehner.de/blog/2020/01/01/apache-kafka-edge-computing-industrial-iot-retailing-logistics/

Idea

A Raspberry Pi 2 nodes Kafka cluster, with a Micronaut Kafka producer that gets sense hat data and a Quarkus Kafka consumer that puts the result in a REST that GFX Hat reads using python API.

The Micronaut producer gets the Sense Hat humidity, pressure, and temperature values and sends it to a Kafka topic.

The Quarkus consumer reads a Kafka topic and generates a REST interface with the last topic value. I used the GFX Hat to display the result.

You can get the full Micronaut Kafka Producer code on my GitHub.
You can get the full Quarkusl Kafka Consumer code on my GitHub.

Results

Kafka is a great solution for the edge. It enables deploying the same open, scalable, and reliable technology at the edge, data center, and the cloud. This is relevant across industries. Kafka is used in more and more places where nobody has seen it before. Edge sites include retail stores, restaurants, cell towers, trains, and many others.

Raspberry Pi Kafka cluster

How heya?
2020/10/09

This is another blog about Raspberry PI, and today I want to show how I did a simple Kafka cluster.

If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.

This is a really simple tutorial and you can find similar instructions over the internet.

Pre-Requisites

  • Raspbian installed

Components

  • 3x Raspberry Pi3 B with power supply and micro SD card with Raspbian.

Despite every time that you look at a cluster tutorial they add a switch, router, and some internet cables I did use just simple wifi.

For the software part, we will download Zookeeper and Kafka, and there’s no installation. Is just about untar and changing some settings files.
Download latest Apache Zookeeper & Apache Kafka.

Steps

You should do this in all Raspberry PIs.

1) Zookeeper

1
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

Modify the “conf/zoo.cfg” with;

1
2
3
4
5
6
7
8
9
dataDir=/opt/zookeeper_data
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/zookeeper
clientPort=2181
server.1=192.168.0.18:2888:3888
server.2=192.168.0.15:2888:3888
server.3=192.168.0.16:2888:3888

create a file “myid” and the file should have only the id of the zookeeper node.
I use 1, 2 and 3

Run this under the zookeeper’s root folder to start the Zookeeper service.

1
./bin/zkServer.sh start > /dev/null 2>&1 &

2) Kafka

I downloaded the most recent stable version
Modify the “config/server.properties” with:

1
2
3
broker.id=1
port=9092
host.name=192.168.0.10 zookeeper.connect=192.168.0.18:2181,192.168.0.15:2181,192.168.0.16:2181

Boker.id should be 1,2 and 3 for each PI
Host.name is the machine IP address
zookeeper.connect should be equals in all

(Depends on each Raspberry PI version you are using)

Update the “bin/kafka-server-start.sh” with:

1
2
export JMX_PORT=${JMX_PORT:-9999}
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

Otherwise, JVM would complain not able to allocate the specified memory.

Update “bin/kafka-run-class.sh” with:

1
KAFKA_JVM_PERFORMANCE_OPTS="-client -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" # change -server to -client

You can run this to start Kafka

1
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties > /dev/null 2>&1 &

And that’s it! Now you can do some basic tests.

Create a topic

1
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.0.15:9092,192.168.0.16:9092,192.168.0.18:9092  --replication-factor 1 --partitions 1 --topic test

Describe this topic

1
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

Star a terminal producer

1
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Start a terminal consumer

1
/opt/kafka/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic test --from-beginning

I put my Zookeeper and Kafka in the “/opt” folder, but you can put it in any path.

Final Thoughts

I did use a Raspberry PI 3 B, but Raspberry PI already released The Raspberry Pi4 with 8g.
I don’t think we are too far from seeing a new Raspberry version, make Pi4 16gb or PI5, who knows.
I did use the Raspbian that is 32bits OS, and Raspberry PI already released Raspberry Pi OS, (yes they changed the name), that is the new official operating system and there’s a 64bits version.

With a more powerful computer, memory, and a 64bits OS it’s open to a lot of more ideas and scenarios. Kubernetes, Strimzi, and keda are just some initial things.

Another thing is that Kafka is about to remove the Apache Zookeeper dependency, and this will make a lot of changes.

I found a lot of tutorials about Kubernetes and 64bits OS for Raspberry, but could not find any Kubernetes Kafka example or Strimzi. I think with this new OS and more powerful hardware is just a question of time.

MicroK8s – is a powerful, lightweight, reliable production-ready Kubernetes distribution. It is an enterprise-grade Kubernetes distribution that has a small disk and memory footprint while offering carefully selected add-ons out-the-box, such as Istio, Knative, Grafana, Cilium, and more.

Canonical has released Ubuntu 19.10 with a focus on accelerating developer productivity in AI/ML, new edge capabilities for MicroK8s, and delivering the fastest GNOME desktop performance.
The Raspberry Pi 4 Model B is supported by Ubuntu 19.10. The latest board from the Raspberry Pi Foundation offers a faster system-on-a-chip with a processor that uses the Cortex-A72 architecture (quad-core 64-bit ARMv8 at 1.5GHz) and offers up to 8GB of RAM. With the Raspberry Pi 4 Model B, developers get access to a low-cost board, powerful enough to orchestrate workloads at the edge with MicroK8s.

Canonical and Raspberry

Raspberry with Ubuntu 20.04

K3s – a flavor of Kubernetes that is highly optimized for the edge. Though K3s is a simplified, miniature version of Kubernetes, it doesn’t compromise the API conformance and functionality.

Links

Kafka
confluent
Raspberry PI
8gb Raspberry PI4
Raspberry Pi OS
Strimzi
kubernetes
Keda
K3s
Microk8s

Some similar tutorial;

https://towardsdatascience.com/kafka-and-zookeeper-over-ubuntu-in-a-3-node-cluster-a-data-science-big-data-laboratory-part-4-of-4-47631730d240

http://czcodezone.blogspot.com/2014/11/setup-kafka-in-cluster.html

https://pandariel.com/posts/kafka-cluster/

https://rdiot.tistory.com/329

Kafka producer & Consumer Overview

How’s it going horse?
2020/08/19

If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.

This blog is just a quick review of Kafka Producer and Consumer.

Table of contents

1. Producer
2. Consumer
3. Link

Kafka has a notion of producer and consumer. The first one pushes messages to Kafka, while the second one fetches them.

Official docs;
Producer
Consumer

1. Producer

The primary role of a Kafka producer is to take producer properties & record as inputs and write it to an appropriate Kafka broker. Producers serialize, partitions, compresses and load balances data across brokers based on partitions.

In order to send the producer record to an appropriate broker, the producer first establishes a connection to one of the bootstrap servers. The bootstrap-server returns list of all the brokers available in the clusters and all the metadata details like topics, partitions, replication factor and so on. Based on the list of brokers and metadata details the producer identifies the leader broker that hosts the leader partition of the producer record and writes to the broker.

Important Producer Settings:

Acks
The acks setting specifies acknowledgements that the producer requires the leader to receive before considering a request complete. This setting defines the durability level for the producer.

Max.in.flight.requests.per.connection
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If this setting is greater than 1, pipelining is used when the producer sends the grouped batch to the broker. This improves throughput, but if there are failed sends there is a risk of out-of-order delivery due to retries (if retries are enabled). Note also that excessive pipelining reduces throughput.

Compression.type
Compression is an important part of a producer’s work, and the speed of different compression types differs a lot.

To specify compression type, use the compression.type property. It accepts standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’), as well as ‘uncompressed’ (the default, equivalent to no compression), and ‘producer’ (uses the compression codec set by the producer).

Compression is handled by the user thread. If compression is slow it can help to add more threads. In addition, batching efficiency impacts the compression ratio: more batching leads to more efficient compression.

Batch.size
Larger batches typically have better compression ratios and higher throughput, but they have higher latency.

Linger.ms
There is no simple guideline for setting linger.ms values; you should test settings on specific use cases. For small events (100 bytes or less), this setting does not appear to have much impact.

Producer performance tuning

Kafka tutorial advanced producers

Code

1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

Java Doc

Simple Java Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.codingharbour.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Instant;
import java.util.Properties;

public class SimpleKafkaProducer {

    public static void main(String[] args) {
        //create kafka producer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        Producer<String, String> producer = new KafkaProducer<>(properties);

        //prepare the record
        String recordValue = "Current time is " + Instant.now().toString();
        System.out.println("Sending message: " + recordValue);
        ProducerRecord<String, String> record = new ProducerRecord<>("java_topic", null, recordValue);

        //produce the record
        producer.send(record);
        producer.flush();

        //close the producer at the end
        producer.close();
    }
}


High-level overview of Kafka producer components – Kafka the Definitive Guide Book

2. Consumer

The primary role of a Kafka consumer is to read data from an appropriate Kafka broker. In order to understand how to read data from Kafka, you first need to understand its consumers and consumer groups.

A consumer group is a group of consumers that share the same group id. When a topic is consumed by consumers in the same group, every record will be delivered to only one consumer. As the official documentation states: “If all the consumer instances have the same consumer group, then the records will effectively be load-balanced over the consumer instances.”
This way you can ensure parallel processing of records from a topic and be sure that your consumers won’t be stepping on each other toes.
Each topic consists of one or more partitions. When a new consumer is started it will join a consumer group and Kafka will then ensure that each partition is consumed by only one consumer from that group.


Kafka the Definitive Guide Book


Kafka the Definitive Guide Book

You can have many consumers reading the same records from the topic, as long as they all have different group ids.


Kafka the Definitive Guide Book

Consumer groups provide the following advantages:

  • Each instance receives messages from one or more partitions (which are “automatically” assigned to it), and the same messages won’t be received by the other instances (assigned to different partitions). In this way, we can scale the number of the instances up to the number of the partitions (having one instance reading only one partition). In this case, a new instance joining the group is in an idle state without being assigned to any partition.
  • Having instances as part of different consumer groups means providing a publish/subscribe pattern in which the messages from partitions are sent to all the instances across the different groups. Inside the same consumer group, the rules are as shown in the secondi image, but across different groups, the instances receive the same messages (as shown in the third image). This is useful when the messages inside a partition are of interest for different applications that will process them in different ways. We want all the interested applications to receive all the same messages from the partition.
  • Another advantage of consumer groups is the rebalancing feature. When an instance joins a group, if enough partitions are available (that is, the limit of one instance per partition hasn’t been reached), a rebalancing starts. The partitions are reassigned to the current instances, plus the new one. In the same way, if an instance leaves a group, the partitions are reassigned to the remaining instances.
  • Offset commits are managed automatically.

Code

Java Doc

Simple Java Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.codingharbour.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        //create kafka consumer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        //subscribe to topic
        consumer.subscribe(Collections.singleton("java_topic"));

        //poll the record from the topic
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Message received: " + record.value());
            }
            consumer.commitAsync();
        }
    }
}

3. Links

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://www.linkedin.com/pulse/kafka-producer-overview-sylvester-daniel

https://www.linkedin.com/pulse/kafka-producer-delivery-semantics-sylvester-daniel/

https://www.linkedin.com/pulse/kafka-consumer-overview-sylvester-daniel/

https://www.linkedin.com/pulse/kafka-consumer-delivery-semantics-sylvester-daniel

A Twitter follow list for Kafka

What’s the crack jack?
2020/05/17

If you like me use Twitter as a source of information this will somehow help you. I just created a Twitter follow list for Apache Kafka.

I came with this post idea after I saw the Confluent Community Catalyst program, and of course here we can get a nice list to start.

This is not an exhaustive list, so if you know someone that you think should be here, please post a comment with. And of course, some here are Kafka related.

Starting with the basics, I add the official ones;

@apachekafka
@confluentinc

Confluent

@jaykreps
@nehanarkhede
@tlberglund
@miguno
@gAmUssA
@riferrei

Kafka connect

@rmoff

Debezium

@debezium
@gunnarmorling

Strimzi

@strimziio

With Spark relations

@jaceklaskowski
@matei_zaharia

With Flink relations

@StephanEwen

Flink and Apache Nifi

@PaaSDev

Cloudera

@cloudera

MapR

@caroljmcdonald
@Ellen_Friedman
@ted_dunning

Azkarra
@fhussonnois

Azure

@abhi_tweeter

Oracle

@igfasouza

without category

@martinkl
@KaiWaehner
@myfear
@hguerreroo
@gwenshap
@AdiPolak
@MichaelDrogalis

Feel free to send anyone that you think should be here.
I couldn’t find anyone for IBM and Google clouds.
I couldn’t find anyone for Python Faust as well.

Sink Kafka connect

Well?
2020/05/05

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect
Source Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your Sink Kafka Connector.

Table of contents

1. Use Case
2. Code
3. Links

1. Use Case

One scenario that has become popular and I came across with some questions around in the last months is to use Kafka to trigger functions as service.

In simple words: Send a message in your kafka topic and your function(s) gets invoked with the payload which you sent to Kafka, you can now have a function trigger in response to messages in Kafka Topics.

My example is using OCI functions.

2. Code

The connector needs extends SinkConnector. Is almost the same from source and you need to implement six methods here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public Class<? extends Task> taskClass() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

    @Override
    public ConfigDef config() {
        // TODO Auto-generated method stub
        return null;
    }

And the Task needs extends SinkTask Is almost the same from source but now you don’t have the poll method and instead, you have put.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // here is where the magic happenings.
        // just add a boolean triggerFn method that trigger the function.
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

Run Docker

1
2
3
4
5
6
7
docker run -it --rm --name fn-oci-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v fn-sink-kafka-connector/target/fn-sink-kafka-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/fn-connector \
    debezium/connect:latest

Configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -X POST \
    http://localhost:8082/connectors \
    -H 'content-type: application/json' \
    -d '{
    "name": "FnSinkConnector",
    "config": {
      "connector.class": "com.fn.sink.kafka.connect.FnSinkConnector",
      "tasks.max": "1",
      "topics": "test-sink-topic",
      "tenant_ocid": "<tenant_ocid>",
      "user_ocid": "<user_ocid>",
      "public_fingerprint": "<public_fingerprint>",
      "private_key_location": "/path/to/kafka-connect/secrets/<private_key_name>",
      "function_url": "<FUNCTION_URL>"
    }
  }'

I created a package “http” where I add the OCI FN part, the code is based on this one here.

GitHub
You can check the full code in my GitHub.

3. Links

https://docs.confluent.io/current/connect/devguide.html

https://github.com/oracle/oci-java-sdk/blob/master/bmc-examples/src/main/java/InvokeFunctionExample.java

https://docs.cloud.oracle.com/en-us/iaas/Content/Functions/Concepts/functionsoverview.htm

Happy Coding.

Source Kafka connect

How’s the man?
2020/04/29

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your own Source Kafka Connector, and in the end, I’ll show an easy way to deploy your code.
Openweathermap source kafka Connector.

Table of contents

1. Get Started
2. Use Case
3. Shema
4. AbstractConfig
5. Connector
6. Task
7. Code
8. SMTs
9. Converter
10. Transformation
11. Deploy
12. Links

1. Get Started

Kafka Connect aims to reduce the burden of connecting Kafka with external systems such as databases, key-value stores, search indexes, and even file systems. Connectors are the high-level abstractions that manage the streaming of data from and to Kafka. Each connector speaks the language of the external system it connects to, hiding this complexity from the application developer who can focus on more valuable business logic.

The best place to start when implementing your Source Connector is the Confluent Connector Development Guide.

There’s a code sample here;

This maven quickstart is used to generate a skeleton plugin for Kafka Connect. Here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0

mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0 \
    -Dpackage=com.github.jcustenborder.kafka.connect.test \
    -DgroupId=com.github.jcustenborder.kafka.connect \
    -DartifactId=testconnect \
    -DpackageName=com.github.jcustenborder.kafka.connect.test \
    -Dversion=1.0-SNAPSHOT

The nice thing about the Archetype is that it will create the boilerplate code for your connector, some basic properties, and some empty tests.

The dependencies that I used are;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-transforms</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>com.konghq</groupId>
            <artifactId>unirest-java</artifactId>
            <version>3.1.02</version>
        </dependency>

And we also need the maven plugin to generate a jar with all dependencies.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

2. Use Case

OpenWeatherMap real-time weather data website. This API is already used in a lot of examples around Development. I already used some time for IoT and Data analyse demos.
They have a free API tier, which is limited to 1 request per second, which is quite enough for tracking changes in weather. Their different API schemas return plenty of numeric and textual data, all interesting for analysis. This is a perfect example for us. I tried to choose an API that contains a more complex Json, as a result, to show more in a didactic way the Shema part.

Check the API.

A partition is a division of source records that usually depends on the source medium.
In our case, there is only one API URL and we are only ever requesting current data, so a very good partition would be query weather data for different cities and each partition would be processed by a separate task.

3. Schema

The Schema interface is one of the most important components of Kafka Connect. This allows you to define what the data looks like without having to worry about the way the data is stored.

Docs here.

The best way to start is to have a look at the API result, and you can see a sample at

https://samples.openweathermap.org/data/2.5/weather?q=London,uk&appid=b6907d289e10d714a6e88b30761fae22

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
  "coord": {
    "lon": -0.13,
    "lat": 51.51
  },
  "weather": [
    {
      "id": 300,
      "main": "Drizzle",
      "description": "light intensity drizzle",
      "icon": "09d"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 280.32,
    "pressure": 1012,
    "humidity": 81,
    "temp_min": 279.15,
    "temp_max": 281.15
  },
  "visibility": 10000,
  "wind": {
    "speed": 4.1,
    "deg": 80
  },
  "clouds": {
    "all": 90
  },
  "dt": 1485789600,
  "sys": {
    "type": 1,
    "id": 5091,
    "message": 0.0103,
    "country": "GB",
    "sunrise": 1485762037,
    "sunset": 1485794875
  },
  "id": 2643743,
  "name": "London",
  "cod": 200
}

Here if you have a look in the json you are going to see that the json has some structures inside, for example, you can see weather that contains, id, main, descriptions and so on.

We can map this in an easy way to a schema;

1
2
3
4
5
6
7
8
public static Schema WEATHER_SCHEMA = SchemaBuilder.struct()
        .name("Weather")
        .version(1)
        .field(ID, Schema.INT64_SCHEMA)
        .field(MAIN, Schema.STRING_SCHEMA)
        .field(DESCRIPTION, Schema.STRING_SCHEMA)
        .field(ICON, Schema.STRING_SCHEMA)
        .build();

But sometimes the json can contain a more complex structure and for this, we can create a schema that uses another schema as a type.
Let’s see wind

1
2
3
4
5
6
7
public static Schema WIND_SCHEMA = SchemaBuilder.struct()
        .name("Wind")
        .version(1)
        .field(SPEED, Schema.OPTIONAL_FLOAT32_SCHEMA)
        .field(DEG, Schema.OPTIONAL_INT32_SCHEMA)
        .optional()
        .build();

So now I can create a schema using the others that I created before.

1
2
3
4
5
6
7
8
public static Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(NAME, Schema.STRING_SCHEMA)
        .field(WEATHER, WEATHER_ARRAY_SCHEMA)
        .field(MAIN, MAIN_SCHEMA)
        .field(WIND, WIND_SCHEMA)
        .build();

4. AbstractConfig

AbstractConfig is an abstract class that defines an interface that our WeatherAPIConfig
needs to adhere to, and here is where we get the configuration for our connector.

We need to pass the openWeatherApiKey, a list of cities, that we want to get the weather data and the kafka topic. I add pollFrequency as a hardcoded value because the API has a limitation of the number of requests in the free tier.

1
2
3
4
5
6
7
public WeatherAPIConfig(Map<String, ?> originals) {
    super(config(), originals);
    this.openWeatherApiKey = this.getString(OPEN_WEATHER_API_KEY);
    this.cities = this.getString(CITIES);
    this.pollFrequency = this.getLong(POLL_FREQUENCY);
    this.kafkaTopic = this.getString(KAFKA_TOPIC);
}

5. Connector

SourceConnector is an abstract class that defines an interface that our WeatherAPIConnector
needs to adhere to. There are six methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // here is were we implement the configuration class
}

@Override
public Class<? extends Task> taskClass() {
    // just need pass the name of the class
    return null;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    // here is a function partition strategy to split the list o cities to a different task
    return null;
}

@Override
public void stop() {
   // since we don't start anything on Star method we don't need do anything here
}

@Override
public ConfigDef config() {
    // here we already defined in the weatherAPI class
    return null;
}

6. Task

In a similar manner to the Source Connector class, we implement the Source Task abstract class. Here we have only 4 methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // initialize the configuration and the weather API client
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    // this method is going to be called continuous and because of the API limitation, we need to add a sleep to decrease the poll frequency.
    return null;
}

@Override
public void stop() {
    // TODO Auto-generated method stub      
}

7. Code

I create one package model where I add my DTO to the fields that I used in my schema and package shema.

WeatherAPIClient this class just calls the API and passes the list of cities.
I use the WeatherAPIConfig config to get all configurations;

1
2
3
4
5
6
7
8
public List<Weather> getCurrentWeather() {
    return Arrays.stream(config.getCities().split(","))
            .map(city -> Unirest.get(BASE_URL)
                    .queryString("q", city)
                    .queryString("APPID", config.getOpenWeatherApiKey())
                    .asObject(Weather.class))
            .map(HttpResponse::getBody)
            .collect(Collectors.toList());

8. SMTs

From an architectural point of view, SMT resides exactly between the connectors and the converters and they apply their data transformation just before storing the data into Kafka and right before extracting the data from it.

The Kafka Connect frameworks come with many built-in SMT to address the most common transformations use cases like for example:

  • mask sensitive message fields,
  • add identifiers,
  • tag events,
  • cast data types to comply to the destination,
  • convert data (e.g. timestamp conversion like time-based data conversion standardization),
  • slim down messages
  • or re-routing them via re-partitioning.

Full documentation here.

SMT is configured right beside connectors as their configuration properties reside inside their same file.

Let’s see one example using FileSource Connector

1
2
3
4
5
6
7
8
9
10
11
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

As you can see SMT can be combined and chained pretty easily by simply configuring them next to the Kafka Connect sinks and sources.

But the most important thing that should always be kept in mind is the “one message at a time”.
This is particularly crucial because it strongly limits SMT’s usage. In particular, it doesn’t permit any kind of transformation that is characterized by any kind of aggregation logic. Processing one message at a time is mandatory due to how partitions work and how the offset semantic is implemented in Kafka.

9. Converter

Here I create a package converter where I add an example of a converter, Serialize, and Deserializer. Here we have only three methods to implement.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    // get the properties
}

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
    // convert to array of bytes
    return null;
}

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
    // convert to schema value
    return null;
}

StringConverter implements Converter
StringDeserializer implements Deserializer
StringSerializer implements Serializer

10. Transformation

Here I just add an example of transformation.

IntegrityCheck> implements Transformation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void configure(Map<String, ?> props) {
    //
}

@Override
public R apply(R record) {
   // check messages with null key with schema and without schema
}

@Override
public ConfigDef config() {
    return CONFIG_DEF;
}

11. Deploy

There is an official Kafka Connect Docker image here, but you can use any kafka connect docker image like Debezium

1
2
3
4
5
6
7
docker run -it --rm --name weather-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v openweathermap-connector/target/openweathermap-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/openweathermap-connector \
    debezium/connect:latest

And we need call the rest api with this Json to start the connector;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
   “name”: “weathermap-connector”,
  “Config”: {
      “connector-class”:”com.openweathermap.kafka.connect.WeatherAPIConnector,
      “Value.converter”:”com.openweathermap.kafka.connect.converter.StringConverter,
      “value.converter.encoding”:”UTF-8,
      “tasks.max”: “1,
      “open.weather.api.key”:”12312312312312313123123123123,
      “cities”: “Ireland, Brazil”,
      “kafka.topic”:”weather”,
      “name”:”weather-connector”,
      “transform”:”ReplaceField,IntegrityCheck”,
      “transform.ReplaceField.type”:”com.openweathermap.kafka.connect.transform.ReplaceField$Value”,
      “transform.ReplaceField.blacklist”:”main”,
     "transform.IntegrityCheck.type”:”com.openweathermap.kafka.connect.transform.IntegrityCheck$Value”,
      “transform.IntegrityCheck.field”:”integrity”,
  }
}

GitHub
You can check the full code in my Github, and I did a Scala demo as well, where I just follow the same structure of folders, packages and classes.

12. Links

https://docs.confluent.io/current/connect/devguide.html

https://www.confluent.io/wp-content/uploads/Partner-Dev-Guide-for-Kafka-Connect.pdf?x18424

https://docs.confluent.io/current/connect/userguide.html

https://docs.confluent.io/3.1.1/connect/userguide.html

https://www.confluent.jp/blog/create-dynamic-kafka-connect-source-connectors/

Stay tuned! Next blog post I’ll show how to code a Sink Kafka connector.

Kafka Connect

How are you?
2020/04/21

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Source Standalone mode
2. Source Distributed mode
3. Sink
4. Don’t use Docker composer
5. Lensesio
6. Strimzi
7. Debezium
8. JDBC
9. Link

1. Source Standalone mode

Standalone mode is the best way to get started with minimum config and infrastructure setup. In this section, we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.

Here I’m going to show how to use the FileSource Connector

Let’s create a Docker compose file. “docker-compose.yml”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
      - kafka

In the same directory where we have created the yaml file execute the following command to start Kafka cluster. When the below command runs for the very first time it downloads the image. Once the image is downloaded it creates a Kafka cluster.

1
docker-compose up

For starting any Kafka connect cluster we require – workers config and connector (file-stream) config.
Create two files: workers-config.properties and file-stream-connector-properties.

Workers-config.properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

File-stream-connector-properties:

1
2
3
4
5
6
7
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone

You should see three files in the folder; docker-compose.yml, file-stream-connector-properties and workers-config.properties

Create an input file source-input.txt, the content of the file is transferred to Kafka topic.

1
touch source-input.txt

Mount a host directory in a docker container: Make sure we are in the directory where we have created the files. After mount, we automatically switch to cp-kafka-connect.

1
docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host confluentinc/cp-kafka-connect

This docker image expect the connector in this folder;
/etc/kafka-connect/jars

Create Kafka topic “kafka-connect-standalone” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create standalone connector using workers-config.properties and file-stream-connector-properties.

1
connect-standalone workers-config.properties file-stream-connector-properties

Now you can test: Open file source-input.txt and type some message to it & save it. The message should have been transferred to Kafka topic.

What happened in the background: We wrote data to the source file and Kafka connect standalone pushed the data to topic. No programming, just configs.

Now stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in the given directory. We have a new guest in the form of a file “standalone.offsets”.
This file is created by Kafka connect to keep track of from where it should resume reading messages from the source file on re-starts.
When Kafka connect starts again it should resume reading without publishing duplicate messages to the topic.

Try to execute the above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate if you do not have a duplicate message.

2. Source Distributed mode

Now let’s configure a connector in distributed mode.

Create Kafka topic “kafka-connect-distibuted” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create a config file “connect-source.json”.

1
2
3
4
5
6
7
8
9
name=file-stream-kafka-connect-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=source-input.txt
topic=kafka-connect-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

and run;

1
curl -d @<path-to-config-file>/connect-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Now write some message in the source file and once the file is saved, all messages are posted in topic “kafka-connect-distributed”.

check for the message copied from file to topic by FileSourceConnector. Note that messages are stored in JSON format as the connector topic created earlier with config value.converter.schemas.enable=true.

We can run the command to check the topic;

1
kafka-console-consumer --topic kafka-connect-distributed --from-beginning --bootstrap-server 127.0.0.1:9092

3. Sink

Now we need a sink example; Let’s look at a JDBC one. MondoBD.
We can add this to the docker composer file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

and run;

1
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Create a connect-mongodb-sink.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter

  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

Elasticsearch

1
2
3
4
5
6
7
elasticsearch:
    image: itzg/elasticsearch:2.4.3
    environment:
      PLUGINS: appbaseio/dejavu
      OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
    ports:
      - "9200:9200"

Create a config file “connect-elasticsearch-sink.json”.

1
2
3
4
5
6
7
8
9
10
11
name=sink-elastic-twitter-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=http://elasticsearch:9200
type.name=kafka-connect
key.ignore=true

Here is a nice link to check about Docker options and images.

I found this with a nice and simple docker composer for single or multiple
nodes here.

4. Don’t use Docker composer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper:3.5.5

#Start Kafka Broker
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0

#Start Kafka Connect
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=kafka_connect_configs \
-e OFFSET_STORAGE_TOPIC=kafka_connect_offsets \
-e STATUS_STORAGE_TOPIC=kafka_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
debezium/connect:1.0

5. Lensesio

Another option is to use the lenses.io image that already contains several kafka connectors pre installed and a nice user interface.

1
2
3
4
5
6
7
8
9
10
11
12
13
services:
  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

https://lenses.io/
https://github.com/lensesio/fast-data-dev

6. Strimzi

Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. It provides container images and Operators for running Kafka on Kubernetes.

https://strimzi.io/

https://medium.com/@sincysebastian/setup-kafka-with-debezium-using-strimzi-in-kubernetes-efd494642585

https://medium.com/@vamsiramakrishnan/strimzi-on-oracle-kubernetes-engine-oke-kafka-connect-on-oracle-streaming-service-oss-a-c73cc9714e90

https://itnext.io/kafka-connect-on-kubernetes-the-easy-way-b5b617b7d5e9

7. Debezium

Debezium before, it is an open source project for applying the Change Data Capture (CDC) pattern to your applications using Kafka.

https://debezium.io/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  postgres:
    image: debezium/postgres:9.6
    ports:
     - "5432:5432"
    environment:
     - POSTGRES_USER=postgresuser
     - POSTGRES_PASSWORD=postgrespw
     - POSTGRES_DB=inventory
  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.5.2
    ports:
     - "9200:9200"
    environment:
     - http.host=0.0.0.0
     - transport.host=127.0.0.1
     - xpack.security.enabled=false
  connect:
    image: debezium/connect-jdbc-es:${DEBEZIUM_VERSION}
    build:
      context: debezium-jdbc-es
    ports:
     - 8083:8083
     - 5005:5005
    links:
     - kafka
     - mysql
     - postgres
     - elastic
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_source_connect_statuses

And of course, you can use Strimzi and Debezium together here.

8. JDBC

One of the most common integrations that people want to do with Kafka is getting data in or from a database.

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/

https://dev.to/rmoff/streaming-data-from-kafka-to-s3-video-walkthrough-2elh

9. Link

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html

https://docs.confluent.io/3.1.2/cp-docker-images/docs/quickstart.html

https://github.com/confluentinc/cp-docker-images/tree/5.3.1-post/examples

https://www.baeldung.com/kafka-connectors-guide

https://blog.softwaremill.com/do-not-reinvent-the-wheel-use-kafka-connect-4bcabb143292

https://www.confluent.io/blog/webify-event-streams-using-kafka-connect-http-sink/

Stay tuned! Next blog post I’ll show how to code your own Kafka connector.

kafka Connector Architecture

What’s the story Rory?
2020/04/05

This blog post is part of my series of posts regarding “Kafka Connect Overview“.
If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Kafka Connect
2. Source & Sink Connectors
3. Standalone & Distributed
4. Converters & Transforms
5. Life cycle
6. Code
7. Books
8. Link

1. Kafka Connect

Kafka Connects goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and its unique features and design decisions

Kafka Connect has three major models in its design:

  • Connector model
  • Worker model
  • Data model

The connector model addresses three key user requirements. First, Kafka Connect performs broad copying by default by having users define jobs at the level of Connectors which then break the job into smaller Tasks. This two level scheme strongly encourages connectors to use configurations that encourage copying broad swaths of data since they should have enough inputs to break the job into smaller tasks. It also provides one point of parallelism by requiring Connectors to immediately consider how their job can be broken down into subtasks, and select an appropriate granularity to do so. Finally, by specializing source and sink interfaces, Kafka Connect provides an accessible connector API that makes it very easy to implement connectors for a variety of systems.

The worker model allows Kafka Connect to scale to the application. It can run scaled down to a single worker process that also acts as its own coordinator, or in clustered mode where connectors and tasks are dynamically scheduled on workers. However, it assumes very little about the process management of the workers, so it can easily run on a variety of cluster managers or using traditional service supervision. This architecture allows scaling up and down, but Kafka Connect’s implementation also adds utilities to support both modes well. The REST interface for managing and monitoring jobs makes it easy to run Kafka Connect as an organization-wide service that runs jobs for many users. Command line utilities specialized for ad hoc jobs make it easy to get up and running in a development environment, for testing, or in production environments where an agent-based approach is required.

The data model addresses the remaining requirements. Many of the benefits come from coupling tightly with Kafka. Kafka serves as a natural buffer for both streaming and batch systems, removing much of the burden of managing data and ensuring delivery from connector developers. Additionally, by always requiring Kafka as one of the endpoints, the larger data pipeline can leverage the many tools that integrate well with Kafka. This allows Kafka Connect to focus only on copying data because a variety of stream processing tools are available to further process the data, which keeps Kafka Connect simple, both conceptually and in its implementation. This differs greatly from other systems where ETL must occur before hitting a sink. In contrast, Kafka Connect can bookend an ETL process, leaving any transformation to tools specifically designed for that purpose. Finally, Kafka includes partitions in its core abstraction, providing another point of parallelism

In simples words

Kafka Connect is a distributed, scale, fault-tolerant service designed to reliably stream data between Kafka and other data systems. Data is produced from a source and consumed to a sink.

Connect tracks the offset that was last consumed for a source, to restart task at the correct starting point after a failure. These offsets are different from Kafka offsets, they are based on the sourse system like database, file, etc.

In standalone mode, the source offset is tracked in a local file and in a distributed mode, the source offset is tracked in a Kafka topic.

2. Source & Sink connectors

Producers and Consumers provide complete flexibility to send any data to Kafka or process in any way. This flexibility means you do everything yourself.

Kafka Connect’s simple frameworks allows;

  • developers to create connectors that copy data to or from others systems.
  • Operators to use said connectors just by writing configuration files and submitting them to Connect. (No code)
  • Community and 3rd-party engineers to build reliable plugins for common data sources and sinks.
  • Deployments to deliver fault-tolerant and automated load balance out-of-the-box.

And the frameworks do the hard work;

  • Serialization and deserialization.
  • Schema registry integration.
  • Fault-tolerant and failover.
  • Partitioning and scale-out.
  • And let the developers focus on domain specific details.

3. Standalone & Distributed

In standalone mode we have a source or a Sink and a Kafka broker, when we deploy Kafka connect in the standalone we actually need to pass a configuration file containing all the connection properties that we need to run. So standalone’s main way of providing configuration to our connector is by using properties files and not the Rest API.

In the distributed mode we usually have more than one worker, since these workers can be in different machines or containers they can not share the same storage space, so a properties file is out of the question. Instead kafka connect in distributed mode leverage kafka topics in order to sink between themselves.

4. Converters & Transforms

Pluggable API to convert data between native formats and Kafka. Just like the name says. Converters are used to come for data from a format to another.
In the Source connectors converters are invoked after the data has been fetched from the source and before it is published to kafka.
In the Sink connectors converters are invoked after the data has been consumed from Kafka and before it is stored in the sink.

Apache Kafka ships with Json Converter.

Transform is a simple operation that can be applied in the message level.

There’s a nice blog post about Single message transforms here.

Single message transforms SMT modify events before storing in Kafka, mask sensitive information, add identifiers, tag events, remove unnecessary columns and more, modify events going out of Kafka, route high priority events to faster datastore, cast data types to match destination and more.

5. Life cycle

I build this animation using the slides from (Randall Hauch, Confluent) Kafka Summit SF 2018 here.

Sequence diagram

6. Code

Confluent makes available a code example that you can found here.

Confluent hub

7. Books

Modern Big Data Processing with Hadoop

8. Links

https://docs.confluent.io/current/connect/managing/confluent-hub/component-archive.html

https://docs.confluent.io/current/connect/design.html

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/