重新平衡监听器
ContainerProperties
有一个名为 consumerRebalanceListener
的属性,它接受 Kafka 客户端的 ConsumerRebalanceListener
接口的实现。如果未提供此属性,则容器会配置一个日志记录监听器,该监听器以 INFO
级别记录重新平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener
。以下清单显示了 ConsumerAwareRebalanceListener
接口定义
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
请注意,当分区被撤销时,有两个回调。第一个立即被调用。第二个在任何挂起的偏移量被提交后被调用。如果您希望在某些外部存储库中维护偏移量,这将很有用,如下面的示例所示
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本 2.4 开始,添加了一个新方法 onPartitionsLost() (类似于 ConsumerRebalanceLister 中具有相同名称的方法)。ConsumerRebalanceLister 上的默认实现只是调用 onPartionsRevoked 。ConsumerAwareRebalanceListener 上的默认实现什么也不做。当使用自定义监听器(两种类型之一)为监听器容器提供监听器时,重要的是您的实现不要从 onPartitionsLost 调用 onPartitionsRevoked 。如果您实现 ConsumerRebalanceListener ,您应该覆盖默认方法。这是因为监听器容器将在调用您实现上的方法后,从其 onPartitionsLost 的实现中调用自己的 onPartitionsRevoked 。如果您的实现委托给默认行为,onPartitionsRevoked 将在每次 Consumer 在容器的监听器上调用该方法时被调用两次。
|