我需要为我的新项目选择一个新的队列代理。
这次我需要一个支持 pub/sub 的可扩展队列,并且保持消息排序是必须的。
我读了亚历克西斯的评论:他写道:
“的确,我们认为 RabbitMQ 提供比 Kafka 更强的排序”
我阅读了 rabbitmq 文档中的消息排序部分:
“消息可以使用具有重新排队参数(basic.recover、basic.reject 和 basic.nack)的 AMQP 方法返回到队列,或者由于通道关闭而持有未确认的消息...使用 2.7.0 及更高版本如果队列有多个订阅者,个别消费者仍然有可能观察到消息乱序。这是由于可能重新排队消息的其他订阅者的行为。从队列的角度来看,消息始终按发布顺序保存。 "
如果我需要按顺序处理消息,我只能使用rabbitMQ和每个消费者的专属队列吗?
RabbitMQ 是否仍然被认为是有序消息队列的良好解决方案?
好吧,让我们仔细看看你上面描述的场景。我认为将 the documentation 粘贴在您问题中的代码段之前以提供上下文非常重要:
AMQP 0-9-1 核心规范的第 4.7 节解释了保证排序的条件:在一个通道中发布的消息,通过一个交换和一个队列以及一个传出通道将按照它们发送的顺序被接收。自 2.7.0 版以来,RabbitMQ 提供了更强的保证。消息可以使用具有重新排队参数(basic.recover、basic.reject 和 basic.nack)的 AMQP 方法返回到队列,或者由于通道关闭而持有未确认的消息。对于 2.7.0 之前的 RabbitMQ 版本,任何这些情况都会导致消息在队列后面重新排队。从 RabbitMQ 2.7.0 版开始,消息始终按发布顺序保存在队列中,即使存在重新排队或通道关闭。 (重点补充)
因此,很明显,从 2.7.0 开始,RabbitMQ 在消息排序方面对原始 AMQP 规范进行了相当大的改进。
对于多个(并行)消费者,无法保证处理顺序。第三段(粘贴在问题中)继续给出免责声明,我将解释:“如果队列中有多个处理器,则不再保证消息将按顺序处理。”他们在这里所说的只是 RabbitMQ 不能违背数学定律。
考虑一家银行的一系列客户。这家特殊的银行以帮助客户按照进入银行的顺序而自豪。顾客排队等候,由 3 位可用的柜员中的下一位提供服务。
今天早上,碰巧三个柜员都同时有空,接下来的三个顾客就上来了。突然,三个柜员中的第一个病得很重,无法为排队的第一个顾客服务。发生这种情况时,柜员 2 已经结束了与客户 2 的合作,而柜员 3 已经开始为客户 3 服务。
现在,可能会发生两种情况之一。 (1)第一个排队的顾客可以回到队首或(2)第一个顾客可以抢占第三个顾客,导致柜员停止为第三个顾客工作,而开始为第一个顾客工作。 RabbitMQ 不支持这种类型的抢占逻辑,我知道的任何其他消息代理也不支持。在任何一种情况下,第一个客户实际上最终都不会首先得到帮助——第二个客户会得到帮助,很幸运能够立即得到一个好的、快速的出纳员。保证客户得到帮助的唯一方法是让一名柜员一次帮助一名客户,这将给银行带来重大的客户服务问题。
我希望这有助于说明您所询问的问题。鉴于您有多个消费者,不可能确保在每种可能的情况下都按顺序处理消息。如果您有多个队列、多个独占消费者、不同的代理等,都没有关系 - 没有办法先验地保证多个消费者按顺序回答消息。但 RabbitMQ 会尽力而为。
消息排序保留在 Kafka 中,但仅在分区内而不是全局内。如果您的数据需要全局排序和分区,这确实会使事情变得困难。但是,如果您只需要确保同一用户的所有相同事件等......最终都在同一个分区中以便正确排序,您可以这样做。生产者负责他们写入的分区,因此如果您能够对数据进行逻辑分区,这可能会更可取。
我认为这个问题有两件事不相似,消费订单和加工订单。
消息队列在一定程度上可以保证消息将按顺序被使用,但是它们不能保证消息的处理顺序。
这里的主要区别在于消息处理的某些方面在消费时无法确定,例如:
如前所述,消费者在处理时可能会失败,这里消息的消费顺序是正确的,但是,消费者未能正确处理它,这将使它回到队列中,直到现在消费顺序仍然完好无损,但我们没有不知道现在的处理顺序如何
如果通过“处理”我们的意思是消息现在被丢弃并完全完成处理,那么请考虑您的处理时间不是线性的情况,换句话说,处理一条消息比另一条消息花费更长的时间,所以如果消息 3 的处理时间比处理时间长预期,则消息 4 和 5 可能会在消息 3 之前被消耗并完成处理
因此,即使您设法将消息返回到队列的前面(顺便说一句,这违反了消费顺序),您仍然不能保证在下一条消息之前的所有消息都已完成处理。
如果要确保处理顺序,则:
始终只有一个消费者实例或者不使用消息队列并以同步阻塞方法进行处理,这听起来可能很糟糕,但在许多情况下,业务需求是完全有效的,有时甚至是关键的
有适当的方法来保证 RabbitMQ 订阅中的消息顺序。
如果您使用多个使用者,他们将使用共享的 ExecutorService
处理消息。另见ConnectionFactory.setSharedExecutor(...)
。您可以设置 Executors.newSingleThreadExecutor()
。
如果您将一个 Consumer
与单个队列一起使用,则可以使用多个 bindingKeys 绑定此队列(它们可能具有通配符)。消息将按照消息代理接收它们的顺序放入队列中。
例如,您有一个发布者发布订单很重要的消息:
try (Connection connection2 = factory.newConnection();
Channel channel2 = connection.createChannel()) {
// publish messages alternating to two different topics
for (int i = 0; i < messageCount; i++) {
final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
}
}
您现在可能希望按照发布顺序从队列中的两个主题接收消息:
// declare a queue for the consumer
final String queueName = channel.queueDeclare().getQueue();
// we bind to queue with the two different routingKeys
final String routingEven = "even";
final String routingOdd = "odd";
channel.queueBind(queueName, exchange, routingEven);
channel.queueBind(queueName, exchange, routingOdd);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });
Consumer
现在将按照消息发布的顺序接收消息,而不管您使用了不同的主题。
RabbitMQ 文档中有一些不错的 5 分钟教程可能会有所帮助:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
redelivered
将是false
而不是true
,就像正常的重新排队一样。