提示、技巧和秘诀

Kafka 的简单 DLQ

问题陈述

作为一名开发者,我想编写一个消费者应用程序,用于处理来自 Kafka 主题的记录。但是,如果在处理过程中发生某些错误,我不想让应用程序完全停止。相反,我想将出错的记录发送到 DLT(死信主题),然后继续处理新记录。

解决方案

此问题的解决方案是使用 Spring Cloud Stream 中的 DLQ 功能。为了讨论的目的,让我们假设以下内容是我们的处理器函数。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,它会对处理的所有记录抛出一个异常,但你可以采用此函数并将其扩展到任何其他类似情况。

为了将出错的记录发送到 DLT,我们需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用程序必须提供一个组名。匿名消费者无法使用 DLQ 功能。我们还需要通过将 Kafka 消费者绑定上的 enableDLQ 属性设置为 true 来启用 DLQ。最后,我们可以通过在 Kafka 消费者绑定上提供 dlqName 来选择性地提供 DLT 名称,否则在这种情况下默认为 error.input-topic.my-group

请注意,在上面提供的示例消费者中,有效负载的类型为 byte[]。默认情况下,Kafka 绑定器中的 DLQ 生产者期望有效负载的类型为 byte[]。如果不是这种情况,那么我们需要提供适当序列化程序的配置。例如,让我们将消费者函数重新编写如下

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,当写入 DLT 时我们希望如何序列化数据。以下是针对此方案的修改后的配置

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

具有高级重试选项的 DLQ

问题陈述

这与上面的方法类似,但作为一名开发者,我希望配置重试处理方式。

解决方案

如果你遵循了上面的方法,那么当处理遇到错误时,你将获得内置于 Kafka 绑定器中的默认重试选项。

默认情况下,绑定器最多重试 3 次,初始延迟为 1 秒,每次后退乘数为 2.0,最大延迟为 10 秒。你可以按如下方式更改所有这些配置

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果你愿意,还可以通过提供布尔值映射来提供可重试异常的列表。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,上面映射中未列出的任何异常都将被重试。如果不需要,则可以通过提供以下内容来禁用它,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

你还可以提供自己的 RetryTemplate 并将其标记为 @StreamRetryTemplate,它将被绑定器扫描和使用。当需要更复杂的重试策略和策略时,这很有用。

如果你有多个 @StreamRetryTemplate bean,则可以使用属性指定绑定所需的那个,

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

使用 DLQ 处理反序列化错误

问题陈述

我有一个在 Kafka 消费者中遇到反序列化异常的处理器。我希望 Spring Cloud Stream DLQ 机制能捕获该场景,但它没有。我该如何处理这个问题?

解决方案

Spring Cloud Stream 提供的常规 DLQ 机制在 Kafka 消费者抛出不可恢复的反序列化异常时不会提供帮助。这是因为,此异常甚至在消费者的 poll() 方法返回之前就发生了。Spring for Apache Kafka 项目提供了一些极好的方法来帮助绑定器解决此情况。让我们来探索一下这些方法。

假设这是我们的函数

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个接受 String 参数的简单函数。

我们希望绕过 Spring Cloud Stream 提供的消息转换器,而希望改用本机反序列化器。对于 String 类型,这样做没有多大意义,但对于 AVRO 等更复杂的类型,您必须依赖于外部反序列化器,因此希望将转换委托给 Kafka。

现在,当消费者接收数据时,让我们假设有一条错误的记录导致反序列化错误,例如,有人传递了一个 Integer 而不是一个 String。在这种情况下,如果您不在应用程序中执行任何操作,异常将通过链传播,最终您的应用程序将退出。

为了处理此问题,您可以添加一个配置了 DefaultErrorHandlerListenerContainerCustomizer @Bean。此 DefaultErrorHandler 使用 DeadLetterPublishingRecoverer 进行配置。我们还需要为消费者配置一个 ErrorHandlingDeserializer。这听起来像很多复杂的事情,但实际上,在这种情况下,它归结为这 3 个 bean。

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们分析一下它们中的每一个。第一个是接受 DefaultErrorHandlerListenerContainerCustomizer bean。现在使用该特定错误处理程序自定义了容器。您可以在 此处 了解有关容器自定义的更多信息。

