<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version></dependency></dependencies>
public class NormalMessageSyncProducer {private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);private NormalMessageSyncProducer() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourNormalTopic";// In most case, you don't need to create too many producers, singleton pattern is recommended.final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic// route before message publishing..setTopics(topic)// May throw {@link ClientException} if the producer is not initialized..build();// Define your message body.byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// Message secondary classifier of message besides topic..setTag(tag)// Key(s) of the message, another way to mark message besides message id..setKeys("yourMessageKey-1c151062f96e").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);}// Close the producer when you don't need it anymore.producer.close();}}
public class NormalPushConsumer {private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);private NormalPushConsumer() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "yourConsumerGroup";String topic = "yourTopic";// In most case, you don't need to create too many consumers, singleton pattern is recommended.PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// Set the consumer group name..setConsumerGroup(consumerGroup)// Set the subscription for the consumer..setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// Handle the received message and return consume result.log.info("Consume message={}", messageView);return ConsumeResult.SUCCESS;}).build();// Block the main thread, no need for production environment.Thread.sleep(Long.MAX_VALUE);// Close the push consumer when you don't need it anymore.pushConsumer.close();}}
本页内容是否解决了您的问题?