import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Service
public class ConsumerService implements Runnable {
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic}")
private String topic;
@Value("${tdmq.subscription}")
private String subscription;
private volatile boolean start = false;
private PulsarClient pulsarClient;
private Consumer<String> consumer;
private static final int corePoolSize = 10;
private static final int maximumPoolSize = 10;
private ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
@PostConstruct
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscription)
.subscribe();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy());
start = true;
}
@PreDestroy
public void destroy() throws Exception {
start = false;
if (consumer != null) {
consumer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public void run() {
logger.info("tdmq consumer started...");
for (int i = 0; i < maximumPoolSize; i++) {
executor.submit(() -> {
while (start) {
try {
Message<String> message = consumer.receive();
if (message == null) {
continue;
}
onConsumer(message);
} catch (Exception e) {
logger.warn("tdmq consumer business error", e);
}
}
});
}
logger.info("tdmq consumer stopped...");
}
private void onConsumer(Message<String> message) {
try {
System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());
Thread.sleep(1000);
consumer.acknowledge(message);
logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());
} catch (Exception exception) {
consumer.negativeAcknowledge(message);
logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());
}
}
}
本页内容是否解决了您的问题?