Connecteur s3 confluent
Le connecteur d'évier Amazon S3 exporte les données des sujets Apache Kafka vers des objets S3 au format Avro, JSON ou octets. Le connecteur d'évier Amazon S3 interroge régulièrement les données depuis Kafka et les télécharge à son tour sur S3. Un partitionneur est utilisé pour diviser les données de chaque partition Kafka en segments. Chaque bloc de données est représenté en tant qu'objet S3. Le nom de clé encode le sujet, la partition Kafka et le décalage de début de ce segment de données.
Dans ce configuration, nous vous montrons comment lire et écrire des sujets dans le stockage objet depuis Kafka directement à l'aide du connecteur lavabo Kafka s3. Pour ce test, nous avons utilisé un cluster Confluent autonome, mais cette configuration s'applique à un cluster distribué.
-
Téléchargez le livre confluent Kafka depuis le site Web confluent.
-
Déballez le paquet dans un dossier de votre serveur.
-
Exporter deux variables.
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
-
Pour une configuration autonome que Kafka confluent, le cluster crée un dossier racine temporaire dans
/tmp
. Cette solution crée également Zookeeper, Kafka, un registre de schéma, Connect, un serveur ksql, et les dossiers du centre de contrôle et copie leurs fichiers de configuration respectifs à partir de$CONFLUENT_HOME
. Voir l'exemple suivant :root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
-
Configurer le Zookeeper. Vous n’avez rien à changer si vous utilisez les paramètres par défaut.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/zookeeper.properties | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
Dans la configuration ci-dessus, nous avons mis à jour le
server. xxx
propriété. Par défaut, vous avez besoin de trois zoopers pour la sélection du leader Kafka. -
Nous avons créé un fichier myID dans
/tmp/confluent.406980/zookeeper/data
Avec un ID unique :root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
Nous avons utilisé le dernier nombre d'adresses IP pour le fichier myID. Nous avons utilisé des valeurs par défaut pour Kafka, Connect, control-Center, Kafka, Kafka-REST, configurations de serveur ksql et de registre de schéma.
-
Démarrer les services Kafka
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Il existe un dossier journal pour chaque configuration, ce qui permet de résoudre les problèmes. Dans certains cas, le démarrage des services prend plus de temps. Assurez-vous que tous les services sont opérationnels.
-
Installez Kafka Connect à l'aide de
confluent-hub
.root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License http://www.confluent.io/confluent-community-license I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties /tmp/confluent.406980/connect/connect.properties /tmp/confluent.406980/connect/connect.properties Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Vous pouvez également installer une version spécifique en utilisant
confluent-hub install confluentinc/kafka-connect-s3:10.0.3
. -
Par défaut,
confluentinc-kafka-connect-s3
est installé dans/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
. -
Mettez à jour le chemin du plug-in avec le nouveau
confluentinc-kafka-connect-s3
.root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
-
Arrêtez les services de confluent et redémarrez-les.
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
-
Configurez l'ID d'accès et la clé secrète dans le
/root/.aws/credentials
fichier.root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
-
Vérifier que le godet est accessible.
root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
-
Configurez le fichier de propriétés s3-lavabo pour s3 et la configuration de compartiment.
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 s3.bucket.name=kafkasgdbucket1-2 store.url=http://kafkasgd.rtpppe.netapp.com:10444/ s3.part.size=5242880 flush.size=3 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE root@stlrx2540m1-108:~#
-
Importez quelques enregistrements dans le compartiment s3.
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
-
Chargez le connecteur de l'évier s3.
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "s3.bucket.name": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
-
Vérifiez l'état de l'évier s3.
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "10.63.150.185:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.63.150.185:8083" } ], "type": "sink" } root@stlrx2540m1-108:~#
-
Vérifiez le journal pour vous assurer que s3-lavabo est prêt à accepter les rubriques.
root@stlrx2540m1-108:~# confluent local services connect log
-
Vérifiez les sujets dans Kafka.
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
-
Vérification des objets dans le compartiment s3
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
-
Pour vérifier le contenu, copiez chaque fichier depuis S3 vers votre système de fichiers local à l'aide de la commande suivante :
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
-
Pour imprimer les enregistrements, utilisez avro-tools-1.11.0.1.jar (disponible dans le "Archives Apache").
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#