Fließender s3-Anschluss
Der Amazon S3 Sink Connector exportiert Daten von Apache Kafka Themen entweder im Avro-, JSON- oder Byte-Format in S3-Objekte. Der Amazon S3 Sink Connector fragt regelmäßig Daten aus Kafka ab und lädt sie wiederum nach S3 hoch. Ein Partitioner wird verwendet, um die Daten jeder Kafka-Partition in Stücke zu teilen. Jeder Datenblock wird als S3-Objekt dargestellt. Der Schlüsselname kodiert das Thema, die Kafka-Partition und den Start-Offset dieses Datenblocks.
In diesen Einstellungen zeigen wir Ihnen, wie man Themen aus Kafka direkt über den Kafka s3 Waschbecken im Objekt-Storage liest und schreibt. Für diesen Test haben wir einen eigenständigen Confluent Cluster verwendet, dieses Setup gilt jedoch für einen verteilten Cluster.
-
Laden Sie Confluent Kafka von der Confluent Website herunter.
-
Packen Sie das Paket in einen Ordner auf Ihrem Server aus.
-
Exportieren Sie zwei Variablen.
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
-
Für ein eigenständiges Confluent Kafka Setup erstellt der Cluster einen temporären Stammordner in
/tmp
. Es erstellt auch Zookeeper, Kafka, eine Schemaregistrierung, connect, einen ksql-Server, Control-Center Ordner und kopiert ihre jeweiligen Konfigurationsdateien von$CONFLUENT_HOME
. Das folgende Beispiel zeigt:root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
-
Zookeeper Konfigurieren. Sie müssen nichts ändern, wenn Sie die Standardparameter verwenden.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/zookeeper.properties | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
In der obigen Konfiguration haben wir die aktualisiert
server. xxx
Eigenschaft. Standardmäßig benötigen Sie drei Zookeeper für die Auswahl von Kafka Leader. -
Wir haben eine myid-Datei in erstellt
/tmp/confluent.406980/zookeeper/data
Mit einer eindeutigen ID:root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
Wir haben die letzte Anzahl von IP-Adressen für die myid-Datei verwendet. Für Kafka wurden Standardwerte verwendet: Connect, Control Center, Kafka, Kafka-Rest, Konfiguration von ksql-Server und Schema-Registry
-
Starten Sie die Kafka-Dienste.
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Für jede Konfiguration gibt es einen Protokollordner, der die Fehlerbehebung erleichtert. In einigen Fällen nehmen Services mehr Zeit zum Start in Anspruch. Stellen Sie sicher, dass alle Services betriebsbereit sind.
-
Installieren Sie Kafka Connect mit
confluent-hub
.root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License http://www.confluent.io/confluent-community-license I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties /tmp/confluent.406980/connect/connect.properties /tmp/confluent.406980/connect/connect.properties Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Sie können eine bestimmte Version auch mithilfe von installieren
confluent-hub install confluentinc/kafka-connect-s3:10.0.3
. -
Standardmäßig ist
confluentinc-kafka-connect-s3
Ist in installiert/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
. -
Aktualisieren Sie den Plug-in-Pfad mit dem neuen
confluentinc-kafka-connect-s3
.root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
-
Beenden Sie die Confluent-Dienste, und starten Sie sie neu.
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
-
Konfigurieren Sie die Zugriffs-ID und den geheimen Schlüssel im
/root/.aws/credentials
Datei:root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
-
Vergewissern Sie sich, dass der Bucket erreichbar ist.
root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
-
Konfigurieren der s3-Sink-Eigenschaftendatei für die s3- und Bucket-Konfiguration
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 s3.bucket.name=kafkasgdbucket1-2 store.url=http://kafkasgd.rtpppe.netapp.com:10444/ s3.part.size=5242880 flush.size=3 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE root@stlrx2540m1-108:~#
-
Importieren Sie einige Datensätze in den s3-Bucket.
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
-
Den s3-Spülkörper-Anschluss einlegen.
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "s3.bucket.name": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
-
Überprüfen Sie den s3-Sink-Status.
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "10.63.150.185:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.63.150.185:8083" } ], "type": "sink" } root@stlrx2540m1-108:~#
-
Prüfen Sie das Protokoll, um sicherzustellen, dass s3-Sink bereit ist, Themen zu akzeptieren.
root@stlrx2540m1-108:~# confluent local services connect log
-
Informieren Sie sich in Kafka über die Themen.
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
-
Überprüfen Sie die Objekte im s3-Bucket.
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
-
Um den Inhalt zu überprüfen, kopieren Sie jede Datei von S3 in Ihr lokales Dateisystem, indem Sie den folgenden Befehl ausführen:
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
-
Um die Datensätze zu drucken, verwenden Sie avro-tools-1.11.0.1.jar (verfügbar im "Apache Archives").
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#