Author Archives: admin

A Twitter follow list for Kafka

What’s the crack jack?

If you like me use Twitter as a source of information this will somehow help you. I just created a Twitter follow list for Apache Kafka.

I came with this post idea after I saw the Confluent Community Catalyst program, and of course here we can get a nice list to start.

This is not an exhaustive list, so if you know someone that you think should be here, please post a comment with. And of course, some here are Kafka related.

Starting with the basics, I add the official ones;

@apachekafka
@confluentinc

Confluent

@jaykreps
@nehanarkhede
@tlberglund
@miguno
@gAmUssA
@riferrei

Kafka connect

@rmoff

Debezium

@debezium
@gunnarmorling

Strimzi

@strimziio

With Spark relations

@jaceklaskowski
@matei_zaharia

With Flink relations

@StephanEwen

Flink and Apache Nifi

@PaaSDev

Cloudera

@cloudera

MapR

@caroljmcdonald
@Ellen_Friedman
@ted_dunning

Azkarra
@fhussonnois

Azure

@abhi_tweeter

Oracle

@igfasouza

without category

@martinkl
@KaiWaehner
@myfear
@hguerreroo
@gwenshap
@AdiPolak
@MichaelDrogalis

Feel free to send anyone that you think should be here.
I couldn’t find anyone for IBM and Google clouds.
I couldn’t find anyone for Python Faust as well.

Sink Kafka connect

Well?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect
Source Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your Sink Kafka Connector.

Table of contents

1. Use Case
2. Code
3. Links

1. Use Case

One scenario that has become popular and I came across with some questions around in the last months is to use Kafka to trigger functions as service.

In simple words: Send a message in your kafka topic and your function(s) gets invoked with the payload which you sent to Kafka, you can now have a function trigger in response to messages in Kafka Topics.

My example is using OCI functions.

2. Code

The connector needs extends SinkConnector. Is almost the same from source and you need to implement six methods here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
       
    }

    @Override
    public Class<? extends Task> taskClass() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

    @Override
    public ConfigDef config() {
        // TODO Auto-generated method stub
        return null;
    }

And the Task needs extends SinkTask Is almost the same from source but now you don’t have the poll method and instead, you have put.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
    public String version() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
        // TODO Auto-generated method stub
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // here is where the magic happenings.
        // just add a boolean triggerFn method that trigger the function.
    }

    @Override
    public void stop() {
        // TODO Auto-generated method stub
       
    }

Run Docker

1
2
3
4
5
6
7
docker run -it --rm --name fn-oci-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v fn-sink-kafka-connector/target/fn-sink-kafka-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/fn-connector \
    debezium/connect:latest

Configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -X POST \
    http://localhost:8082/connectors \
    -H 'content-type: application/json' \
    -d '{
    "name": "FnSinkConnector",
    "config": {
      "connector.class": "com.fn.sink.kafka.connect.FnSinkConnector",
      "tasks.max": "1",
      "topics": "test-sink-topic",
      "tenant_ocid": "<tenant_ocid>",
      "user_ocid": "<user_ocid>",
      "public_fingerprint": "<public_fingerprint>",
      "private_key_location": "/path/to/kafka-connect/secrets/<private_key_name>",
      "function_url": "<FUNCTION_URL>"
    }
  }'

I created a package “http” where I add the OCI FN part, the code is based on this one here.

GitHub
You can check the full code in my GitHub.

3. Links

https://docs.confluent.io/current/connect/devguide.html

https://github.com/oracle/oci-java-sdk/blob/master/bmc-examples/src/main/java/InvokeFunctionExample.java

https://docs.cloud.oracle.com/en-us/iaas/Content/Functions/Concepts/functionsoverview.htm

Happy Coding.

Source Kafka connect

How’s the man?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture
Kafka Connect

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

TL;DR

This post is about code, I’ll show all steps to develop your own Source Kafka Connector, and in the end, I’ll show an easy way to deploy your code.
Openweathermap source kafka Connector.

Table of contents

1. Get Started
2. Use Case
3. Shema
4. AbstractConfig
5. Connector
6. Task
7. Code
8. SMTs
9. Converter
10. Transformation
11. Deploy
12. Links

1. Get Started

Kafka Connect aims to reduce the burden of connecting Kafka with external systems such as databases, key-value stores, search indexes, and even file systems. Connectors are the high-level abstractions that manage the streaming of data from and to Kafka. Each connector speaks the language of the external system it connects to, hiding this complexity from the application developer who can focus on more valuable business logic.

The best place to start when implementing your Source Connector is the Confluent Connector Development Guide.

There’s a code sample here;

This maven quickstart is used to generate a skeleton plugin for Kafka Connect. Here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0

mvn archetype:generate \
    -DarchetypeGroupId=com.github.jcustenborder.kafka.connect \
    -DarchetypeArtifactId=kafka-connect-quickstart \
    -DarchetypeVersion=2.4.0 \
    -Dpackage=com.github.jcustenborder.kafka.connect.test \
    -DgroupId=com.github.jcustenborder.kafka.connect \
    -DartifactId=testconnect \
    -DpackageName=com.github.jcustenborder.kafka.connect.test \
    -Dversion=1.0-SNAPSHOT

The nice thing about the Archetype is that it will create the boilerplate code for your connector, some basic properties, and some empty tests.

The dependencies that I used are;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-transforms</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>com.konghq</groupId>
            <artifactId>unirest-java</artifactId>
            <version>3.1.02</version>
        </dependency>

And we also need the maven plugin to generate a jar with all dependencies.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

2. Use Case

OpenWeatherMap real-time weather data website. This API is already used in a lot of examples around Development. I already used some time for IoT and Data analyse demos.
They have a free API tier, which is limited to 1 request per second, which is quite enough for tracking changes in weather. Their different API schemas return plenty of numeric and textual data, all interesting for analysis. This is a perfect example for us. I tried to choose an API that contains a more complex Json, as a result, to show more in a didactic way the Shema part.

Check the API.

A partition is a division of source records that usually depends on the source medium.
In our case, there is only one API URL and we are only ever requesting current data, so a very good partition would be query weather data for different cities and each partition would be processed by a separate task.

3. Schema

The Schema interface is one of the most important components of Kafka Connect. This allows you to define what the data looks like without having to worry about the way the data is stored.

Docs here.

The best way to start is to have a look at the API result, and you can see a sample at

https://samples.openweathermap.org/data/2.5/weather?q=London,uk&appid=b6907d289e10d714a6e88b30761fae22

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
  "coord": {
    "lon": -0.13,
    "lat": 51.51
  },
  "weather": [
    {
      "id": 300,
      "main": "Drizzle",
      "description": "light intensity drizzle",
      "icon": "09d"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 280.32,
    "pressure": 1012,
    "humidity": 81,
    "temp_min": 279.15,
    "temp_max": 281.15
  },
  "visibility": 10000,
  "wind": {
    "speed": 4.1,
    "deg": 80
  },
  "clouds": {
    "all": 90
  },
  "dt": 1485789600,
  "sys": {
    "type": 1,
    "id": 5091,
    "message": 0.0103,
    "country": "GB",
    "sunrise": 1485762037,
    "sunset": 1485794875
  },
  "id": 2643743,
  "name": "London",
  "cod": 200
}

Here if you have a look in the json you are going to see that the json has some structures inside, for example, you can see weather that contains, id, main, descriptions and so on.

We can map this in an easy way to a schema;

1
2
3
4
5
6
7
8
public static Schema WEATHER_SCHEMA = SchemaBuilder.struct()
        .name("Weather")
        .version(1)
        .field(ID, Schema.INT64_SCHEMA)
        .field(MAIN, Schema.STRING_SCHEMA)
        .field(DESCRIPTION, Schema.STRING_SCHEMA)
        .field(ICON, Schema.STRING_SCHEMA)
        .build();

But sometimes the json can contain a more complex structure and for this, we can create a schema that uses another schema as a type.
Let’s see wind

1
2
3
4
5
6
7
public static Schema WIND_SCHEMA = SchemaBuilder.struct()
        .name("Wind")
        .version(1)
        .field(SPEED, Schema.OPTIONAL_FLOAT32_SCHEMA)
        .field(DEG, Schema.OPTIONAL_INT32_SCHEMA)
        .optional()
        .build();

So now I can create a schema using the others that I created before.

1
2
3
4
5
6
7
8
public static Schema VALUE_SCHEMA = SchemaBuilder.struct()
        .name("Value Schema")
        .version(1)
        .field(NAME, Schema.STRING_SCHEMA)
        .field(WEATHER, WEATHER_ARRAY_SCHEMA)
        .field(MAIN, MAIN_SCHEMA)
        .field(WIND, WIND_SCHEMA)
        .build();

4. AbstractConfig

AbstractConfig is an abstract class that defines an interface that our WeatherAPIConfig
needs to adhere to, and here is where we get the configuration for our connector.

We need to pass the openWeatherApiKey, a list of cities, that we want to get the weather data and the kafka topic. I add pollFrequency as a hardcoded value because the API has a limitation of the number of requests in the free tier.

1
2
3
4
5
6
7
public WeatherAPIConfig(Map<String, ?> originals) {
    super(config(), originals);
    this.openWeatherApiKey = this.getString(OPEN_WEATHER_API_KEY);
    this.cities = this.getString(CITIES);
    this.pollFrequency = this.getLong(POLL_FREQUENCY);
    this.kafkaTopic = this.getString(KAFKA_TOPIC);
}

5. Connector

SourceConnector is an abstract class that defines an interface that our WeatherAPIConnector
needs to adhere to. There are six methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // here is were we implement the configuration class
}

@Override
public Class<? extends Task> taskClass() {
    // just need pass the name of the class
    return null;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    // here is a function partition strategy to split the list o cities to a different task
    return null;
}

@Override
public void stop() {
   // since we don't start anything on Star method we don't need do anything here
}

@Override
public ConfigDef config() {
    // here we already defined in the weatherAPI class
    return null;
}

6. Task

In a similar manner to the Source Connector class, we implement the Source Task abstract class. Here we have only 4 methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public String version() {
    // you can add your own version strategy
    return null;
}

@Override
public void start(Map<String, String> props) {
    // initialize the configuration and the weather API client
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
    // this method is going to be called continuous and because of the API limitation, we need to add a sleep to decrease the poll frequency.
    return null;
}

@Override
public void stop() {
    // TODO Auto-generated method stub      
}

7. Code

I create one package model where I add my DTO to the fields that I used in my schema and package shema.

WeatherAPIClient this class just calls the API and passes the list of cities.
I use the WeatherAPIConfig config to get all configurations;

