Confluent s3 connector


The Amazon S3 Sink connector exports data from Apache Kafka topics to S3 objects in either the Avro, JSON, or Bytes formats. The Amazon S3 sink connector periodically polls data from Kafka and in turn uploads it to S3. A partitioner is used to split the data of every Kafka partition into chunks. Each chunk of data is represented as an S3 object. The key name encodes the topic, the Kafka partition, and the start offset of this data chunk.

In this setup, we show you how to read and write topics in object storage from Kafka directly using the Kafka s3 sink connector. For this test, we used a stand-alone Confluent cluster, but this setup is applicable to a distributed cluster.

  1. Download Confluent Kafka from the Confluent website.

  2. Unpack the package to a folder on your server.

  3. Export two variables.

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. For a stand-alone Confluent Kafka setup, the cluster creates a temporary root folder in /tmp. It also creates Zookeeper, Kafka, a schema registry, connect, a ksql-server, and control-center folders and copies their respective configuration files from $CONFLUENT_HOME. See the following example:

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
  5. Configure Zookeeper. You don’t need to change anything if you use the default parameters.

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/  | grep -iv ^#

    In the above configuration, we updated the server. xxx property. By default, you need three Zookeepers for the Kafka leader selection.

  6. We created a myid file in /tmp/confluent.406980/zookeeper/data with a unique ID:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid

    We used the last number of IP addresses for the myid file. We used default values for the Kafka, connect, control-center, Kafka, Kafka-rest, ksql-server, and schema-registry configurations.

  7. Start the Kafka services.

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]

    There is a log folder for each configuration, which helps troubleshoot issues. In some instances, services take more time to start. Make sure all services are up and running.

  8. Install Kafka connect using confluent-hub.

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    Component's license:
    Confluent Community License
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:

    You can also install a specific version by using confluent-hub install confluentinc/kafka-connect-s3:10.0.3.

  9. By default, confluentinc-kafka-connect-s3 is installed in /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3.

  10. Update the plug-in path with the new confluentinc-kafka-connect-s3.

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/ | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
  11. Stop the Confluent services and restart them.

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
  12. Configure the access ID and secret key in the /root/.aws/credentials file.

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
  13. Verify that the bucket is reachable.

    root@stlrx2540m4-01:~# aws s3 –endpoint-url ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
  14. Configure the s3-sink properties file for s3 and bucket configuration.

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/ | grep -v ^#
  15. Import a few records to the s3 bucket.

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. Load the s3-sink connector.

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "",
        "": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "",
        "store.url": "",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      "tasks": [],
      "type": "sink"
  17. Check the s3-sink status.

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": ""
      "tasks": [
          "id": 0,
          "state": "RUNNING",
          "worker_id": ""
      "type": "sink"
  18. Check the log to make sure that s3-sink is ready to accept topics.

    root@stlrx2540m1-108:~# confluent local services connect log
  19. Check the topics in Kafka.

    kafka-topics --list --bootstrap-server localhost:9092
  20. Check the objects in the s3 bucket.

    root@stlrx2540m1-108:~# aws s3 --endpoint-url ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
  21. To verify the contents, copy each file from S3 to your local filesystem by running the following command:

    root@stlrx2540m1-108:~# aws s3 --endpoint-url cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
  22. To print the records, use avro-tools- (available in the Apache Archives).

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools- tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable