tencent cloud

Feedback

Last updated: 2024-09-09 21:27:07

    Overview

    This document introduces the directions for using the PHP client to connect to an elastic Topic of CKafka and send and receive messages.

    Prerequisites

    Directions

    Step 1: Preparing the Environment

    1. Find the latest version of the rdkafka PHP extension package on the rdkafka Official Page.
    Note
    Different package versions require different PHP version requirements. Here, version 4.1.2 is used as an example.
    2. Install the rdkafka extension.
    wget --no-check-certificate https://pecl.php.net/get/rdkafka-4.1.2.tgz
    pear install rdkafka-4.1.2.tgz
    # If the installation succeeds, the system will prompt install ok and You should add extension=rdkafka.so to php.ini.
    # If the installation fails and the system prompts could not extract the package.xml file from rdkafka-4.1.2.tgz, manually extract it, copy the package.xml file into the rdkafka directory, and then execute pear install package.xml to proceed with the installation.
    # Resolve other errors according to the provided instructions.
    # After successful installation, add `extension=rdkafka.so` to `php.ini`.
    # After `php --ini` is executed, `Loaded Configuration File:` shows the location of `php.ini`.
    echo 'extension=rdkafka.so' >> /etc/php.ini

    Step 2: Creating a Topic and Subscription Relationship

    1. On the Elastic Topic list page of the console, create a Topic.
    
    2. Click the ID of the Topic to enter the Basic Information page and obtain the username, password, and address information.
    
    3. In the Subscription Relationships tab, create a subscription relationship (consumption group).
    

    Step 3: Producing Messages

    1. Write the message production program Producer.php.
    <?php
    
    $setting = require __DIR__ . '/CKafkaSetting.php';
    
    $conf = new RdKafka\\Conf();
    // Set the entry service, and get the corresponding service address from the console.
    $conf->set('bootstrap.servers', $setting['bootstrap_servers']);
    // ---------- Set this part if SASL authentication is enabled. ----------
    // The SASL authentication mechanism is PLAIN by default.
    $conf->set('sasl.mechanism', 'PLAIN');
    // Set `username`: The username configured in **User Management**.
    $conf->set('sasl.username', $setting['sasl_username']);
    // Set `password`: the password configured in **User Management**
    $conf->set('sasl.password', $setting['sasl_password']);
    // Configure the ACL policy locally.
    $conf->set('security.protocol', 'SASL_PLAINTEXT');
    // ---------- Set this part if SASL authentication is enabled. ----------
    // There are three ack mechanisms for a Kafka producer as described below:
    // -1 or all: The broker responds to the producer and continues to send the next message or next batch of messages only after the leader receives the data and synchronizes it to the followers in all ISRs.
    // This configuration offers the highest data reliability. There is no message loss as long as one synchronized replica is active. Note: This configuration cannot ensure that all replicas have completed write operations on the data before returning."
    // It can be used together with the `min.insync.replicas` parameter at the topic level.
    // 0: The producer does not wait for the confirmation of synchronization completion from the broker and continues to send the next message (or batch). This configuration offers the highest production performance but the lowest data reliability.
    // (Data loss may occur when the server fails. If the leader is down but the producer is unaware of that, the broker cannot receive messages.)
    // 1: The producer sends the next message (or batch) only after the leader has successfully received the data and acknowledged it. This configuration is a balance between production throughput and data reliability.
    // (If the leader is down but not yet replicated, the message may be lost.)
    // When the user does not specify a configuration, the default value is 1. Users can adjust this value according to their business needs.
    $conf->set('acks', '1');
    // The number of retries upon request error. It is recommended to set the value to be greater than 0. This ensures that failed requests are retried, maximizing the chances of preventing message loss.
    $conf->set('retries', '0');
    // The time interval between a failed request and the next retry attempt.
    $conf->set('retry.backoff.ms', 100);
    // Timeout period for network requests made by the producer.
    $conf->set('socket.timeout.ms', 6000);
    $conf->set('reconnect.backoff.max.ms', 3000);
    
    // Register the callback for message sending.
    $conf->setDrMsgCb(function ($kafka, $message) {
    echo '**Producer** sent a message: message=' . var_export($message, true) . "\\n";
    });
    // Register the callback for message sending error.
    $conf->setErrorCb(function ($kafka, $err, $reason) {
    echo "**Producer** runs into an error while sending the message: err=$err reason=$reason \\n";
    });
    
    $producer = new RdKafka\\Producer($conf);
    // Please set `LOG_DEBUG` when you perform Debug.
    //$producer->setLogLevel(LOG_DEBUG);
    $topicConf = new RdKafka\\TopicConf();
    $topic = $producer->newTopic($setting['topic_name'], $topicConf);
    // Produce a message and send it.
    for ($i = 0; $i < 5; $i++) {
    // `RD_KAFKA_PARTITION_UA` lets Kafka select a partition freely.
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
    $producer->poll(0);
    }
    
    while ($producer->getOutQLen() > 0) {
    $producer->poll(50);
    }
    
    echo "**Producer** sent the message successfully\\n";
    
    2. Run Producer.php to send a message.
    php Producer.php
    3. View the execution result.
    >**Producer** sends a message: message=RdKafka\\Message::__set_state(array(
    > 'err' => 0,
    > 'topic_name' => 'topic_name',
    > 'timestamp' => 1618800895159,
    > 'partition' => 0,
    > 'payload' => 'Message 0',
    > 'len' => 9,
    > 'key' => NULL,
    > 'offset' => 0,
    > 'headers' => NULL,
    >))
    >**Producer** sends a message: message=RdKafka\\Message::__set_state(array(
    > 'err' => 0,
    > 'topic_name' => 'topic_name',
    > 'timestamp' => 1618800895159,
    > 'partition' => 0,
    > 'payload' => 'Message 1',
    > 'len' => 9,
    > 'key' => NULL,
    > 'offset' => 1,
    > 'headers' => NULL,
    >))
    
    ...
    
    >**Producer** sent the message successfully.

    Step 4: Consuming Messages

    1. Write the message subscription and consumption program Consumer.php.
    <?php
    
    $setting = require __DIR__ . '/CKafkaSetting.php';
    
    $conf = new RdKafka\\Conf();
    $conf->set('group.id', $setting['group_id']);
    // Set the entry service, and get the corresponding service address from the console.
    $conf->set('bootstrap.servers', $setting['bootstrap_servers']);
    // ---------- Set this part if SASL authentication is enabled. ----------
    // The SASL authentication mechanism is PLAIN by default.
    $conf->set('sasl.mechanism', 'PLAIN');
    // Set username : The username configured in **User Management**.
    $conf->set('sasl.username', $setting['sasl_username']);
    // Set `password`: The password configured in **User Management**.
    $conf->set('sasl.password', $setting['sasl_password']);
    // Configure the ACL policy locally.
    $conf->set('security.protocol', 'SASL_PLAINTEXT');
    // ---------- Set this part if SASL authentication is enabled. ----------
    // When you use the Kafka consumption group mechanism, set the consumer timeout period. If the broker does not receive a heartbeat from the consumer within this period,
    // the consumer is considered as failed and the broker will initiate rebalance.
    $conf->set('session.timeout.ms', 10000);
    // Client request timeout period. If no response is received after this time period, the request will time out and fail.
    $conf->set('request.timeout.ms', 305000);
    // Set the internal retry interval of the client.
    $conf->set('reconnect.backoff.max.ms', 3000);
    
    $topicConf = new RdKafka\\TopicConf();
    #$topicConf->set('auto.commit.interval.ms', 100);
    // Offset reset policy. Set it as appropriate according to the business scenario. Inappropriate settings may result in the loss of consumed data.
    $topicConf->set('auto.offset.reset', 'earliest');
    $conf->setDefaultTopicConf($topicConf);
    
    $consumer = new RdKafka\\KafkaConsumer($conf);
    // Set it to `LOG_DEBUG` when you perform Debug.
    //$consumer->setLogLevel(LOG_DEBUG);
    $consumer->subscribe([$setting['topic_name']]);
    
    $isConsuming = true;
    while ($isConsuming) {
    $message = $consumer->consume(10 * 1000);
    switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    echo "**Consumer** receives a message:" . var_export($message, true) . "\\n";
    break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "**Consumer** is waiting for messages\\n";
    break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "**Consumer** waiting times out\\n";
    $isConsuming = false;
    break;
    default:
    throw new \\Exception($message->errstr(), $message->err);
    break;
    }
    }
    2. Run Consumer.php to consume the message.
    php Consumer.php
    3. View the execution result.
    >**Consumer** receives a message: RdKafka\\Message::__set_state(array(
    > 'err' => 0,
    > 'topic_name' => 'topic_name',
    > 'timestamp' => 1618800895159,
    > 'partition' => 0,
    > 'payload' => 'Message 0',
    > 'len' => 9,
    > 'key' => NULL,
    > 'offset' => 0,
    > 'headers' => NULL,
    >))
    >**Consumer** receives a message: RdKafka\\Message::__set_state(array(
    > 'err' => 0,
    > 'topic_name' => 'topic_name',
    > 'timestamp' => 1618800895159,
    > 'partition' => 0,
    > 'payload' => 'Message 1',
    > 'len' => 9,
    > 'key' => NULL,
    > 'offset' => 1,
    > 'headers' => NULL,
    >))
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support