Manage Apache Kafka Connect connectors with kcctl

Manage Apache Kafka Connect connectors with kcctl


title: Manage Apache Kafka Connect connectors with kcctl published: true date: 2021-10-13 00:00:00 UTC tags: apachekafka, kafkaconnect, kcctl

canonical_url: aiven.io/blog/manage-apache-kafka-connect-c..

Image showing a terminal with an astronaut flowing and the wording cli power

Apache Kafka is widely used as company data backbone with Kafka Connect acting as a bridge. This enables an easy, reliable and scalable integration of Kafka with other existing technologies. Kafka Connect REST APIs provide a way to manage connectors via web calls, but crafting URLs on a terminal can sometimes be tricky.

In this blog post we will explore kcctl, a new open source command line tool for Kafka Connect. We'll show how we can integrate it with Apache Kafka and manage our connections to other systems.

At Aiven, we offer similar functionality in the Aiven command line interface, that you can use to create, drop, change, verify, pause and restore any Kafka Connect connector running on Aiven services. If you're all your Kafka Connect instances are Aiven services, then the Aiven command line interface is all you need. If on the other side, you want to use the same tool for any Kafka Connect instance, being it on-premises, on Aiven or in other cloud providers, then kcctl is your friend.

Create an Apache Kafka instance with Kafka Connect

To start following the process in this article, you're supposed to already have an Apache Kafka environment with Kafka Connect up and running. If you don't have one, don't worry: Aiven can provide one in minutes. Just create one in the Aiven Console, or in the Aiven Command Line Interface with following command:

avn service create demo-kafka               \
    --service-type kafka                    \
    --cloud google-europe-west3             \
    --plan business-4                       \
    -c kafka_connect=true                   \
    -c kafka.auto_create_topics_enable=true

The above command creates an Aiven for Apache Kafka cluster named demo-kafka, of three nodes (with the business-4 plan), in the google-europe-west3 region and enables the automatic creation of topics and Kafka Connect. With Aiven, you can deploy Kafka Connect as part of your Kafka cluster for business and premium plans or as separate, standalone cluster. To read more about Aiven for Apache Kafka and related Kafka Connect topics, check out the dedicated page.

We can wait until the service is ready with:

avn service wait demo-kafka

Install kcctl

At the moment of writing, kcctl is in an early access release. The current set of installation instructions can be found in its GitHub repository.

Once kcctl is installed, we can test that it working in our terminal by adding the bin subdirectory to our PATH and executing:

kcctl

If we did everything correctly, then we should see the usage information. Now it's time to connect to our Kafka Connect cluster.

Connect

In order to plug in to Kafka Connect, first retrieve the cluster URL, which can be done using the via Aiven CLI and jq, to parse the JSON output:

avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'

We can now create a kcctl configuration context by executing the following command replacing the cluster parameter accordingly.

kcctl config set-context \
    --cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 \
    my_kafka_cluster

The above creates a context named my_kafka_cluster pointing to the demo-kafka instance. To verify the configuration:

kcctl info

This retrieves its definition of the current kcctl configuration context:

URL:               https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
Version:           2.7.2-SNAPSHOT
Commit:            d15ddddd3ef3f5ef
Kafka Cluster ID:  -DvILyiXQxSpnFSK9M1qgQ

Create a data source in PostgreSQL

To see the connectors in action, let's create a PostgreSQL database and configure a Kafka Connect JDBC source connector to bring the data into Kafka. The connector will take data from a table named pasta stored in a PostgreSQL database and include them in a Kafka topic. If you don't have a PostgreSQL database handy, you can create one at Aiven with the following Aiven CLI command:

avn service create demo-pg               \
    --service-type pg                    \
    --cloud google-europe-west3          \
    --plan hobbyist

Once the demo-pg PostgreSQL instance is up and running (use avn service wait demo-pg to wait for it), connect to it:

avn service cli demo-pg

Now we can create our sample pasta table and fill it with data, using the following statements in our terminal:

create table pasta (id serial, name varchar, cooking_minutes int);
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);

Create a new Kafka Connect connector

Once the source data is available, we can create the Kafka Connect JDBC source connector, sourcing in incremental mode the pasta table based on the id column. To get the required PostgreSQL connection details such as hostname, port, user and password, use this command:

avn service get demo-pg --format '{service_uri_params}'

Create a file named my_jdbc_connect_source.json with the following JSON content (substituting the <HOST>, <PORT> and <PASSWORD> with the actual information retrieved in the previous step):

{
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require",
    "connection.user": "avnadmin",
    "connection.password": "<PASSWORD>",
    "table.whitelist": "pasta",
    "mode": "incrementing",
    "incrementing.column.name":"id",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}

Now we can invoke the connector creation via kcctl in a new terminal window:

kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source

We can verify that the connector was successfully created:

kcctl describe connector pg-incremental-source

The command output shows the pg-incremental-source connector in RUNNING state and all the details associated with it.

Check the data in Apache Kafka with kcctl

We can also check that we have a new Kafka topic called pg_source_pasta with the same data stored in PostgreSQL. We can check it via kcat. Start by first downloading the required certificates:

avn service user-creds-download demo-kafka \
    --username avnadmin                    \
    -d certs

Then create a kcat.config file containing the following entries:

bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem

And reading from the pg_source_pasta topic with the following kcat invocation:

kcat -F kcat.config -C -t pg_source_pasta

If we now insert some rows in the PostgreSQL pasta table, we can see the same changes appearing in Kafka via kcat:

Gif showing Postgresql insert and related rows flowing in Kafka via kcat

Managing Kafka Connect connectors with kcctl

Creating connectors is only part of the game with kcctl - we can also manage them! Need a list of all the connectors deployed? Just run the following command:

kcctl get connectors

Need to pause and resume connectors? The code below, for example, pauses the one named pg-incremental-source:

kcctl pause connector pg-incremental-source

What type of connectors can we create? Glad you asked! We can find the full plugin list with:

kcctl get plugins

The command shows all the connector plugins available with the related type (source or sink) and version. With this command you'll be able to check the list of the managed Kafka Connect connector types you can create with Aiven for Apache Kafka.

TYPE     CLASS                                                                           VERSION
 source   com.couchbase.connect.kafka.CouchbaseSourceConnector                            4.0.6
 source   com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon   2.1.3
         nector
 source   com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector       2.1.3
 source   com.google.pubsub.kafka.source.CloudPubSubSourceConnector                       2.7.2-SNAPSHOT
 source   com.google.pubsublite.kafka.source.PubSubLiteSourceConnector                    2.7.2-SNAPSHOT

 ...
 sink     io.aiven.kafka.connect.gcs.GcsSinkConnector                                     0.9.0
 sink     io.aiven.kafka.connect.http.HttpSinkConnector                                   0.4.0
 sink     io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector                      2.12.0
 ...

Wrapping up

Managing Kafka Connect connectors from the terminal is only few commands away. kcctl makes it easy to inspect, deploy, update, pause and restore any connector to our environments. This unifies the end-user experience for Apache Kafka instances deployed on-premises, self-hosted or in Aiven.

Further reading: