数据缓冲区和编解码器
Java NIO 提供了 ByteBuffer
,但许多库在它之上构建了自己的字节缓冲区 API,尤其是在网络操作中,其中重用缓冲区和/或使用直接缓冲区有利于性能。例如,Netty 有 ByteBuf
层次结构,Undertow 使用 XNIO,Jetty 使用带回调的池化字节缓冲区以供释放,等等。spring-core
模块提供了一组抽象来处理各种字节缓冲区 API,如下所示
-
DataBufferFactory
抽象了数据缓冲区的创建。 -
DataBuffer
表示一个字节缓冲区,它可能是 池化的。 -
DataBufferUtils
提供用于数据缓冲区的实用方法。 -
编解码器 将数据缓冲区流解码或编码为更高级别的对象。
DataBufferFactory
DataBufferFactory
用于以两种方式之一创建数据缓冲区
-
分配一个新的数据缓冲区,可以选择在事先指定容量,如果已知,这更有效,即使
DataBuffer
的实现可以根据需要增长和缩小。 -
包装现有的
byte[]
或java.nio.ByteBuffer
,它使用DataBuffer
实现来装饰给定数据,并且不涉及分配。
请注意,WebFlux 应用程序不会直接创建DataBufferFactory
,而是通过客户端的ServerHttpResponse
或ClientHttpRequest
访问它。工厂的类型取决于底层客户端或服务器,例如,NettyDataBufferFactory
用于 Reactor Netty,DefaultDataBufferFactory
用于其他服务器。
DataBuffer
DataBuffer
接口提供与java.nio.ByteBuffer
相似的操作,但也带来了一些额外的优势,其中一些灵感来自 Netty 的ByteBuf
。以下是部分优势列表
-
使用独立的位置进行读写,即不需要调用
flip()
来在读写之间切换。 -
容量根据需要扩展,如
java.lang.StringBuilder
。 -
通过
PooledDataBuffer
实现池化缓冲区和引用计数。 -
将缓冲区视为
java.nio.ByteBuffer
、InputStream
或OutputStream
。 -
确定给定字节的索引或最后一个索引。
PooledDataBuffer
如ByteBuffer 的 Javadoc 中所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能驻留在 Java 堆之外,从而消除了对本机 I/O 操作的复制需求。这使得直接缓冲区特别适用于通过套接字接收和发送数据,但它们创建和释放的成本也更高,这导致了缓冲区池化的想法。
PooledDataBuffer
是DataBuffer
的扩展,它有助于引用计数,这对字节缓冲区池化至关重要。它是如何工作的?当分配PooledDataBuffer
时,引用计数为 1。调用retain()
会增加计数,而调用release()
会减少计数。只要计数大于 0,就可以保证缓冲区不会被释放。当计数减少到 0 时,池化缓冲区可以被释放,在实践中,这可能意味着为缓冲区保留的内存将被返回到内存池。
请注意,在大多数情况下,最好使用DataBufferUtils
中的便利方法来操作PooledDataBuffer
,这些方法仅在DataBuffer
是PooledDataBuffer
的实例时才对它应用释放或保留。
DataBufferUtils
DataBufferUtils
提供了许多实用方法来操作数据缓冲区
-
将数据缓冲区流合并成单个缓冲区,如果底层字节缓冲区 API 支持,则可能使用零拷贝,例如通过复合缓冲区。
-
将
InputStream
或 NIOChannel
转换为Flux<DataBuffer>
,反之亦然,将Publisher<DataBuffer>
转换为OutputStream
或 NIOChannel
。 -
如果缓冲区是
PooledDataBuffer
的实例,则可以使用方法来释放或保留DataBuffer
。 -
从字节流中跳过或获取字节,直到达到特定字节数。
编解码器
org.springframework.core.codec
包提供以下策略接口
-
Encoder
用于将Publisher<T>
编码为数据缓冲区流。 -
Decoder
用于将Publisher<DataBuffer>
解码为更高层级对象的流。
spring-core
模块提供 byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
编码器和解码器实现。spring-web
模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器。请参阅 WebFlux 部分中的 编解码器。
使用 DataBuffer
在使用数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能是 池化的。我们将使用编解码器来说明它是如何工作的,但这些概念更普遍适用。让我们看看编解码器在内部必须做些什么来管理数据缓冲区。
Decoder
是最后一个读取输入数据缓冲区的,在创建更高层级对象之前,因此它必须按如下方式释放它们
-
如果
Decoder
只读取每个输入缓冲区并准备立即释放它,它可以通过DataBufferUtils.release(dataBuffer)
来做到这一点。 -
如果
Decoder
使用Flux
或Mono
运算符,例如flatMap
、reduce
和其他在内部预取和缓存数据项的运算符,或者使用filter
、skip
和其他省略项的运算符,那么必须将doOnDiscard(DataBuffer.class, DataBufferUtils::release)
添加到组合链中,以确保在丢弃之前释放这些缓冲区,可能也是由于错误或取消信号导致的。 -
如果
Decoder
以任何其他方式保留一个或多个数据缓冲区,它必须确保在完全读取时释放它们,或者在发生错误或取消信号之前释放它们,这些信号发生在缓存的数据缓冲区被读取和释放之前。
请注意,DataBufferUtils#join
提供了一种安全高效的方式将数据缓冲区流聚合到单个数据缓冲区中。同样,skipUntilByteCount
和 takeUntilByteCount
是解码器可以使用的一些其他安全方法。
Encoder
分配数据缓冲区,其他组件必须读取(并释放)这些缓冲区。因此,Encoder
的工作量并不大。但是,Encoder
必须注意在填充缓冲区时,如果发生序列化错误,则释放数据缓冲区。例如
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
Encoder
的使用者负责释放它接收到的数据缓冲区。在 WebFlux 应用程序中,Encoder
的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任在于写入服务器响应或客户端请求的代码。
请注意,在 Netty 上运行时,有一些用于 调试缓冲区泄漏 的选项。