简介
在NSQ中,使用inflight机制来保证NSQ中消息”at least once”(至少被消费一次)。
在消息发送给Client之后,会将消息以及消息的timeout时间存储到优先级队列中pqueue。
如果客户端收到该消息,可以使用如下三个命令对此进行回复:
- FIN: Finish a message,表示成功处理完成。
- REQ: Re-queue a message,表示消息处理失败,需要重新入队再次进行处理。
- TOUCH: Reset the timeout for an in-flight message,表示需要重置消息的timeout时间。
如果客户端没有收到消息或是收到消息后没有进行任何的回复,则随着到达消息的超时时间,NSQD会将超时的消息重新入队,再次发送给客户端。
NSQD只能保证消息的”at least once”,至于消息的”exactly once”则需要业务端配合来实现。例如通过messageID来判断消息是否被消费过。
NSQ消息Inflight机制流程图
相关源码
首先在Client在连接NSQD成功时,会向NSQD发送SUB命令,订阅想要消费的Topic和channel。
1 |
|
在SubEventChan中收到订阅channel的信息后,messagePump开始不断向Client发送消息。发送之后,在StartInFlightTimeout中将消息写入优先级队列中。
1 |
|
如果NSQD收到了来自客户端的FIN命令,会把消息从inflight队列中删除。
如果NSQD收到了来自客户端的REQ命令,会把消息重新放入channel的内存或磁盘队列中。
如果NSQD收到了来自客户端的TOUCH命令,会把消息的超时时间进行重置。
如果在超时时间段范围内,没有收到任何来自客户端的消息,NSQD会在queueScanLoop中会启动多个queueScanWorker协程来对消息重新进行处理。
1 |
|