消息类型 | 消费顺序 | 性能 | 适用场景 |
普通消息 | 无顺序 | 高 | 适用于对吞吐量要求高,且对生产和消费顺序无要求 |
顺序消息 | 指定的 Topic 内的消息遵循先入先出(FIFO)规则 | 一般 | 吞吐量要求一般,但是要求特定的 Topic 严格地按照 FIFO 原则进行消息发布和消费的场景 |
public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
SendResult send(Message msg, MessageQueueSelector selector, Object arg)
方法,MessageQueueSelector
是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。MessageQueueSelector
的接口如下:public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
本页内容是否解决了您的问题?