tencent cloud

Feedback

Importing Data from Kafka

Last updated: 2024-06-27 11:04:49
    Users can subscribe to message data in Kafka by submitting a routine import job for near real-time data synchronization.
    Doris itself can guarantee the subscription of messages in Kafka without loss or duplication, that is, Exactly-Once consumption semantics.

    Subscribing of Kafka Messages

    Subscription of Kafka messages uses the Routine Load feature in Doris. Users need to create a routine import job first. The job will send a series of tasks continuously through routine scheduling, and each task will consume a certain number of messages in Kafka. Please note the following usage restrictions:
    1. Support unauthenticated Kafka access and SSL-authenticated Kafka clusters.
    2. The supported message formats are as follows:
    CSV text format. Each message is one line, and the end of the line does not include newline characters.
    JSON format.
    3. Only supports Kafka 0.10.0.0 (inclusive) and above versions.

    Accessing SSL-authenticated Kafka Cluster

    The Routine Import feature supports unauthenticated Kafka clusters, and SSL-authenticated Kafka clusters. To access the SSL-authenticated Kafka cluster, users need to provide a certificate file (ca.pem) for authenticating the Kafka Broker's public key. If the client authentication is also enabled in the Kafka cluster, the client's public key (client.pem), Key file (client.key), and Key password must alsi be provided. The required files need to be uploaded to Plao in advance through the CREAE FILE command with the catalog named kafka. For specific help of the CREATE FILE command, see the CREATE FILE command manual. Here is an example:

    Upload Files

    CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
    CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
    CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    Once the upload is complete, you can view at the uploaded files using the SHOW FILES command.

    Creating Routine Import Job

    For the specific command of creating a routine import task, please see the ROUTINE LOAD command manual. Here is an example:
    1. Access the unauthenticated Kafka cluster.
    CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
    COLUMNS TERMINATED BY ","
    PROPERTIES
    (
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    max_batch_interval/max_batch_rows/max_batch_size is used to control the runtime cycle of a subtask. The runtime cycle of a subtask is determined by the longest running time, the most consumed rows and the maximum data volume consumed.
    2. Access the SSL-certificated Kafka cluster.
    CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
    COLUMNS TERMINATED BY ",",
    PROPERTIES
    (
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    )
    FROM KAFKA
    (
    "kafka_broker_list"= "broker1:9091,broker2:9091",
    "kafka_topic" = "my_topic",
    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"
    );

    Viewing Import Job Status

    For the specific command and example for checking the status of job, please see the SHOW ROUTINE LOAD command document.
    For the specific command and example of viewing the status of a task for a job, please see the SHOW ROUTINE LOAD TASK command document.
    You can only view running tasks. You cannot view tasks that have ended or have not started.

    Modifying Job Properties

    Users can modify some properties of already created jobs. For specific instructions, please see the ALTER ROUTINE LOAD command manual.

    Job Control

    Users can control the stop, pause, and restart of the job by the STOP/PAUSE/RESUME three commands. For the specific commands, please see STOP ROUTINE LOAD, PAUSE ROUTINE LOAD and RESUME ROUTINE LOAD command documents.

    More help

    For more detailed syntax and best practices about ROUTINE LOAD, please see the ROUTINE LOAD command manual.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support