1
2
3
4
5
6
7
8
public List<Weather> getCurrentWeather() {
    return Arrays.stream(config.getCities().split(","))
            .map(city -> Unirest.get(BASE_URL)
                    .queryString("q", city)
                    .queryString("APPID", config.getOpenWeatherApiKey())
                    .asObject(Weather.class))
            .map(HttpResponse::getBody)
            .collect(Collectors.toList());

8. SMTs

From an architectural point of view, SMT resides exactly between the connectors and the converters and they apply their data transformation just before storing the data into Kafka and right before extracting the data from it.

The Kafka Connect frameworks come with many built-in SMT to address the most common transformations use cases like for example:

  • mask sensitive message fields,
  • add identifiers,
  • tag events,
  • cast data types to comply to the destination,
  • convert data (e.g. timestamp conversion like time-based data conversion standardization),
  • slim down messages
  • or re-routing them via re-partitioning.

Full documentation here.

SMT is configured right beside connectors as their configuration properties reside inside their same file.

Let’s see one example using FileSource Connector

1
2
3
4
5
6
7
8
9
10
11
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

As you can see SMT can be combined and chained pretty easily by simply configuring them next to the Kafka Connect sinks and sources.

But the most important thing that should always be kept in mind is the “one message at a time”.
This is particularly crucial because it strongly limits SMT’s usage. In particular, it doesn’t permit any kind of transformation that is characterized by any kind of aggregation logic. Processing one message at a time is mandatory due to how partitions work and how the offset semantic is implemented in Kafka.

9. Converter

Here I create a package converter where I add an example of a converter, Serialize, and Deserializer. Here we have only three methods to implement.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    // get the properties
}

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
    // convert to array of bytes
    return null;
}

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
    // convert to schema value
    return null;
}

StringConverter implements Converter
StringDeserializer implements Deserializer
StringSerializer implements Serializer

10. Transformation

Here I just add an example of transformation.

IntegrityCheck> implements Transformation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void configure(Map<String, ?> props) {
    //
}

@Override
public R apply(R record) {
   // check messages with null key with schema and without schema
}

@Override
public ConfigDef config() {
    return CONFIG_DEF;
}

11. Deploy

There is an official Kafka Connect Docker image here, but you can use any kafka connect docker image like Debezium

1
2
3
4
5
6
7
docker run -it --rm --name weather-connect-demo -p 8083:8083 -e GROUP_ID=1 \
    -e BOOTSTRAP_SERVERS="bootstrap_URL" \
    -e CONFIG_STORAGE_TOPIC=”ID-config” \
    -e OFFSET_STORAGE_TOPIC=”ID-offset” \
    -e STATUS_STORAGE_TOPIC=”ID-status” \
    -v openweathermap-connector/target/openweathermap-connector-0.0.1-SNAPSHOT-jar-with-dependencies.jar/:/kafka/connect/openweathermap-connector \
    debezium/connect:latest

And we need call the rest api with this Json to start the connector;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
   “name”: “weathermap-connector”,
  “Config”: {
      “connector-class”:”com.openweathermap.kafka.connect.WeatherAPIConnector,
      “Value.converter”:”com.openweathermap.kafka.connect.converter.StringConverter,
      “value.converter.encoding”:”UTF-8,
      “tasks.max”: “1,
      “open.weather.api.key”:”12312312312312313123123123123,
      “cities”: “Ireland, Brazil”,
      “kafka.topic”:”weather”,
      “name”:”weather-connector”,
      “transform”:”ReplaceField,IntegrityCheck”,
      “transform.ReplaceField.type”:”com.openweathermap.kafka.connect.transform.ReplaceField$Value”,
      “transform.ReplaceField.blacklist”:”main”,
     "transform.IntegrityCheck.type”:”com.openweathermap.kafka.connect.transform.IntegrityCheck$Value”,
      “transform.IntegrityCheck.field”:”integrity”,
  }
}

GitHub
You can check the full code in my Github, and I did a Scala demo as well, where I just follow the same structure of folders, packages and classes.

12. Links

https://docs.confluent.io/current/connect/devguide.html

https://www.confluent.io/wp-content/uploads/Partner-Dev-Guide-for-Kafka-Connect.pdf?x18424

https://docs.confluent.io/current/connect/userguide.html

https://docs.confluent.io/3.1.1/connect/userguide.html

https://www.confluent.jp/blog/create-dynamic-kafka-connect-source-connectors/

Stay tuned! Next blog post I’ll show how to code a Sink Kafka connector.

Kafka Connect

How are you?

This blog post is part of my series of posts regarding Kafka Connect.
If you’re not familiar with Kafka, I suggest you have a look at some of my previous post;

What is Kafka?
Kafka Connect Overview
Kafka Connector Architecture

This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Source Standalone mode
2. Source Distributed mode
3. Sink
4. Don’t use Docker composer
5. Lensesio
6. Strimzi
7. Debezium
8. JDBC
9. Link

1. Source Standalone mode

Standalone mode is the best way to get started with minimum config and infrastructure setup. In this section, we will see how to configure a connector in Standalone mode and transfer file content to Kafka topic.

Here I’m going to show how to use the FileSource Connector

Let’s create a Docker compose file. “docker-compose.yml”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
zookeeper:
    image: zookeeper:3.4.9
    restart: unless-stopped
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zookeeper:2888:3888
  kafka:
    image: confluentinc/cp-kafka:5.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  kafka-connect:
    image: confluentinc/cp-kafka-connect:5.1.0
    hostname: kafka-connect
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
      CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
      - kafka

In the same directory where we have created the yaml file execute the following command to start Kafka cluster. When the below command runs for the very first time it downloads the image. Once the image is downloaded it creates a Kafka cluster.

1
docker-compose up

For starting any Kafka connect cluster we require – workers config and connector (file-stream) config.
Create two files: workers-config.properties and file-stream-connector-properties.

Workers-config.properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# we always leave the internal key to JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
# Rest API
rest.port=8086
rest.host.name=127.0.0.1
# this config is only for standalone workers
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

File-stream-connector-properties:

1
2
3
4
5
6
7
# These are standard kafka connect parameters, need for ALL connectors
name=file-stream-kafka-connect-standalone
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file/FileStreamSourceConnector.java
file=source-input.txt
topic=kafka-connect-standalone

You should see three files in the folder; docker-compose.yml, file-stream-connector-properties and workers-config.properties

Create an input file source-input.txt, the content of the file is transferred to Kafka topic.

1
touch source-input.txt

Mount a host directory in a docker container: Make sure we are in the directory where we have created the files. After mount, we automatically switch to cp-kafka-connect.

1
docker run --rm -it -v "$(pwd)":/kafka-connect/ --net=host confluentinc/cp-kafka-connect

This docker image expect the connector in this folder;
/etc/kafka-connect/jars

Create Kafka topic “kafka-connect-standalone” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-standalone --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create standalone connector using workers-config.properties and file-stream-connector-properties.

1
connect-standalone workers-config.properties file-stream-connector-properties

Now you can test: Open file source-input.txt and type some message to it & save it. The message should have been transferred to Kafka topic.

What happened in the background: We wrote data to the source file and Kafka connect standalone pushed the data to topic. No programming, just configs.

Now stop Kafka Connect (press Ctrl/Command + C). Once Kafka connect gracefully stopped, list all files in the given directory. We have a new guest in the form of a file “standalone.offsets”.
This file is created by Kafka connect to keep track of from where it should resume reading messages from the source file on re-starts.
When Kafka connect starts again it should resume reading without publishing duplicate messages to the topic.

Try to execute the above command (connect-standalone workers-config.properties file-stream-connector-properties) again and validate if you do not have a duplicate message.

2. Source Distributed mode

Now let’s configure a connector in distributed mode.

Create Kafka topic “kafka-connect-distibuted” with 3 partitions and replication factor 1.

1
kafka-topics --create --topic kafka-connect-distributed --partitions 3 --replication-factor 1 --zookeeper 127.0.0.1:2181

Create a config file “connect-source.json”.

1
2
3
4
5
6
7
8
9
name=file-stream-kafka-connect-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=source-input.txt
topic=kafka-connect-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

and run;

1
curl -d @<path-to-config-file>/connect-source.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Now write some message in the source file and once the file is saved, all messages are posted in topic “kafka-connect-distributed”.

check for the message copied from file to topic by FileSourceConnector. Note that messages are stored in JSON format as the connector topic created earlier with config value.converter.schemas.enable=true.

We can run the command to check the topic;

1
kafka-console-consumer --topic kafka-connect-distributed --from-beginning --bootstrap-server 127.0.0.1:9092

3. Sink

Now we need a sink example; Let’s look at a JDBC one. MondoBD.
We can add this to the docker composer file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 mongo-db:
    image: mongo:4.0.5
    hostname: mongo-db
    container_name: mongo-db
    expose:
      - "27017"
    ports:
      - "27017:27017"
    command: --bind_ip_all --smallfiles
    volumes:
      - ./mongo-db:/data
  mongoclient:
    image: mongoclient/mongoclient:2.2.0
    container_name: mongoclient
    hostname: mongoclient
    depends_on:
      - mongo-db
    ports:
      - 3000:3000
    environment:
      MONGO_URL: "mongodb://mongo-db:27017"
      PORT: 3000
    expose:
      - "3000"

and run;

1
curl -d @<path-to-config file>/connect-mongodb-sink.json -H "Content-Type: application/json" -X POST http://localhost:8083/connectors

Create a connect-mongodb-sink.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": 1,
        "topics": "connect-custom",
        "mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
        "mongodb.collection": "MyCollection",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

We have the following MongoDB-specific properties here:

  • mongodb.connection.uri contains the connection string for our MongoDB instance
  • mongodb.collection defines the collection
  • Since the MongoDB connector is expecting JSON, we have to set JsonConverter for key.converter and value.converter

  • And we also need schemaless JSON for MongoDB, so we have to set key.converter.schemas.enable and value.converter.schemas.enable to false

Elasticsearch

1
2
3
4
5
6
7
elasticsearch:
    image: itzg/elasticsearch:2.4.3
    environment:
      PLUGINS: appbaseio/dejavu
      OPTS: -Dindex.number_of_shards=1 -Dindex.number_of_replicas=0
    ports:
      - "9200:9200"

Create a config file “connect-elasticsearch-sink.json”.

1
2
3
4
5
6
7
8
9
10
11
name=sink-elastic-twitter-distributed
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=2
topics=kafka-connect-distributed-twitter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connection.url=http://elasticsearch:9200
type.name=kafka-connect
key.ignore=true

Here is a nice link to check about Docker options and images.

I found this with a nice and simple docker composer for single or multiple
nodes here.

4. Don’t use Docker composer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#Start Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper:3.5.5

#Start Kafka Broker
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0

#Start Kafka Connect
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=kafka_connect_configs \
-e OFFSET_STORAGE_TOPIC=kafka_connect_offsets \
-e STATUS_STORAGE_TOPIC=kafka_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
debezium/connect:1.0

