消息头
0.11.0.0 客户端引入了对消息头部的支持。从 2.0 版本开始,Spring for Apache Kafka 现在支持在 spring-messaging
MessageHeaders
之间映射这些头部。
以前的版本将 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging Message<?> ,其中 value 属性映射到有效负载,其他属性(例如 topic、partition 等)映射到头部。这种情况仍然存在,但现在可以映射额外的(任意)头部。 |
Apache Kafka 头部具有简单的 API,如下面的接口定义所示
public interface Header {
String key();
byte[] value();
}
提供 KafkaHeaderMapper
策略来在 Kafka Headers
和 MessageHeaders
之间映射头部条目。其接口定义如下
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
SimpleKafkaHeaderMapper
将原始头部映射为 byte[]
,并提供配置选项以将其转换为 String
值。
DefaultKafkaHeaderMapper
将键映射到 MessageHeaders
头部名称,并且为了支持出站消息的丰富头部类型,执行 JSON 转换。“special
”头部(其键为 spring_json_header_types
)包含 <key>:<type>
的 JSON 映射。此头部在入站侧用于提供每个头部值的适当转换,使其恢复为原始类型。
在入站侧,所有 Kafka Header
实例都映射到 MessageHeaders
。在出站侧,默认情况下,将映射所有 MessageHeaders
,除了 id
、timestamp
和映射到 ConsumerRecord
属性的头部。
您可以通过向映射器提供模式来指定要为出站消息映射哪些头部。以下列表显示了一些示例映射
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用默认的 Jackson ObjectMapper 并映射大多数头部,如示例之前的讨论。 |
2 | 使用提供的 Jackson ObjectMapper 并映射大多数头部,如示例之前的讨论。 |
3 | 使用默认的 Jackson ObjectMapper 并根据提供的模式映射头部。 |
4 | 使用提供的 Jackson ObjectMapper 并根据提供的模式映射头部。 |
模式非常简单,可以包含前导通配符(*
)、后缀通配符或两者(例如 *.cat.*
)。您可以使用前导 !
来否定模式。匹配头部名称(无论是正向还是负向)的第一个模式将获胜。
当您提供自己的模式时,建议包含 !id
和 !timestamp
,因为这些头部在入站侧是只读的。
默认情况下,映射器仅反序列化 java.lang 和 java.util 中的类。您可以通过使用 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果您从不受信任的来源接收消息,则可能希望仅添加您信任的那些包。要信任所有包,您可以使用 mapper.addTrustedPackages("*") 。 |
以原始形式映射 String 头部值在与不知道映射器 JSON 格式的系统通信时很有用。 |
从 2.2.5 版本开始,您可以指定某些字符串值头部不应使用 JSON 映射,而是映射到/从原始 byte[]
。AbstractKafkaHeaderMapper
具有新的属性;mapAllStringsOut
设置为 true 时,所有字符串值头部都将使用 charset
属性(默认为 UTF-8
)转换为 byte[]
。此外,还有一个属性 rawMappedHeaders
,它是一个 header name : boolean
的映射;如果映射包含头部名称,并且头部包含 String
值,则它将作为原始 byte[]
使用字符集进行映射。此映射还用于使用字符集将传入的原始 byte[]
头部映射到 String
,当且仅当映射值中的布尔值为 true
时。如果布尔值为 false
,或者头部名称不在具有 true
值的映射中,则传入的头部将简单地映射为原始的未映射头部。
以下测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个头部映射器都映射所有入站头部。从 2.8.8 版本开始,模式也可以应用于入站映射。要为入站映射创建映射器,请在相应的映射器上使用其中一个静态方法
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 abc
开头的头部,并包含所有其他头部。
默认情况下,只要 Jackson 在类路径上,DefaultKafkaHeaderMapper
就会在 MessagingMessageConverter
和 BatchMessagingMessageConverter
中使用。
使用批处理转换器,转换后的头部在 KafkaHeaders.BATCH_CONVERTED_HEADERS
中可用,作为一个 List<Map<String, Object>>
,其中列表中某个位置的映射对应于有效负载中的数据位置。
如果不存在转换器(可能是因为 Jackson 不存在或显式设置为 null
),则来自消费者记录的头部将在 KafkaHeaders.NATIVE_HEADERS
头部中以未转换的形式提供。此头部是一个 Headers
对象(或在批处理转换器的情况下为 List<Headers>
),其中列表中的位置对应于有效负载中的数据位置。
某些类型不适合 JSON 序列化,对于这些类型,可能更倾向于简单的 toString() 序列化。DefaultKafkaHeaderMapper 有一个名为 addToStringClasses() 的方法,允许您提供应以这种方式处理的类的名称,以便进行出站映射。在入站映射期间,它们被映射为 String 。默认情况下,只有 org.springframework.util.MimeType 和 org.springframework.http.MediaType 以这种方式映射。 |
从 2.3 版本开始,简化了对字符串值头的处理。默认情况下,此类头不再进行 JSON 编码(即,不会添加包含的 "..." )。类型仍然添加到 JSON_TYPES 头部,以便接收系统可以转换回字符串(从 byte[] )。映射器可以处理(解码)旧版本产生的头部(它检查前导 " );这样,使用 2.3 的应用程序就可以使用旧版本的记录。 |
为了与早期版本兼容,如果使用 2.3 版本生成的记录可能会被使用早期版本的应用程序使用,请将 encodeStrings 设置为 true 。当所有应用程序都使用 2.3 或更高版本时,您可以将属性保留为其默认值 false 。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 Bean 配置到自动配置的 KafkaTemplate
中;否则,您应该将此转换器添加到模板中。