Sink Kafka connect

Well?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect
Source Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your Sink Kafka Connector.

Table of contents

1. Use Case
2. Code
3. Links

1. Use Case

One scenario that has become popular and I came across with some questions around in the last months is to use Kafka to trigger functions as service.

In simple words: Send a message in your kafka topic and your function(s) gets invoked with the payload which you sent to Kafka, you can now have a function trigger in response to messages in Kafka Topics.

My example is using OCI functions.

2. Code

The connector needs extends SinkConnector. Is almost the same from source and you need to implement six methods here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public Class<? extends Task> taskClass() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

    @Override
    public ConfigDef config() {
        // TODO Auto-generated method stub
        return null;
    }

And the Task needs extends SinkTask Is almost the same from source but now you don’t have the poll method and instead, you have put.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // here is where the magic happenings.
        // just add a boolean triggerFn method that trigger the function.
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

Run Docker

1
2
3
4
5
6
7
docker run -it --rm --name fn-oci-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v fn-sink-kafka-connector/target/fn-sink-kafka-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/fn-connector \
    debezium/connect:latest

Configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -X POST \
    http://localhost:8082/connectors \
    -H 'content-type: application/json' \
    -d '{
    "name": "FnSinkConnector",
    "config": {
      "connector.class": "com.fn.sink.kafka.connect.FnSinkConnector",
      "tasks.max": "1",
      "topics": "test-sink-topic",
      "tenant_ocid": "<tenant_ocid>",
      "user_ocid": "<user_ocid>",
      "public_fingerprint": "<public_fingerprint>",
      "private_key_location": "/path/to/kafka-connect/secrets/<private_key_name>",
      "function_url": "<FUNCTION_URL>"
    }
  }'

I created a package “http” where I add the OCI FN part, the code is based on this one here.

GitHub
You can check the full code in my GitHub.

3. Links

https://docs.confluent.io/current/connect/devguide.html

https://github.com/oracle/oci-java-sdk/blob/master/bmc-examples/src/main/java/InvokeFunctionExample.java

https://docs.cloud.oracle.com/en-us/iaas/Content/Functions/Concepts/functionsoverview.htm

Happy Coding.

Leave a Reply

Your email address will not be published. Required fields are marked *