5. Lensesio

Another option is to use the lenses.io image that already contains several kafka connectors pre installed and a nice user interface.

1
2
3
4
5
6
7
8
9
10
11
12
13
services:
  # this is our kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

https://lenses.io/
https://github.com/lensesio/fast-data-dev

6. Strimzi

Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. It provides container images and Operators for running Kafka on Kubernetes.

https://strimzi.io/

https://medium.com/@sincysebastian/setup-kafka-with-debezium-using-strimzi-in-kubernetes-efd494642585

https://medium.com/@vamsiramakrishnan/strimzi-on-oracle-kubernetes-engine-oke-kafka-connect-on-oracle-streaming-service-oss-a-c73cc9714e90

https://itnext.io/kafka-connect-on-kubernetes-the-easy-way-b5b617b7d5e9

7. Debezium

Debezium before, it is an open source project for applying the Change Data Capture (CDC) pattern to your applications using Kafka.

https://debezium.io/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  postgres:
    image: debezium/postgres:9.6
    ports:
     - "5432:5432"
    environment:
     - POSTGRES_USER=postgresuser
     - POSTGRES_PASSWORD=postgrespw
     - POSTGRES_DB=inventory
  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.5.2
    ports:
     - "9200:9200"
    environment:
     - http.host=0.0.0.0
     - transport.host=127.0.0.1
     - xpack.security.enabled=false
  connect:
    image: debezium/connect-jdbc-es:${DEBEZIUM_VERSION}
    build:
      context: debezium-jdbc-es
    ports:
     - 8083:8083
     - 5005:5005
    links:
     - kafka
     - mysql
     - postgres
     - elastic
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_source_connect_statuses

And of course, you can use Strimzi and Debezium together here.

8. JDBC

One of the most common integrations that people want to do with Kafka is getting data in or from a database.

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/

https://dev.to/rmoff/streaming-data-from-kafka-to-s3-video-walkthrough-2elh

9. Link

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html

https://docs.confluent.io/3.1.2/cp-docker-images/docs/quickstart.html

https://github.com/confluentinc/cp-docker-images/tree/5.3.1-post/examples

https://www.baeldung.com/kafka-connectors-guide

https://blog.softwaremill.com/do-not-reinvent-the-wheel-use-kafka-connect-4bcabb143292

https://www.confluent.io/blog/webify-event-streams-using-kafka-connect-http-sink/

Stay tuned! Next blog post I’ll show how to code your own Kafka connector.

kafka Connector Architecture

What’s the story Rory?

This blog post is part of my series of posts regarding “Kafka Connect Overview“.
If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

1. Kafka Connect
2. Source & Sink Connectors
3. Standalone & Distributed
4. Converters & Transforms
5. Life cycle
6. Code
7. Books
8. Link

1. Kafka Connect

Kafka Connects goal of copying data between systems has been tackled by a variety of frameworks, many of them still actively developed and maintained. This section explains the motivation behind Kafka Connect, where it fits in the design space, and its unique features and design decisions

Kafka Connect has three major models in its design:

  • Connector model
  • Worker model
  • Data model

The connector model addresses three key user requirements. First, Kafka Connect performs broad copying by default by having users define jobs at the level of Connectors which then break the job into smaller Tasks. This two level scheme strongly encourages connectors to use configurations that encourage copying broad swaths of data since they should have enough inputs to break the job into smaller tasks. It also provides one point of parallelism by requiring Connectors to immediately consider how their job can be broken down into subtasks, and select an appropriate granularity to do so. Finally, by specializing source and sink interfaces, Kafka Connect provides an accessible connector API that makes it very easy to implement connectors for a variety of systems.

The worker model allows Kafka Connect to scale to the application. It can run scaled down to a single worker process that also acts as its own coordinator, or in clustered mode where connectors and tasks are dynamically scheduled on workers. However, it assumes very little about the process management of the workers, so it can easily run on a variety of cluster managers or using traditional service supervision. This architecture allows scaling up and down, but Kafka Connect’s implementation also adds utilities to support both modes well. The REST interface for managing and monitoring jobs makes it easy to run Kafka Connect as an organization-wide service that runs jobs for many users. Command line utilities specialized for ad hoc jobs make it easy to get up and running in a development environment, for testing, or in production environments where an agent-based approach is required.

The data model addresses the remaining requirements. Many of the benefits come from coupling tightly with Kafka. Kafka serves as a natural buffer for both streaming and batch systems, removing much of the burden of managing data and ensuring delivery from connector developers. Additionally, by always requiring Kafka as one of the endpoints, the larger data pipeline can leverage the many tools that integrate well with Kafka. This allows Kafka Connect to focus only on copying data because a variety of stream processing tools are available to further process the data, which keeps Kafka Connect simple, both conceptually and in its implementation. This differs greatly from other systems where ETL must occur before hitting a sink. In contrast, Kafka Connect can bookend an ETL process, leaving any transformation to tools specifically designed for that purpose. Finally, Kafka includes partitions in its core abstraction, providing another point of parallelism

In simples words

Kafka Connect is a distributed, scale, fault-tolerant service designed to reliably stream data between Kafka and other data systems. Data is produced from a source and consumed to a sink.

Connect tracks the offset that was last consumed for a source, to restart task at the correct starting point after a failure. These offsets are different from Kafka offsets, they are based on the sourse system like database, file, etc.

In standalone mode, the source offset is tracked in a local file and in a distributed mode, the source offset is tracked in a Kafka topic.

2. Source & Sink connectors

Producers and Consumers provide complete flexibility to send any data to Kafka or process in any way. This flexibility means you do everything yourself.

Kafka Connect’s simple frameworks allows;

  • developers to create connectors that copy data to or from others systems.
  • Operators to use said connectors just by writing configuration files and submitting them to Connect. (No code)
  • Community and 3rd-party engineers to build reliable plugins for common data sources and sinks.
  • Deployments to deliver fault-tolerant and automated load balance out-of-the-box.

And the frameworks do the hard work;

  • Serialization and deserialization.
  • Schema registry integration.
  • Fault-tolerant and failover.
  • Partitioning and scale-out.
  • And let the developers focus on domain specific details.

3. Standalone & Distributed

In standalone mode we have a source or a Sink and a Kafka broker, when we deploy Kafka connect in the standalone we actually need to pass a configuration file containing all the connection properties that we need to run. So standalone’s main way of providing configuration to our connector is by using properties files and not the Rest API.

In the distributed mode we usually have more than one worker, since these workers can be in different machines or containers they can not share the same storage space, so a properties file is out of the question. Instead kafka connect in distributed mode leverage kafka topics in order to sink between themselves.

4. Converters & Transforms

Pluggable API to convert data between native formats and Kafka. Just like the name says. Converters are used to come for data from a format to another.
In the Source connectors converters are invoked after the data has been fetched from the source and before it is published to kafka.
In the Sink connectors converters are invoked after the data has been consumed from Kafka and before it is stored in the sink.

Apache Kafka ships with Json Converter.

Transform is a simple operation that can be applied in the message level.

There’s a nice blog post about Single message transforms here.

Single message transforms SMT modify events before storing in Kafka, mask sensitive information, add identifiers, tag events, remove unnecessary columns and more, modify events going out of Kafka, route high priority events to faster datastore, cast data types to match destination and more.

5. Life cycle

I build this animation using the slides from (Randall Hauch, Confluent) Kafka Summit SF 2018 here.

Sequence diagram

6. Code

Confluent makes available a code example that you can found here.

Confluent hub

7. Books

Modern Big Data Processing with Hadoop

8. Links

https://docs.confluent.io/current/connect/managing/confluent-hub/component-archive.html

https://docs.confluent.io/current/connect/design.html

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

Kafka Connect Overview

How’s it going horse?

If you’re not familiar with Kafka, I suggest you have a look at my previous post “What is Kafka?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion. The idea is that I’ll create a series of posts regarding Kafka Connect. Stay tuned!

Table of contents

1. What is Kafka Connect?
2. Concepts
3. Features
4. Why Kafka connect?
5. Courses
6. Books
7. Influencers List
8. Link

1. What is Kafka Connect?

Kafka Connect (or Connect API) is a framework to import/export data from/to other systems. It was added in the Kafka 0.9.0.0 release and uses the Producer and Consumer API internally. The Connect framework itself executes so-called “connectors” that implement the actual logic to read/write data from other systems. The Connect API defines the programming interface that must be implemented to build a custom connector. Many open source and commercial connectors for popular data systems are available already. However, Apache Kafka itself does not include production ready connectors.

Connectors are meant to provide a simple way of connecting to external systems, only requiring a configuration file, while the scaling, distribution and persistence of state is handled by the framework for you. Connectors for common things like JDBC exist already at the Confluent Hub.
Official blog announcement and overview

For example;
Sometimes you need to process streams of data that are not in your Kafka cluster. These data may be located in a SQL database like SQL Server, MySQL or a simple CSV file. In order to process those data, you have to move them from your database to the Kafka cluster. To this end, you have some options and two of them are:

  1. Create an application that reads data from your source storage system and produces them to Kafka cluster.
  2. Or use Kafka Connect to move your data easily from source storage system to your Kafka cluster.

If you choose the first option you need to write codes that move your data to the Kafka cluster. Your code must deal with the failure of your application (for example it must store the offset of the last record of tables that are moved to Kafka, so it can continue to copy the records that were not inserted into Kafka), scalability, polling and much more.

But if you choose the second option you can move data without writing any code. The Kafka Connect does the same job as the first option but in a scalable and fault-tolerant way. The process of copying data from a storage system and move it to Kafka Cluster is so common that Kafka Connect tool is created to address this problem.

Kafka connectors provide some powerful features. They can be easily configured to route unprocessable or invalid messages to a dead letter queue, apply Single Message Transforms before a message is written to Kafka by a source connector or before it is consumed from Kafka by a sink connector, integrate with Confluent Schema Registry for automatic schema registration and management, and convert data into types such as Avro or JSON. By leveraging existing connectors, developers can quickly create fault-tolerant data pipelines that reliably stream data from an external source into records in Kafka topics or from Kafka topics into an external sink, all with mere configuration and no code!

2. Concept

To efficiently discuss the inner workings of Kafka Connect, it is helpful to establish a few major concepts, and of course, I suggest a look in the official docs, here.

  • Connectors – the high level abstraction that coordinates data streaming by managing tasks
  • Tasks – the implementation of how data is copied to or from Kafka
  • Workers – the running processes that execute connectors and tasks
  • Converters – the code used to translate data between Connect and the system sending or receiving data
  • Transforms – simple logic to alter each message produced by or sent to a connector
  • Dead Letter Queue – how Connect handles connector errors

