tencent cloud

Feedback

Client Connection and Producer/Consumer

Last updated: 2024-07-04 16:06:41
    This document describes the relationship between the TDMQ for Apache Pulsar client and connection and between the client and producer/consumer, as well as how to use the client reasonably for higher efficiency and stability.
    Core principles:
    One PulsarClient is sufficient for one process.
    Producers and consumers are thread-safe. They can be reused and are recommended to be reused for the same topic.

    Client and Connection

    The TDMQ for Apache Pulsar client (PulsarClient) is a basic unit that connects an application to TDMQ for Apache Pulsar, and one PulsarClient corresponds to one TCP connection. Generally, one application or process on the user side uses one PulsarClient, and the number of clients is equal to the number of application nodes. If an application node of the TDMQ for Apache Pulsar service is unused for a long time, its client should be repossessed to reduce the resource usage (currently, the connection limit in TDMQ for Apache Pulsar is 200 client connections per topic).
    Note:
    If there are many business topics, and multiple clients need to be created, reuse client objects in the following way:
    1. Reuse the same client for producers or consumers of the same topic respectively.
    2. If the above approach fails to meet the needs, try reusing the same client for multiple topics.
    3. This limit only applies to virtual clusters. In pro clusters, the limit is still 200 by default but can be adjusted as needed.

    Client and Producer/Consumer

    You can create multiple producers and consumers under one client to produce and consume quickly. Typically, this is done by using multiple threads, which can isolate data among different producers and consumers.
    Currently, the limits on the number of producers/consumers in TDMQ for Apache Pulsar are as follows:
    Up to 1,000 producers per topic.
    Up to 2,000 consumers per topic.

    Practical Tutorial

    The number of producers/consumers does not necessarily depend on the business object. They can be reused and are uniquely identified by name.

    Producer

    For 1,000 business objects to produce messages simultaneously, you don't necessarily need to create 1,000 producers. Since one single producer can often get the most out of the hardware configuration of each application node, it can be shared by multiple application nodes for production (singleton mode) if the messages are delivered to the same topic.
    Below is the sample code in Java for message production.
    // Get the `serviceURL` access address, token, full topic name, and subscription name from the configuration file (all of which can be copied from the console)
    @Value("${tdmq.serviceUrl}")
    private String serviceUrl;
    @Value("${tdmq.token}")
    private String token;
    @Value("${tdmq.topic}")
    private String topic;
    
    // Declare a client object and producer object
    private PulsarClient pulsarClient;
    private Producer<String> producer;
    
    // Create client and producer objects in an initialization program
    public void init() throws Exception {
    pulsarClient = PulsarClient.builder()
    .serviceUrl(serviceUrl)
    .authentication(AuthenticationFactory.token(token))
    .build();
    producer = pulsarClient.newProducer(Schema.STRING)
    .topic(topic)
    .create();
    }
    Directly import the producer for message sending in the business logic of message production.
    // Directly import the following code into the business logic of message production. Note that the schema type declared by the producer through the paradigm must match the object passed in
    public void onProduce(Producer<String> producer){
    // Add the business logic
    String msg = "my-message";// Simulate getting a message from the business logic
    try {
    // Schema verification is enabled in TDMQ for Apache Pulsar by default. The message object must match the schema type declared by the producer
    MessageId messageId = producer.newMessage()
    .key("msgKey")
    .value(msg)
    .send();
    System.out.println("delivered msg " + msgId + ", value:" + value);
    } catch (PulsarClientException e) {
    System.out.println("delivered msg failed, value:" + value);
    e.printStackTrace();
    }
    }
    
    public void onProduceAsync(Producer<String> producer){
    // Add the business logic
    String msg = "my-asnyc-message";// Simulate getting a message from the business logic
    // Send the message asynchronously, which avoids thread jamming and improves the sending speed
    CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
    .key("msgKey")
    .value(msg)
    .sendAsync();
    // Check whether the message has been sent successfully from the async callback
    messageIdFuture.whenComplete(((messageId, throwable) -> {
    if( null != throwable ) {
    System.out.println("delivery failed, value: " + msg );
    // You can add the logic of delayed retry here
    } else {
    System.out.println("delivered msg " + messageId + ", value:" + msg);
    }
    }));
    }
    You need to call the close method to disable a long idle producer or client instance, in case they consume resources or jam up the connection pool.
    public void destroy(){
    if (producer != null) {
    producer.close();
    }
    if (pulsarClient != null) {
    pulsarClient.close();
    }
    }

    Consumer

    Like producers, consumers are recommended to be used in singleton mode, and a single consumption node needs only one client instance and one consumer instance. Generally, the consumption performance bottleneck of a message queue occurs somewhere in the consumer's own message handling process, rather than in the action of receiving messages. Therefore, when the consumption performance is poor, check the consumer's network bandwidth usage first. If the upper limit is not reached as seen from the trend, you should then analyze the message processing duration based on the logs and message trace information.
    Note:
    In shared or key-shared mode, the number of consumers is not necessarily less than or equal to the number of partitions. A module on the server responsible for delivering messages will deliver the messages to all consumers in a particular way (round robin by default in shared mode and round robin in the same key by default in key-shared mode).
    In shared mode, if production is suspended on the producer, messages at the end may be distributed unevenly.
    When multithreaded consumption is used, even if one consumer object is reused, the order of messages cannot be guaranteed.
    Below is the complete sample code in Java for multithreaded consumption by using a thread pool based on the Spring Boot framework.
    import org.apache.pulsar.client.api.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    @Service
    public class ConsumerService implements Runnable {
    
    // Get the `serviceURL` access address, token, full topic name, and subscription name from the configuration file (all of which can be copied from the console)
    @Value("${tdmq.serviceUrl}")
    private String serviceUrl;
    @Value("${tdmq.token}")
    private String token;
    @Value("${tdmq.topic}")
    private String topic;
    @Value("${tdmq.subscription}")
    private String subscription;
    
    private volatile boolean start = false;
    private PulsarClient pulsarClient;
    private Consumer<String> consumer;
    private static final int corePoolSize = 10;
    private static final int maximumPoolSize = 10;
    
    private ExecutorService executor;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    
    @PostConstruct
    public void init() throws Exception {
    pulsarClient = PulsarClient.builder()
    .serviceUrl(serviceUrl)
    .authentication(AuthenticationFactory.token(token))
    .build();
    consumer = pulsarClient.newConsumer(Schema.STRING)
    .topic(topic)
    //.subscriptionType(SubscriptionType.Shared)
    .subscriptionName(subscription)
    .subscribe();
    executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.AbortPolicy());
    start = true;
    }
    
    @PreDestroy
    public void destroy() throws Exception {
    start = false;
    if (consumer != null) {
    consumer.close();
    }
    if (pulsarClient != null) {
    pulsarClient.close();
    }
    if (executor != null) {
    executor.shutdownNow();
    }
    }
    
    @Override
    public void run() {
    logger.info("tdmq consumer started...");
    for (int i = 0; i < maximumPoolSize; i++) {
    executor.submit(() -> {
    while (start) {
    try {
    Message<String> message = consumer.receive();
    if (message == null) {
    continue;
    }
    onConsumer(message);
    } catch (Exception e) {
    logger.warn("tdmq consumer business error", e);
    }
    }
    });
    }
    logger.info("tdmq consumer stopped...");
    }
    
    /**
    * Write the consumption business logic here
    *
    * @param message
    * @return return true: message ack; return false: message nack
    * @throws Exception Message nack
    */
    private void onConsumer(Message<String> message) {
    // Business logic of delay operation
    try {
    System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());
    Thread.sleep(1000);// Simulate business logic processing
    consumer.acknowledge(message);
    logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());
    } catch (Exception exception) {
    consumer.negativeAcknowledge(message);
    logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());
    }
    }
    }
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support