Skip to main content
NetApp artificial intelligence solutions
本繁體中文版使用機器翻譯,譯文僅供參考,若與英文版本牴觸,應以英文版本為準。

Confluent S3連接器

貢獻者 kevin-hoke

Amazon S3 Sink 連接器將資料從 Apache Kafka 主題匯出到 Avro、JSON 或 Bytes 格式的 S3 物件。 Amazon S3 接收器連接器定期從 Kafka 輪詢數據,然後將其上傳到 S3。分區器用於將每個 Kafka 分區的資料分成區塊。每個資料塊都表示為一個 S3 物件。鍵名會對主題、Kafka 分區和該資料區塊的起始偏移量進行編碼。

在此設定中,我們向您展示如何使用 Kafka s3 接收器連接器直接從 Kafka 讀取和寫入物件儲存中的主題。對於此測試,我們使用了獨立的 Confluent 集群,但此設定適用於分散式集群。

  1. 從 Confluent 網站下載 Confluent Kafka。

  2. 將包解壓縮到伺服器上的資料夾。

  3. 導出兩個變數。

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. 對於獨立的 Confluent Kafka 設置,叢集會在 /tmp。它還創建 Zookeeper、Kafka、模式註冊表、連接、ksql-server 和控制中心資料夾,並從複製它們各自的配置文件 $CONFLUENT_HOME。請參閱以下範例:

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
    root@stlrx2540m1-108:~#
  5. 配置 Zookeeper。如果使用預設參數,則無需更改任何內容。

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/zookeeper.properties  | grep -iv ^#
    dataDir=/tmp/confluent.406980/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.179=controlcenter:2888:3888
    root@stlrx2540m1-108:~#

    在上面的配置中,我們更新了 `server. xxx`財產。預設情況下,您需要三個 Zookeeper 來選擇 Kafka 領導者。

  6. 我們在 `/tmp/confluent.406980/zookeeper/data`具有唯一 ID:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid
    179
    root@stlrx2540m1-108:~#

    我們將最後一個 IP 位址數用於 myid 檔案。我們對 Kafka、connect、control-center、Kafka、Kafka-rest、ksql-server 和 schema-registry 配置使用了預設值。

  7. 啟動 Kafka 服務。

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    每個配置都有一個日誌資料夾,有助於解決問題。在某些情況下,服務需要更多時間才能啟動。確保所有服務均已啟動並正在運行。

  8. 使用以下方式安裝 Kafka connect confluent-hub

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    
    Component's license:
    Confluent Community License
    http://www.confluent.io/confluent-community-license
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:
      /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      /tmp/confluent.406980/connect/connect.properties
      /tmp/confluent.406980/connect/connect.properties
    
    Completed
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    您也可以使用以下方式安裝特定版本 confluent-hub install confluentinc/kafka-connect-s3:10.0.3

  9. 預設情況下, confluentinc-kafka-connect-s3`安裝在 `/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3

  10. 使用新的 confluentinc-kafka-connect-s3

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
    root@stlrx2540m1-108:~#
  11. 停止 Confluent 服務並重新啟動它們。

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
  12. 在 `/root/.aws/credentials`文件。

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    [default]
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
    root@stlrx2540m1-108:~#
  13. 驗證儲存桶是否可存取。

    root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
    root@stlrx2540m4-01:~#
  14. 為 s3 和 bucket 配置配置 s3-sink 屬性檔。

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^#
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=s3_testtopic
    s3.region=us-west-2
    s3.bucket.name=kafkasgdbucket1-2
    store.url=http://kafkasgd.rtpppe.netapp.com:10444/
    s3.part.size=5242880
    flush.size=3
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    root@stlrx2540m1-108:~#
  15. 將一些記錄匯入到 s3 儲存桶。

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. 載入 s3-sink 連接器。

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "s3.bucket.name": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      },
      "tasks": [],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  17. 檢查 s3-sink 狀態。

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.63.150.185:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.63.150.185:8083"
        }
      ],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  18. 檢查日誌以確保 s3-sink 已準備好接受主題。

    root@stlrx2540m1-108:~# confluent local services connect log
  19. 檢查 Kafka 中的主題。

    kafka-topics --list --bootstrap-server localhost:9092
    …
    connect-configs
    connect-offsets
    connect-statuses
    default_ksql_processing_log
    s3_testtopic
    s3_topic
    s3_topic_new
    root@stlrx2540m1-108:~#
  20. 檢查 s3 儲存桶中的物件。

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
    root@stlrx2540m1-108:~#
  21. 若要驗證內容,請執行以下命令將每個檔案從 S3 複製到本機檔案系統:

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
    root@stlrx2540m1-108:~#
  22. 若要列印記錄,請使用 avro-tools-1.11.0.1.jar(可在 "Apache 檔案")。

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    root@stlrx2540m1-108:~#