wget --no-check-certificate https://pecl.php.net/get/rdkafka-4.1.2.tgzpear install rdkafka-4.1.2.tgz# If the installation succeeds, the system will prompt "install ok" and "You should add "extension=rdkafka.so" to php.ini"# If the installation fails, please follow the prompts to troubleshoot# After successful installation, add `extension=rdkafka.so` to `php.ini`# After `php --ini` is executed, `Loaded Configuration File:` shows the location of `php.ini`echo 'extension=rdkafka.so' >> /etc/php.ini
phpkafkademo
in the downloaded demo to the Linux server.phpkafkademo
directory, and modify the CKafkaSetting.php
configuration file.<?phpreturn ['bootstrap_servers' => 'bootstrap_servers1:port,bootstrap_servers2:port','topic_name' => 'topic_name','group_id' => 'php-demo',];
Parameter | Description |
bootstrap_servers | Accessed network, which can be copied from the Network column in the Access Mode section on the Instance Details page in the console. |
topic_name | Topic name, which can be copied from the Topic Management page in the console. |
group_id | Consumer group ID. You can customize it. After the demo runs successfully, you can see the consumer on the Consumer Group page. |
Producer.php
.<?php$setting = require __DIR__ . '/CKafkaSetting.php';$conf = new RdKafka\\Conf();// To set the entry service, please get the corresponding service address in the console$conf->set('bootstrap.servers', $setting['bootstrap_servers']);// There are three ack mechanisms for a Kafka producer as described below:// -1 or all: the broker responds to the producer and continues to send the next message or next batch of messages only after the leader receives the data and syncs it to the followers in all ISRs.// This configuration provides the highest data reliability, and messages will never be lost as long as one synced replica survives. Note: this configuration cannot ensure that replicas will be returned after the data is written to all of them.// It can be used together with the `min.insync.replicas` parameter at the topic level.// 0: the producer does not wait for acknowledgment of sync completion from the broker before it continues to send the next message or next batch of messages. This configuration has the highest production performance but lowest data reliability.// (Data loss may occur when the server fails. If the leader is dead but the producer is unaware of that, the broker cannot receive messages.)// 1: the producer sends the next message or next batch of messages after it receives an acknowledgment that the leader has successfully received the data. This configuration is a balance between production throughput and data reliability.// (Messages may be lost if the leader is dead but has not been replicated yet.)// If you do not explicitly configure this, the value of 1 will be used by default. You can customize this according to your business conditions$conf->set('acks', '1');// Number of retries upon request error. We recommend you set the value to be greater than 0. Retries can ensure as much as possible that the message will not be lost$conf->set('retries', '0');// The time between when a request fails and the next time the request is retried$conf->set('retry.backoff.ms', 100);// Timeout period of the producer network request$conf->set('socket.timeout.ms', 6000);$conf->set('reconnect.backoff.max.ms', 3000);// Register the callback for message sending$conf->setDrMsgCb(function ($kafka, $message) {echo '[Producer] sent a message: message=' . var_export($message, true) . "\\n";});// Register the callback for message sending error$conf->setErrorCb(function ($kafka, $err, $reason) {echo "[Producer] run into an error while sending the message: err=$err reason=$reason \\n";});$producer = new RdKafka\\Producer($conf);// Please set `LOG_DEBUG` when debugging//$producer->setLogLevel(LOG_DEBUG);$topicConf = new RdKafka\\TopicConf();$topic = $producer->newTopic($setting['topic_name'], $topicConf);// Produce a message and send itfor ($i = 0; $i < 5; $i++) {// `RD_KAFKA_PARTITION_UA` lets Kafka select a partition freely$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");$producer->poll(0);}while ($producer->getOutQLen() > 0) {$producer->poll(50);}echo "[Producer] sent the message successfully\\n";
Producer.php
to send the message.php Producer.php
>[Producer] sent a message: message=RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 0',> 'len' => 9,> 'key' => NULL,> 'offset' => 0,> 'headers' => NULL,>))>[Producer] sent a message: message=RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 1',> 'len' => 9,> 'key' => NULL,> 'offset' => 1,> 'headers' => NULL,>))...>[Producer] sent the message successfully
Consumer.php
.<?php$setting = require __DIR__ . '/CKafkaSetting.php';$conf = new RdKafka\\Conf();$conf->set('group.id', $setting['group_id']);// To set the entry service, please get the corresponding service address in the console$conf->set('bootstrap.servers', $setting['bootstrap_servers']);// Consumer timeout period when the Kafka consumer grouping mechanism is used. If the broker does not receive the heartbeat of the consumer within this period,// the consumer will be considered to have failed and the broker will initiate rebalance$conf->set('session.timeout.ms', 10000);// Client request timeout period. If no response is received after this time elapses, the request will time out and fail$conf->set('request.timeout.ms', 305000);// Set the internal retry interval of the client$conf->set('reconnect.backoff.max.ms', 3000);$topicConf = new RdKafka\\TopicConf();#$topicConf->set('auto.commit.interval.ms', 100);// Offset reset policy. Please set as appropriate according to the business scenario. Improper setting may result in the loss of consumed data.$topicConf->set('auto.offset.reset', 'earliest');$conf->setDefaultTopicConf($topicConf);$consumer = new RdKafka\\KafkaConsumer($conf);// Please set `LOG_DEBUG` when debugging//$consumer->setLogLevel(LOG_DEBUG);$consumer->subscribe([$setting['topic_name']]);$isConsuming = true;while ($isConsuming) {$message = $consumer->consume(10 * 1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:echo "[Consumer] received a message:" . var_export($message, true) . "\\n";break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "[Consumer] is waiting for messages\\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "[Consumer] waiting timed out\\n";$isConsuming = false;break;default:throw new \\Exception($message->errstr(), $message->err);break;}}
Consumer.php
to consume the message.php Consumer.php
>[Consumer] received a message: RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 0',> 'len' => 9,> 'key' => NULL,> 'offset' => 0,> 'headers' => NULL,>))>[Consumer] received a message: RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 1',> 'len' => 9,> 'key' => NULL,> 'offset' => 1,> 'headers' => NULL,>))...
Was this page helpful?