Each connector instance can break down its job into multiple tasks, thereby parallelizing the work of copying data and providing scalability. When a connector instance starts up a task, it passes along the configuration properties that each task will need. The task stores this configuration as well as the status and the latest offsets for the records it has produced or consumed externally in Kafka topics. Since the task does not store any state, tasks can be stopped, started, or restarted at any time. Newly started tasks will simply pick up the latest offsets from Kafka and continue on their merry way.

Workers are a physical concept. They are processes that run inside JVM. Your job in Kafka Connect concepts is called a connector. It is something like this:

  • Copy records from table ‘accounts’ of my MySQL to Kafka topic ‘accounts’. It is called a source connector because you move data from external storage to Kafka.
  • Copy each message from Kafka topic ‘product-events’ to a CSV file ‘myfile.csv’. It is called a sink connector because you move data from Kafka to external storage.

Kafka Connect uses workers for moving data. Workers are just simple Linux (or any other OS) processes. Kafka Connect can create a cluster of workers to make the copying data process scalable and fault tolerant. Workers need to store some information about their status, their progress in reading data from external storage and so on. To store that data, they use Kafka as their storage. Note that Kafka Connect cluster (which is a cluster of workers) is completely different from the Kafka cluster (which is a cluster of Kafka brokers). More workers mean that your copying process is more fault tolerant.

Standalone vs. Distributed Mode

There are two modes for running workers: standalone mode and distributed mode. You should identify which mode works best for your environment before getting started.

Standalone mode is useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to Kafka).

Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. You can add more nodes or remove nodes as your needs evolve.

Distributed mode is also more fault tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. And, because of Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs do not result in any lost data.

Converters are necessary to have a Kafka Connect deployment support a particular data format when writing to or reading from Kafka. Tasks use converters to change the format of data from bytes to a Connect internal data format and vice versa.

3. Features

A common framework for Kafka connectors
It standardizes the integration of other data systems with Kafka. Also, it simplifies connector development, deployment, and management.

Distributed and standalone modes
Scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments.

REST interface
By an easy to use REST API, we can submit and manage connectors to our Kafka Connect cluster.

Automatic offset management
However, Kafka Connect can manage the offset commit process automatically even with just a little information from connectors. Hence, connector developers do not need to worry about this error-prone part of connector development.

Distributed and scalable by default
It builds upon the existing group management protocol. And to scale up a Kafka Connect cluster we can add more workers.

Streaming/batch integration
We can say for bridging streaming and batch data systems, Kafka Connect is an ideal solution.

Transformations
Enable to make simple and lightweight modifications to individual messages


From Zero to Hero with Kafka Connect by Robin Moffatt

4. Why Kafka Connect

Auto-recovery After Failure
To each record, a “source” connector can attach arbitrary “source location” information which it passes to Kafka Connect. Hence, at the time of failure Kafka Connect will automatically provide this information back to the connector. In this way, it can resume where it failed. Additionally, auto recovery for “sink” connectors is even easier.

Auto-failover
Auto-failover is possible because the Kafka Connect nodes build a Kafka cluster. That means if suppose one node fails the work that it is doing is redistributed to other nodes.

Simple Parallelism
A connector can define data import or export tasks, especially which execute in parallel.

5. Courses

https://www.udemy.com/course/kafka-connect/

https://www.pluralsight.com/courses/kafka-connect-fundamentals

6. Books

Kafka: The Definitive Guide is the best option to start. There is a chapter “Kafka Connect”

Building Data Streaming Applications with Apache Kafka – there is a nice chapter “Deep dive into Kafka Connect”

7. Influencers List

@rmoff
@tlberglund

8. Links

https://www.confluent.io/blog/announcing-kafka-connect-building-large-scale-low-latency-data-pipelines/

https://docs.confluent.io/current/connect/concepts.html

Get Started COVID-19

How’s the craic?

Get Started COVID-19

You are probably aware about Coronavirus or Covid-19 pandemic.

It is a good chance that you are reading this while self quarantined at your home, social distancing from public and like many, have very much extra time to deal with.

TL;DR
I came with the idea to create this blog post to maybe help or give some ideas for you to start something. if you are familiar with programming, Developer, Coder, Data something, you should give a go and try or start something.
This post is a collection of links, videos, tutorials and blogs that I found mixed with my opinion.

Table of contents

1. Some articles to start
2. Datasets
3. APIs
4. Dashboards
5. Ideas
6. Code
7. Link

1.Some articles to start

You can start with reading “Coronavirus: Why You Must Act Now”, this article has received over 40 million views in the last week. And “Coronavirus the hammer and the dance

2.Datasets

In the wake of the Coronavirus outbreak, many data sources have been made available to the public in an effort to encourage research in the field. Recently, the White House and a group of leading researchers published the COVID-19 Open Research Dataset (CORD-19), which is available on Kaggle.
Kaggle calls data scientists to action on COVID-19

I found these three links

https://www.kaggle.com/sudalairajkumar/novel-corona-virus-2019-dataset
https://www.kaggle.com/allen-institute-for-ai/CORD-19-research-challenge
https://www.kaggle.com/kimjihoo/coronavirusdataset

This link is the most famous or quoted for data source;
https://github.com/CSSEGISandData/COVID-19

3.APIs

But I found some APIs as well;

https://covid-19-apis.postman.com/
https://thevirustracker.com/api
https://covid19api.com/
https://github.com/NovelCOVID/API
https://github.com/ahmadawais/corona-cli
https://github.com/ExpDev07/coronavirus-tracker-api

4.Dashboards

List of Dashboards;

Maybe the famous one;
https://coronavirus.jhu.edu/map.html

A complete one and interactive;
http://gabgoh.github.io/COVID/index.html

A ready nice one about “Flatten the curve”
https://www.washingtonpost.com/graphics/2020/world/corona-simulator/

https://towardsdatascience.com/covid-19-open-source-dashboard-fa1d2b4cd985

https://towardsdatascience.com/covid-19-dashboard-b7f8b7c59431

https://towardsdatascience.com/covid-19-interactive-power-bi-map-of-total-cases-by-us-state-and-county-e4ad7fdd0f10

https://www.covidvisualizer.com/

5.Ideas

There are several ideas around, and because the ideas are based on data analyses and similar topics Oracle OCI data Science is a good place to give a go.
Start free
Oracle DataScience Docs
video

You can use an open-source OCI Marketplace image as well, (GPU as optional), and talking about GPU, NVidia provides a free 90-day license to Parabricks

I created a list of links with some ideas that I found;

Just a simple start point to read the data and start playing with Python and Pandas
https://towardsdatascience.com/how-to-get-started-analyzing-covid-19-data-808822437c32

https://towardsdatascience.com/exploring-covid-19-research-publications-407f8c2aa842

Text Analysis
https://towardsdatascience.com/using-topological-text-analysis-for-covid-19-open-research-challenge-184d44bb92a6

https://towardsdatascience.com/machine-learning-the-coronavirus-9cb8352e1b36

https://medium.com/@noahhaber/flatten-the-curve-of-armchair-epidemiology-9aa8cf92d652

Apache Spark
https://medium.com/@sunetrobanerjee/understanding-covid-19-coronavirus-with-apache-spark-and-zeppelin-70c285097b68

SNA
https://www.againstcovid19.com/singapore/clusters

https://theconversation.com/how-to-model-a-pandemic-134187

There are some initiatives in the maker community as well;

https://www.forbes.com/sites/alexandrasternlicht/2020/03/18/theres-a-shortage-of-ventilators-for-coronavirus-patients-so-this-international-group-invented-an-open-source-alternative-thats-being-tested-next-week/#58ddd9493ba0

ventilator hackathon

Ultimate Medical Hackathon: How Fast Can We Design and Deploy an Open Source Ventilator?

Facebook group – Open Source COVID19 Medical Supplies

And some ideas regarding image analysis

https://www.pyimagesearch.com/2020/03/16/detecting-covid-19-in-x-ray-images-with-keras-tensorflow-and-deep-learning/

https://towardsdatascience.com/detecting-covid-19-induced-pneumonia-from-chest-x-rays-with-transfer-learning-an-implementation-311484e6afc1

https://github.com/ieee8023/covid-chestxray-dataset

There are some Hackathons around as well;

https://covid-global-hackathon.devpost.com

https://www.codevscovid19.org/

You can search #BuildforCOVID19

6.Code

1) You can build your own epidemic model at home.

I just made some small changes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import pandas as pd
import matplotlib.pyplot as plt
import math
import glob
import random
from PIL import Image


def point(xlimit,ylimit):
    x = random.uniform(0,xlimit)
    y = random.uniform(0,ylimit)
    return x,y


def Generate(GrupSize,xlimit,ylimit):
    df = pd.DataFrame(columns='X,Y,Covid-19,Day'.split(','))
   
    for i in range(GrupSize):
        df.loc[i,'X'], df.loc[i,'Y'] = point(xlimit,ylimit)
        df.loc[i,'Covid-19'] = False
   
    samplesize = math.floor(GrupSize/100)
    MoversList = df.sample(n = samplesize).index.values.tolist()
   
    StatofDay = pd.DataFrame(columns='Healthy,Covid-19(+),Hospitalized,Cured,Dead'.split(','))
    return df, StatofDay, MoversList

   
def plt1color(df):
    cols=[]
    for l in df.index:
        if df.loc[l,'Covid-19']==True: #Infected
            cols.append('red')
        elif df.loc[l,'Covid-19']==666: #Dead
            cols.append('black')
        elif df.loc[l,'Covid-19']==115: #Hospitalized
            cols.append('yellow')
        elif df.loc[l,'Covid-19']==7: #Cured
            cols.append('green')
        else:
            cols.append('blue') #Healthy
    return cols


def plt2color(Stat):
    cols=[]
    for i in Stat.columns:
        if i=='Covid-19(+)': #Infected
            cols.append('red')
        elif i=='Dead': #Dead
            cols.append('black')
        elif i=='Hospitalized': #Hospitalized
            cols.append('yellow')
        elif i=='Cured': #Cured
            cols.append('green')
        else:
            cols.append('blue') #Healthy
    return cols


