Skip to main content
NetApp artificial intelligence solutions
본 한국어 번역은 사용자 편의를 위해 제공되는 기계 번역입니다. 영어 버전과 한국어 버전이 서로 어긋나는 경우에는 언제나 영어 버전이 우선합니다.

Confluent S3 커넥터

Amazon S3 Sink 커넥터는 Apache Kafka 토픽의 데이터를 Avro, JSON 또는 Bytes 형식으로 S3 객체로 내보냅니다. Amazon S3 싱크 커넥터는 주기적으로 Kafka에서 데이터를 폴링하고 이를 다시 S3에 업로드합니다. 파티셔너는 모든 카프카 파티션의 데이터를 청크로 분할하는 데 사용됩니다. 각 데이터 덩어리는 S3 객체로 표현됩니다. 키 이름은 주제, 카프카 파티션, 이 데이터 청크의 시작 오프셋을 인코딩합니다.

이 설정에서는 Kafka S3 싱크 커넥터를 사용하여 Kafka에서 개체 스토리지의 주제를 직접 읽고 쓰는 방법을 보여드립니다. 이 테스트에서는 독립형 Confluent 클러스터를 사용했지만, 이 설정은 분산 클러스터에도 적용할 수 있습니다.

  1. Confluent 웹사이트에서 Confluent Kafka를 다운로드하세요.

  2. 서버의 폴더에 패키지를 압축 해제합니다.

  3. 두 개의 변수를 내보냅니다.

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. 독립 실행형 Confluent Kafka 설정의 경우 클러스터는 임시 루트 폴더를 생성합니다. /tmp 또한 Zookeeper, Kafka, 스키마 레지스트리, Connect, ksql-server 및 Control-Center 폴더를 생성하고 해당 구성 파일을 다음에서 복사합니다. $CONFLUENT_HOME . 다음 예를 참조하세요.

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
    root@stlrx2540m1-108:~#
  5. Zookeeper를 구성합니다. 기본 매개변수를 사용하는 경우 아무것도 변경할 필요가 없습니다.

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/zookeeper.properties  | grep -iv ^#
    dataDir=/tmp/confluent.406980/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.179=controlcenter:2888:3888
    root@stlrx2540m1-108:~#

    위의 구성에서 우리는 다음을 업데이트했습니다. server. xxx 재산. 기본적으로 Kafka 리더 선택을 위해서는 3명의 Zookeeper가 필요합니다.

  6. 우리는 myid 파일을 생성했습니다 /tmp/confluent.406980/zookeeper/data 고유 ID로:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid
    179
    root@stlrx2540m1-108:~#

    우리는 myid 파일에 마지막 IP 주소 번호를 사용했습니다. Kafka, connect, control-center, Kafka, Kafka-rest, ksql-server 및 schema-registry 구성에 기본값을 사용했습니다.

  7. 카프카 서비스를 시작합니다.

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    각 구성에는 문제 해결에 도움이 되는 로그 폴더가 있습니다. 어떤 경우에는 서비스를 시작하는 데 시간이 더 오래 걸립니다. 모든 서비스가 정상적으로 작동하고 있는지 확인하세요.

  8. Kafka Connect를 사용하여 설치하세요 confluent-hub .

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    
    Component's license:
    Confluent Community License
    http://www.confluent.io/confluent-community-license
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:
      /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      /tmp/confluent.406980/connect/connect.properties
      /tmp/confluent.406980/connect/connect.properties
    
    Completed
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    다음을 사용하여 특정 버전을 설치할 수도 있습니다. confluent-hub install confluentinc/kafka-connect-s3:10.0.3 .

  9. 기본적으로, confluentinc-kafka-connect-s3 에 설치됨 /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 .

  10. 플러그인 경로를 새 것으로 업데이트합니다. confluentinc-kafka-connect-s3 .

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
    root@stlrx2540m1-108:~#
  11. Confluent 서비스를 중지했다가 다시 시작합니다.

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
  12. 액세스 ID 및 비밀 키를 구성하세요. /root/.aws/credentials 파일.

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    [default]
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
    root@stlrx2540m1-108:~#
  13. 버킷에 접근 가능한지 확인하세요.

    root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
    root@stlrx2540m4-01:~#
  14. S3 및 버킷 구성을 위해 S3-싱크 속성 파일을 구성합니다.

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^#
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=s3_testtopic
    s3.region=us-west-2
    s3.bucket.name=kafkasgdbucket1-2
    store.url=http://kafkasgd.rtpppe.netapp.com:10444/
    s3.part.size=5242880
    flush.size=3
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    root@stlrx2540m1-108:~#
  15. 몇 개의 레코드를 S3 버킷으로 가져옵니다.

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. s3-싱크 커넥터를 로드합니다.

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "s3.bucket.name": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      },
      "tasks": [],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  17. s3-sink 상태를 확인하세요.

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.63.150.185:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.63.150.185:8083"
        }
      ],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  18. s3-sink가 토픽을 수락할 준비가 되었는지 확인하려면 로그를 확인하세요.

    root@stlrx2540m1-108:~# confluent local services connect log
  19. 카프카의 주제를 확인하세요.

    kafka-topics --list --bootstrap-server localhost:9092
    …
    connect-configs
    connect-offsets
    connect-statuses
    default_ksql_processing_log
    s3_testtopic
    s3_topic
    s3_topic_new
    root@stlrx2540m1-108:~#
  20. s3 버킷의 객체를 확인하세요.

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
    root@stlrx2540m1-108:~#
  21. 내용을 확인하려면 다음 명령을 실행하여 각 파일을 S3에서 로컬 파일 시스템으로 복사합니다.

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
    root@stlrx2540m1-108:~#
  22. 레코드를 인쇄하려면 avro-tools-1.11.0.1.jar(다음에서 사용 가능)를 사용하세요. "아파치 아카이브" ).

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    root@stlrx2540m1-108:~#