Skip to main content
NetApp Solutions
Se proporciona el idioma español mediante traducción automática para su comodidad. En caso de alguna inconsistencia, el inglés precede al español.

Conector confluent s3

Colaboradores

El conector Amazon S3 Sink exporta datos de temas de Apache Kafka a objetos de S3 en formato Avro, JSON o bytes. El conector hunk de Amazon S3 sondea periódicamente datos de Kafka y, a su vez, los carga en S3. Se utiliza un particionador para dividir los datos de cada partición de Kafka en fragmentos. Cada fragmento de datos se representa como un objeto S3. El nombre de clave codifica el tema, la partición Kafka y el desplazamiento inicial de este fragmento de datos.

En esta configuración, le mostramos cómo leer y escribir temas en almacenamiento de objetos de Kafka directamente con el conector Kafka s3 sink. Para esta prueba, utilizamos un clúster Confluent independiente, pero esta configuración se aplica a un clúster distribuido.

  1. Descargar Confluent Kafka desde el sitio web de Confluent.

  2. Desembale el paquete en una carpeta del servidor.

  3. Exportar dos variables.

    Export CONFLUENT_HOME=/data/confluent/confluent-6.2.0
    export PATH=$PATH:/data/confluent/confluent-6.2.0/bin
  4. Para una configuración independiente Confluent Kafka, el clúster crea una carpeta raíz temporal en /tmp. También crea Zookeeper, Kafka, un registro de esquemas, Connect, un ksql-Server, y las carpetas del centro de control y copia sus archivos de configuración respectivos desde $CONFLUENT_HOME. Consulte el siguiente ejemplo:

    root@stlrx2540m1-108:~# ls -ltr /tmp/confluent.406980/
    total 28
    drwxr-xr-x 4 root root 4096 Oct 29 19:01 zookeeper
    drwxr-xr-x 4 root root 4096 Oct 29 19:37 kafka
    drwxr-xr-x 4 root root 4096 Oct 29 19:40 schema-registry
    drwxr-xr-x 4 root root 4096 Oct 29 19:45 kafka-rest
    drwxr-xr-x 4 root root 4096 Oct 29 19:47 connect
    drwxr-xr-x 4 root root 4096 Oct 29 19:48 ksql-server
    drwxr-xr-x 4 root root 4096 Oct 29 19:53 control-center
    root@stlrx2540m1-108:~#
  5. Configure Zookeeper. No es necesario cambiar nada si utiliza los parámetros predeterminados.

    root@stlrx2540m1-108:~# cat  /tmp/confluent.406980/zookeeper/zookeeper.properties  | grep -iv ^#
    dataDir=/tmp/confluent.406980/zookeeper/data
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    tickTime=2000
    initLimit=5
    syncLimit=2
    server.179=controlcenter:2888:3888
    root@stlrx2540m1-108:~#

    En la configuración anterior, actualizamos la server. xxx propiedad. De forma predeterminada, se necesitan tres Zookeepers para la selección de líder de Kafka.

  6. Hemos creado un archivo myid en /tmp/confluent.406980/zookeeper/data Con un ID único:

    root@stlrx2540m1-108:~# cat /tmp/confluent.406980/zookeeper/data/myid
    179
    root@stlrx2540m1-108:~#

    Utilizamos el último número de direcciones IP para el archivo myid. Hemos utilizado los valores predeterminados para Kafka, Connect, control-centre, Kafka, Kafka-Rest, ksql-server y configuraciones de registro de esquema.

  7. Inicie los servicios de Kafka.

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  start
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    ZooKeeper is [UP]
    Kafka is [UP]
    Schema Registry is [UP]
    Kafka REST is [UP]
    Connect is [UP]
    ksqlDB Server is [UP]
    Control Center is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    Hay una carpeta de registro para cada configuración, lo que ayuda a solucionar problemas. En algunos casos, los servicios tardan más tiempo en iniciarse. Asegúrese de que todos los servicios estén activos y en funcionamiento.

  8. Instale Kafka Connect con confluent-hub.

    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# ./confluent-hub install confluentinc/kafka-connect-s3:latest
    The component can be installed in any of the following Confluent Platform installations:
      1. /data/confluent/confluent-6.2.0 (based on $CONFLUENT_HOME)
      2. /data/confluent/confluent-6.2.0 (where this tool is installed)
    Choose one of these to continue the installation (1-2): 1
    Do you want to install this into /data/confluent/confluent-6.2.0/share/confluent-hub-components? (yN) y
    
    Component's license:
    Confluent Community License
    http://www.confluent.io/confluent-community-license
    I agree to the software license agreement (yN) y
    Downloading component Kafka Connect S3 10.0.3, provided by Confluent, Inc. from Confluent Hub and installing into /data/confluent/confluent-6.2.0/share/confluent-hub-components
    Do you want to uninstall existing version 10.0.3? (yN) y
    Detected Worker's configs:
      1. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      2. Standard: /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      3. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      4. Standard: /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      5. Based on CONFLUENT_CURRENT: /tmp/confluent.406980/connect/connect.properties
      6. Used by Connect process with PID 15904: /tmp/confluent.406980/connect/connect.properties
    Do you want to update all detected configs? (yN) y
    Adding installation directory to plugin path in the following files:
      /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties
      /data/confluent/confluent-6.2.0/etc/kafka/connect-standalone.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-distributed.properties
      /data/confluent/confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties
      /tmp/confluent.406980/connect/connect.properties
      /tmp/confluent.406980/connect/connect.properties
    
    Completed
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#

    También puede instalar una versión específica mediante confluent-hub install confluentinc/kafka-connect-s3:10.0.3.

  9. De forma predeterminada, confluentinc-kafka-connect-s3 está instalado en /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3.

  10. Actualice la ruta de plugins con el nuevo confluentinc-kafka-connect-s3.

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/etc/kafka/connect-distributed.properties | grep plugin.path
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    plugin.path=/usr/share/java,/data/zookeeper/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components,/data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3
    root@stlrx2540m1-108:~#
  11. Detenga los servicios Confluent y reinícielos.

    confluent local services  stop
    confluent local services  start
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin# confluent local services  status
    The local commands are intended for a single-node development environment only,
    NOT for production usage.
     
    Using CONFLUENT_CURRENT: /tmp/confluent.406980
    Connect is [UP]
    Control Center is [UP]
    Kafka is [UP]
    Kafka REST is [UP]
    ksqlDB Server is [UP]
    Schema Registry is [UP]
    ZooKeeper is [UP]
    root@stlrx2540m1-108:/data/confluent/confluent-6.2.0/bin#
  12. Configure el ID de acceso y la clave secreta en la /root/.aws/credentials archivo.

    root@stlrx2540m1-108:~# cat /root/.aws/credentials
    [default]
    aws_access_key_id = xxxxxxxxxxxx
    aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxx
    root@stlrx2540m1-108:~#
  13. Verifique que se pueda acceder al cucharón.

    root@stlrx2540m4-01:~# aws s3 –endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls kafkasgdbucket1-2
    2021-10-29 21:04:18       1388 1
    2021-10-29 21:04:20       1388 2
    2021-10-29 21:04:22       1388 3
    root@stlrx2540m4-01:~#
  14. Configure el archivo de propiedades s3-hunden para la configuración de bloques y s3.

    root@stlrx2540m1-108:~# cat /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties | grep -v ^#
    name=s3-sink
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=s3_testtopic
    s3.region=us-west-2
    s3.bucket.name=kafkasgdbucket1-2
    store.url=http://kafkasgd.rtpppe.netapp.com:10444/
    s3.part.size=5242880
    flush.size=3
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.avro.AvroFormat
    partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility=NONE
    root@stlrx2540m1-108:~#
  15. Importe unos pocos registros en el bloque de s3.

    kafka-avro-console-producer --broker-list localhost:9092 --topic s3_topic \
    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
    {"f1": "value1"}
    {"f1": "value2"}
    {"f1": "value3"}
    {"f1": "value4"}
    {"f1": "value5"}
    {"f1": "value6"}
    {"f1": "value7"}
    {"f1": "value8"}
    {"f1": "value9"}
  16. Cargue el conector del receptor s3.

    root@stlrx2540m1-108:~# confluent local services connect connector load s3-sink  --config /data/confluent/confluent-6.2.0/share/confluent-hub-components/confluentinc-kafka-connect-s3/etc/quickstart-s3.properties
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "flush.size": "3",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
        "s3.bucket.name": "kafkasgdbucket1-2",
        "s3.part.size": "5242880",
        "s3.region": "us-west-2",
        "schema.compatibility": "NONE",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "store.url": "http://kafkasgd.rtpppe.netapp.com:10444/",
        "tasks.max": "1",
        "topics": "s3_testtopic",
        "name": "s3-sink"
      },
      "tasks": [],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  17. Compruebe el estado del receptor s3.

    root@stlrx2540m1-108:~# confluent local services connect connector status s3-sink
    The local commands are intended for a single-node development environment only,
    NOT for production usage. https://docs.confluent.io/current/cli/index.html
    {
      "name": "s3-sink",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.63.150.185:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.63.150.185:8083"
        }
      ],
      "type": "sink"
    }
    root@stlrx2540m1-108:~#
  18. Revise el registro para asegurarse de que el receptor s3 esté listo para aceptar temas.

    root@stlrx2540m1-108:~# confluent local services connect log
  19. Consulta los temas en Kafka.

    kafka-topics --list --bootstrap-server localhost:9092
    …
    connect-configs
    connect-offsets
    connect-statuses
    default_ksql_processing_log
    s3_testtopic
    s3_topic
    s3_topic_new
    root@stlrx2540m1-108:~#
  20. Compruebe los objetos en el bloque de s3.

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 ls --recursive kafkasgdbucket1-2/topics/
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000003.avro
    2021-10-29 21:24:00        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000006.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000009.avro
    2021-10-29 21:24:08        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000012.avro
    2021-10-29 21:24:09        213 topics/s3_testtopic/partition=0/s3_testtopic+0+0000000015.avro
    root@stlrx2540m1-108:~#
  21. Para verificar el contenido, copie cada archivo desde S3 en el sistema de archivos local ejecutando el siguiente comando:

    root@stlrx2540m1-108:~# aws s3 --endpoint-url http://kafkasgd.rtpppe.netapp.com:10444 cp s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro  tes.avro
    download: s3://kafkasgdbucket1-2/topics/s3_testtopic/partition=0/s3_testtopic+0+0000000000.avro to ./tes.avro
    root@stlrx2540m1-108:~#
  22. Para imprimir los registros, utilice avro-tools-1.11.0.1.jar (disponible en "Archivos Apache").

    root@stlrx2540m1-108:~# java -jar /usr/src/avro-tools-1.11.0.1.jar tojson tes.avro
    21/10/30 00:20:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    {"f1":"value1"}
    {"f1":"value2"}
    {"f1":"value3"}
    root@stlrx2540m1-108:~#