第二个 bean 是使用发布到 DLT 进行配置的 DefaultErrorHandler。有关 DefaultErrorHandler 的更多详细信息,请参阅 此处

第三个 bean 是最终负责发送到 DLTDeadLetterPublishingRecoverer。默认情况下,DLT 主题被命名为 ORIGINAL_TOPIC_NAME.DLT。不过,您可以更改它。有关更多详细信息,请参阅 文档

我们还需要通过应用程序配置来配置一个 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委托给实际的反序列化器。如果发生错误,它会将记录的键/值设置为 null,并包含消息的原始字节。然后,它在标头中设置异常,并将此记录传递给侦听器,然后侦听器调用注册的错误处理程序。

以下是必需的配置

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们通过绑定上的 configuration 属性提供 ErrorHandlingDeserializer。我们还指示要委托的实际反序列化器是 StringDeserializer

请记住,上述 dlq 属性与本配方中的讨论无关。它们纯粹是为了解决任何应用程序级别错误。

Kafka binder 中的基本偏移管理

问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不知道它如何管理 Kafka 消费者偏移。你能解释一下吗?

解决方案

我们建议你阅读 文档 中的这一部分,以全面了解它。

以下是要点

默认情况下,Kafka 支持两种类型的偏移量来开始 - earliestlatest。它们的语义从它们的名称中不言自明。

假设你第一次运行消费者。如果你在 Spring Cloud Stream 应用程序中错过了 group.id,那么它将成为匿名消费者。每当你有一个匿名消费者时,在这种情况下,Spring Cloud Stream 应用程序默认将从主题分区中可用的 latest 偏移量开始。另一方面,如果你明确指定了一个 group.id,那么默认情况下,Spring Cloud Stream 应用程序将从主题分区中可用的 earliest 偏移量开始。

在上述两种情况下(具有显式组和匿名组的消费者),可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 并将其设置为 earliestlatest 来切换起始偏移量。

现在,假设你之前已经运行过消费者,现在再次启动它。在这种情况下,上述情况中的起始偏移语义不适用,因为消费者找到了消费者组已经提交的偏移量(对于匿名消费者,尽管应用程序没有提供 group.id,但 binder 会自动为你生成一个)。它只是从最后提交的偏移量开始。即使你提供了 startOffset 值,这也是正确的。

但是,你可以通过使用 resetOffsets 属性来覆盖消费者从最后提交的偏移量开始的默认行为。要做到这一点,将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认值为 false)。然后确保你提供了 startOffset 值(earliestlatest)。当你这样做然后启动消费者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略分区的所有已提交偏移量。

在 Kafka 中寻求任意偏移量

问题陈述

使用 Kafka 绑定器,我知道它可以将偏移量设置为 earliestlatest,但我需要将偏移量寻求到中间的某个位置,即任意偏移量。有没有办法使用 Spring Cloud Stream Kafka 绑定器实现这一点?

解决方案

之前我们了解了 Kafka 绑定器如何允许你处理基本的偏移量管理。默认情况下,绑定器不允许你倒回到任意偏移量,至少通过我们在该配方中看到的机制不行。但是,绑定器提供了一些低级别的策略来实现此用例。让我们来探索一下。

首先,当你想重置到 earliestlatest 以外的任意偏移量时,请确保将 resetOffsets 配置保留为其默认值,即 false。然后,你必须提供一个类型为 KafkaBindingRebalanceListener 的自定义 bean,该 bean 将注入到所有消费者绑定中。这是一个带有几个默认方法的接口,但这里是我们感兴趣的方法

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看细节。

从本质上讲,此方法将在每次对主题分区进行初始分配或重新平衡后调用。为了更好地说明,让我们假设我们的主题是 foo,它有 4 个分区。最初,我们只在组中启动一个消费者,此消费者将从所有分区消费。当消费者第一次启动时,所有 4 个分区都将被最初分配。但是,我们不想让分区从默认值开始消费(earliest,因为我们定义了一个组),而是希望它们在寻求任意偏移量后对每个分区进行消费。想象一下,你有如下业务用例,需要从某些偏移量开始消费。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

可以通过如下实现上述方法来实现这一点。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个基本实现。实际用例比这复杂得多,你需要根据实际情况进行调整,但这肯定能给你一个基本框架。当消费者seek失败时,它可能会抛出一些运行时异常,你需要决定在这些情况下做什么。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们使用相同的组 ID 启动第二个消费者会怎样?

