import com.aliyun.mq.http.MQClient;import com.aliyun.mq.http.MQProducer;public class Producer {public static void main(String[] args) {MQClient mqClient = new MQClient(// HTTP接入点"${HTTP_ENDPOINT}",// TDMQ RocketmQ access key, 从腾讯云控制台创建获取"${ACCESS_KEY}",// TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取"${SECRET_KEY}");// 消息发送的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数final String topic = "${TOPIC}";// Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数final String instanceId = "${INSTANCE_ID}";// 创建生产者MQProducer producer = mqClient.getProducer(instanceId, topic);// 发送消息mqClient.close();}}
require "vendor/autoload.php";use MQ\\MQClient;class ProducerTest{private $client;private $producer;public function __construct(){$this->client = new MQClient(// HTTP接入点"${HTTP_ENDPOINT}",// TDMQ RocketmQ access key, 从腾讯云控制台创建获取"${ACCESS_KEY}",// TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取"${SECRET_KEY}");// 消息发送的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数$topic = "${TOPIC}";// Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数$instanceId = "${INSTANCE_ID}";$this->producer = $this->client->getProducer($instanceId, $topic);}public function run(){// 发送消息}}$instance = new ProducerTest();$instance->run();
const {MQClient,MessageProperties} = require('@aliyunmq/mq-http-sdk');// 设置HTTP接入域名const endpoint = "{Endpoint}";// AccessKeyconst accessKeyId = "{Accesskey}";// SecretKeyconst accessKeySecret = "rop";var client = new MQClient(endpoint, accessKeyId, accessKeySecret);// 所属的 Topicconst topic = "TopicA";// Topic所属实例IDconst instanceId = "MQ_INST_xxxxx";const producer = client.getProducer(instanceId, topic);(async function(){try {// 循环发送4条消息for(var i = 0; i < 4; i++) {let res;if (i % 2 == 0) {msgProps = new MessageProperties();// 设置属性msgProps.putProperty("key", i);// 设置KEYmsgProps.messageKey("MessageKey");res = await producer.publishMessage("hello mq.", "", msgProps);} else {msgProps = new MessageProperties();// 设置属性msgProps.putProperty("key", i);// 定时消息, 定时时间为10s后msgProps.startDeliverTime(Date.now() + 10 * 1000);res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);}console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);}} catch(e) {// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理console.log(e)}})();
import com.aliyun.mq.http.MQClient;import com.aliyun.mq.http.MQConsumer;public class Consumer {public static void main(String[] args) {MQClient mqClient = new MQClient(// HTTP接入点"${HTTP_ENDPOINT}",// TDMQ RocketmQ access key, 从腾讯云控制台创建获取"${ACCESS_KEY}",// TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取"${SECRET_KEY}");// 消费的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数final String topic = "${TOPIC}";// 消费组名称, 从腾讯云TDMQ控制台创建获取, 必传参数final String groupId = "${GROUP_ID}";// Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数final String instanceId = "${INSTANCE_ID}";final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);do {// 消费消息} while (true);}}
require "vendor/autoload.php";use MQ\\MQClient;class ConsumerTest{private $client;private $consumer;public function __construct(){$this->client = new MQClient(// HTTP接入点"${HTTP_ENDPOINT}",// TDMQ RocketmQ access key, 从腾讯云控制台创建获取"${ACCESS_KEY}",// TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取"${SECRET_KEY}");// 消费的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数$topic = "${TOPIC}";// 消费组名称, 从腾讯云TDMQ控制台创建获取, 必传参数$groupId = "${GROUP_ID}";// Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数$instanceId = "${INSTANCE_ID}";$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);}public function run(){while (True) {// 消费消息}}}$instance = new ConsumerTest();$instance->run();
const {MQClient} = require('@aliyunmq/mq-http-sdk');// 设置HTTP接入域名const endpoint = "{Endpoint}";// AccessKeyconst accessKeyId = "{Accesskey}";// SecretKeyconst accessKeySecret = "rop";var client = new MQClient(endpoint, accessKeyId, accessKeySecret);// 所属的 Topicconst topic = "TopicA";// Topic所属实例IDconst instanceId = "MQ_INST_xxxxx";// 您在控制台创建的 Consumer Groupconst groupId = "GID_xxx";const consumer = client.getConsumer(instanceId, topic, groupId);(async function(){// 循环消费消息while(true) {try {// 长轮询消费消息// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回res = await consumer.consumeMessage(3, // 一次最多消费3条(最多可设置为16条)3 // 长轮询时间3秒(最多可设置为30秒));if (res.code == 200) {// 消费消息,处理业务逻辑console.log("Consume Messages, requestId:%s", res.requestId);const handles = res.body.map((message) => {console.log("\\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +",Props:%j,MessageKey:%s,Prop-A:%s",message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);return message.ReceiptHandle;});// message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样res = await consumer.ackMessage(handles);if (res.code != 204) {// 某些消息的句柄可能超时了会导致确认不成功console.log("Ack Message Fail:");const failHandles = res.body.map((error)=>{console.log("\\tErrorHandle:%s, Code:%s, Reason:%s\\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);return error.ReceiptHandle;});handles.forEach((handle)=>{if (failHandles.indexOf(handle) < 0) {console.log("\\tSucHandle:%s\\n", handle);}});} else {// 消息确认消费成功console.log("Ack Message suc, RequestId:%s\\n\\t", res.requestId, handles.join(','));}}} catch(e) {if (e.Code.indexOf("MessageNotExist") > -1) {// 没有消息,则继续长轮询服务器console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);} else {console.log(e);}}}})();
本页内容是否解决了您的问题?