import com.aliyun.mq.http.MQClient;import com.aliyun.mq.http.MQProducer;public class Producer {public static void main(String[] args) {MQClient mqClient = new MQClient(// HTTP access point"${HTTP_ENDPOINT}",// Access key, which can be created and obtained in the TDMQ for RocketMQ console."${ACCESS_KEY}",// Role name, which can be created and obtained in the TDMQ for RocketMQ console."${SECRET_KEY}");// The topic used for sending messages, which is required and can be obtained in the TDMQ for RocketMQ console.final String topic = "${TOPIC}";// The namespace of the topic, which is required and can be obtained in the TDMQ console.final String instanceId = "${INSTANCE_ID}";// Create a producerMQProducer producer = mqClient.getProducer(instanceId, topic);// Send the messagemqClient.close();}}
require "vendor/autoload.php";use MQ\\MQClient;class ProducerTest{private $client;private $producer;public function __construct(){$this->client = new MQClient(// HTTP access point"${HTTP_ENDPOINT}",// Access key, which can be created and obtained in the TDMQ for RocketMQ console."${ACCESS_KEY}",// Role name, which can be created and obtained in the TDMQ for RocketMQ console."${SECRET_KEY}");// The topic used for sending messages, which is required and can be obtained in the TDMQ for RocketMQ console.$topic = "${TOPIC}";// The namespace of the topic, which is required and can be obtained in the TDMQ console.$instanceId = "${INSTANCE_ID}";$this->producer = $this->client->getProducer($instanceId, $topic);}public function run(){// Send the message}}$instance = new ProducerTest();$instance->run();
const {MQClient,MessageProperties} = require('@aliyunmq/mq-http-sdk');// Set HTTP access endpointsconst endpoint = "{Endpoint}";// AccessKeyconst accessKeyId = "{Accesskey}";// SecretKeyconst accessKeySecret = "rop";var client = new MQClient(endpoint, accessKeyId, accessKeySecret);// Its Topicconst topic = "TopicA";// ID of the instance to which the topic belongsconst instanceId = "MQ_INST_xxxxx";const producer = client.getProducer(instanceId, topic);(async function(){try {// Send 4 messages in a loopfor(var i = 0; i < 4; i++) {let res;if (i % 2 == 0) {msgProps = new MessageProperties();// Set attributesmsgProps.putProperty("key", i);// Set keysmsgProps.messageKey("MessageKey");res = await producer.publishMessage("hello mq.", "", msgProps);} else {msgProps = new MessageProperties();// Set attributesmsgProps.putProperty("key", i);// Timed message, with the time being 10s latermsgProps.startDeliverTime(Date.now() + 10 * 1000);res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);}console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);}} catch(e) {// The message failed to be sent and needs to be retried. You can resend this message or persist this data entry.console.log(e)}
import com.aliyun.mq.http.MQClient;import com.aliyun.mq.http.MQConsumer;public class Consumer {public static void main(String[] args) {MQClient mqClient = new MQClient(// HTTP access point"${HTTP_ENDPOINT}",// Access key, which can be created and obtained in the TDMQ for RocketMQ console."${ACCESS_KEY}",// Role name, which can be created and obtained in the TDMQ for RocketMQ console."${SECRET_KEY}");// The topic used for consuming messages, which is required and can be obtained in the TDMQ console.final String topic = "${TOPIC}";// Consumer group name, which is required and can be obtained in the TDMQ console.final String groupId = "${GROUP_ID}";// The namespace of the topic, which is required and can be obtained in the TDMQ console.final String instanceId = "${INSTANCE_ID}";final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);do {// Consume a message} while (true);}}
require "vendor/autoload.php";use MQ\\MQClient;class ConsumerTest{private $client;private $consumer;public function __construct(){$this->client = new MQClient(// HTTP access point"${HTTP_ENDPOINT}",// Access key, which can be created and obtained in the TDMQ for RocketMQ console."${ACCESS_KEY}",// Role name, which can be created and obtained in the TDMQ for RocketMQ console."${SECRET_KEY}");// The topic used for consuming messages, which is required and can be obtained in the TDMQ console.$topic = "${TOPIC}";// Consumer group name, which is required and can be obtained in the TDMQ console.$groupId = "${GROUP_ID}";// The namespace of the topic, which is required and can be obtained in the TDMQ console.$instanceId = "${INSTANCE_ID}";$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);}public function run(){while (True) {// Consume a message}}}$instance = new ConsumerTest();$instance->run();
const {MQClient} = require('@aliyunmq/mq-http-sdk');// Set HTTP access endpointsconst endpoint = "{Endpoint}";// AccessKeyconst accessKeyId = "{Accesskey}";// SecretKeyconst accessKeySecret = "rop";var client = new MQClient(endpoint, accessKeyId, accessKeySecret);// Its Topicconst topic = "TopicA";// ID of the instance to which the topic belongsconst instanceId = "MQ_INST_xxxxx";// The consumer group you created in the consoleconst groupId = "GID_xxx";const consumer = client.getConsumer(instanceId, topic, groupId);(async function(){// Consume messages in loopwhile(true) {try {// long polling of consumption messages// Long polling means that if the topic has no messages, the request will hang on the server for 3 seconds. If there is a message that can be consumed within 3 seconds, it will return immediately.res = await consumer.consumeMessage(3, // This indicates a maximum of 3 messages can be consumed at a time. Up to 16 messages can be set.3 // Long polling lasts 3 seconds, which can be set up to 30 seconds.);if (res.code == 200) {// Consume messages based on business processing logicconsole.log("Consume Messages, requestId:%s", res.requestId);const handles = res.body.map((message) => {console.log("\\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +",Props:%j,MessageKey:%s,Prop-A:%s",message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);return message.ReceiptHandle;});// If a message is not acked for successful consumption before `message.NextConsumeTime`, it will be consumed repeatedly.// The message handle has a timestamp that changes each time the same message is consumed.res = await consumer.ackMessage(handles);if (res.code != 204) {// The handle of some messages may time out, which will cause the acknowledgement to fail.console.log("Ack Message Fail:");const failHandles = res.body.map((error)=>{console.log("\\tErrorHandle:%s, Code:%s, Reason:%s\\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);return error.ReceiptHandle;});handles.forEach((handle)=>{if (failHandles.indexOf(handle) < 0) {console.log("\\tSucHandle:%s\\n", handle);}});} else {// The message is acked for successful consumptionconsole.log("Ack Message suc, RequestId:%s\\n\\t", res.requestId, handles.join(','));}}} catch(e) {if (e.Code.indexOf("MessageNotExist") > -1) {// If there is no message, long polling will continue on the server.console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);} else {console.log(e);}}}})();
Was this page helpful?