title: Manage Apache Kafka Connect connectors with kcctl published: true date: 2021-10-13 00:00:00 UTC tags: apachekafka, kafkaconnect, kcctl
canonical_url: aiven.io/blog/manage-apache-kafka-connect-c..
Apache Kafka is widely used as company data backbone with Kafka Connect acting as a bridge. This enables an easy, reliable and scalable integration of Kafka with other existing technologies. Kafka Connect REST APIs provide a way to manage connectors via web calls, but crafting URLs on a terminal can sometimes be tricky.
In this blog post we will explore kcctl, a new open source command line tool for Kafka Connect. We'll show how we can integrate it with Apache Kafka and manage our connections to other systems.
At Aiven, we offer similar functionality in the Aiven command line interface, that you can use to create, drop, change, verify, pause and restore any Kafka Connect connector running on Aiven services. If you're all your Kafka Connect instances are Aiven services, then the Aiven command line interface is all you need. If on the other side, you want to use the same tool for any Kafka Connect instance, being it on-premises, on Aiven or in other cloud providers, then kcctl is your friend.
Create an Apache Kafka instance with Kafka Connect
To start following the process in this article, you're supposed to already have an Apache Kafka environment with Kafka Connect up and running. If you don't have one, don't worry: Aiven can provide one in minutes. Just create one in the Aiven Console, or in the Aiven Command Line Interface with following command:
avn service create demo-kafka \
--service-type kafka \
--cloud google-europe-west3 \
--plan business-4 \
-c kafka_connect=true \
-c kafka.auto_create_topics_enable=true
The above command creates an Aiven for Apache Kafka cluster named demo-kafka
, of three nodes (with the business-4
plan), in the google-europe-west3
region and enables the automatic creation of topics and Kafka Connect. With Aiven, you can deploy Kafka Connect as part of your Kafka cluster for business and premium plans or as separate, standalone cluster. To read more about Aiven for Apache Kafka and related Kafka Connect topics, check out the dedicated page.
We can wait until the service is ready with:
avn service wait demo-kafka
Install kcctl
At the moment of writing, kcctl
is in an early access release. The current set of installation instructions can be found in its GitHub repository.
Once kcctl
is installed, we can test that it working in our terminal by adding the bin
subdirectory to our PATH
and executing:
kcctl
If we did everything correctly, then we should see the usage information. Now it's time to connect to our Kafka Connect cluster.
Connect
In order to plug in to Kafka Connect, first retrieve the cluster URL, which can be done using the via Aiven CLI and jq, to parse the JSON output:
avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'
We can now create a kcctl
configuration context by executing the following command replacing the cluster
parameter accordingly.
kcctl config set-context \
--cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 \
my_kafka_cluster
The above creates a context named my_kafka_cluster
pointing to the demo-kafka
instance. To verify the configuration:
kcctl info
This retrieves its definition of the current kcctl
configuration context:
URL: https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443
Version: 2.7.2-SNAPSHOT
Commit: d15ddddd3ef3f5ef
Kafka Cluster ID: -DvILyiXQxSpnFSK9M1qgQ
Create a data source in PostgreSQL
To see the connectors in action, let's create a PostgreSQL database and configure a Kafka Connect JDBC source connector to bring the data into Kafka. The connector will take data from a table named pasta
stored in a PostgreSQL database and include them in a Kafka topic. If you don't have a PostgreSQL database handy, you can create one at Aiven with the following Aiven CLI command:
avn service create demo-pg \
--service-type pg \
--cloud google-europe-west3 \
--plan hobbyist
Once the demo-pg
PostgreSQL instance is up and running (use avn service wait demo-pg
to wait for it), connect to it:
avn service cli demo-pg
Now we can create our sample pasta
table and fill it with data, using the following statements in our terminal:
create table pasta (id serial, name varchar, cooking_minutes int);
insert into pasta (name, cooking_minutes) values ('spaghetti', 8);
insert into pasta (name, cooking_minutes) values ('spaghettini', 6);
insert into pasta (name, cooking_minutes) values ('fusilli', 9);
insert into pasta (name, cooking_minutes) values ('trofie', 5);
Create a new Kafka Connect connector
Once the source data is available, we can create the Kafka Connect JDBC source connector, sourcing in incremental
mode the pasta
table based on the id
column. To get the required PostgreSQL connection details such as hostname, port, user and password, use this command:
avn service get demo-pg --format '{service_uri_params}'
Create a file named my_jdbc_connect_source.json
with the following JSON content (substituting the <HOST>
, <PORT>
and <PASSWORD>
with the actual information retrieved in the previous step):
{
"connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require",
"connection.user": "avnadmin",
"connection.password": "<PASSWORD>",
"table.whitelist": "pasta",
"mode": "incrementing",
"incrementing.column.name":"id",
"poll.interval.ms": "2000",
"topic.prefix": "pg_source_"
}
Now we can invoke the connector creation via kcctl
in a new terminal window:
kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source
We can verify that the connector was successfully created:
kcctl describe connector pg-incremental-source
The command output shows the pg-incremental-source
connector in RUNNING
state and all the details associated with it.
Check the data in Apache Kafka with kcctl
We can also check that we have a new Kafka topic called pg_source_pasta
with the same data stored in PostgreSQL. We can check it via kcat. Start by first downloading the required certificates:
avn service user-creds-download demo-kafka \
--username avnadmin \
-d certs
Then create a kcat.config
file containing the following entries:
bootstrap.servers=<HOST>:<PORT>
security.protocol=ssl
ssl.key.location=certs/service.key
ssl.certificate.location=certs/service.cert
ssl.ca.location=certs/ca.pem
And reading from the pg_source_pasta
topic with the following kcat
invocation:
kcat -F kcat.config -C -t pg_source_pasta
If we now insert some rows in the PostgreSQL pasta
table, we can see the same changes appearing in Kafka via kcat
:
Managing Kafka Connect connectors with kcctl
Creating connectors is only part of the game with kcctl
- we can also manage them! Need a list of all the connectors deployed? Just run the following command:
kcctl get connectors
Need to pause and resume connectors? The code below, for example, pauses the one named pg-incremental-source
:
kcctl pause connector pg-incremental-source
What type of connectors can we create? Glad you asked! We can find the full plugin list with:
kcctl get plugins
The command shows all the connector plugins available with the related type (source
or sink
) and version. With this command you'll be able to check the list of the managed Kafka Connect connector types you can create with Aiven for Apache Kafka.
TYPE CLASS VERSION
source com.couchbase.connect.kafka.CouchbaseSourceConnector 4.0.6
source com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon 2.1.3
nector
source com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector 2.1.3
source com.google.pubsub.kafka.source.CloudPubSubSourceConnector 2.7.2-SNAPSHOT
source com.google.pubsublite.kafka.source.PubSubLiteSourceConnector 2.7.2-SNAPSHOT
...
sink io.aiven.kafka.connect.gcs.GcsSinkConnector 0.9.0
sink io.aiven.kafka.connect.http.HttpSinkConnector 0.4.0
sink io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector 2.12.0
...
Wrapping up
Managing Kafka Connect connectors from the terminal is only few commands away. kcctl
makes it easy to inspect, deploy, update, pause and restore any connector to our environments. This unifies the end-user experience for Apache Kafka instances deployed on-premises, self-hosted or in Aiven.
Further reading: