{"id":759,"date":"2020-05-05T18:34:25","date_gmt":"2020-05-05T17:34:25","guid":{"rendered":"http:\/\/www.igfasouza.com\/blog\/?p=759"},"modified":"2021-05-20T14:20:33","modified_gmt":"2021-05-20T13:20:33","slug":"sink-kafka-connect","status":"publish","type":"post","link":"http:\/\/www.igfasouza.com\/blog\/sink-kafka-connect\/","title":{"rendered":"Sink Kafka connect"},"content":{"rendered":"<p><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_kafka_connect.jpg\" alt=\"\" width=\"703\" height=\"354\" class=\"alignnone size-full wp-image-760\" srcset=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_kafka_connect.jpg 703w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_kafka_connect-300x151.jpg 300w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_kafka_connect-624x314.jpg 624w\" sizes=\"auto, (max-width: 703px) 100vw, 703px\" \/><\/p>\n<p><b>Well?<\/b><\/p>\n<p>This blog post is part of my series of posts regarding Kafka Connect.<br \/>\nIf you&#8217;re not familiar with Kafka, I suggest you have a look at some of my previous post;<\/p>\n<p><a href=\"http:\/\/www.igfasouza.com\/blog\/what-is-kafka\/\" rel=\"noopener\" target=\"_blank\">What is Kafka?<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connect-overview\/\" rel=\"noopener\" target=\"_blank\">Kafka Connect Overview<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connector-architecture\/\" rel=\"noopener\" target=\"_blank\">Kafka Connector Architecture<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connect\/\" rel=\"noopener\" target=\"_blank\">Kafka Connect<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/source-kafka-connect\/\" rel=\"noopener\" target=\"_blank\">Source Kafka Connect<\/a><\/p>\n<p>This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion. <\/p>\n<p><b>TL;DR<\/b><\/p>\n<p>This post is about code, I\u2019ll show all steps to develop your Sink Kafka Connector.<\/p>\n<h2>Table of contents<\/h2>\n<p>1. Use Case<br \/>\n2. Code<br \/>\n3. Links<\/p>\n<h2>1. Use Case<\/h2>\n<p>One scenario that has become popular and I came across with some questions around in the last months is to use Kafka to trigger functions as service.<\/p>\n<p>In simple words: Send a message in your kafka topic and your function(s) gets invoked with the payload which you sent to Kafka, you can now have a function trigger in response to messages in Kafka Topics.<\/p>\n<p>My example is using OCI functions.<\/p>\n<h2>2. Code<\/h2>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_class.jpg\" alt=\"\" width=\"924\" height=\"819\" class=\"alignnone size-full wp-image-761\" srcset=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_class.jpg 924w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_class-300x266.jpg 300w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_class-768x681.jpg 768w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/05\/sink_class-624x553.jpg 624w\" sizes=\"auto, (max-width: 924px) 100vw, 924px\" \/><\/p>\n<p>The connector needs extends SinkConnector. Is almost the same from source and you need to implement six methods here.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;height:300px;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/>21<br \/>22<br \/>23<br \/>24<br \/>25<br \/>26<br \/>27<br \/>28<br \/>29<br \/>30<br \/>31<br \/>32<br \/>33<br \/>34<br \/>35<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> version<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> start<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;<\/span> props<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> Class<span class=\"sy0\">&lt;?<\/span> <span class=\"kw1\">extends<\/span> Task<span class=\"sy0\">&gt;<\/span> taskClass<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> List<span class=\"sy0\">&lt;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;&gt;<\/span> taskConfigs<span class=\"br0\">&#40;<\/span><span class=\"kw4\">int<\/span> maxTasks<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> stop<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> ConfigDef config<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>And the Task needs extends SinkTask Is almost the same from source but now you don&#8217;t have the poll method and instead, you have put.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;height:300px;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/>21<br \/>22<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> version<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> start<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;<\/span> props<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> put<span class=\"br0\">&#40;<\/span>Collection<span class=\"sy0\">&lt;<\/span>SinkRecord<span class=\"sy0\">&gt;<\/span> records<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ here is where the magic happenings.<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ just add a boolean triggerFn method that trigger the function.<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; @Override<br \/>\n&nbsp; &nbsp; <span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> stop<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p><b>Run Docker<\/b><\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/><\/div><\/td><td><div class=\"java codecolorer\">docker run <span class=\"sy0\">-<\/span>it <span class=\"sy0\">--<\/span>rm <span class=\"sy0\">--<\/span>name fn<span class=\"sy0\">-<\/span>oci<span class=\"sy0\">-<\/span>connect<span class=\"sy0\">-<\/span>demo <span class=\"sy0\">-<\/span>p <span class=\"nu0\">8083<\/span><span class=\"sy0\">:<\/span><span class=\"nu0\">8083<\/span> <span class=\"sy0\">-<\/span>e GROUP_ID<span class=\"sy0\">=<\/span><span class=\"nu0\">1<\/span> \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>e BOOTSTRAP_SERVERS<span class=\"sy0\">=<\/span><span class=\"st0\">&quot;bootstrap_URL&quot;<\/span> \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>e CONFIG_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID<span class=\"sy0\">-<\/span>config\u201d \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>e OFFSET_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID<span class=\"sy0\">-<\/span>offset\u201d \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>e STATUS_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID<span class=\"sy0\">-<\/span>status\u201d \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>v fn<span class=\"sy0\">-<\/span>sink<span class=\"sy0\">-<\/span>kafka<span class=\"sy0\">-<\/span>connector<span class=\"sy0\">\/<\/span>target<span class=\"sy0\">\/<\/span>fn<span class=\"sy0\">-<\/span>sink<span class=\"sy0\">-<\/span>kafka<span class=\"sy0\">-<\/span>connector<span class=\"sy0\">-<\/span>0.0.1<span class=\"sy0\">-<\/span>SNAPSHOT<span class=\"sy0\">-<\/span>jar<span class=\"sy0\">-<\/span>with<span class=\"sy0\">-<\/span>dependencies.<span class=\"me1\">jar<\/span><span class=\"sy0\">\/:\/<\/span>kafka<span class=\"sy0\">\/<\/span>connect<span class=\"sy0\">\/<\/span>fn<span class=\"sy0\">-<\/span>connector \\<br \/>\n&nbsp; &nbsp; debezium<span class=\"sy0\">\/<\/span>connect<span class=\"sy0\">:<\/span>latest<\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p><b>Configure<\/b><\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/><\/div><\/td><td><div class=\"java codecolorer\">curl <span class=\"sy0\">-<\/span>X POST \\<br \/>\n&nbsp; &nbsp; http<span class=\"sy0\">:<\/span><span class=\"co1\">\/\/localhost:8082\/connectors \\<\/span><br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>H <span class=\"st0\">'content-type: application\/json'<\/span> \\<br \/>\n&nbsp; &nbsp; <span class=\"sy0\">-<\/span>d <span class=\"st0\">'{<br \/>\n&nbsp; &nbsp; &quot;name&quot;: &quot;FnSinkConnector&quot;,<br \/>\n&nbsp; &nbsp; &quot;config&quot;: {<br \/>\n&nbsp; &nbsp; &nbsp; &quot;connector.class&quot;: &quot;com.fn.sink.kafka.connect.FnSinkConnector&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;tasks.max&quot;: &quot;1&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;topics&quot;: &quot;test-sink-topic&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;tenant_ocid&quot;: &quot;&lt;tenant_ocid&gt;&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;user_ocid&quot;: &quot;&lt;user_ocid&gt;&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;public_fingerprint&quot;: &quot;&lt;public_fingerprint&gt;&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;private_key_location&quot;: &quot;\/path\/to\/kafka-connect\/secrets\/&lt;private_key_name&gt;&quot;,<br \/>\n&nbsp; &nbsp; &nbsp; &quot;function_url&quot;: &quot;&lt;FUNCTION_URL&gt;&quot;<br \/>\n&nbsp; &nbsp; }<br \/>\n&nbsp; }'<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>I created a package &#8220;http&#8221; where I add the OCI FN part, the code is based on this one <a href=\"https:\/\/docs.cloud.oracle.com\/en-us\/iaas\/Content\/API\/Concepts\/signingrequests.htm?TocPath=Developer%20Tools%20|REST%20APIs%20|_____4#Java\" rel=\"noopener\" target=\"_blank\">here<\/a>. <\/p>\n<p><b>GitHub<\/b><br \/>\nYou can check the full code in my <a href=\"https:\/\/github.com\/igfasouza\/oci-fn-connector-sink\" rel=\"noopener\" target=\"_blank\">GitHub<\/a>.<\/p>\n<h2>3. Links<\/h2>\n<p><a href=\"https:\/\/docs.confluent.io\/current\/connect\/devguide.html\" rel=\"noopener\" target=\"_blank\">https:\/\/docs.confluent.io\/current\/connect\/devguide.html<\/a><\/p>\n<p><a href=\"https:\/\/github.com\/oracle\/oci-java-sdk\/blob\/master\/bmc-examples\/src\/main\/java\/InvokeFunctionExample.java\" rel=\"noopener\" target=\"_blank\">https:\/\/github.com\/oracle\/oci-java-sdk\/blob\/master\/bmc-examples\/src\/main\/java\/InvokeFunctionExample.java<\/a><\/p>\n<p><a href=\"https:\/\/docs.cloud.oracle.com\/en-us\/iaas\/Content\/Functions\/Concepts\/functionsoverview.htm\" rel=\"noopener\" target=\"_blank\">https:\/\/docs.cloud.oracle.com\/en-us\/iaas\/Content\/Functions\/Concepts\/functionsoverview.htm<\/a><\/p>\n<p><b>Happy Coding.<\/b><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Well? This blog post is part of my series of posts regarding Kafka Connect. If you&#8217;re not familiar with Kafka, I suggest you have a look at some of my previous post; What is Kafka? Kafka Connect Overview Kafka Connector&hellip; <a href=\"http:\/\/www.igfasouza.com\/blog\/sink-kafka-connect\/\" class=\"more-link\">Continue Reading <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":760,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[25],"tags":[7,11],"class_list":["post-759","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-kafka","tag-kafka","tag-kafka-connect"],"_links":{"self":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/759","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/comments?post=759"}],"version-history":[{"count":6,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/759\/revisions"}],"predecessor-version":[{"id":1247,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/759\/revisions\/1247"}],"wp:featuredmedia":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/media\/760"}],"wp:attachment":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/media?parent=759"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/categories?post=759"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/tags?post=759"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}