融合 S3 连接器
Amazon S3 Sink Connector 以 Avro , JSON 或字节格式将数据从 Apache Kafka 主题导出到 S3 对象。Amazon S3 Sink Connector 会定期轮询 Kafka 中的数据,然后将其上传到 S3 。分区程序用于将每个 Kafka 分区的数据拆分为多个区块。每个数据区块都表示为 S3 对象。密钥名称会对主题, Kafka 分区以及此数据块的起始偏移进行编码。
在此设置中,我们将向您展示如何使用 Kafka S3 接收器连接器直接从 Kafka 读取和写入对象存储中的主题。在此测试中,我们使用了独立的 Consfluent 集群,但此设置适用于分布式集群。
-
从 Confluent 网站下载 Confluent Kafka 。
-
将软件包解压缩到服务器上的文件夹。
-
导出两个变量。
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
-
对于独立的 Confluent Kafka 设置,集群会在 ` /tmp` 中创建一个临时根文件夹。它还会创建 Zookeeper , Kafka ,模式注册表, connect , ksql-server , 和控制中心文件夹,并从 ` $confuent_home` 复制其各自的配置文件。请参见以下示例:
root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
-
配置 Zookeeper 。如果使用默认参数,则无需更改任何内容。
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/zookeeper.properties | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
在上述配置中,我们更新了
s服务器。xxx
属性。默认情况下,您需要三个 zookepers 来选择 Kafka 领导者。 -
我们在 ` /tmp/confuent.406980/zookeeper /data` 中创建了一个 myid 文件,其唯一 ID 为:
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
我们使用 myid 文件的最后一个 IP 地址数。我们使用了 Kafka , connect , control-center , Kafka , Kafka-REST , ksql-server 和模式注册表配置。
-
启动 Kafka 服务。
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
每个配置都有一个日志文件夹,可帮助您解决问题。在某些情况下,服务需要较长时间才能启动。确保所有服务均已启动且正在运行。
-
使用
Confluent-hub
安装 Kafka 连接。root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License http://www.confluent.io/confluent-community-license I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties /tmp/confluent.406980/connect/connect.properties /tmp/confluent.406980/connect/connect.properties Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
您也可以使用
Confluent-hub install conflientint/Kafka-connect-S3 : 10.0.3
来安装特定版本。 -
默认情况下,
confuentine-Kafka-connect-S3
安装在 ` /data/confuent/confuent-6.2.0/share/confuent-hub-components/confuentine-Kafka-connect-S3` 中。 -
使用新的
Confluentine-Kafka-connect-S3
更新插件路径。root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
-
停止并重新启动 Consfluent 服务。
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
-
在 ` /root/.AWS/credentials` 文件中配置访问 ID 和机密密钥。
root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
-
验证存储分段是否可访问。
root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
-
为 S3 和存储分段配置 S3-sink 属性文件。
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 s3.bucket.name=kafkasgdbucket1-2 store.url=http://kafkasgd.rtpppe.netapp.com:10444/ s3.part.size=5242880 flush.size=3 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE root@stlrx2540m1-108:~#
-
将一些记录导入到 S3 存储分段中。
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
-
加载 S3-sink 连接器。
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "s3.bucket.name": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
-
检查 S3-sink 状态。
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "10.63.150.185:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.63.150.185:8083" } ], "type": "sink" } root@stlrx2540m1-108:~#
-
检查日志以确保 S3-sink 已准备好接受主题。
root@stlrx2540m1-108:~# confluent local services connect log
-
查看 Kafka 中的主题。
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
-
检查 S3 存储分段中的对象。
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
-
要验证内容,请运行以下命令将每个文件从 S3 复制到本地文件系统:
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
-
要打印记录,请使用 avro-tools-1.11.0.1.jar (可在中找到 "Apache 归档")。
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#