Confluent S3 커넥터
Amazon S3 싱크 커넥터는 Apache Kafka 항목의 데이터를 Avro, JSON 또는 바이트 형식의 S3 오브젝트로 내보냅니다. Amazon S3 싱크 커넥터는 주기적으로 Kafka의 데이터를 폴링하여 S3로 업로드합니다. 분할자는 모든 Kafka 파티션의 데이터를 청크로 분할하는 데 사용됩니다. 각 데이터 청크는 S3 오브젝트로 표시됩니다. 키 이름은 주제, Kafka 파티션 및 이 데이터 청크의 시작 오프셋을 인코딩합니다.
이 설정에서는 Kafka S3 싱크 커넥터를 사용하여 바로 Kafka에서 오브젝트 스토리지의 항목을 읽고 쓰는 방법을 보여 줍니다. 이 테스트에서는 독립 실행형 Confluent 클러스터를 사용했지만 이 설정은 분산 클러스터에 적용됩니다.
Confluent 웹 사이트에서 Confluent Kafka를 다운로드하십시오.
패키지를 서버의 폴더에 압축을 풉니다.
두 변수를 내보냅니다.
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
독립 실행형 Confluorent Kafka 설정의 경우 클러스터는 "/tmp"에 임시 루트 폴더를 생성합니다. 또한 ZooKeeper, Kafka, 스키마 레지스트리, 연결, ksql-server를 생성합니다. 제어 센터 폴더를 만들고 해당 구성 파일을 '$CONFLUENT_HOME'에서 복사합니다. 다음 예를 참조하십시오.
root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
ZooKeeper를 구성합니다. 기본 매개 변수를 사용하는 경우 아무것도 변경할 필요가 없습니다.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/ | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
위 구성에서 서버를 업데이트했습니다. XXX'재산. 기본적으로 Kafka 리더 선택을 위해서는 Zookeepers가 세 개 필요합니다.
고유한 ID로 '/tmp/confluent.406980/zookeeper/data'에 myid 파일을 만들었습니다.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
myid 파일에 대한 마지막 IP 주소 수를 사용했습니다. Kafka, CONNECT, CONTROL-CENTER, Kafka, Kafka-Rest, ksql-server 및 schema-registry 구성.
Kafka 서비스를 시작합니다.
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
각 구성에 대한 로그 폴더가 있어 문제를 해결하는 데 도움이 됩니다. 경우에 따라 서비스를 시작하는 데 더 많은 시간이 걸릴 수 있습니다. 모든 서비스가 실행 중인지 확인합니다.
'confluent-hub'을 이용하여 Kafka CONNECT를 설치한다.
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/ 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/ 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/ 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/ 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/ 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/ Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/ /data/confluent/confluent-6.2.0/etc/kafka/ /data/confluent/confluent-6.2.0/etc/schema-registry/ /data/confluent/confluent-6.2.0/etc/schema-registry/ /tmp/confluent.406980/connect/ /tmp/confluent.406980/connect/ Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
confluent-hub install confluentinc/Kafka-connect-S3:10.0.3'을 사용하여 특정 버전을 설치할 수도 있습니다.
기본적으로 '/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentententinc-kafka-connect-s3'에 confluentinc-kafka-connect-s3이 설치됩니다.
새로운 'confluentinc-kafka-connect-s3'으로 플러그인 경로를 업데이트합니다.
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/ | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
Confluent 서비스를 중지하고 다시 시작합니다.
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
'/root/.aws/credentials' 파일에서 액세스 ID와 비밀 키를 설정한다.
root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
버킷에 도달할 수 있는지 확인합니다.
root@stlrx2540m4-01:~# aws s3 –endpoint-url ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
S3 및 버킷 구성에 대해 S3-싱크 속성 파일을 구성합니다.
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/ | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 store.url= s3.part.size=5242880 flush.size=3 format.class=io.confluent.connect.s3.format.avro.AvroFormat schema.compatibility=NONE root@stlrx2540m1-108:~#
S3 버킷으로 몇 개의 레코드를 가져옵니다.
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
S3 싱크 커넥터를 로드합니다.
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/ The local commands are intended for a single-node development environment only, NOT for production usage. { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "", "": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "", "store.url": "", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
S3 싱크 상태를 확인합니다.
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "" } ], "type": "sink" } root@stlrx2540m1-108:~#
로그를 확인하여 S3 싱크가 항목을 수락할 준비가 되었는지 확인합니다.
root@stlrx2540m1-108:~# confluent local services connect log
Kafka의 주제를 확인하십시오.
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
S3 버킷의 오브젝트를 확인합니다.
root@stlrx2540m1-108:~# aws s3 --endpoint-url ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
내용을 확인하려면 다음 명령을 실행하여 각 파일을 S3에서 로컬 파일 시스템으로 복사합니다.
root@stlrx2540m1-108:~# aws s3 --endpoint-url cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
레코드를 인쇄하려면 avro-tools- (에서 사용 가능)를 사용합니다 "아파치 아카이브")를 클릭합니다.
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools- tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#