Java NIO、Channel、Selector 详解

Posted by Yano on October 14, 2019

公众号

coding 笔记、点滴记录,以后的文章也会同步到公众号(Coding Insight)中,希望大家关注^_^

Java NIO 有三大组件:

  1. Buffer
  2. Channel
  3. Selector

Buffer

Buffer 是一个特定原始类型的容器。Buffer 是一个原始类型的线性的、有限序列,除了 Buffer 存储的内容外,关键属性还包括:capacity, limit 和 position。

  • capacity:Buffer 包含的元素的数量,capacity 永远不会为负,也不会改变。
  • limit:Buffer 中第一个不能读取或写入的元素索引。limit 永远不会为负,且永远小于等于 capacity
  • position:下一个待读取、写入的元素索引。position 永远不会为负,且永远小于等于 limit

每个基本类型(布尔类型除外),都有一个 Buffer 的子类。Java NIO 中 Buffer 的一些实现,其中最重要的是 ByteBuffer,其余类如 IntBuffer 的实现类未画出。

我个人理解,Buffer 就是一个内存数组,并通过 capacity, limit 和 position 三个变量对读写操作进行控制。

position、limit、capacity

Buffer 的属性主要有:

// 恒等式: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

// 仅在 direct buffers 中使用
long address;

ByteBuffer 中额外定义了字节数组(其余 Buffer 的子类同理):

// 该字节数组仅在分配在堆上时才非空(参考下面的 Direct vs. non-direct buffers)
final byte[] hb;

Buffer 就是根据这 4 个 int 型字段来配合内存数组的读写。这 4 个属性分别为:

  • mark:临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position
  • capacity:Buffer 缓冲区容量,capacity 永远不会为负,也不会改变。
  • limit:Buffer 中第一个不能读取或写入的元素索引。limit 永远不会为负,且永远小于等于 capacity。写模式下,limit 代表的是最大能写入的数据,limit = capacity;读模式下,limit = Buffer 实际写入的数据大小。
  • position:下一个待读取、写入的元素索引。position 永远不会为负,且永远小于等于 limit。

ByteBuffer

从上图中我们可以看到,ByteBuffer 类有 2 个实现类:

  1. MappedByteBuffer:DirectByteBuffer 的抽象类,JVM 会尽可能交给本地方法操作 I/O,其内存不会分配在堆上,不会占用应用程序的内存。
  2. HeapByteBuffer:顾名思义是存储在堆上的 Buffer,我们直接调用 ByteBuffer.allocate(1024); 时会创建此类 Buffer。
public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    return new HeapByteBuffer(capacity, capacity);
}

Direct vs. non-direct buffers

一个 byte buffer 可以是 direct,也可以是非 direct 的。对于 direct byte buffer,JVM 将尽量在本机上执行 I/O 操作。也就是说,JVM 尽量避免每次在调用操作系统 I/O 操作前,将缓冲区内容复制到中间缓冲区。

可以通过类中的 allocateDirect 工厂方法创建 direct buffer,这个方法创建的 direct buffer 通常比 non-direct buffer 具有更高的分配和释放成本。Direct buffer 内存可能分配在 GC 堆的外部,所以对应用程序的内存占用影响并不明显。所以建议将 direct buffer 分配给大型、寿命长的、受底层操作系统 I/O 操作约束的缓冲区。

可以通过调用 isDirect 方法判断 byte buffer 是否是 direct 的。

Buffer 初始化

Buffer 可以通过 allocation 方法创建,也可以通过字节数组的 wrapping 方法创建并填充。

ByteBuffer byteBuf = ByteBuffer.allocate(1024);
public static ByteBuffer wrap(byte[] array) {
    ...
}

填充 Buffer

// 填充一个 byte
public abstract ByteBuffer put(byte b);

// 在指定位置填充一个 byte
public abstract ByteBuffer put(int index, byte b);

