Skip to main content
NetApp artificial intelligence solutions
简体中文版经机器翻译而成,仅供参考。如与英语版出现任何冲突,应以英语版为准。

Confluent S3连接器

贡献者 kevin-hoke

Amazon S3 Sink 连接器将数据从 Apache Kafka 主题导出到 Avro、JSON 或 Bytes 格式的 S3 对象。 Amazon S3 接收器连接器定期从 Kafka 轮询数据,然后将其上传到 S3。分区器用于将每个 Kafka 分区的数据分成块。每个数据块都表示为一个 S3 对象。键名对主题、Kafka 分区和该数据块的起始偏移量进行编码。

在此设置中,我们向您展示如何使用 Kafka s3 接收器连接器直接从 Kafka 读取和写入对象存储中的主题。对于此测试,我们使用了独立的 Confluent 集群,但此设置适用于分布式集群。

  1. 从 Confluent 网站下载 Confluent Kafka。

  2. 将软件包解压到服务器上的文件夹中。

  3. 导出两个变量。

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. 对于独立的 Confluent Kafka 设置,集群会在 /tmp。它还创建 Zookeeper、Kafka、模式注册表、连接、ksql-server 和控制中心文件夹,并从复制它们各自的配置文件 $CONFLUENT_HOME。请参见以下示例:

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
    root@stlrx2540m1-108:~#
  5. 配置 Zookeeper。如果使用默认参数,则无需更改任何内容。

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/zookeeper.properties  | grep -iv ^#
    dataDir=/tmp/confluent.406980/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.179=controlcenter:2888:3888
    root@stlrx2540m1-108:~#

    在上面的配置中,我们更新了 `server. xxx`财产。默认情况下,您需要三个 Zookeeper 来选择 Kafka 领导者。

  6. 我们在 `/tmp/confluent.406980/zookeeper/data`具有唯一 ID:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid
    179
    root@stlrx2540m1-108:~#

    我们将最后一个 IP 地址数字用于 myid 文件。我们对 Kafka、connect、control-center、Kafka、Kafka-rest、ksql-server 和 schema-registry 配置使用了默认值。

  7. 启动 Kafka 服务。

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    每个配置都有一个日志文件夹,有助于解决问题。在某些情况下,服务需要更多时间才能启动。确保所有服务均已启动并正在运行。

  8. 使用以下方式安装 Kafka connect confluent-hub

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    
    Component's license:
    Confluent Community License
    http://www.confluent.io/confluent-community-license
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:
      /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      /tmp/confluent.406980/connect/connect.properties
      /tmp/confluent.406980/connect/connect.properties
    
    Completed
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    您还可以使用以下方式安装特定版本 confluent-hub install confluentinc/kafka-connect-s3:10.0.3

  9. 默认情况下, confluentinc-kafka-connect-s3`安装在 `/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3

  10. 使用新的 confluentinc-kafka-connect-s3

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
    root@stlrx2540m1-108:~#
  11. 停止 Confluent 服务并重新启动它们。

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
  12. 在 `/root/.aws/credentials`文件。

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    [default]
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
    root@stlrx2540m1-108:~#
  13. 验证存储桶是否可访问。

    root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
    root@stlrx2540m4-01:~#
  14. 为 s3 和 bucket 配置配置 s3-sink 属性文件。

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^#
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=s3_testtopic
    s3.region=us-west-2
    s3.bucket.name=kafkasgdbucket1-2
    store.url=http://kafkasgd.rtpppe.netapp.com:10444/
    s3.part.size=5242880
    flush.size=3
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    root@stlrx2540m1-108:~#
  15. 将一些记录导入到 s3 存储桶。

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. 加载 s3-sink 连接器。

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "s3.bucket.name": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      },
      "tasks": [],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  17. 检查 s3-sink 状态。

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.63.150.185:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.63.150.185:8083"
        }
      ],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  18. 检查日志以确保 s3-sink 已准备好接受主题。

    root@stlrx2540m1-108:~# confluent local services connect log
  19. 检查 Kafka 中的主题。

    kafka-topics --list --bootstrap-server localhost:9092
    …
    connect-configs
    connect-offsets
    connect-statuses
    default_ksql_processing_log
    s3_testtopic
    s3_topic
    s3_topic_new
    root@stlrx2540m1-108:~#
  20. 检查 s3 存储桶中的对象。

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
    root@stlrx2540m1-108:~#
  21. 要验证内容,请运行以下命令将每个文件从 S3 复制到本地文件系统:

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
    root@stlrx2540m1-108:~#
  22. 要打印记录,请使用 avro-tools-1.11.0.1.jar(可在 "Apache 档案")。

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    root@stlrx2540m1-108:~#