Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

References:

PNDA-guide/streamingest

PNDA-guide/streamingest/topic-preparation

Confluent Kafka-Connect Deep Dive/Converters and Serialization

Overview

Kafka is the "front door" of PNDA, allowing the ingest of high-velocity data streams, distributing data to all interested consumers and decoupling data sources from data processing applications and platform clients.

Kafka Connect, an open source component of Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Using Kafka Connect you can use existing connector implementations for common data sources and sinks to move data into and out of Kafka.

We want to store the events that arrives to the kafka topics in HDFS datesets. Currently PNDA uses Apache Gobblin to ingest data streams from kafka to HDFS.

However Apache Gobblin has some disadvantages:

  • Apache Incubator Project.
  • Configuration complexity.
  • Job distribution through MapReduce mode disadvantages (from gobblin documentation): 
    • The MR mode suffers from problems such as large overhead mostly due to the overhead of submitting and launching a MR job and poor cluster resource usage.
    • The MR mode is also fundamentally not appropriate for real-time data ingestion given its batch nature.

NOTE: Recently Apache Gobblin added support of Apache Yarn as orchestrator for job distribution that may solves some of the previous disadvantages.

Here we explore kafka-connect as an alternative, a component of Kafka that would let us remove Yarn dependency and simplify configuration, management and scaling of the stream ingest jobs.

Getting Started

We will setup a local deployment of PNDA to test how kafka-connect works. Follow the Testing locally with microk8s' instructions, using the next yaml profile to enable kafka-connect related components:

Code Block
languageyml
# Enabling Kafka and Kafka-connect related components:

...


cp-zookeeper:

...


  enabled:

...

 true

...


cp-kafka:

...


  enabled:

...

 true

...


cp-schema-registry:

...


  enabled:

...

 true

...


schema-registry-ui:

...


  enabled:

...

 true

...


cp-kafka-connect:

...


  enabled:

...

 true

...


kafka-connect-ui:

...


  enabled:

...

 true

...


kafka-manager:

...


  enabled:

...

 true


# Enabling HDFS related components:
hdfs:
  enabled: true

# Disabling the rest of the external components
grafana:
  enabled: false
jupyterhub:
  enabled: false
cp-kafka-rest:

...


  enabled:

...

 false

...


cp-ksql-server:

...


  enabled:

...

 false

...


hbase:

...


  enabled:

...

 false

...


opentsdb:

...


  enabled:

...

 false

...


spark-standalone:

...


  enabled:

...

 false

Kafka-connect-ui can be accessed through the pnda-kafka-connect-ui service ip:

Code Block
languagebash
kubectl get service -o wide -n pnda pnda-kafka-connect-ui

Accessing kafka-connect-ui service ip through your browser:

Image Modified

Now we will configure some HDFS Sink Connectors to ingest events in kafka topics to HDFS.

Following instructions in PNDA-guide/topic-preparation we find different kind of topics:

...

  • Matching PNDA schema topic
  • Avro configured topic
  • un-configured topic

TBD

Matching schema topic

When data being streamed to the platform already happends to adhere to the PNDA schema. 

Let's divide the test in two parts, depending in how the data is encoded in the kafka topic:

AvroEncoded

Reference: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html

We use the kafka-avro-console-producer tool to produce some avro encoded data in a topic. The tool is available in the cp-schema-registry pod so we open a bash terminal inside the cp-schema-registry container of the pod:

Code Block
languagebash
kubectl

...

 exec

...

 -ti svc/pnda-cp-schema-registry

...

 --

...

 /bin/bash

...




JMX_PORT=5559 kafka-avro-console-producer --broker-list $SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS --topic

...

 pnda.avro.test

...

 --property

...

 value.schema='{"namespace": "pnda.entity","type": "record","name": "event","fields": [ {"name": "timestamp", "type": "long"}, {"name": "source", "type": "string"}, {"name": "rawdata", "type": "bytes"}]}'

Then copy the next lines in the topic. If you don't respect the pnda avro schema, the kafka-avro-console-producer will exit with error.