// 批量将 src buffer 填充到本 buffer
public ByteBuffer put(ByteBuffer src) {
    ...
}

// 批量将 src 数组的特定区间填充到本 buffer
public ByteBuffer put(byte[] src, int offset, int length) {
    ...
}

// 批量将 src 数组填充到本 buffer
public final ByteBuffer put(byte[] src) {
    ...
}

我们还可以将 Channel 的数据填充到 Buffer 中,数据是从外部(文件、网络)读到内存中。

int read = channel.read(buffer);

读取 Buffer

对于前面的写操作,每写一个值,position 都会自增 1,所以 position 会指向最后写入位置的后面一位。

如果要读取 Buffer 的值,需要调用 flip() 方法,从写模式切换到读模式

public final Buffer flip() {
    limit = position; // 将 limit 设置为实际写入的数据数量
    position = 0; // 重置 position
    mark = -1; // 将 mark 设置为未标记
    return this;
}

读操作的 get 方法如下:

// 读取当前 position 的字节,然后 position 自增 1
public abstract byte get();

// 读取 index 的字节( position 不会自增!)
public abstract byte get(int index);

// 批量将缓冲区数据传递到 dst 数组中,position 自增 dst 的长度
public ByteBuffer get(byte[] dst) {
    ...
}

// 批量将缓冲区数据传递到 dst 数组中
public ByteBuffer get(byte[] dst, int offset, int length) {
    ...
}

我们可以将缓冲区的数据传输到 Channel 中:

  1. 通过 FileChannel 将数据写到文件中
  2. 通过 SocketChannel 将数据写入网络,发送到远程机器
int write = channel.write(buffer);

mark(), reset()

mark 用于临时保存 position 的值,每次调用 mark() 方法都会将 position 赋值给 mark。

public final Buffer mark() {
    mark = position;
    return this;
}

reset() 方法就是将 position 赋值到上次 mark 的位置上(也就是上一次调用 mark() 方法的时候),通过 mark(), reset() 两个方法的配合,我们可以重复读取某个区间的数据。

public final Buffer reset() {
    int m = mark;
    if (m < 0)
        throw new InvalidMarkException();
    position = m;
    return this;
}

注意 mark 构造初始化时数值是 -1,如果 >= 0 则表示可以读取。

rewind(), clear(), compact()

rewind() 重置 position 为 0。通常在 channel-write 和 get 前调用此方法。

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

clear() 会重置 position,将 limit 设置为最大值 capacity,并将 mark 置成 -1。通常在 channel-read 和填充此 buffer 时,会先调用此方法。

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

compact() 方法并不常用,忽略。

public abstract ByteBuffer compact();

恒等式

mark, position, limit和 capacity 永远遵循以下关系:

0 <= mark <= position <= limit <= capacity

新创建的 buffer position = 0,mark 是未定义的(-1)。limit 的初始值可能是 0,也可能是构造时传入的其他值。新分配的缓冲区元素都初始化为 0。

Channel

Channel 是 I/O 操作的「桥梁」。Channel 可以是对硬件设备、文件、网络套接字、程序组件等实体的连接,该实体能够执行不同的 I/O 操作(读取或写入)。

Channel 只有 2 种状态:开启和关闭。在创建时就是开启的,一旦关闭就不会再回到打开状态。Channel 一旦关闭,任何对 Channel 调用的 I/O 操作都会抛出 ClosedChannelException 异常,可以通过方法 isOpen() 来检测 Channel 是否开启。

Channel 接口定义如下:

public interface Channel extends Closeable {

    public boolean isOpen();

    public void close() throws IOException;
}

  • FileChannel:文件通道,用于文件读写
  • DatagramChannel:UDP 连接
  • SocketChannel:TCP 连接通道,TCP 客户端
  • ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求

读操作:将数据从 Channel 读取到 Buffer 中

int read = channel.read(buffer);

