1.创建队列
endpoint='' //cmq的域名
secretId ='' // 用户的id和key
secretKey = ''
account = Account(endpoint,secretId,secretKey)
queueName = 'QueueForTest'
queue=account.get_queue(queueName)
queue_meta = QueueMeta()
queue_meta.queueName = queueName
queue_meta.visibilityTimeout = 10
queue_meta.maxMsgSize = 65536
queue_meta.pollingWaitSeconds = 10
try:
queue.create(queue_meta)
except CMQExceptionBase,e:
print e
创建队列成功之后,可以在从控制台查看创建出来的队列信息。
2.生产消息
用户只要取得队列对象,就可以调用队列的发送消息接口,将消息发送到队列中,发送消息有两种接口:单独发送和批量发送。
生产消息:
msg_body = "I am test message."
msg = Message(msg_body)
re_msg = my_queue.send_message(msg)
可以直接从控制台看到消息的属性。
批量生产消息:
msg_count=3
messages=[]
for i in range(msg_count):
msg_body = "I am test message %s." % i
msg = Message(msg_body)
messages.append(msg)
re_msg_list = my_queue.batch_send_message(messages)
3.消费消息
消费消息中有个默认的参数 pollingWaitSeconds,这个参数表示消费消息愿意等待的时间,如果不填该参数,默认的也会使用队列中的属性值。
消费消息:
wait_seconds=3
recv_msg = my_queue.receive_message(wait_seconds)
批量消费消息:
wait_seconds = 3
num_of_msg = 3
recv_msg_list = my_queue.batch_receive_message(num_of_msg, wait_seconds)
注意事项
设置恰当的 pollingWaitSeconds
这里的 pollingWaitSeconds,可以使用自定义的值,也可以使用队列默认值。如果将值设置为0,表示不等待消息。但设置为0有可能会造成空消息的现象(但实际队列中存在消息),这是由于当大量的消费者同时消费消息时有可能需要排队等待队列服务,如果设置了等待时间为0,可能根本没有等到队列服务就已经超时,返回 no message 这个异常了,因此建议消息的等待时间不要设置为0。
队列消息数 < 批量消费数,不会阻塞本次消费操作
批量消费消息的时候,需要填写本次接收消息的个数,如果队列中的消息数小于本次需要消费的消息数,不会阻塞本次操作。
队列属性中 设置 不可见时间 > 消息堆积周期,实现每个消息均会消费且只消费一次
当不可见时间 > 消息堆积周期时,消息被消费之后会永远不可见,直到超过堆积周期被队列删除,这样看着确实消息只被消费了一次,且不会被再消费了。
但是生产和消费的过程中有可能存在重复生产和消费失败等现象,只通过修改队列属性无法保证队列只消费一次,这里需要业务方来实现生产消费的去重和容错。可参见消息去重>> 4.消息回溯
这里结合一个场景来描述消息回溯的使用:
假设有 A/B 两个业务,正常的生产消费场景,A 生产消息投递到队列中,B 从队列中消费消息,A/B 这时已经实现互相解耦,双方互不关心。A 只需要生产消息投递即可,B 从队列中拿到消息,然后将消息从队列中删除,接着在本地消费消息。
如果出现了异常,例如 B 业务虽然进行了消费,但是在一段时间内消费情况都出现了异常。这时,已经删除的消息已经被删除,无法重新消费,会对业务造成影响,且需要暂停 B 业务,等开发运维人员修复 Bug 之后才能重新上线 B 业务。而且运维人员也无法实时监控 B 业务的情况,等到发现异常场景,可能已经过去一段时间。
为了防止这种情况出现,A 业务需要关心 B 业务的处理情况,需要对生产消息进行备份,确保B 业务正常才能删除备份,保证现网正常。
这种情况下,您可以使用 消息回溯 的功能,开发人员对 B 业务进行修复,然后根据 B 业务消费正常的最近一个时间点,将消息回溯到该时间点。这时候,B 业务获取到的消息就会从指定的时间点开始,A 业务完全不用关心 B 业务的异常情况。这里 B 需要注意要对消费进行幂等处理。
开启消息回溯功能
endpoint='' //cmq的域名
secretId ='' // 用户的id和key
secretKey = ''
account = Account(endpoint,secretId,secretKey)
queueName = 'QueueTest'
my_queue = account.get_queue(queueName)
queue_meta = QueueMeta()
queue_meta.rewindSeconds = 43200 //消息允许回溯的时间,单位为秒
my_queue.create(queue_meta)
使用消息回溯
my_queue.rewindQueue(1488718862) //本次消息回溯的时间点,为unix时间戳
5.延时消息
延时消息 是指生产消息的时候,可以指定一个飞行时间,即消息投递到队列中的花费时间。只有过了该消费时间,消息才会被消费者消费。
很多业务都有失败的场景出现,而失败之后需要进行重试,大多数业务不会立刻进行消费重试,而会隔一段时间再重试消费,这时就可以使用延时消息的功能。
举例:
message_body='i am test'
msg = Message(message_body)
my_queue.send_message(msg)
//这时候发现消息消费失败。可以重新投递消息并设置消息的飞行时间。
my_queue.send_message(msg,600) //这里设置飞行时间为10分钟。
//可以通过消息属性查看当前队列中的延时消息数量
queue_meta = my_queue.get_attributes()
print queue_meta.delayMsgNum
本页内容是否解决了您的问题?