// Get the `serviceURL` access address, token, full topic name, and subscription name from the configuration file (all of which can be copied from the console)@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;// Declare a client object and producer objectprivate PulsarClient pulsarClient;private Producer<String> producer;// Create client and producer objects in an initialization programpublic void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();}
producer
for message sending in the business logic of message production.// Directly import the following code into the business logic of message production. Note that the schema type declared by the producer through the paradigm must match the object passed inpublic void onProduce(Producer<String> producer){// Add the business logicString msg = "my-message";// Simulate getting a message from the business logictry {// Schema verification is enabled in TDMQ for Apache Pulsar by default. The message object must match the schema type declared by the producerMessageId messageId = producer.newMessage().key("msgKey").value(msg).send();System.out.println("delivered msg " + msgId + ", value:" + value);} catch (PulsarClientException e) {System.out.println("delivered msg failed, value:" + value);e.printStackTrace();}}public void onProduceAsync(Producer<String> producer){// Add the business logicString msg = "my-asnyc-message";// Simulate getting a message from the business logic// Send the message asynchronously, which avoids thread jamming and improves the sending speedCompletableFuture<MessageId> messageIdFuture = producer.newMessage().key("msgKey").value(msg).sendAsync();// Check whether the message has been sent successfully from the async callbackmessageIdFuture.whenComplete(((messageId, throwable) -> {if( null != throwable ) {System.out.println("delivery failed, value: " + msg );// You can add the logic of delayed retry here} else {System.out.println("delivered msg " + messageId + ", value:" + msg);}}));}
close
method to disable a long idle producer or client instance, in case they consume resources or jam up the connection pool.public void destroy(){if (producer != null) {producer.close();}if (pulsarClient != null) {pulsarClient.close();}}
import org.apache.pulsar.client.api.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;@Servicepublic class ConsumerService implements Runnable {// Get the `serviceURL` access address, token, full topic name, and subscription name from the configuration file (all of which can be copied from the console)@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;@Value("${tdmq.subscription}")private String subscription;private volatile boolean start = false;private PulsarClient pulsarClient;private Consumer<String> consumer;private static final int corePoolSize = 10;private static final int maximumPoolSize = 10;private ExecutorService executor;private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);@PostConstructpublic void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)//.subscriptionType(SubscriptionType.Shared).subscriptionName(subscription).subscribe();executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());start = true;}@PreDestroypublic void destroy() throws Exception {start = false;if (consumer != null) {consumer.close();}if (pulsarClient != null) {pulsarClient.close();}if (executor != null) {executor.shutdownNow();}}@Overridepublic void run() {logger.info("tdmq consumer started...");for (int i = 0; i < maximumPoolSize; i++) {executor.submit(() -> {while (start) {try {Message<String> message = consumer.receive();if (message == null) {continue;}onConsumer(message);} catch (Exception e) {logger.warn("tdmq consumer business error", e);}}});}logger.info("tdmq consumer stopped...");}/*** Write the consumption business logic here** @param message* @return return true: message ack; return false: message nack* @throws Exception Message nack*/private void onConsumer(Message<String> message) {// Business logic of delay operationtry {System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());Thread.sleep(1000);// Simulate business logic processingconsumer.acknowledge(message);logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());} catch (Exception exception) {consumer.negativeAcknowledge(message);logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());}}}
Was this page helpful?