写操作:将数据从 Buffer 写入到 Channel 中

int write = channel.write(buffer);

FileChannel

读取文件内容,详细说明见注释。

@Test
public void testFileChannelRead() throws IOException {
    // 获取文件的 FileChannel
    FileInputStream fileInputStream = new FileInputStream("/abc");
    FileChannel channel = fileInputStream.getChannel();

    // 创建 ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(30);

    // 将文件内容读取到 buffer 中
    channel.read(buffer);

    // buffer 从写模式,切换到读模式
    buffer.flip();

    // 打印 buffer(文件)的内容
    while (buffer.hasRemaining()) {
        System.out.print((char)buffer.get());
    }
}

写入文件内容,详细说明见注释。

@Test
public void testFileChannelWrite() throws IOException {
    // 获取文件的 FileChannel
    FileOutputStream fileOutputStream = new FileOutputStream("/abc");
    FileChannel channel = fileOutputStream.getChannel();

    // 创建 ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(30);
    buffer.put("123456".getBytes());

    // Buffer 切换为读模式
    buffer.flip();
    while(buffer.hasRemaining()) {
        // 将 Buffer 中的内容写入文件
        channel.write(buffer);
    }
}

SocketChannel

SocketChannel 顾名思义,就是 Socket 的 Channel,能够读写 Socket。操作缓冲区同 FileChannel。

@Test
public void testSocketChannel() throws IOException {
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 80));

    // 创建 ByteBuffer
    ByteBuffer buffer = ByteBuffer.allocate(30);
    // 读取数据
    socketChannel.read(buffer);

    // 写入数据到网络连接中
    while(buffer.hasRemaining()) {
        socketChannel.write(buffer);
    }
}

ServerSocketChannel

ServerSocketChannel 用于监听机器端口,管理从这个端口进来的 TCP 连接。

// 实例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));

while (true) {
    // 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理
    SocketChannel socketChannel = serverSocketChannel.accept();
}
这里可以看到 SocketChannel 的另一种实例化方式,SocketChannel 可读可写,操作一个网络通道。

ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接。

DatagramChannel

处理 UDP 连接(面向无连接的,不需要握手,只要把数据丢出去就好了),操作字节数组,同 FileChannel,不作过多介绍。

Selector

Selector 是非阻塞的,多路复用就是基于 Selector 的,Java 能通过 Selector 实现一个线程管理多个 Channel。

基本操作

  1. 开启一个 Selector(经常被翻译成选择器、多路复用器)
Selector selector = Selector.open();
  1. 将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式,FileChannel 不支持非阻塞,我们这里讨论最常见的 SocketChannel 和 ServerSocketChannel。
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false);
// 注册
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register 方法的第二个参数是 SelectionKey 中的常量,代表要监听感兴趣的事件,总共有以下 4 种:

// 通道有数据可读
public static final int OP_READ = 1 << 0;
// 可以向通道中写数据
public static final int OP_WRITE = 1 << 2;
// 成功建立 TCP 连接
public static final int OP_CONNECT = 1 << 3;
// 接受 TCP 连接
public static final int OP_ACCEPT = 1 << 4;

注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。

  1. 调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。

基本用法

Selector selector = Selector.open();

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

while(true) {
  // 判断是否有事件准备好
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;

  // 遍历
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
  }
}

I/O 多路复用原理

这里放上一张原来总结的思维导图截图,具体原理需要另行写篇文章。

总结

  • Buffer 和数组差不多,它有 position、limit、capacity 几个重要属性。put() 一下数据、flip() 切换到读模式、然后用 get() 获取数据、clear() 一下清空数据、重新回到 put() 写入数据。

  • Channel 基本上只和 Buffer 打交道,最重要的接口就是 channel.read(buffer) 和 channel.write(buffer)。

  • Selector 用于实现非阻塞 IO,这里仅仅介绍接口使用,后续请关注非阻塞 IO 的介绍。