Conetor S3 confluent
O Amazon S3 Sink Connector exporta dados de tópicos Apache Kafka para objetos S3 nos formatos Avro, JSON ou bytes. O Amazon S3 Sink Connector faz pesquisas periódicas de dados do Kafka e, por sua vez, os carrega para S3. Um particionador é usado para dividir os dados de cada partição Kafka em pedaços. Cada pedaço de dados é representado como um objeto S3D. O nome da chave codifica o tópico, a partição Kafka e o deslocamento inicial deste bloco de dados.
Nesta configuração, mostramos como ler e escrever tópicos no armazenamento de objetos do Kafka diretamente usando o conetor do dissipador kafka S3. Para esse teste, usamos um cluster confluente autônomo, mas essa configuração é aplicável a um cluster distribuído.
-
Faça o download do Confluent Kafka a partir do site da Confluent.
-
Descompacte o pacote para uma pasta no servidor.
-
Exportar duas variáveis.
Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0 export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
-
Para uma configuração autônoma do Kafka confluente, o cluster cria uma pasta raiz temporária no
/tmp
. também cria pastas Zookeeper, Kafka, um Registro de esquema, connect, um servidor ksql e control-center e copia seus respetivos arquivos de configuração$CONFLUENT_HOME
do . Veja o exemplo a seguir:root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/ total 28 drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center root@stlrx2540m1-108:~#
-
Configurar o Zookeeper. Você não precisa alterar nada se você usar os parâmetros padrão.
root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/zookeeper.properties | grep -iv ^# dataDir=/tmp/confluent.406980/zookeeper/data clientPort=2181 maxClientCnxns=0 admin.enableServer=false tickTime=2000 initLimit=5 syncLimit=2 server.179=controlcenter:2888:3888 root@stlrx2540m1-108:~#
Na configuração acima, atualizamos a
server. xxx
propriedade. Por padrão, você precisa de três Zookeepers para a seleção de líder Kafka. -
Criamos um arquivo myid
/tmp/confluent.406980/zookeeper/data
com um ID exclusivo:root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid 179 root@stlrx2540m1-108:~#
Usamos o último número de endereços IP para o arquivo myid. Usamos valores padrão para as configurações Kafka, connect, control-center, Kafka, Kafka-rest, ksql-server e schema-registry.
-
Inicie os serviços Kafka.
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services start The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 ZooKeeper is [UP] Kafka is [UP] Schema Registry is [UP] Kafka REST is [UP] Connect is [UP] ksqlDB Server is [UP] Control Center is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Há uma pasta de log para cada configuração, que ajuda a solucionar problemas. Em alguns casos, os serviços demoram mais tempo para começar. Certifique-se de que todos os serviços estão em funcionamento.
-
Instale o Kafka Connect usando `confluent-hub`o .
root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest The component can be installed in any of the following Confluent Platform installations: 1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME) 2. /data/confluent/confluent-6.2.0 (where this tool is installed) Choose one of these to continue the installation (1-2): 1 Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y Component's license: Confluent Community License http://www.confluent.io/confluent-community-license I agree to the software license agreement (yN) y Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components Do you want to uninstall existing version 10.0.3? (yN) y Detected Worker's configs: 1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties 2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties 3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties 4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties 5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties 6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties Do you want to update all detected configs? (yN) y Adding installation directory to plugin path in the following files: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties /tmp/confluent.406980/connect/connect.properties /tmp/confluent.406980/connect/connect.properties Completed root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
Você também pode instalar uma versão específica
confluent-hub install confluentinc/kafka-connect-s3:10.0.3
usando o . -
Por padrão,
confluentinc-kafka-connect-s3
o é instalado no/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
. -
Atualize o caminho do plug-in com o novo
confluentinc-kafka-connect-s3
.root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3 root@stlrx2540m1-108:~#
-
Pare os serviços confluentes e reinicie-os.
confluent local services stop confluent local services start root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services status The local commands are intended for a single-node development environment only, NOT for production usage. Using CONFLUENT_CURRENT: /tmp/confluent.406980 Connect is [UP] Control Center is [UP] Kafka is [UP] Kafka REST is [UP] ksqlDB Server is [UP] Schema Registry is [UP] ZooKeeper is [UP] root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
-
Configure a ID de acesso e a chave secreta
/root/.aws/credentials
no arquivo.root@stlrx2540m1-108:~# cat /root/.aws/credentials [default] aws_access_key_id = xxxxxxxxxxxx aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx root@stlrx2540m1-108:~#
-
Verifique se o balde está acessível.
root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2 2021-10-29 21:04:18 1388 1 2021-10-29 21:04:20 1388 2 2021-10-29 21:04:22 1388 3 root@stlrx2540m4-01:~#
-
Configure o arquivo de propriedades do S3-Sink para a configuração do S3 e do bucket.
root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^# name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=1 topics=s3_testtopic s3.region=us-west-2 s3.bucket.name=kafkasgdbucket1-2 store.url=http://kafkasgd.rtpppe.netapp.com:10444/ s3.part.size=5242880 flush.size=3 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.avro.AvroFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility=NONE root@stlrx2540m1-108:~#
-
Importe alguns Registros para o bucket do S3.
kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \ --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' {"f1": "value1"} {"f1": "value2"} {"f1": "value3"} {"f1": "value4"} {"f1": "value5"} {"f1": "value6"} {"f1": "value7"} {"f1": "value8"} {"f1": "value9"}
-
Carregue o conetor do dissipador de S3 V.
root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "flush.size": "3", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "s3.bucket.name": "kafkasgdbucket1-2", "s3.part.size": "5242880", "s3.region": "us-west-2", "schema.compatibility": "NONE", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/", "tasks.max": "1", "topics": "s3_testtopic", "name": "s3-sink" }, "tasks": [], "type": "sink" } root@stlrx2540m1-108:~#
-
Verifique o estado do dissipador de S3.
root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html { "name": "s3-sink", "connector": { "state": "RUNNING", "worker_id": "10.63.150.185:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.63.150.185:8083" } ], "type": "sink" } root@stlrx2540m1-108:~#
-
Verifique o log para se certificar de que o S3-Sink está pronto para aceitar tópicos.
root@stlrx2540m1-108:~# confluent local services connect log
-
Confira os tópicos no Kafka.
kafka-topics --list --bootstrap-server localhost:9092 … connect-configs connect-offsets connect-statuses default_ksql_processing_log s3_testtopic s3_topic s3_topic_new root@stlrx2540m1-108:~#
-
Verifique os objetos no balde S3.
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/ 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro 2021-10-29 21:24:00 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro 2021-10-29 21:24:08 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro 2021-10-29 21:24:09 213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro root@stlrx2540m1-108:~#
-
Para verificar o conteúdo, copie cada arquivo do S3 para o seu sistema de arquivos local executando o seguinte comando:
root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro tes.avro download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro root@stlrx2540m1-108:~#
-
Para imprimir os Registros, use avro-tools-1.11.0.1.jar (disponível no "Arquivos Apache").
root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro 21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable {"f1":"value1"} {"f1":"value2"} {"f1":"value3"} root@stlrx2540m1-108:~#