Conector confluent s3
El conector Amazon S3 Sink exporta datos de temas de Apache Kafka a objetos de S3 en formato Avro, JSON o bytes. El conector hunk de Amazon S3 sondea periódicamente datos de Kafka y, a su vez, los carga en S3. Se utiliza un particionador para dividir los datos de cada partición de Kafka en fragmentos. Cada fragmento de datos se representa como un objeto S3. El nombre de clave codifica el tema, la partición Kafka y el desplazamiento inicial de este fragmento de datos.
En esta configuración, le mostramos cómo leer y escribir temas en almacenamiento de objetos de Kafka directamente con el conector Kafka s3 sink. Para esta prueba, utilizamos un clúster Confluent independiente, pero esta configuración se aplica a un clúster distribuido.
-
Descargar Confluent Kafka desde el sitio web de Confluent.
-
Desembale el paquete en una carpeta del servidor.
-
Exportar dos variables.
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
-
Para una configuración independiente Confluent Kafka, el clúster crea una carpeta raíz temporal en
/tmp
. También crea Zookeeper, Kafka, un registro de esquemas, Connect, un ksql-Server, y las carpetas del centro de control y copia sus archivos de configuración respectivos desde$CONFLUENT_HOME
. Consulte el siguiente ejemplo:root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
-
Configure Zookeeper. No es necesario cambiar nada si utiliza los parámetros predeterminados.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/zookeeper.properties | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
En la configuración anterior, actualizamos la
server. xxx
propiedad. De forma predeterminada, se necesitan tres Zookeepers para la selección de líder de Kafka. -
Hemos creado un archivo myid en
/tmp/confluent.406980/zookeeper/data
Con un ID único:root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
Utilizamos el último número de direcciones IP para el archivo myid. Hemos utilizado los valores predeterminados para Kafka, Connect, control-centre, Kafka, Kafka-Rest, ksql-server y configuraciones de registro de esquema.
-
Inicie los servicios de Kafka.
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Hay una carpeta de registro para cada configuración, lo que ayuda a solucionar problemas. En algunos casos, los servicios tardan más tiempo en iniciarse. Asegúrese de que todos los servicios estén activos y en funcionamiento.
-
Instale Kafka Connect con
confluent-hub
.root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License http://www.confluent.io/confluent-community-license I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties /tmp/confluent.406980/connect/connect.properties /tmp/confluent.406980/connect/connect.properties Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
También puede instalar una versión específica mediante
confluent-hub install confluentinc/kafka-connect-s3:10.0.3
. -
De forma predeterminada,
confluentinc-kafka-connect-s3
está instalado en/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
. -
Actualice la ruta de plugins con el nuevo
confluentinc-kafka-connect-s3
.root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
-
Detenga los servicios Confluent y reinícielos.
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
-
Configure el ID de acceso y la clave secreta en la
/root/.aws/credentials
archivo.root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
-
Verifique que se pueda acceder al cucharón.
root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
-
Configure el archivo de propiedades s3-hunden para la configuración de bloques y s3.
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 s3.bucket.name=kafkasgdbucket1-2 store.url=http://kafkasgd.rtpppe.netapp.com:10444/ s3.part.size=5242880 flush.size=3 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE root@stlrx2540m1-108:~#
-
Importe unos pocos registros en el bloque de s3.
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
-
Cargue el conector del receptor s3.
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "s3.bucket.name": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
-
Compruebe el estado del receptor s3.
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "10.63.150.185:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.63.150.185:8083" } ], "type": "sink" } root@stlrx2540m1-108:~#
-
Revise el registro para asegurarse de que el receptor s3 esté listo para aceptar temas.
root@stlrx2540m1-108:~# confluent local services connect log
-
Consulta los temas en Kafka.
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
-
Compruebe los objetos en el bloque de s3.
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
-
Para verificar el contenido, copie cada archivo desde S3 en el sistema de archivos local ejecutando el siguiente comando:
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
-
Para imprimir los registros, utilice avro-tools-1.11.0.1.jar (disponible en "Archivos Apache").
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#