Skip to main content
NetApp Solutions
本繁體中文版使用機器翻譯,譯文僅供參考,若與英文版本牴觸,應以英文版本為準。

Connent S3連接器

貢獻者

Amazon S3 Sink Connector會以Avro、Json或位元組格式、將Apache Kafka主題的資料匯出至S3物件。Amazon S3 Sink連接器會定期輪詢Kafka的資料、然後再將資料上傳至S3。分區工具用於將每個Kafka分區的資料分割成區塊。每一區塊的資料都會以S3物件表示。金鑰名稱會編碼主題、Kafka分割區、以及此資料區塊的起始偏移。

在此設定中、我們會示範如何使用Kafka S3接收器連接器、直接從Kafka讀取和寫入物件儲存區中的主題。在這項測試中、我們使用獨立的ConFluent叢集、但此設定適用於分散式叢集。

  1. 從Conflent網站下載Conflent Kafka。

  2. 將套件解壓縮至伺服器上的資料夾。

  3. 匯出兩個變數。

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. 對於獨立的Conflent Kafka設定、叢集會在「/tmp」中建立一個暫存根資料夾。它也會建立Zookkeeper、Kafka、架構登錄、Connect、ksql-server、 和控制中心資料夾、並從「$Confluent_home'複製各自的組態檔案。請參閱下列範例:

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
    root@stlrx2540m1-108:~#
  5. 設定Zookkeeper。如果您使用預設參數、則不需要變更任何內容。

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/zookeeper.properties  | grep -iv ^#
    dataDir=/tmp/confluent.406980/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.179=controlcenter:2888:3888
    root@stlrx2540m1-108:~#

    在上述組態中、我們更新了「伺服器」。xxx屬性。根據預設、您需要三位Zooka主管才能選擇Kafka。

  6. 我們在「/tmp/confine.406980/zookeeper / data」中建立一個具有唯一ID的MyID檔案:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid
    179
    root@stlrx2540m1-108:~#

    我們使用最後數量的IP位址作為MyID檔案。我們使用卡夫卡、連線、控制中心、卡夫卡、卡夫卡、 ksql-server和schema-registry組態。

  7. 啟動Kafka服務。

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    每個組態都有一個記錄資料夾、有助於疑難排解問題。在某些情況下、服務需要更多時間才能啟動。請確定所有服務均已啟動且正在執行。

  8. 使用「confhube-hub」安裝Kafka Connect。

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    
    Component's license:
    Confluent Community License
    http://www.confluent.io/confluent-community-license
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:
      /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      /tmp/confluent.406980/connect/connect.properties
      /tmp/confluent.406980/connect/connect.properties
    
    Completed
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    您也可以使用「conlume-hub install confluentinc / Kafka-connect:s3:10.0.3」來安裝特定版本。

  9. 依預設、「confluentinc、Kafka-connect、S3」安裝於「/data / conflue/conlume-6.2.0/share/conflum-hub-sue-subs/confluentinc、Kafka-Connect、S3」中。

  10. 使用全新的「confluentinc - Kafka-connect - S3」來更新外掛程式路徑。

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
    root@stlrx2540m1-108:~#
  11. 停止Conflent服務並重新啟動。

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
  12. 在「/root/.AWS/IDes'檔案中設定存取ID和秘密金鑰。

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    [default]
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
    root@stlrx2540m1-108:~#
  13. 確認鏟斗可到達。

    root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
    root@stlrx2540m4-01:~#
  14. 設定S3和Bucket組態的S3接收器內容檔。

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^#
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=s3_testtopic
    s3.region=us-west-2
    s3.bucket.name=kafkasgdbucket1-2
    store.url=http://kafkasgd.rtpppe.netapp.com:10444/
    s3.part.size=5242880
    flush.size=3
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    root@stlrx2540m1-108:~#
  15. 將幾筆記錄匯入S3儲存區。

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. 裝入S3接收器連接器。

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "s3.bucket.name": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      },
      "tasks": [],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  17. 檢查S3接收器狀態。

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.63.150.185:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.63.150.185:8083"
        }
      ],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  18. 檢查記錄、確定S3接收器已準備好接受主題。

    root@stlrx2540m1-108:~# confluent local services connect log
  19. 請查看卡夫卡的主題。

    kafka-topics --list --bootstrap-server localhost:9092
    …
    connect-configs
    connect-offsets
    connect-statuses
    default_ksql_processing_log
    s3_testtopic
    s3_topic
    s3_topic_new
    root@stlrx2540m1-108:~#
  20. 檢查S3儲存區中的物件。

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
    root@stlrx2540m1-108:~#
  21. 若要驗證內容、請執行下列命令、將每個檔案從S3複製到您的本機檔案系統:

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
    root@stlrx2540m1-108:~#
  22. 若要列印記錄、請使用avro-tools-1.11.0.1.jar(可在 "Apache歸檔")。

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    root@stlrx2540m1-108:~#