使用 RabbitMQ Stream 插件
版本 2.4 引入了对 RabbitMQ Stream 插件 Java 客户端 的初始支持,该客户端用于 RabbitMQ Stream 插件。
-
RabbitStreamTemplate
-
StreamListenerContainer
将 spring-rabbit-stream
依赖项添加到您的项目中
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.1.5</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.5'
您可以像往常一样使用 RabbitAdmin
bean 来配置队列,使用 QueueBuilder.stream()
方法来指定队列类型。例如
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,只有在您也使用非流组件(例如 SimpleMessageListenerContainer
或 DirectMessageListenerContainer
)时,此方法才有效,因为当 AMQP 连接打开时,会触发管理员来声明定义的 bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应配置 StreamAdmin
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
有关 StreamCreator
的更多信息,请参阅 RabbitMQ 文档。
发送消息
RabbitStreamTemplate
提供了 RabbitTemplate
(AMQP)功能的子集。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
RabbitStreamTemplate
实现具有以下构造函数和属性
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
MessageConverter
用于 convertAndSend
方法将对象转换为 Spring AMQP Message
。
StreamMessageConverter
用于将 Spring AMQP Message
转换为原生流 Message
。
您也可以直接发送原生流 Message
;messageBuilder()
方法提供了对 Producer
的消息构建器的访问。
ProducerCustomizer
提供了一种机制,可以在构建生产者之前对其进行自定义。
有关自定义 Environment
和 Producer
的信息,请参阅 Java 客户端文档。
从 3.0 版本开始,方法返回值类型为 CompletableFuture 而不是 ListenableFuture 。
|
接收消息
StreamListenerContainer
(以及使用 @RabbitListener
时的 StreamRabbitListenerContainerFactory
)提供异步消息接收。
监听器容器需要一个 Environment
以及一个流名称。
您可以使用经典的 MessageListener
接收 Spring AMQP Message
,也可以使用新的接口接收原生流 Message
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有关支持属性的信息,请参阅 消息监听器容器配置。
与模板类似,容器具有 ConsumerCustomizer
属性。
有关自定义 Environment
和 Consumer
的信息,请参阅 Java 客户端文档。
使用 @RabbitListener
时,请配置一个 StreamRabbitListenerContainerFactory
;目前,大多数 @RabbitListener
属性(concurrency
等)都被忽略。仅支持 id
、queues
、autoStartup
和 containerFactory
。此外,queues
只能包含一个流名称。
示例
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 在 StreamListenerContainer
(及其工厂)中添加了 adviceChain
属性。还提供了一个新的工厂 Bean 来创建一个无状态重试拦截器,该拦截器具有可选的 StreamMessageRecoverer
,用于在使用原始流消息时使用。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支持有状态重试。 |
超级流
超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数 x-super-stream: true
的交换机来实现。
配置
为了方便起见,可以通过定义一个类型为 SuperStream
的单个 Bean 来配置超级流。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
RabbitAdmin
检测到此 Bean,并将声明交换机(my.super.stream
)和 3 个队列(分区) - my.super-stream-n
,其中 n
为 0
、1
、2
,并使用等于 n
的路由键绑定。
如果您也希望通过 AMQP 发布到交换机,您可以提供自定义路由键。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
键的数量必须等于分区的数量。
发布到 SuperStream
您必须在 RabbitStreamTemplate
中添加一个 superStreamRoutingFunction
。
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您也可以通过 AMQP 发布,使用 RabbitTemplate
。
使用单个活动消费者消费 Super Stream
在监听器容器上调用 superStream
方法,以在 Super Stream 上启用单个活动消费者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
目前,当并发性大于 1 时,实际并发性将由 Environment 进一步控制;要实现完全并发,请将环境的 maxConsumersByConnection 设置为 1。请参阅 配置环境。
|
Micrometer 观测
从 3.0.5 版本开始,现在支持使用 Micrometer 对 RabbitStreamTemplate
和流监听器容器进行观测。容器现在还支持 Micrometer 定时器(当未启用观测时)。
在每个组件上设置 observationEnabled
以启用观测;这将禁用 Micrometer 定时器,因为定时器现在将与每个观测一起管理。当使用带注释的监听器时,在容器工厂上设置 observationEnabled
。
有关更多信息,请参阅 Micrometer 跟踪。
要向定时器/跟踪添加标签,请分别将自定义 RabbitStreamTemplateObservationConvention
或 RabbitStreamListenerObservationConvention
配置到模板或监听器容器。
默认实现为模板观测添加 name
标签,为容器添加 listener.id
标签。
您可以子类化 DefaultRabbitStreamTemplateObservationConvention
或 DefaultStreamRabbitListenerObservationConvention
,也可以提供全新的实现。
有关更多详细信息,请参阅 Micrometer 观测文档。