def Plot():
    global df, fig, Stat, Day, Moverslist
    cols=plt1color(df)
    ld = ['Healthy','Covid-19(+)','Hospitalized','Cured','Death Toll']
    axs[0].cla()
    axs[0].scatter(df['X'],df['Y'],s=1,c=cols)
    for i in MoversList:
        axs[0].scatter(df.loc[i,'X'],df.loc[i,'Y'],s=6,facecolors='none', edgecolors='black')
        axs[0].text(df.loc[i,'X']+0.02, df.loc[i,'Y']+0.02, str(i), fontsize=5)
    cols=plt2color(Stat)
    sDay = str(Day)
    title = 'Day' + sDay
    axs[0].set_title(title,loc='left')
    axs[0].set_yticklabels([])
    axs[0].set_xticklabels([])
    axs[0].tick_params(
#    axis='both',       # changes apply to the x-axis
    which='both',      # both major and minor ticks are affected
    bottom=False,      # ticks along the bottom edge are off
    top=False,         # ticks along the top edge are off
    right=False,      # ticks along the right edge are off
    left=False,         # ticks along the left edge are off
    labelbottom=False) # labels along the bottom edge are off
    axs[1].cla()
    axs[1].plot(Stat.Healthy,label=ld[0],color=cols[0])
    axs[1].plot(Stat['Covid-19(+)'],label=ld[1],color=cols[1])
    axs[1].plot(Stat.Hospitalized,label=ld[2],color=cols[2])
    axs[1].plot(Stat.Cured,label=ld[3],color=cols[3])
    axs[1].plot(Stat.Dead,label=ld[4],color=cols[4])
#    axs[1].set_prop_cycle(color=cols)
    axs[1].legend(bbox_to_anchor=(0, 1), loc='upper left', borderaxespad=0.)
    plt.xlabel('Days')
#    plt.show()
    if Day<10 : sDay = '0' + sDay
    title = 'Day' + sDay + '.png'
    plt.savefig(title)

    return


def Png_to_gif():  
    # Create frames
    frames = []
    imgs = sorted(glob.glob("*.png"))
    for i in imgs:
        new_frame = Image.open(i)
        frames.append(new_frame)
       
    # Save into GIF
    frames[0].save('png_to_gif.gif', format='GIF',
          append_images=frames[1:],
          save_all=True,
          duration=500, loop=0)


def infect(Person):
    global df,Day
    if random.random()>0.25 and Day>3 : return
    if df.loc[Person,'Covid-19']==False:
        df.loc[Person,'Covid-19'], df.loc[Person,'Day'] = True, Day
   
   
def Move(xlimit,ylimit):
    """
    Move Movers Randomly
    """

    global df, MoversList
    for i in MoversList:
        if (df.loc[i,'Covid-19']==115) or (df.loc[i,'Covid-19']==666) :
           MoversList.remove(i)
        df.loc[i,'X'], df.loc[i,'Y'] = (df.loc[i,'X']+random.uniform(1,xlimit/3))%xlimit, (df.loc[i,'Y']+random.uniform(1,ylimit/3))%ylimit


def check(i,j):
    global df, YesterdayPatients, Distlimit
    Dist = math.sqrt((df.loc[i,'X']-df.loc[j,'X'])**2+(df.loc[i,'Y']-df.loc[j,'Y'])**2)
    flag = ((YesterdayPatients[i]==True) ^ (YesterdayPatients[j]==True)) and Dist<Distlimit
    return flag
   

def interact():
    global Day, df
    for i in range(len(df)):
        for j in range(i):
            if check(i,j):
                if (df.loc[i,'Covid-19']==False) :
                    infect(i)
                else:
                    infect(j)


def kill():
    global df
    samplesize = math.floor(len(df[df['Covid-19']==True])*.005+len(df[df['Covid-19']==115])*.005)
    if samplesize>len(df[df['Covid-19']==True]):
        return
    df.loc[df[df['Covid-19']==True].sample(n = samplesize).index.values.tolist(),'Covid-19']=666
    return


def hospitilize():
    global df
    samplesize = math.floor(len(df[df['Covid-19']==True])*0.03)
    if samplesize>len(df[df['Covid-19']==True]):
        return
    df.loc[df[df['Covid-19']==True].sample(n = samplesize).index.values.tolist(),'Covid-19']=115
    return


def cure():
    global df, Day
    df.loc[(df['Day']<Day-10) & (df['Covid-19']==True) ,'Covid-19'] = 7
    df.loc[(df['Day']<Day-21) & (df['Covid-19']==115) ,'Covid-19'] = 7
    return
   
   
def Tomorrow(): # To Be checked and Resolved!!!
    global df, Day
    Day +=1
    kill()
    hospitilize()
    cure()
    Move(xlimit,ylimit)
    interact()


def Count(Day):
    global df, Stat
    List = list(df['Covid-19'])
    Stat.loc[Day,'Healthy'] = List.count(False)
    Stat.loc[Day,'Covid-19(+)'] = List.count(True)    
    Stat.loc[Day,'Hospitalized'] = List.count(115)    
    Stat.loc[Day,'Cured'] = List.count(7)
    Stat.loc[Day,'Dead'] = List.count(666)
     
    return
   

def write_log(*args):
    global log_file
    line = ' '.join([str(a) for a in args])
    log_file.write(line+'\n')
    print(line)
   

# Main ---
log_file = open("Log.txt","w+")

n = 1000
xlimit,ylimit=30,30
Distlimit = 1.5

write_log(31*'-')
write_log("Here's the Input Data:")
write_log(8*'- - ')
write_log('Numper of Sample:',n)
write_log('X & Y limites: ',xlimit,', ',ylimit)
write_log('Distance required for Contamination:', Distlimit)

# Day = 0,  Generating Model...
Day = 0

df, Stat, MoversList = Generate(n,xlimit,ylimit)
infect(random.randrange(n))
fig, axs = plt.subplots(2)
fig.suptitle('Covid-19 Epidemic Sample Model', fontsize=16)
Plot()
Count(Day)
write_log(31*'-')
write_log('Day:',Day)
write_log(8*'- - ')    
write_log(Stat.loc[Day])

# Day=1
YesterdayPatients = list(df['Covid-19'])
Tomorrow()
Plot()
Count(Day)
write_log(31*'-')
write_log('Day:',Day)
write_log(8*'- - ')    
write_log(Stat.loc[Day])
   
#Main Loop ---

countsames = 0
while Stat.loc[Day, 'Healthy']>0 and Day<100:
    log_file = open("Log.txt","a+")
    if (list(Stat.loc[Day])==list(Stat.loc[Day-1])):
        countsames +=1
        if countsames>2 : break
    else :
        countsames = 0
   
    YesterdayPatients = list(df['Covid-19'])
    Tomorrow()
    Plot()
    Count(Day)
    write_log(31*'-')
    write_log('Day:',Day)
    write_log(8*'- - ')    
    write_log(Stat.loc[Day])
    log_file.close()

Png_to_gif()
Stat.to_excel('Stat.xlsx')
Stat.plot(title='Statistical Data Vs. Days Passed')
plt.savefig('Stat')

2) You can visualize the impact of social distancing

I just made some small changes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


DAYS = 180
POPULATION = 100000
SPREAD_FACTOR = 0.05
DAYS_TO_RECOVER = 10
INITIALLY_AFFECTED = 4

city = pd.DataFrame(data={'id': np.arange(POPULATION), 'infected': False, 'recovery_day': None, 'recovered': False})
city = city.set_index('id')

firstCases = city.sample(INITIALLY_AFFECTED, replace=False)
city.loc[firstCases.index, 'infected'] = True
city.loc[firstCases.index, 'recovery_day'] = DAYS_TO_RECOVER

stat_active_cases = [INITIALLY_AFFECTED]
stat_recovered = [0]

for today in range(1, DAYS):
    city.loc[city['recovery_day'] == today, 'recovered'] = True
    city.loc[city['recovery_day'] == today, 'infected'] = False

    spreadingPeople = city[ (city['infected'] == True)]
    totalCasesToday = round(len(spreadingPeople) * SPREAD_FACTOR)
    casesToday = city.sample(totalCasesToday, replace=True)
    # Ignore already infected or recovered people
    casesToday = casesToday[ (casesToday['infected'] == False) & (casesToday['recovered'] == False) ]
    # Mark the new cases as infected
    city.loc[casesToday.index, 'infected'] = True
    city.loc[casesToday.index, 'recovery_day'] = today + DAYS_TO_RECOVER

    stat_active_cases.append(len(city[city['infected'] == True]))
    stat_recovered.append(len(city[city['recovered'] == True]))


title = "Spread Factor " + str(SPREAD_FACTOR).replace('.', '')
fig = plt.figure(figsize=(16, 8))
plt.bar(np.arange(DAYS), stat_active_cases, color="red")
plt.text(145, 90000, title, fontsize=14)
#plt.show()
plt.savefig(title)

3) You cam use this code in both ideas to generate the animated gif.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import glob
from PIL import Image


def Png_to_gif():  
    # Create frames
    frames = []
    imgs = sorted(glob.glob("*.png"))
    for i in imgs:
        new_frame = Image.open(i)
        frames.append(new_frame)
       
    # Save into GIF
    frames[0].save('result.gif', format='GIF',
          append_images=frames[1:],
          save_all=True,
          duration=500, loop=0)

# Main ---
Png_to_gif()

4) And of course, I did something with Raspberry Pi as well;

A simple Hello World example using GFX-Hat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import requests
import time
import signal
from gfxhat import touch, lcd, backlight, fonts
from PIL import Image, ImageFont, ImageDraw

resp = requests.get('https://thevirustracker.com/free-api?global=stats')
if resp.status_code != 200:
    # This means something went wrong.
    raise ApiError('GET /tasks/ {}'.format(resp.status_code))

dic = resp.json()
total_cases = 'total cases {}'.format(dic['results'][0]['total_cases'])
total_recovered = 'total recovered {}'.format(dic['results'][0]['total_recovered'])
total_deaths = 'total deaths {}'.format(dic['results'][0]['total_deaths'])
new_cases = 'total new cases today {}'.format(dic['results'][0]['total_new_cases_today'])

led_states = [False for _ in range(6)]

width, height = lcd.dimensions()

image = Image.new('P', (width, height))

draw = ImageDraw.Draw(image)

font = ImageFont.truetype(fonts.AmaticSCBold, 12)

text1 = total_cases
text2 = total_recovered
text3 = total_deaths
text4 = new_cases
#text = "Hello World"

w, h = font.getsize(text1)

x = (width - w) // 2
y = (height - h) // 2

draw.text((x, y-25), text1, 1, font)
draw.text((x, y-13), text2, 1, font)
draw.text((x, y), text3, 1, font)
draw.text((x, y+13), text4, 1, font)

def handler(ch, event):
    if event == 'press':
        led_states[ch] = not led_states[ch]
        touch.set_led(ch, led_states[ch])
        if led_states[ch]:
            backlight.set_pixel(ch, 0, 255, 255)
        else:
            backlight.set_pixel(ch, 0, 255, 0)
        backlight.show()

for x in range(6):
    touch.set_led(x, 1)
    time.sleep(0.1)
    touch.set_led(x, 0)

for x in range(6):
    backlight.set_pixel(x, 0, 255, 0)
    touch.on(x, handler)

backlight.show()

for x in range(128):
    for y in range(64):
        pixel = image.getpixel((x, y))
        lcd.set_pixel(x, y, pixel)


lcd.show()

try:
    signal.pause()