Code Block
languagejs
{"timestamp":1567516179000,"source":"test","rawdata":"200"}

...


{"timestamp":1567516180000,"source":"test","rawdata":"300"}

...


{"timestamp":1567516181000,"source":"test","rawdata":"150"}

...


{"timestamp":1567516182000,"source":"test","rawdata":"100"}

...


{"timestamp":1567516183000,"source":"test","rawdata":"230"}

...


{"timestamp":1567516184000,"source":"test","rawdata":"250"}

CTRL+C to exit pnda-avro-consol-producer and 'exit' to close container terminal.

Go back to the  kafka-connect-ui and create a new HDFS Sink connector with the following config properties:

Code Block
name=pnda_avro

...


connector.class=io.confluent.connect.hdfs.HdfsSinkConnector

...


topics=pnda.avro.test

...


hdfs.url=hdfs://pnda-hdfs-namenode:8020

...


tasks.max=3

...


flush.size=2

...


format.class=io.confluent.connect.hdfs.avro.AvroFormat

...


key.converter=org.apache.kafka.connect.storage.StringConverter

...


value.converter=io.confluent.connect.avro.AvroConverter

...


value.converter.schema.registry.url=http://pnda-cp-schema-registry:8081

...


logs.dir=/kafkaconnectlogs

...


topics.dir=/user/PNDA/datasets

...


partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner

...


path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH

...


locale=ES

...


timezone=CET

A new avro file containting $flush.size events should be writen to HDFS. dataset folder is named as the kafka topic, and can be further partitioned with partitioner classes (more info at hdfs-connector-doc).

You can check if files are correctly writen through "Utilities" > "Browse the Filesystem" in the HDFS-namenode ui:

Code Block
kubectl get service pnda-hdfs-namenode

Default port is (50070).

Image Modified

JsonEncoded

...

WARNING kafka-connect issue! Kafka-connect

...

logs an error when writing HDFS in Avro format

...

from events

...

encoded in

...

Json

...

in kafka. With a flush_size to 1, It will correctly write a

...

single avro file per event (does not make to much sense).

...

The issue describing this problem is written here.

Avro

...

configured topic

For a source that ingest events in avro format with a schema different to pnda-schema, a HDFS sink connector must be created with a custom configuration with transformations to extract/rename timestamp, source and raw fields.

Reference: https://docs.confluent.io/current/connect/transforms/index.html.

Un-configured topic

For topics without following a schema, the following connector configuration will transform the event to a record following pnda-schema before storing in HDFS:

  • source field will be set to the topic name.
  • timestamp field will be the kafka ingestion time.
  • rawdata will be the event.

We use the kafka-avro-producer tool to produce some schemaless data in a topic. The tool is available in the cp-kafka pod so we open a bash terminal inside the cp-kafka container of the pod:

Code Block
languagebash
kubectl exec -ti svc/pnda-cp-kafka -- /bin/bash


KAFKA_OPTS='' kafka-console-producer --topic unconfigured --broker-list=localhost:9092

Then copy the next lines in the topic.

Code Block
languagejs
This is event one.
Second event without a schema.

CTRL+C to exit pnda-console-producer and 'exit' to close container terminal.

Go back to the  kafka-connect-ui and create a new HDFS Sink connector with the following config properties:

Code Block
name=pnda-unconfigured
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
topics=unconfigured
hdfs.url=hdfs://pnda-hdfs-namenode:8020
tasks.max=3
flush.size=2
format.class=io.confluent.connect.hdfs.avro.AvroFormat
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
logs.dir=/kafkaconnectlogs
topics.dir=/user/PNDA/datasets
partitioner.class=io.confluent.connect.storage.partitioner.HourlyPartitioner
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=ES
timezone=CET
transforms=MakeMap,AddSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=rawdata
transforms.AddSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddSource.topic.field=source
transforms.AddSource.timestamp.field=timestamp

A new avro file containting $flush.size events should be writen to HDFS. dataset folder is named as the kafka topic, and can be further partitioned with partitioner classes (more info at hdfs-connector-doc).