当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。假设新消费者获得分区23。当这个新的 Spring Cloud Stream 消费者调用这个onPartitionsAssigned方法时,它会看到这是这个消费者上分区23的初始分配。因此,它将执行 seek 操作,因为对initial参数进行了条件检查。对于第一个消费者,它现在只有分区01。但是,对于这个消费者来说,它只是一个重新平衡事件,而不是被视为初始分配。因此,它不会重新寻找到给定的偏移量,因为对initial参数进行了条件检查。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka binder 手动确认?

问题陈述

使用 Kafka binder,我想在我的消费者中手动确认消息。我该怎么做?

解决方案

默认情况下,Kafka binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。Spring Kafka 中的默认ackModebatch。有关更多详细信息,请在此处查看。

在某些情况下,你希望禁用此默认提交行为并依赖于手动提交。以下步骤允许你执行此操作。

将属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode设置为MANUALMANUAL_IMMEDIATE。当这样设置时,在消费者方法收到的消息中将存在一个名为kafka_acknowledgment(来自KafkaHeaders.ACKNOWLEDGMENT)的标头。

例如,将其想象为你的消费者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后,将属性spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode设置为MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆盖 Spring Cloud Stream 中的默认绑定名称?

问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但我如何将其覆盖为更友好的域名?

解决方案

假设以下为您的函数签名。

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将按如下方式创建绑定。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下属性将这些绑定覆盖为其他内容。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之后,所有绑定属性都必须在新的名称 my-transformer-inmy-transformer-out 上进行。

以下是使用 Kafka Streams 和多个输入的另一个示例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次要对这些绑定设置一些配置时,您都必须使用这些绑定名称。您不喜欢这样,并且希望使用更友好的域名和可读的绑定名称,例如类似以下内容。

  1. orders

  2. accounts

  3. enrichedOrders

通过简单设置这三个属性,您可以轻松做到这一点

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

完成此操作后,它将覆盖默认绑定名称,您要对它们设置的任何属性都必须在这些新的绑定名称上。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何将消息密钥作为记录的一部分发送?

问题陈述

我需要将密钥与记录的有效负载一起发送,Spring Cloud Stream 中是否有办法做到这一点?

解决方案

通常需要将关联数据结构(如具有键和值的映射)作为记录发送。Spring Cloud Stream 允许您以直接的方式执行此操作。以下是执行此操作的基本蓝图,但您可能需要根据具体用例进行调整。