except KeyboardInterrupt:
    for x in range(6):
        backlight.set_pixel(x, 0, 0, 0)
        touch.set_led(x, 0)
    backlight.show()
    lcd.clear()
    lcd.show()

7.Links

These are the main points to get help;

https://www.who.int/emergencies/diseases/novel-coronavirus-2019/situation-reports

https://covid-oss-help.org/

Github
https://github.blog/2020-03-23-open-collaboration-on-covid-19/

While You are at Home
https://makeymakey.com/blogs/blog/creativity-matters-free-resources-from-authors-while-you-are-at-home

city
https://citymapper.com/

https://www.visualcapitalist.com/global-pandemic-preparedness-ranked/

This one I found an amazing idea:
https://howmuchtoiletpaper.com/

I suggest have a look at “towardsdatascience” and twitter because there are several new kinds of stuff every day about COVID-19.

OGB Appreciation Day – Face Recognition in an easy way

How’s it going horse?

It’s Oracle Groundbreaker’s Appreciation Day!

Today it’s #ThanksOGB day and I decided to join the idea with a post about Face Recognition in an easy way using Oracle OCI Marketplace Nvidia image.

What is OGB Appreciation day?
OGB Appreciation day

Don’t forget to search for tweets with #ThanksOGB.

Oracle blog post my article about Face Recognition in 4 lines of code.

Today I want to show how you can create an Oracle OCI instance using the Marketplace and configure the environment to run this example easily with the Nvidia image.

Oracle OCI Marketplace

You can follow this link on how to create an image using the marketplace. Just change to use Nvidia image!

Once you have the instance running just ssh the image and run:

1
2
3
4
5
6
7
sudo apt install python3-pip
sudo apt-get install python3-setuptools
wget http://dlib.net/files/dlib-19.17.tar.bz2
tar jxvf dlib-19.17.tar.bz2
cd dlib-19.17
sudo python3 setup.py install
sudo pip3 install face_recognition

Now you are ready to play with Face Recognition.

You can use my Python code to Streaming your Raspberry Pi camera feeds.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server

PAGE="""\
<html>
<head>
<title>Raspberry Pi - Surveillance Camera</title>
</head>
<body>
<center><h1>Raspberry Pi - Surveillance Camera</h1></center>
<center><img src="stream.mjpg" width="640" height="480"></center>
</body>
</html>
"""


class StreamingOutput(object):
    def __init__(self):
        self.frame = None
        self.buffer = io.BytesIO()
        self.condition = Condition()

    def write(self, buf):
        if buf.startswith(b'\xff\xd8'):
            # New frame, copy the existing buffer's content and notify all
            # clients it's available
            self.buffer.truncate()
            with self.condition:
                self.frame = self.buffer.getvalue()
                self.condition.notify_all()
            self.buffer.seek(0)
        return self.buffer.write(buf)

