{"id":748,"date":"2020-04-29T21:17:05","date_gmt":"2020-04-29T20:17:05","guid":{"rendered":"http:\/\/www.igfasouza.com\/blog\/?p=748"},"modified":"2021-05-20T14:21:04","modified_gmt":"2021-05-20T13:21:04","slug":"source-kafka-connect","status":"publish","type":"post","link":"http:\/\/www.igfasouza.com\/blog\/source-kafka-connect\/","title":{"rendered":"Source Kafka connect"},"content":{"rendered":"<p><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/source-connect-kafka.png\" alt=\"\" width=\"752\" height=\"499\" class=\"alignnone size-full wp-image-749\" srcset=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/source-connect-kafka.png 752w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/source-connect-kafka-300x199.png 300w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/source-connect-kafka-624x414.png 624w\" sizes=\"auto, (max-width: 752px) 100vw, 752px\" \/><\/p>\n<p><b>How\u2019s the man?<\/b><\/p>\n<p>This blog post is part of my series of posts regarding Kafka Connect.<br \/>\nIf you&#8217;re not familiar with Kafka, I suggest you have a look at some of my previous post;<\/p>\n<p><a href=\"http:\/\/www.igfasouza.com\/blog\/what-is-kafka\/\" rel=\"noopener\" target=\"_blank\">What is Kafka?<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connect-overview\/\" rel=\"noopener\" target=\"_blank\">Kafka Connect Overview<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connector-architecture\/\" rel=\"noopener\" target=\"_blank\">Kafka Connector Architecture<\/a><br \/>\n<a href=\"http:\/\/www.igfasouza.com\/blog\/kafka-connect\/\" rel=\"noopener\" target=\"_blank\">Kafka Connect<\/a><\/p>\n<p>This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion. <\/p>\n<p><b>TL;DR<\/b><\/p>\n<p>This post is about code, I\u2019ll show all steps to develop your own Source Kafka Connector, and in the end, I\u2019ll show an easy way to deploy your code.<br \/>\n<a href=\"https:\/\/openweathermap.org\/\" rel=\"noopener\" target=\"_blank\">Openweathermap<\/a> source kafka Connector.<\/p>\n<h2>Table of contents<\/h2>\n<p>1. Get Started<br \/>\n2. Use Case<br \/>\n3. Shema<br \/>\n4. AbstractConfig<br \/>\n5. Connector<br \/>\n6. Task<br \/>\n7. Code<br \/>\n8. SMTs<br \/>\n9. Converter<br \/>\n10. Transformation<br \/>\n11. Deploy<br \/>\n12. Links<\/p>\n<h2>1. Get Started<\/h2>\n<p>Kafka Connect aims to reduce the burden of connecting Kafka with external systems such as databases, key-value stores, search indexes, and even file systems. Connectors are the high-level abstractions that manage the streaming of data from and to Kafka. Each connector speaks the language of the external system it connects to, hiding this complexity from the application developer who can focus on more valuable business logic.<\/p>\n<p><iframe loading=\"lazy\" width=\"560\" height=\"315\" src=\"https:\/\/www.youtube.com\/embed\/YyTebiMx6Gc\" frameborder=\"0\" allow=\"accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture\" allowfullscreen><\/iframe><\/p>\n<p>The best place to start when implementing your Source Connector is the Confluent Connector <a href=\"https:\/\/docs.confluent.io\/current\/connect\/devguide.html\" rel=\"noopener\" target=\"_blank\">Development Guide<\/a>.<\/p>\n<p>There\u2019s a code sample <a href=\"https:\/\/github.com\/apache\/kafka\/tree\/trunk\/connect\/file\/src\/main\/java\/org\/apache\/kafka\/connect\/file\" rel=\"noopener\" target=\"_blank\">here<\/a>;<\/p>\n<p>This maven quickstart is used to generate a skeleton plugin for Kafka Connect. <a href=\"https:\/\/github.com\/jcustenborder\/kafka-connect-archtype\" rel=\"noopener\" target=\"_blank\">Here<\/a>.<\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/><\/div><\/td><td><div class=\"python codecolorer\">mvn archetype:generate \\<br \/>\n&nbsp; &nbsp; -DarchetypeGroupId<span class=\"sy0\">=<\/span>com.<span class=\"me1\">github<\/span>.<span class=\"me1\">jcustenborder<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span> \\<br \/>\n&nbsp; &nbsp; -DarchetypeArtifactId<span class=\"sy0\">=<\/span>kafka-connect-quickstart \\<br \/>\n&nbsp; &nbsp; -DarchetypeVersion<span class=\"sy0\">=<\/span>2.4.0<br \/>\n<br \/>\nmvn archetype:generate \\<br \/>\n&nbsp; &nbsp; -DarchetypeGroupId<span class=\"sy0\">=<\/span>com.<span class=\"me1\">github<\/span>.<span class=\"me1\">jcustenborder<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span> \\<br \/>\n&nbsp; &nbsp; -DarchetypeArtifactId<span class=\"sy0\">=<\/span>kafka-connect-quickstart \\<br \/>\n&nbsp; &nbsp; -DarchetypeVersion<span class=\"sy0\">=<\/span>2.4.0 \\<br \/>\n&nbsp; &nbsp; -Dpackage<span class=\"sy0\">=<\/span>com.<span class=\"me1\">github<\/span>.<span class=\"me1\">jcustenborder<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"kw3\">test<\/span> \\<br \/>\n&nbsp; &nbsp; -DgroupId<span class=\"sy0\">=<\/span>com.<span class=\"me1\">github<\/span>.<span class=\"me1\">jcustenborder<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span> \\<br \/>\n&nbsp; &nbsp; -DartifactId<span class=\"sy0\">=<\/span>testconnect \\<br \/>\n&nbsp; &nbsp; -DpackageName<span class=\"sy0\">=<\/span>com.<span class=\"me1\">github<\/span>.<span class=\"me1\">jcustenborder<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"kw3\">test<\/span> \\<br \/>\n&nbsp; &nbsp; -Dversion<span class=\"sy0\">=<\/span><span class=\"nu0\">1.0<\/span>-SNAPSHOT<\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>The nice thing about the Archetype is that it will create the boilerplate code for your connector, some basic properties, and some empty tests.<\/p>\n<p>The dependencies that I used are;<\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/><\/div><\/td><td><div class=\"python codecolorer\">&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>groupId<span class=\"sy0\">&gt;<\/span>org.<span class=\"me1\">apache<\/span>.<span class=\"me1\">kafka<\/span><span class=\"sy0\">&lt;<\/span>\/groupId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>artifactId<span class=\"sy0\">&gt;<\/span>connect-api<span class=\"sy0\">&lt;<\/span>\/artifactId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>version<span class=\"sy0\">&gt;<\/span>2.3.1<span class=\"sy0\">&lt;<\/span>\/version<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>groupId<span class=\"sy0\">&gt;<\/span>org.<span class=\"me1\">apache<\/span>.<span class=\"me1\">kafka<\/span><span class=\"sy0\">&lt;<\/span>\/groupId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>artifactId<span class=\"sy0\">&gt;<\/span>connect-transforms<span class=\"sy0\">&lt;<\/span>\/artifactId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>version<span class=\"sy0\">&gt;<\/span>2.3.1<span class=\"sy0\">&lt;<\/span>\/version<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>groupId<span class=\"sy0\">&gt;<\/span>commons-codec<span class=\"sy0\">&lt;<\/span>\/groupId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>artifactId<span class=\"sy0\">&gt;<\/span>commons-codec<span class=\"sy0\">&lt;<\/span>\/artifactId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>version<span class=\"sy0\">&gt;<\/span><span class=\"nu0\">1.13<\/span><span class=\"sy0\">&lt;<\/span>\/version<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>dependency<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>groupId<span class=\"sy0\">&gt;<\/span>com.<span class=\"me1\">konghq<\/span><span class=\"sy0\">&lt;<\/span>\/groupId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>artifactId<span class=\"sy0\">&gt;<\/span>unirest-java<span class=\"sy0\">&lt;<\/span>\/artifactId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>version<span class=\"sy0\">&gt;<\/span>3.1.02<span class=\"sy0\">&lt;<\/span>\/version<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/dependency<span class=\"sy0\">&gt;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>And we also need the maven plugin to generate a jar with all dependencies.<\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/><\/div><\/td><td><div class=\"python codecolorer\">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>plugin<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>artifactId<span class=\"sy0\">&gt;<\/span>maven-assembly-plugin<span class=\"sy0\">&lt;<\/span>\/artifactId<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>version<span class=\"sy0\">&gt;<\/span>3.2.0<span class=\"sy0\">&lt;<\/span>\/version<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>configuration<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>descriptorRefs<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>descriptorRef<span class=\"sy0\">&gt;<\/span>jar-with-dependencies<span class=\"sy0\">&lt;<\/span>\/descriptorRef<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/descriptorRefs<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/configuration<span class=\"sy0\">&gt;<\/span><br \/>\n<br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>executions<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>execution<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>id<span class=\"sy0\">&gt;<\/span>make-assembly<span class=\"sy0\">&lt;<\/span>\/id<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>phase<span class=\"sy0\">&gt;<\/span>package<span class=\"sy0\">&lt;<\/span>\/phase<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>goals<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>goal<span class=\"sy0\">&gt;<\/span>single<span class=\"sy0\">&lt;<\/span>\/goal<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/goals<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/execution<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/executions<span class=\"sy0\">&gt;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <span class=\"sy0\">&lt;<\/span>\/plugin<span class=\"sy0\">&gt;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>2. Use Case<\/h2>\n<p>OpenWeatherMap real-time weather data website. This API is already used in a lot of examples around Development. I already used some time for IoT and Data analyse demos.<br \/>\nThey have a free API tier, which is limited to 1 request per second, which is quite enough for tracking changes in weather. Their different API schemas return plenty of numeric and textual data, all interesting for analysis. This is a perfect example for us. I tried to choose an API that contains a more complex Json, as a result, to show more in a didactic way the Shema part.<\/p>\n<p>Check the <a href=\"https:\/\/openweathermap.org\/api\" rel=\"noopener\" target=\"_blank\">API<\/a>.<\/p>\n<p>A partition is a division of source records that usually depends on the source medium.<br \/>\nIn our case, there is only one API URL and we are only ever requesting current data, so a very good partition would be query weather data for different cities and each partition would be processed by a separate task.<\/p>\n<h2>3. Schema<\/h2>\n<p>The Schema interface is one of the most important components of Kafka Connect. This allows you to define what the data looks like without having to worry about the way the data is stored.<\/p>\n<p>Docs <a href=\"https:\/\/gist.github.com\/jcustenborder\/b9b1518cc794e1c1895c3da7abbe9c08#schema\" rel=\"noopener\" target=\"_blank\">here<\/a>.<\/p>\n<p>The best way to start is to have a look at the API result, and you can see a sample at <\/p>\n<p><a href=\"https:\/\/samples.openweathermap.org\/data\/2.5\/weather?q=London,uk&#038;appid=b6907d289e10d714a6e88b30761fae22\" rel=\"noopener\" target=\"_blank\">https:\/\/samples.openweathermap.org\/data\/2.5\/weather?q=London,uk&#038;appid=b6907d289e10d714a6e88b30761fae22<\/a><\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;height:300px;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/>21<br \/>22<br \/>23<br \/>24<br \/>25<br \/>26<br \/>27<br \/>28<br \/>29<br \/>30<br \/>31<br \/>32<br \/>33<br \/>34<br \/>35<br \/>36<br \/>37<br \/>38<br \/>39<br \/>40<br \/>41<br \/>42<br \/><\/div><\/td><td><div class=\"python codecolorer\"><span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;coord&quot;<\/span>: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;lon&quot;<\/span>: -<span class=\"nu0\">0.13<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;lat&quot;<\/span>: <span class=\"nu0\">51.51<\/span><br \/>\n&nbsp; <span class=\"br0\">&#125;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;weather&quot;<\/span>: <span class=\"br0\">&#91;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; <span class=\"st0\">&quot;id&quot;<\/span>: <span class=\"nu0\">300<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; <span class=\"st0\">&quot;main&quot;<\/span>: <span class=\"st0\">&quot;Drizzle&quot;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; <span class=\"st0\">&quot;description&quot;<\/span>: <span class=\"st0\">&quot;light intensity drizzle&quot;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; <span class=\"st0\">&quot;icon&quot;<\/span>: <span class=\"st0\">&quot;09d&quot;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"br0\">&#125;<\/span><br \/>\n&nbsp; <span class=\"br0\">&#93;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;base&quot;<\/span>: <span class=\"st0\">&quot;stations&quot;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;main&quot;<\/span>: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;temp&quot;<\/span>: <span class=\"nu0\">280.32<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;pressure&quot;<\/span>: <span class=\"nu0\">1012<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;humidity&quot;<\/span>: <span class=\"nu0\">81<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;temp_min&quot;<\/span>: <span class=\"nu0\">279.15<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;temp_max&quot;<\/span>: <span class=\"nu0\">281.15<\/span><br \/>\n&nbsp; <span class=\"br0\">&#125;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;visibility&quot;<\/span>: <span class=\"nu0\">10000<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;wind&quot;<\/span>: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;speed&quot;<\/span>: <span class=\"nu0\">4.1<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;deg&quot;<\/span>: <span class=\"nu0\">80<\/span><br \/>\n&nbsp; <span class=\"br0\">&#125;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;clouds&quot;<\/span>: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;all&quot;<\/span>: <span class=\"nu0\">90<\/span><br \/>\n&nbsp; <span class=\"br0\">&#125;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;dt&quot;<\/span>: <span class=\"nu0\">1485789600<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;sys&quot;<\/span>: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;type&quot;<\/span>: <span class=\"nu0\">1<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;id&quot;<\/span>: <span class=\"nu0\">5091<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;message&quot;<\/span>: <span class=\"nu0\">0.0103<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;country&quot;<\/span>: <span class=\"st0\">&quot;GB&quot;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;sunrise&quot;<\/span>: <span class=\"nu0\">1485762037<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; <span class=\"st0\">&quot;sunset&quot;<\/span>: <span class=\"nu0\">1485794875<\/span><br \/>\n&nbsp; <span class=\"br0\">&#125;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;id&quot;<\/span>: <span class=\"nu0\">2643743<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;name&quot;<\/span>: <span class=\"st0\">&quot;London&quot;<\/span><span class=\"sy0\">,<\/span><br \/>\n&nbsp; <span class=\"st0\">&quot;cod&quot;<\/span>: <span class=\"nu0\">200<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>Here if you have a look in the json you are going to see that the json has some structures inside, for example, you can see weather that contains, id, main, descriptions and so on.<\/p>\n<p>We can map this in an easy way to a schema;<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/><\/div><\/td><td><div class=\"java codecolorer\"><span class=\"kw1\">public<\/span> <span class=\"kw1\">static<\/span> Schema WEATHER_SCHEMA <span class=\"sy0\">=<\/span> SchemaBuilder.<span class=\"me1\">struct<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">name<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;Weather&quot;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">version<\/span><span class=\"br0\">&#40;<\/span><span class=\"nu0\">1<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>ID, Schema.<span class=\"me1\">INT64_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>MAIN, Schema.<span class=\"me1\">STRING_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>DESCRIPTION, Schema.<span class=\"me1\">STRING_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>ICON, Schema.<span class=\"me1\">STRING_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">build<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>But sometimes the json can contain a more complex structure and for this, we can create a schema that uses another schema as a type.<br \/>\nLet\u2019s see wind<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/><\/div><\/td><td><div class=\"java codecolorer\"><span class=\"kw1\">public<\/span> <span class=\"kw1\">static<\/span> Schema WIND_SCHEMA <span class=\"sy0\">=<\/span> SchemaBuilder.<span class=\"me1\">struct<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">name<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;Wind&quot;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">version<\/span><span class=\"br0\">&#40;<\/span><span class=\"nu0\">1<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>SPEED, Schema.<span class=\"me1\">OPTIONAL_FLOAT32_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>DEG, Schema.<span class=\"me1\">OPTIONAL_INT32_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">optional<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">build<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>So now I can create a schema using the others that I created before.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/><\/div><\/td><td><div class=\"java codecolorer\"><span class=\"kw1\">public<\/span> <span class=\"kw1\">static<\/span> Schema VALUE_SCHEMA <span class=\"sy0\">=<\/span> SchemaBuilder.<span class=\"me1\">struct<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">name<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;Value Schema&quot;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">version<\/span><span class=\"br0\">&#40;<\/span><span class=\"nu0\">1<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>NAME, Schema.<span class=\"me1\">STRING_SCHEMA<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>WEATHER, WEATHER_ARRAY_SCHEMA<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>MAIN, MAIN_SCHEMA<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">field<\/span><span class=\"br0\">&#40;<\/span>WIND, WIND_SCHEMA<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">build<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>4. AbstractConfig<\/h2>\n<p>AbstractConfig is an abstract class that defines an interface that our WeatherAPIConfig<br \/>\nneeds to adhere to, and here is where we get the configuration for our connector.<\/p>\n<p>We need to pass the openWeatherApiKey, a list of cities, that we want to get the weather data and the kafka topic. I add pollFrequency as a hardcoded value because the API has a limitation of the number of requests in the free tier.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/><\/div><\/td><td><div class=\"java codecolorer\"><span class=\"kw1\">public<\/span> WeatherAPIConfig<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, <span class=\"sy0\">?&gt;<\/span> originals<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">super<\/span><span class=\"br0\">&#40;<\/span>config<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span>, originals<span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">this<\/span>.<span class=\"me1\">openWeatherApiKey<\/span> <span class=\"sy0\">=<\/span> <span class=\"kw1\">this<\/span>.<span class=\"me1\">getString<\/span><span class=\"br0\">&#40;<\/span>OPEN_WEATHER_API_KEY<span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">this<\/span>.<span class=\"me1\">cities<\/span> <span class=\"sy0\">=<\/span> <span class=\"kw1\">this<\/span>.<span class=\"me1\">getString<\/span><span class=\"br0\">&#40;<\/span>CITIES<span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">this<\/span>.<span class=\"me1\">pollFrequency<\/span> <span class=\"sy0\">=<\/span> <span class=\"kw1\">this<\/span>.<span class=\"me1\">getLong<\/span><span class=\"br0\">&#40;<\/span>POLL_FREQUENCY<span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">this<\/span>.<span class=\"me1\">kafkaTopic<\/span> <span class=\"sy0\">=<\/span> <span class=\"kw1\">this<\/span>.<span class=\"me1\">getString<\/span><span class=\"br0\">&#40;<\/span>KAFKA_TOPIC<span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>5. Connector<\/h2>\n<p>SourceConnector is an abstract class that defines an interface that our WeatherAPIConnector<br \/>\nneeds to adhere to. There are six methods.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;height:300px;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/>21<br \/>22<br \/>23<br \/>24<br \/>25<br \/>26<br \/>27<br \/>28<br \/>29<br \/>30<br \/>31<br \/>32<br \/>33<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n<span class=\"kw1\">public<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> version<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ you can add your own version strategy <\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> start<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;<\/span> props<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ here is were we implement the configuration class <\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> Class<span class=\"sy0\">&lt;?<\/span> <span class=\"kw1\">extends<\/span> Task<span class=\"sy0\">&gt;<\/span> taskClass<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ just need pass the name of the class <\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> List<span class=\"sy0\">&lt;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;&gt;<\/span> taskConfigs<span class=\"br0\">&#40;<\/span><span class=\"kw4\">int<\/span> maxTasks<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ here is a function partition strategy to split the list o cities to a different task<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> stop<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp;<span class=\"co1\">\/\/ since we don't start anything on Star method we don't need do anything here<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> ConfigDef config<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ here we already defined in the weatherAPI class<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>6. Task<\/h2>\n<p>In a similar manner to the Source Connector class, we implement the Source Task abstract class. Here we have only 4 methods.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;height:300px;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/>19<br \/>20<br \/>21<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n<span class=\"kw1\">public<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> version<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ you can add your own version strategy <\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> start<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, String<span class=\"sy0\">&gt;<\/span> props<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ initialize the configuration and the weather API client<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> List<span class=\"sy0\">&lt;<\/span>SourceRecord<span class=\"sy0\">&gt;<\/span> poll<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"kw1\">throws<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+interruptedexception\"><span class=\"kw3\">InterruptedException<\/span><\/a> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ this method is going to be called continuous and because of the API limitation, we need to add a sleep to decrease the poll frequency.<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> stop<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ TODO Auto-generated method stub &nbsp; &nbsp; &nbsp; <\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>7. Code<\/h2>\n<p>I create one package model where I add my DTO to the fields that I used in my schema and package shema.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class.jpg\" alt=\"\" width=\"824\" height=\"821\" class=\"alignnone size-full wp-image-750\" srcset=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class.jpg 824w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class-150x150.jpg 150w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class-300x300.jpg 300w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class-768x765.jpg 768w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/class-624x622.jpg 624w\" sizes=\"auto, (max-width: 824px) 100vw, 824px\" \/><\/p>\n<p>WeatherAPIClient this class just calls the API and passes the list of cities.<br \/>\nI use the WeatherAPIConfig config to get all configurations;<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/><\/div><\/td><td><div class=\"java codecolorer\"><span class=\"kw1\">public<\/span> List<span class=\"sy0\">&lt;<\/span>Weather<span class=\"sy0\">&gt;<\/span> getCurrentWeather<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+arrays\"><span class=\"kw3\">Arrays<\/span><\/a>.<span class=\"me1\">stream<\/span><span class=\"br0\">&#40;<\/span>config.<span class=\"me1\">getCities<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span>.<span class=\"me1\">split<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;,&quot;<\/span><span class=\"br0\">&#41;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">map<\/span><span class=\"br0\">&#40;<\/span>city <span class=\"sy0\">-&gt;<\/span> Unirest.<span class=\"me1\">get<\/span><span class=\"br0\">&#40;<\/span>BASE_URL<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">queryString<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;q&quot;<\/span>, city<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">queryString<\/span><span class=\"br0\">&#40;<\/span><span class=\"st0\">&quot;APPID&quot;<\/span>, config.<span class=\"me1\">getOpenWeatherApiKey<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">asObject<\/span><span class=\"br0\">&#40;<\/span>Weather.<span class=\"kw1\">class<\/span><span class=\"br0\">&#41;<\/span><span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">map<\/span><span class=\"br0\">&#40;<\/span>HttpResponse<span class=\"sy0\">::<\/span>getBody<span class=\"br0\">&#41;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<span class=\"me1\">collect<\/span><span class=\"br0\">&#40;<\/span>Collectors.<span class=\"me1\">toList<\/span><span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span><span class=\"br0\">&#41;<\/span><span class=\"sy0\">;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<h2>8. SMTs<\/h2>\n<p>From an architectural point of view, SMT resides exactly between the connectors and the converters and they apply their data transformation just before storing the data into Kafka and right before extracting the data from it.<\/p>\n<p>The Kafka Connect frameworks come with many built-in SMT to address the most common transformations use cases like for example:<\/p>\n<ul>\n<li>\nmask sensitive message fields,\n<\/li>\n<li>\nadd identifiers,\n<\/li>\n<li>\ntag events,\n<\/li>\n<li>\ncast data types to comply to the destination,\n<\/li>\n<li>\nconvert data (e.g. timestamp conversion like time-based data conversion standardization),\n<\/li>\n<li>\nslim down messages\n<\/li>\n<li>\nor re-routing them via re-partitioning.\n<\/li>\n<\/ul>\n<p>Full documentation <a href=\"http:\/\/kafka.apache.org\/documentation.html#connect_transforms\" rel=\"noopener\" target=\"_blank\">here<\/a>.<\/p>\n<p>SMT is configured right beside connectors as their configuration properties reside inside their same file.<\/p>\n<p>Let\u2019s see one example using FileSource Connector<\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/><\/div><\/td><td><div class=\"python codecolorer\">name<span class=\"sy0\">=<\/span>local-file-source<br \/>\nconnector.<span class=\"kw1\">class<\/span><span class=\"sy0\">=<\/span>FileStreamSource<br \/>\ntasks.<span class=\"kw2\">max<\/span><span class=\"sy0\">=<\/span><span class=\"nu0\">1<\/span><br \/>\n<span class=\"kw2\">file<\/span><span class=\"sy0\">=<\/span><span class=\"kw3\">test<\/span>.<span class=\"me1\">txt<\/span><br \/>\ntopic<span class=\"sy0\">=<\/span>connect-<span class=\"kw3\">test<\/span><br \/>\ntransforms<span class=\"sy0\">=<\/span>MakeMap<span class=\"sy0\">,<\/span> InsertSource<br \/>\ntransforms.<span class=\"me1\">MakeMap<\/span>.<span class=\"kw2\">type<\/span><span class=\"sy0\">=<\/span>org.<span class=\"me1\">apache<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"me1\">transforms<\/span>.<span class=\"me1\">HoistField<\/span>$Value<br \/>\ntransforms.<span class=\"me1\">MakeMap<\/span>.<span class=\"me1\">field<\/span><span class=\"sy0\">=<\/span>line<br \/>\ntransforms.<span class=\"me1\">InsertSource<\/span>.<span class=\"kw2\">type<\/span><span class=\"sy0\">=<\/span>org.<span class=\"me1\">apache<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"me1\">transforms<\/span>.<span class=\"me1\">InsertField<\/span>$Value<br \/>\ntransforms.<span class=\"me1\">InsertSource<\/span>.<span class=\"me1\">static<\/span>.<span class=\"me1\">field<\/span><span class=\"sy0\">=<\/span>data_source<br \/>\ntransforms.<span class=\"me1\">InsertSource<\/span>.<span class=\"me1\">static<\/span>.<span class=\"me1\">value<\/span><span class=\"sy0\">=<\/span>test-file-source<\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>As you can see SMT can be combined and chained pretty easily by simply configuring them next to the Kafka Connect sinks and sources.<\/p>\n<p>But the most important thing that should always be kept in mind is the \u201cone message at a time\u201d.<br \/>\nThis is particularly crucial because it strongly limits SMT\u2019s usage. In particular, it doesn\u2019t permit any kind of transformation that is characterized by any kind of aggregation logic. Processing one message at a time is mandatory due to how partitions work and how the offset semantic is implemented in Kafka.<\/p>\n<h2>9. Converter<\/h2>\n<p>Here I create a package converter where I add an example of a converter, Serialize, and Deserializer. Here we have only three methods to implement.<\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> configure<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, <span class=\"sy0\">?&gt;<\/span> configs, <span class=\"kw4\">boolean<\/span> isKey<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ get the properties<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">byte<\/span><span class=\"br0\">&#91;<\/span><span class=\"br0\">&#93;<\/span> fromConnectData<span class=\"br0\">&#40;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> topic, Schema schema, <a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+object\"><span class=\"kw3\">Object<\/span><\/a> value<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ convert to array of bytes<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> SchemaAndValue toConnectData<span class=\"br0\">&#40;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a> topic, <span class=\"kw4\">byte<\/span><span class=\"br0\">&#91;<\/span><span class=\"br0\">&#93;<\/span> value<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/ convert to schema value<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> <span class=\"kw2\">null<\/span><span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>StringConverter implements Converter<br \/>\nStringDeserializer implements Deserializer<String><br \/>\nStringSerializer implements Serializer<String><\/p>\n<h2>10. Transformation<\/h2>\n<p>Here I just add an example of transformation.<\/p>\n<p>IntegrityCheck<R extends ConnectRecord<R>> implements Transformation<R><\/p>\n<div class=\"codecolorer-container java blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/><\/div><\/td><td><div class=\"java codecolorer\">@Override<br \/>\n<span class=\"kw1\">public<\/span> <span class=\"kw4\">void<\/span> configure<span class=\"br0\">&#40;<\/span>Map<span class=\"sy0\">&lt;<\/span><a href=\"http:\/\/www.google.com\/search?hl=en&amp;q=allinurl%3Adocs.oracle.com+javase+docs+api+string\"><span class=\"kw3\">String<\/span><\/a>, <span class=\"sy0\">?&gt;<\/span> props<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"co1\">\/\/<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> R apply<span class=\"br0\">&#40;<\/span>R record<span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp;<span class=\"co1\">\/\/ check messages with null key with schema and without schema<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><br \/>\n<br \/>\n@Override<br \/>\n<span class=\"kw1\">public<\/span> ConfigDef config<span class=\"br0\">&#40;<\/span><span class=\"br0\">&#41;<\/span> <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; <span class=\"kw1\">return<\/span> CONFIG_DEF<span class=\"sy0\">;<\/span><br \/>\n<span class=\"br0\">&#125;<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p><img loading=\"lazy\" decoding=\"async\" src=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/Kafka-Connect-Source.png\" alt=\"\" width=\"880\" height=\"415\" class=\"alignnone size-full wp-image-751\" srcset=\"http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/Kafka-Connect-Source.png 880w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/Kafka-Connect-Source-300x141.png 300w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/Kafka-Connect-Source-768x362.png 768w, http:\/\/www.igfasouza.com\/blog\/wp-content\/uploads\/2020\/04\/Kafka-Connect-Source-624x294.png 624w\" sizes=\"auto, (max-width: 880px) 100vw, 880px\" \/><\/p>\n<h2>11. Deploy<\/h2>\n<p>There is an official Kafka Connect Docker image <a href=\"https:\/\/hub.docker.com\/r\/confluentinc\/cp-kafka-connect\" rel=\"noopener\" target=\"_blank\">here<\/a>, but you can use any kafka connect docker image like <a href=\"https:\/\/hub.docker.com\/r\/debezium\/connect\" rel=\"noopener\" target=\"_blank\">Debezium<\/a><\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/><\/div><\/td><td><div class=\"python codecolorer\">docker run -it --rm --name weather-connect-demo -p <span class=\"nu0\">8083<\/span>:<span class=\"nu0\">8083<\/span> -e GROUP_ID<span class=\"sy0\">=<\/span><span class=\"nu0\">1<\/span> \\<br \/>\n&nbsp; &nbsp; -e BOOTSTRAP_SERVERS<span class=\"sy0\">=<\/span><span class=\"st0\">&quot;bootstrap_URL&quot;<\/span> \\<br \/>\n&nbsp; &nbsp; -e CONFIG_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID-config\u201d \\<br \/>\n&nbsp; &nbsp; -e OFFSET_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID-offset\u201d \\<br \/>\n&nbsp; &nbsp; -e STATUS_STORAGE_TOPIC<span class=\"sy0\">=<\/span>\u201dID-status\u201d \\<br \/>\n&nbsp; &nbsp; -v openweathermap-connector\/target\/openweathermap-connector-0.0.1-SNAPSHOT-jar-with-dependencies.<span class=\"me1\">jar<\/span>\/:\/kafka\/connect\/openweathermap-connector \\<br \/>\n&nbsp; &nbsp; debezium\/connect:latest<\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p>And we need call the rest api with this Json to start the connector;<\/p>\n<div class=\"codecolorer-container python blackboard\" style=\"overflow:auto;white-space:nowrap;\"><table cellspacing=\"0\" cellpadding=\"0\"><tbody><tr><td class=\"line-numbers\"><div>1<br \/>2<br \/>3<br \/>4<br \/>5<br \/>6<br \/>7<br \/>8<br \/>9<br \/>10<br \/>11<br \/>12<br \/>13<br \/>14<br \/>15<br \/>16<br \/>17<br \/>18<br \/><\/div><\/td><td><div class=\"python codecolorer\"><span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp;\u201cname\u201d: \u201cweathermap-connector\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; \u201cConfig\u201d: <span class=\"br0\">&#123;<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201cconnector-<span class=\"kw1\">class<\/span>\u201d:\u201dcom.<span class=\"me1\">openweathermap<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"me1\">WeatherAPIConnector<\/span>\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201cValue.<span class=\"me1\">converter<\/span>\u201d:\u201dcom.<span class=\"me1\">openweathermap<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"me1\">converter<\/span>.<span class=\"me1\">StringConverter<\/span>\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201cvalue.<span class=\"me1\">converter<\/span>.<span class=\"me1\">encoding<\/span>\u201d:\u201dUTF-<span class=\"nu0\">8<\/span>\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ctasks.<span class=\"kw2\">max<\/span>\u201d: \u201c<span class=\"nu0\">1<\/span>\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201c<span class=\"kw2\">open<\/span>.<span class=\"me1\">weather<\/span>.<span class=\"me1\">api<\/span>.<span class=\"me1\">key<\/span>\u201d:\u201d<span class=\"nu0\">12312312312312313123123123123<\/span>\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ccities\u201d: \u201cIreland<span class=\"sy0\">,<\/span> Brazil\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ckafka.<span class=\"me1\">topic<\/span>\u201d:\u201dweather\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201cname\u201d:\u201dweather-connector\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ctransform\u201d:\u201dReplaceField<span class=\"sy0\">,<\/span>IntegrityCheck\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ctransform.<span class=\"me1\">ReplaceField<\/span>.<span class=\"kw2\">type<\/span>\u201d:\u201dcom.<span class=\"me1\">openweathermap<\/span>.<span class=\"me1\">kafka<\/span>.<span class=\"me1\">connect<\/span>.<span class=\"me1\">transform<\/span>.<span class=\"me1\">ReplaceField<\/span>$Value\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp; \u201ctransform.<span class=\"me1\">ReplaceField<\/span>.<span class=\"me1\">blacklist<\/span>\u201d:\u201dmain\u201d<span class=\"sy0\">,<\/span><br \/>\n&nbsp; &nbsp; &nbsp;<span class=\"st0\">&quot;transform.IntegrityCheck.type\u201d:\u201dcom.openweathermap.kafka.connect.transform.IntegrityCheck$Value\u201d,<br \/>\n&nbsp; &nbsp; &nbsp; \u201ctransform.IntegrityCheck.field\u201d:\u201dintegrity\u201d,<br \/>\n&nbsp; }<br \/>\n}<\/span><\/div><\/td><\/tr><\/tbody><\/table><\/div>\n<p><b>GitHub<\/b><br \/>\nYou can check the full code in my <a href=\"https:\/\/github.com\/igfasouza\/openweathermap-connector-source\" rel=\"noopener\" target=\"_blank\">Github<\/a>, and I did a Scala demo as well, where I just follow the same structure of folders, packages and classes.<\/p>\n<h2>12. Links<\/h2>\n<p><a href=\"https:\/\/docs.confluent.io\/current\/connect\/devguide.html\" rel=\"noopener\" target=\"_blank\">https:\/\/docs.confluent.io\/current\/connect\/devguide.html<\/a><\/p>\n<p><a href=\"https:\/\/www.confluent.io\/wp-content\/uploads\/Partner-Dev-Guide-for-Kafka-Connect.pdf?x18424\" rel=\"noopener\" target=\"_blank\">https:\/\/www.confluent.io\/wp-content\/uploads\/Partner-Dev-Guide-for-Kafka-Connect.pdf?x18424<\/a><\/p>\n<p><a href=\"https:\/\/docs.confluent.io\/current\/connect\/userguide.html\" rel=\"noopener\" target=\"_blank\">https:\/\/docs.confluent.io\/current\/connect\/userguide.html<\/a><\/p>\n<p><a href=\"https:\/\/docs.confluent.io\/3.1.1\/connect\/userguide.html\" rel=\"noopener\" target=\"_blank\">https:\/\/docs.confluent.io\/3.1.1\/connect\/userguide.html<\/a><\/p>\n<p><a href=\"https:\/\/www.confluent.jp\/blog\/create-dynamic-kafka-connect-source-connectors\/\" rel=\"noopener\" target=\"_blank\">https:\/\/www.confluent.jp\/blog\/create-dynamic-kafka-connect-source-connectors\/<\/a><\/p>\n<p><b>Stay tuned!<\/b> Next blog post I\u2019ll show how to code a Sink Kafka connector.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>How\u2019s the man? This blog post is part of my series of posts regarding Kafka Connect. If you&#8217;re not familiar with Kafka, I suggest you have a look at some of my previous post; What is Kafka? Kafka Connect Overview&hellip; <a href=\"http:\/\/www.igfasouza.com\/blog\/source-kafka-connect\/\" class=\"more-link\">Continue Reading <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":749,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[25],"tags":[7,11],"class_list":["post-748","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-kafka","tag-kafka","tag-kafka-connect"],"_links":{"self":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/748","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/comments?post=748"}],"version-history":[{"count":9,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/748\/revisions"}],"predecessor-version":[{"id":1248,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/posts\/748\/revisions\/1248"}],"wp:featuredmedia":[{"embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/media\/749"}],"wp:attachment":[{"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/media?parent=748"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/categories?post=748"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/www.igfasouza.com\/blog\/wp-json\/wp\/v2\/tags?post=748"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}