以下是一个示例生产者方法(又名 Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个简单的函数,它发送带有 String 有效负载的消息,但也带有密钥。请注意,我们使用 KafkaHeaders.MESSAGE_KEY 将密钥设置为消息头。

如果您想从默认 kafka_messageKey 更改密钥,那么在配置中,我们需要指定此属性

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称 supplier-out-0,因为这是我们的函数名称,请相应地更新。

然后,我们在生成消息时使用此新密钥。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用本机序列化程序和反序列化程序,而不是 Spring Cloud Stream 完成的消息转换?

问题陈述

我想在 Kafka 中使用本机序列化程序和反序列化程序,而不是使用 Spring Cloud Stream 中的消息转换器。默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理此转换。如何绕过此问题并将此责任委托给 Kafka?

解决方案

这很容易做到。

您只需提供以下属性即可启用本机序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化程序。有几种方法可以做到这一点。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或使用绑定器配置。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用绑定器方式时,它应用于所有绑定,而将它们设置在绑定处则针对每个绑定。

在反序列化方面,您只需将反序列化程序作为配置提供即可。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您还可以在绑定器级别设置它们。

有一个可选属性,您可以将其设置为强制执行本机解码。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,在 Kafka 绑定器的情况下,这是不必要的,因为当它到达绑定器时,Kafka 已使用配置的反序列化程序对其进行反序列化。

解释 Kafka Streams 绑定器中偏移重置的工作原理

问题陈述

默认情况下,Kafka Streams 绑定器始终从新消费者的最早偏移量开始。有时,应用程序需要或需要从最新偏移量开始。Kafka Streams 绑定器允许您这样做。

解决方案

在了解解决方案之前,让我们看一下以下场景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个 BiConsumer bean,它需要两个输入绑定。在这种情况下,第一个绑定用于 KStream,第二个绑定用于 KTable。首次运行此应用程序时,默认情况下,两个绑定都从 earliest 偏移量开始。由于某些要求,我想从 latest 偏移量开始,该怎么办?您可以通过启用以下属性来实现此目的。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想从 latest 偏移量开始一个绑定,而从默认 earliest 偏移量开始另一个绑定,那么请将后一个绑定从配置中排除。

请记住,一旦有提交的偏移量,这些设置将得到遵守,并且提交的偏移量优先。

跟踪 Kafka 中发送记录的成功情况(生成)

问题陈述

我有一个 Kafka 生产者应用程序,我想跟踪我所有成功的发送。

解决方案

让我们假设我们在应用程序中具有以下供应商。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的 MessageChannel bean 来捕获所有成功的发送信息。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性,以提供 recordMetadataChannel 的 bean 名称。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功的发送信息将被发送到 fooRecordChannel

您可以编写一个 IntegrationFlow,如下所示,以查看信息。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,有效负载是发送到 Kafka 的内容,消息头包含一个名为 kafka_recordMetadata 的特殊键。它的值是一个 RecordMetadata,其中包含有关主题分区、当前偏移量等的信息。

在 Kafka 中添加自定义头映射器

问题陈述

我有一个 Kafka 生产者应用程序,它设置了一些头,但在消费者应用程序中却没有。这是为什么?

解决方案

在正常情况下,这应该没问题。

想象一下,您有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,您仍然应该看到头“foo”,并且以下内容不应给您带来任何问题。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供了一个 自定义头映射器,那么这将不起作用。假设您在应用程序中有一个空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是您的实现,那么您将在消费者上错过 foo 头。很有可能,您在这些 KafkaHeaderMapper 方法中有一些逻辑。您需要以下内容来填充 foo 头。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确地填充从生产者到消费者的 foo 头。

关于 id 头的特殊说明

在 Spring Cloud Stream 中,id 标头是一个特殊标头,但某些应用程序可能希望拥有特殊的自定义 id 标头,例如 custom-idIDId。第一个(custom-id)将在没有任何自定义标头映射器的情况下从生产者传播到消费者。但是,如果你使用框架保留的 id 标头(例如 IDIdiD 等)进行生产,那么你将遇到框架内部的问题。请参阅 StackOverflow 线程 以了解此用例的更多背景。在这种情况下,你必须使用自定义 KafkaHeaderMapper 来映射区分大小写的 id 标头。例如,假设你有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的标头 Id 将从消费方消失,因为它与框架 id 标头冲突。你可以提供一个自定义 KafkaHeaderMapper 来解决此问题。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过执行此操作,idId 标头都将从生产者提供给消费者方。

在事务中生成到多个主题

问题陈述

如何将事务消息生成到多个 Kafka 主题?

有关更多背景,请参阅 StackOverflow 问题

解决方案

在 Kafka 绑定器中使用事务支持进行事务,然后提供一个 AfterRollbackProcessor。为了生成到多个主题,请使用 StreamBridge API。

以下是此代码的代码片段

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

必需的配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了进行测试,你可以使用以下内容

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要的注意事项

请确保应用程序配置中没有任何 DLQ 设置,因为我们手动配置了 DLT(默认情况下,它将发布到名为 input.DLT 的主题,基于初始使用者函数)。此外,将使用者绑定上的 maxAttempts 重置为 1,以避免绑定器重试。在上面的示例中,它将总共尝试 3 次(初始尝试 + FixedBackoff 中的 2 次尝试)。

请参阅 StackOverflow 线程,了解有关如何测试此代码的更多详细信息。如果你正在使用 Spring Cloud Stream 通过添加更多使用者函数来测试它,请确保将使用者绑定上的 isolation-level 设置为 read-committed

StackOverflow 线程 也与此讨论相关。

运行多个可轮询使用者时避免的陷阱

问题陈述

如何运行可轮询使用者的多个实例,并为每个实例生成唯一的 client.id

解决方案

假设我有以下定义

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

在运行应用程序时,Kafka 消费者会生成一个 client.id(类似于 consumer-my-group-1)。对于正在运行的应用程序的每个实例,此 client.id 将相同,从而导致意外问题。

为了解决此问题,您可以在应用程序的每个实例上添加以下属性

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

请参阅此 GitHub 问题 了解更多详细信息。