class StreamingHandler(server.BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/':
            self.send_response(301)
            self.send_header('Location', '/index.html')
            self.end_headers()
        elif self.path == '/index.html':
            content = PAGE.encode('utf-8')
            self.send_response(200)
            self.send_header('Content-Type', 'text/html')
            self.send_header('Content-Length', len(content))
            self.end_headers()
            self.wfile.write(content)
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header('Age', 0)
            self.send_header('Cache-Control', 'no-cache, private')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
            self.end_headers()
            try:
                while True:
                    with output.condition:
                        output.condition.wait()
                        frame = output.frame
                    self.wfile.write(b'--FRAME\r\n')
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', len(frame))
                    self.end_headers()
                    self.wfile.write(frame)
                    self.wfile.write(b'\r\n')
            except Exception as e:
                logging.warning(
                    'Removed streaming client %s: %s',
                    self.client_address, str(e))
        else:
            self.send_error(404)
            self.end_headers()

class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
    allow_reuse_address = True
    daemon_threads = True

with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
    output = StreamingOutput()
    #Uncomment the next line to change your Pi's Camera rotation (in degrees)
    #camera.rotation = 90
    camera.start_recording(output, format='mjpeg')
    try:
        address = ('', 8000)
        server = StreamingServer(address, StreamingHandler)
        server.serve_forever()
    finally:
        camera.stop_recording()

Just change the code for instead of 0 use the streaming url, in the example here: http://ipaddress:8000/stream

1
video_capture = cv2.VideoCapture(0)

Happy face recognition.

Link

pyimagesearch

What is Stream processing?

Hey you!

if you’re not familiar with Big Data or Data lake, I suggest you have a look at my previous post “What is Big Data?” and “What is data lake?” before.
This post is a collection of links, videos, tutorials, blogs and books that I found mixed with my opinion.

Table of contents

01. What is Stream processing?
02. Martin Kleppmann
03. Typical use cases
04. Pattern
05. Evaluation: Choose a Stream Processing Framework or a Product or Both?
06. Vertical vs. Horizontal Scaling
07. Streaming is better with SQL
08. Streaming Windows
09. Why Stream Processing
10. Final considerations
11. Book
12. Influence’s List
13. Links

Stream processing is becoming something like a “grand unifying paradigm” for data processing. Outgrowing its original space of real-time data processing, stream processing is becoming a technology that offers new approaches to data processing (including batch processing), real-time applications, and even distributed transactions.

1. What is Stream Processing?

Stream processing is the act of continuous incorporate new data to compute a result. In stream processing, the input data is unbounded and has no predetermined beginning or end. It simply forms a series of events that arrives at the stream processing system e.g. credit card transactions, clicks on a website, or sensor readings from internet of things devices.

Streaming is a data distribution technique where data producers write data records into an ordered data stream from which data consumers can read that data in the same order. Here is a simple data streaming diagram illustrating a data producer, a data stream and a data consumer

Each data streaming product makes a certain set of assumptions about the use cases and processing techniques to support. These assumptions leads to certain design choices, which affect what types of stream processing behaviour you can implement with them.

From wikipedia;

Stream processing is a computer programming paradigm, equivalent to dataflow programming, event stream processing, and reactive programming, that allows some applications to more easily exploit a limited form of parallel processing.
Stream Processing is a powerful technology that can scan huge volumes of data coming from sensors, credit card swipes, clickstreams and other inputs, and find actionable insights nearly instantaneously. For example, Stream Processing can detect a single fraudulent transaction in a stream containing millions of legitimate purchases, act as a recommendation engine to determine what ad or promotion to display for a particular customer while he or she is actually shopping or compute the optimal price for a car service ride in only a few seconds.

The term “Stream Processing” means that the data is coming into the processing engine as a continuous “stream” of events produced by some outside system or systems, and the processing engine works so fast that all decisions are made without stopping the data stream and storing the information first.

Streaming data and event-driven architectures are rising in popularity. The ideas have been around for a while, but technological and architectural advances have made into reality capabilities like stream processing and even function-based (aka “serverless”) computing. In many cases, the ability to act on data quickly is more valuable than a new method for batch-processing or historical data analysis.

I Googled about and I found;

Streaming is processing of data in motion.
Streaming is data that is continuously generated by different sources.
Streaming is the continuous high-speed transfer of large amounts of data from a source system to a target.
Programming paradigm that allows some applications to more easily exploit a limited form of parallel processing.

Streaming decouple data producers and data consumers from each other. When a data producer simply writes its data to a data stream, the producer does not need to know the consumers that read the data. Consumers can be added and removed independently of the producer. Consumers can also start and stop or pause and resume their consumption without the data producer needing to know about it. This decoupling simplifies the implementation of both data producers and consumers.

A data stream can be persistent, in which case it is sometimes referred to as a log or a journal. A persistent data stream has the advantage that the data in the stream can survive a shutdown of the data streaming service, so no data records are lost.
Persistent data streaming services can typically hold larger amounts of historic data than a data streaming service that only holds records in memory. Some data streaming services can even hold historic data all the way back to the first record written to the data stream. Others only hold e.g. a number of days of historic data.
In the cases where a persistent data stream holds the full history of records, consumers can replay all these records and recreate their internal state based on these records. In case a consumer discovers a bug in its own code, it can correct that code and replay the data stream to recreate its internal database.

2. Martin Kleppmann

Martin Kleppmann is the author of the book “Designing Data Intensive Applications”, and he has some nice papers/presentations;

https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/

https://martin.kleppmann.com/2015/01/29/stream-processing-event-sourcing-reactive-cep.html

https://www.oreilly.com/learning/making-sense-of-stream-processing

Two ideas came from this;

  • All the BuzzWords are the same thing;
  • The concept of Streaming came from Database “Replication”;

1. BuzzWords

Some people call it stream processing. Others call it Event Sourcing or CQRS. Some even call it Complex Event Processing. Sometimes, such self-important buzzwords are just smoke and mirrors, invented by companies who want to sell you stuff. But sometimes, they contain a kernel of wisdom which can really help us design better systems.

The idea of structuring data as a stream of events is nothing new, and it is used in many different fields. Even though the underlying principles are often similar, the terminology is frequently inconsistent across different fields, which can be quite confusing. Although the jargon can be intimidating when you first encounter it, don’t let that put you off; many of the ideas are quite simple when you get down to the core.

But there’s some Differences.

In this article you can see some differences and similarities.
https://iwringer.wordpress.com/2015/12/15/cep-vs-streaming-processing-vs-cep-engines-vs-streaming-analytic-engines/

2. Replication

if we took that replication stream, and made it a first-class citizen in our data architecture? What if we changed our infrastructure so that the replication stream was not an implementation detail, but a key part of the public interface of the database? What if we turn the database inside out, take the implementation detail that was previously hidden, and make it a top-level concern? What would that look like?

3. Typical use cases

Stream Processing is rapidly gaining popularity and finding applications in various business domains. Found its first uses in the finance industry, as stock exchanges moved from floor-based trading to electronic trading. Today, it makes sense in almost every industry – anywhere where you generate stream data through human activities, machine data or sensors data. Assuming it takes off, the Internet of Things will increase volume, variety and velocity of data, leading to a dramatic increase in the applications for stream processing technologies.

Some use cases where stream processing can solve business problems include:

  • Network monitoring
  • Intelligence and surveillance
  • Risk management
  • E-commerce
  • Fraud detection
  • Smart order routing
  • Transaction cost analysis
  • Pricing and analytics
  • Market data management
  • Algorithmic trading
  • Data warehouse augmentation

Here is a short list of well-known, proven applications of Stream Processing:

  • Clickstream analytics can act as a recommendation engine providing actionable insights used to personalize offers, coupons and discounts, customize search results, and guide targeted advertisements — all of which help retailers enhance the online shopping experience, increase sales, and improve conversion rates.
  • Preventive maintenance allows equipment manufacturers and service providers to monitor quality of service, detect problems early, notify support teams, and prevent outages.
  • Fraud detection alerts banks and service providers of suspected frauds in time to stop bogus transactions and quickly notify affected accounts.
  • Emotions analytics can detect an unhappy customer and help customer service augment the response to prevent escalations before the customer’s unhappiness boils over into anger.
  • A dynamic pricing engine determines the price of a product on the fly based on factors such as current customer demand, product availability, and competitive prices in the area.

Common Usage Pattern for In-Stream Analytics

4. Pattern

Writing Streaming Applications requires very different thinking patterns from writing code with a language like Java. A better understanding of common patterns in Stream Processing will let us understand the domain better and build tools that handle those scenarios.

Pattern 1: Preprocessing

Preprocessing is often done as a projection from one data stream to the other or through filtering. Potential operations include

  • Filtering and removing some events
  • Reshaping a stream by removing, renaming, or adding new attributes to a stream
  • Splitting and combining attributes in a stream
  • Transforming attributes

For example, from a twitter data stream, we might choose to extract the fields: author, timestamp, location, and then filter them based on the location of the author.

Pattern 2: Alerts and Thresholds

This pattern detects a condition and generates alerts based on a condition. (e.g. Alarm on high temperature). These alerts can be based on a simple value or more complex conditions such as rate of increase etc.

For an example, in TFL (Transport for London) Demo video based on transit data from London, we trigger a speed alert when the bus has exceeded a given speed limit.

We can generate alerts for scenarios such as the server room temperature is continually increasing for the last 5 mins.

Pattern 3: Simple Counting and Counting with Windows

This pattern includes aggregate functions like Min, Max, Percentiles etc, and they can be counted without storing any data. (e.g. counting the number of failed transactions).

However, counts are often used with a time window attached to it. ( e.g. failure count last hour). There are many types of windows: sliding windows vs. batch (tumbling) windows and time vs. length windows. There are four main variations.

  • Time, Sliding window: keeps each event for the given time window, produce an output whenever a new event has added or removed.
  • Time, Batch window: also called tumbling windows, they only produce output at the end of the time window
  • Length, Sliding: same as the time, sliding window, but keeps a window of n events instead of selecting them by time.
  • Length, Batch window: same as the time, batch window, but keeps a window of n events instead of selecting them by time

There are special windows like decaying windows and unique windows.

Pattern 4: Joining Event Streams

The main idea behind this pattern is to match up multiple data streams and create a new event steam. For an example, let’s assume we play a football game with both the players and the ball having sensors that emit events with current location and acceleration. We can use “joins” to detect when a player has kicked the ball. To that end, we can join the ball location stream and the player stream on the condition that they are close to each other by one meter and the ball’s acceleration has increased by more than 55m/s^2.

Among other use cases are combining data from two sensors, and detecting the proximity of two vehicles. Please refer to Stream Processing 101: From SQL to Streaming SQL in 10 Minutes for more details.

Pattern 5: Data Correlation, Missing Events, and Erroneous Data

This pattern and the pattern four a has lot in common where here too we match up multiple streams. In addition, we also correlate the data within the same stream. This is because different data sensors can send events at different rates, and many use cases require this fundamental operator.

Following are some possible scenarios.

  • Matching up two data streams that send events at different speeds
  • Detecting a missing event in a data stream ( e.g. detect a customer request that has not been responded within 1 hour of its reception. )
  • Detecting erroneous data (e.g. Detect failed sensors using a set of sensors that monitor overlapping regions and using those redundant data to find erroneous sensors and removing their data from further processing)

Pattern 6: Interacting with Databases

Often we need to combine the real time data against the historical data stored in a disk. Following are a few examples.

  • When a transaction happened, look up the age using the customer ID from customer database to be used for Fraud detection (enrichment)
  • Checking a transaction against blacklists and whitelists in the database
  • Receive an input from the user (e.g. Daily discount amount may be updated in the database, and then the query will pick it automatically without human intervention.)

Pattern 7: Detecting Temporal Event Sequence Patterns

Using regular expressions with strings, we detect a pattern of characters from a sequence of characters. Similarly, given a sequence of events, we can write a regular expression to detect a temporal sequence of events arranged on time where each event or condition about the event is parallel to a character in a string in the above example.

A frequently cited example, although bit simplistic, is that a thief, having stolen a credit card, would try a smaller transaction to make sure it works and then do a large transaction. Here the small transaction followed by a large transaction is a temporal sequence of events arranged on time and can be detected using a regular expression written on top of an event sequence.

Such temporal sequence patterns are very powerful. For example, the following video shows a real time analytics done using the data collected from a real football game. This was the dataset taken from DEBS 2013 Grand Challenge.

In the video, we used patterns on event sequence to detect the ball possession, the time period a specific player controlled the ball. A player possessed the ball from the time he hits the ball until someone else hits the ball. This condition can be written as a regular expression: a hit by me, followed by any number of hits by me, followed by a hit by someone else. (We already discussed how to detect the hits on the ball in Pattern 4: Joins).

Pattern 8: Tracking

The eighth pattern tracks something over space and time and detects given conditions.
Following are few examples

  • Tracking a fleet of vehicles, making sure that they adhere to speed limits, routes, and geo-fences.
  • Tracking wildlife, making sure they are alive (they will not move if they are dead) and making sure they will not go out of the reservation.
  • Tracking airline luggage and making sure they are not been sent to wrong destinations
  • Tracking a logistic network and figure out bottlenecks and unexpected conditions.

For example, TFL Demo we discussed under pattern 2 shows an application that tracks and monitors London buses using the open data feeds exposed by TFL(Transport for London).

Pattern 9: Detecting Trends

We often encounter time series data. Detecting patterns from time series data and bringing them into operator attention are common use cases.
Following are some of the examples of tends.

  • Rise, Fall
  • Turn (switch from a rise to a fall)
  • Outliers
  • Complex trends like triple bottom etc.

These trends are useful in a wide variety of use cases such as

  • Stock markets and Algorithmic trading
  • Enforcing SLA (Service Level Agreement), Auto Scaling, and Load Balancing
  • Predictive maintenance ( e.g. guessing the Hard Disk will fill within next week)

Pattern 10: Running the same Query in Batch and Realtime Pipelines

This pattern runs the same query in both Relatime and batch pipeline. It is often used to fill the gap left in the data due to batch processing. For example, if batch processing takes 15 minutes, results would lack the data for the last 15 minutes.

The idea of this pattern, which is sometimes called “Lambda Architecture” is to use real time analytics to fill the gap. Jay Kreps’s article “Questioning the Lambda Architecture” discusses this pattern in detail.

Pattern 11: Detecting and switching to Detailed Analysis

The main idea of the pattern is to detect a condition that suggests some anomaly, and further analyze it using historical data. This pattern is used with the use cases where we cannot analyze all the data with full detail. Instead, we analyze anomalous cases in full detail. Following are a few examples.

    Use basic rules to detect Fraud (e.g. large transaction), then pull out all transactions done against that credit card for a larger time period (e.g. 3 months data) from a batch pipeline and run a detailed analysis
  • While monitoring weather, detect conditions like high temperature or low pressure in a given region and then start a high resolution localized forecast on that region.
  • Detect good customers, for example through the expenditure of more than $1000 within a month, and then run a detailed model to decide the potential of offering a deal.

Pattern 12: Using a Model

The idea is to train a model (often a Machine Learning model), and then use it with the Realtime pipeline to make decisions. For example, you can build a model using R, export it as PMML (Predictive Model Markup Language) and use it within your realtime pipeline.

Among examples is Fraud Detections, Segmentation, Predict next value, Predict Churn. Also see InfoQ article, Machine Learning Techniques for Predictive Maintenance, for a detailed example of this pattern.

Pattern 13: Online Control

There are many use cases where we need to control something online. The classical use cases are the autopilot, self-driving, and robotics. These would involve problems like current situation awareness, predicting the next value(s), and deciding on corrective actions.

You can implement most of these use cases with a Stream Processor that supports a Streaming SQL language.

This pattern list came from (9th ACM International Conference on Distributed Event-Based Systems), describing a set of real time analytics patterns.

You can find details about pattern implementations and source code from here.

Monal Daxini presents a blueprint for streaming data architectures and a review of desirable features of a streaming engine. He also talks about streaming application patterns and anti-patterns, and use cases and concrete examples using Apache Flink.
Patterns of Streaming Applications

5. Evaluation: Choose a Stream Processing Framework or a Product or Both?

There are many different data streaming products, and it can be hard to know where to start studying them, and which products do what etc.

The typical evaluation process (long list, short list, proof of concept) is obligatory before making a decision.

  • A stream processing programming language for streaming analytics
  • Visual development and debugging instead of coding
  • Real-time analytics
  • Monitoring and alerts
  • Support for fault tolerance, and highly optimized performance
  • Product maturity
  • In the case of TIBCO, a live data mart and operational command and control center for business users
  • Out-of-the-box connectivity to plenty of streaming data sources
  • Commercial support
  • Professional services and training.

Think about which of the above features you need for your project. In addition, you have to evaluate the costs of using a framework against productivity, reduced effort and time-to-market using a product before making your choice.

Besides evaluating the core features of stream processing products, you also have to check integration with other products. Can a product work together with messaging, Enterprise Service Bus (ESB), Master Data Management (MDM), in-memory stores, etc. in a loosely coupled, but highly integrated way? If not, there will be a lot of integration time and high costs.

6. Vertical vs. Horizontal Scaling

Vertical scaling means running your data streaming storage and processors on a more powerful computer. Vertical scaling is also sometimes referred to as scaling up. You scale up the size and speed of its disk, memory, speed of CPUs, possibly CPU cores too, graphics cards etc.

Horizontal scaling means distributing the workload among multiple computers. Thus, the data in the data stream is distributed among multiple computers, and the applications processing the data streams are too (or at least they can be). Horizontal scaling is also sometimes referred to as scaling out. You scale out from a single computer to multiple computers.

Distributing the messages of a data stream onto multiple computers is also referred to as partitioning the data stream.

1. Round Robin Partitioning

Round robin data stream partitioning is the simplest way to partition the messages of a data stream across multiple computers. The round robin partitioning method simply distributes the messages evenly and sequentially among the computers. In other words, the first message is stored on the first computer, the second message on the second computer etc. When all computers have received a message from the stream, the round robin method starts from the first computer again.

2. Key Based Partitioning

Key based partitioning distributes the message across different computers based on a certain key value read from each message. Commonly the identifying id (e.g. primary key) is used as key to distribute the messages. Typically, a hash value is calculated from each key value, and that hash value is then used to map the message to one of the computers in the cluster.

Stream Processing and DWH

A DWH is a great tool to store and analyze structured data. You can store terabytes of data and get answers to your queries about historical data within seconds. DWH products such as Teradata or HP Vertica were built for this use case. However the ETL processes often take too long. Business wants to query up-to-date information instead of using an approach where you may only get information about what happened yesterday. This is where stream processing comes in and feeds all new data into the DWH immediately.

Stream Processing and Hadoop

A big data architecture contains stream processing for real-time analytics and Hadoop for storing all kinds of data and long-running computations.

Hadoop initially started with MapReduce, which offers batch processing where queries take hours, minutes or at best seconds. This is and will be great for complex transformations and computations of big data volumes. However, it is not so good for ad hoc data exploration and real-time analytics. Multiple vendors have though made improvements and added capabilities to Hadoop that make it capable of being more than just a batch framework.

DWH, Hadoop and stream processing complement each other very well. Therefore, the integration layer is even more important in the big data era, because you have to combine more and more different sinks and sources.

Since 2016, a new idea called Streaming SQL has emerged. We call a language that enables users to write SQL like queries to query streaming data as a “Streaming SQL” language. Almost all Stream Processors now support Streaming SQL.

7. Streaming is better with SQL

Let’s assume that you picked a stream processor, implemented some use cases, and it’s working. Now you sit down to savor the win. However, given that you can simply write SQL or something like SQL when doing batch processing, why should you have to write all this code? Shouldn’t you be able to do streaming with SQL? The answer is yes, you should. Such streaming SQL exists. Again there are many offerings. Unfortunately, unlike SQL, there is no standard streaming SQL syntax. There are many favors, which follow SQL but have variations.

SQL is a powerful language for querying structured data. It is designed as a set of independent operators: projection, filter, joins, and grouping, which can be recombined to create very powerful queries.

Following are some advantages of streaming SQL languages:

  • It’s easy to follow and learn for the many people who know SQL.
  • It’s expressive, short, sweet and fast!!
  • It defines core operations that cover 90% of problems.
  • Streaming SQL language experts can dig in when they like by writing extensions!
  • A query engine can better optimize the executions with a streaming SQL model. Most optimizations are already studied under SQL, and there is much we can simply borrow from database optimizations.

Let us walk through a few of the key operators. Just as SQL can cover most data queries on data stored in a disk, streaming SQL can cover most of the queries on streaming data. Without streaming SQL, programmers would have to hand code each operator, which is very complicated and hard work.

Concepts in SQL, such as “group by” and “having” clauses, usually work similarly with streaming SQL languages.

Streaming SQL has two additional concepts not covered by SQL: windows and joins, which handle the complexities of streaming. Let’s understand each of them.

8. Streaming Windows

Although batch can be handled as a special case of stream processing, analyzing never-ending streaming data often requires a shift in the mindset and comes with its own terminology (for example, “windowing” and “at-least-once”/”exactly-once” processing). This shift and the new terminology can be quite confusing for people being new to the space of stream processing.

Consider the example of a traffic sensor that counts every 15 seconds the number of vehicles passing a certain location. The resulting stream could look like:

If you would like to know, how many vehicles passed that location, you would simply sum the individual counts. However, the nature of a sensor stream is that it continuously produces data. Such a stream never ends and it is not possible to compute a final sum that can be returned. Instead, it is possible to compute rolling sums, i.e., return for each input event an updated sum record. This would yield a new stream of partial sums.

However, a stream of partial sums might not be what we are looking for, because it constantly updates the count and even more important, some information such as variation over time is lost. Hence, we might want to rephrase our question and ask for the number of cars that pass the location every minute. This requires us to group the elements of the stream into finite sets, each set corresponding to sixty seconds. This operation is called a tumbling windows operation.

Tumbling windows discretize a stream into non-overlapping windows. For certain applications it is important that windows are not disjunct because an application might require smoothed aggregates. For example, we can compute every thirty seconds the number of cars passed in the last minute. Such windows are called sliding windows.

This is because each element of a stream must be processed by the same window operator that decides which windows the element should be added to. For many applications, a data stream needs to be grouped into multiple logical streams on each of which a window operator can be applied. Think for example about a stream of vehicle counts from multiple traffic sensors (instead of only one sensor as in our previous example), where each sensor monitors a different location. By grouping the stream by sensor id, we can compute windowed traffic statistics for each location in parallel.
The following figure shows tumbling windows that collect two elements over a stream of (sensorId, count) pair elements.

Generally speaking, a window defines a finite set of elements on an unbounded stream. This set can be based on time (as in our previous examples), element counts, a combination of counts and time, or some custom logic to assign elements to windows.

Streaming Joins

If we want to handle data from multiple tables, we use the JOIN operator in SQL. Similarly, if you want to handle data from multiple streams, there are two options. First is to join the two and create one stream while the second is to write patterns across multiple streams.

9. Why Stream Processing

https://towardsdatascience.com/introduction-to-stream-processing-5a6db310f1b4

https://medium.com/stream-processing/what-is-stream-processing-1eadfca11b97

10. Final considerations

We have entered an era where competitive advantage comes from analyzing, understanding, and responding to an organization’s data. When doing this, time is of the essence, and speed will decide the winners and losers.

Stream processing is required when data has to be processed fast and / or continuously, i.e. reactions have to be computed and initiated in real time. This requirement is coming more and more into every vertical. Many different frameworks and products are available on the market already.

Many use cases need fast, real-time decisions. Although it is possible to implement them using databases or batch processing, these technologies quickly introduce complexities because there is a fundamental impedance mismatch between the use cases and the tools employed. In contrast, streaming provides a much more natural model to think about, capture, and implement those real-time streaming use cases. Streaming SQL provides a simple yet powerful language to program streaming use cases.

The reality is that the value of most data degrades with time. It’s interesting to know that yesterday there was a traffic jam, or 10 fraud incidents, or 10 people who had heart attacks. From that knowledge, we can learn how to mitigate or prevent those incidents in the future. However, it is much better if we can gain those insights at the time they are occurring so that we can intervene and manage the situation.

The most popular Stream processing framework is Kafka. You can check my previous post here

What is Kafka?

11. Book

Designing Data-Intensive Applications

Stream processing book bundle

Streaming Systems

Event Streams in action

12. Influencers List

@martinkl
@paasdev

13. Links

The Log: What every software engineer should know about real-time data’s unifying abstraction

Oracle understanding stream analytics

Class 101

Class 102

Stream processing myths debunked – Six Common Streaming Misconceptions
Myth 1: There’s no streaming without batch (the Lambda Architecture)
Myth 2: Latency and Throughput: Choose One
Myth 3: Micro-batching means better throughput
Myth 4: Exactly once? Completely impossible.
Myth 5: Streaming only applies to “real-time”
Myth 6: So what? Streaming is too hard anyway.

The data processing evolution a potted history

Choosing a stream processor is challenging because there are many options to choose from and the best choice depends on end-user use cases.

How to choose stream processor

Streaming first architecture

Migrating to an event driven system

More details about Stream and SQL

Predict Conference 2019

What’s the crack jack?

Yesterday I went to Predict Conference, and I need to say thanks to Oracle for providing this experience for me.

Takeaways

My first time at Predict Conference, my first impression: it is a nice conference to get what are the trending topics in the area of Data in Ireland.
With nearly 1000 attendees, more than 30 speakers, over 2 stages.
Women are more and more present in tech.

But what is Predict Conference?

The Predict Conference is unique in its approach to combining business, data science and technology under one roof.

Predict is a holistic and continuous experience. It combines great talks, inspiration, face-to-face serendipity, experience zone, and hands-on data modeling innovation to equip you for decision making in the data age.
Predict is Europe’s leading data conference. It is designed to bring together thought leaders and innovators in the fields of data science, predictive analytics, artificial intelligence and technology.

Tickets include:
– Full access to the main event on the 1st of October 2019: all talks, access to the Predict experience area, all-day coffee, lunch and evening drinks reception.

www.predictconference.com

How many years has it been going?

Creme Global started Predict in 2015; it is now in its 5th year and has gone from strength to strength.
What exciting things can people look forward to?
The major themes at Predict this year where Health & Life Sciences, AI and Machine Learning along with Technology and Society (Sustainability/ Government/ Cities/ Privacy);

It’s not a very deep dive tech conference

The Predict Conference is just about talks around data topics, but it isn’t the most techie one for sure. The idea of this conference was to bring the tech people and industries together — and this objective is being realized year after year with great success.

From video analysis to APIs and IOT

The range of topics was very broad: but for me, everything was around 3 main topics; Video analizes, text analysis and IOT.

Talks
The talks were divided into some main categories;
Health;
Applied;
Sustainability & Cities;
Customer first;
Experiments;

I create my list of talks that I had the opportunity to see there. This is not an exhaustive list!

Talk:
From Cups to Consciousness: The Roadmap to AGI – Ben Duffy
He talked about AGI, Artificial general intelligence that is the intelligence of a machine that has the capacity to understand or learn any intellectual task that a human being can. It is a primary goal of some artificial intelligence research and a common topic in science fiction and future studies, and also open the floor for debate about what the development of AI and robotics will look like over the next few years.

Talk:
Onboard Artificial Intelligence: train, deploy and use Deep Learning on an edge device (Raspberry Pi) – Constant Bridon
He showed a live demo of a model designed to recognised car drawings via the camera of a Raspberry Pi. He also talked about an IOT car competition in the Us where the car run using the feeds of the camera.
There’s a nice one in UK called FormulaPI.
https://www.formulapi.com/

Talk:
The history of Creative AI – Eric Risser
He talked about artificial intelligence and computer graphics to lead both the vision and the core technology.

There’s a nice colleague video about showing how do do it a demo using Oracle Cloud and GPU

Talks:
Top 3 things I’ve learned in 3 decades of Data Science – Dr. John F Elder
He talked about Ensembles, we can’t do it alone, we need alternative perspectives, Target Shuffling, some of our success is luck, simulation can find out how much and Leaks from the future, we can’t blindly trust results, if it looks good to good to be true, it is.

Talk:
Uncertainty and Interpretability – Kevin Kuo

Talk:
Low-Latency Model Prediction with Video
He talked about TensorFlow and buy the way there’s a 2.0 version now.

Talk:
The Fast Track to AI with Serverless – Peter Elger
https://www.manning.com/books/ai-as-a-service

Talk:
AI Live: No Experience Required
He showed one Wolfram notebook.

But the main topic, in my opinion, was ML & DL buzzwords

A lot of presentation was trying to explain that Machine Learning and Deep Learning are not the same things, and that there’s a lot of other buzzwords around.

Partners

A stands area with some nice content. Experience Zone.

Wolfram – they are showing case the computing program Wolfram Mathematica.

Equinix – specializes in internet connection and data centers

Rstudio – they are showing case the IDE and language. I had a nice talk with the guy there about R is not dying and is Python that becoming more strong.

Coder Dojo – https://coderdojo.com/

CeADAR – they had a nice tic tac toe game.

Links

https://irishtechnews.ie/predict-conference-2019-smartcities-ai/amp/