RabbitMQ Stream 插件的初始消费者支持
现在提供了对 RabbitMQ Stream 插件 的基本支持。要启用此功能,您必须将 spring-rabbit-stream
jar 添加到类路径中 - 它必须与 spring-amqp
和 spring-rabbit
版本相同。
当您将 containerType 属性设置为 stream 时,上面描述的消费者属性不受支持;仅超级流支持 concurrency 。每个绑定只能消费一个流队列。 |
要配置绑定器以使用 containerType=stream
,Spring Boot 将自动从应用程序属性配置一个 Environment
@Bean
。您可以选择添加一个自定义程序来自定义侦听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
传递给自定义程序的 name
参数为 destination + '.' + group + '.container'
。
流 name()
(用于偏移量跟踪)设置为绑定 destination + '.' + group
。可以使用上面显示的 ConsumerCustomizer
更改它。如果您决定使用手动偏移量跟踪,则 Context
可作为消息头使用。
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关配置环境和消费者构建器的信息,请参阅 RabbitMQ Stream Java 客户端文档。
RabbitMQ 超级流的消费者支持
有关超级流的信息,请参阅 超级流。
使用超级流允许通过超级流的每个分区的单个活动消费者实现自动扩展和缩减。
配置示例
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架将创建一个名为 super
的超级流,包含 9 个分区。最多可以部署此应用程序的 3 个实例。