Netty

参考链接:

1.什么是Netty

Netty是一个高性能事件驱动、异步非堵塞的IO(NIO)Java开源框架,Jboss提供,用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器,快速开发高性能、高可靠性的网络服务器和客户端程序。支持HTTP、 WebSocket 、Protobuf、 Binary TCP |和UDP,Netty已经被很多高性能项目作为其Socket底层基础,如HornetQ Infinispan Vert.x Play Framework Finangle和 Cassandra。其竞争对手是:Apache MINA和 Grizzly。

也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

“快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

1.1 目的

快速开发高性能、高可靠性的网络服务器和客户端程序

1.2 优点

提供异步的、事件驱动的网络应用程序框架和工具

2. 为什么选择 Netty

Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架Avro就使用了Netty作为底层通信框架,其他还有业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。

通过对Netty的分析,我们将它的优点总结如下。

  • API使用简单,开发门槛低;

  • 功能强大,预置了多种编解码功能,支持多种主流协议;

  • 定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;

  • 性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;

  • 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;

  • 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;

  • 经历了大规模的商业应用考验,质量得到验证。Netty在互联网、大数据、网络游戏、企业应用、电信软件等众多行业已经得到了成功商用,证明它已经完全能够满足不同行业的商业应用了。

3. 说说业务中,Netty 的使用场景

​ 互联网行业:随着网站规模的不断扩大,系统并发访问量也越来越高,传统基于 Tomcat 等 Web 容器的垂直架构已经无法满足需求,需要拆分应用进行服务化,以提高开发和维护效率。从组网情况看,垂直的架构拆分之后,系统采用分布式部署,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。

  典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

4. 原生的 NIO 在 JDK 1.7 版本存在 epoll bug

它会导致Selector空轮询,最终导致CPU 100%

5. 什么是TCP 粘包/拆包

TCP是个”流”协议,所谓流,就是没有界限没有分割的一串数据。TCP会根据缓冲区的实际情况进行包划分,一个完整的包可能会拆分成多个包进行发送,也用可能把多个小包封装成一个大的数据包发送。这就是TCP粘包/拆包。

5.1 发生TCP粘包/拆包,主要是由于下面一些原因:
  • 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
  • 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
  • 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。
  • 接收方法不及时读取套接字缓冲区数据,这将发生粘包。
5.2 对于什么是粘包、拆包问题,我想先举两个简单的应用场景:
  • 客户端和服务器建立一个连接,客户端发送一条消息,客户端关闭与服务端的连接。

  • 客户端和服务器简历一个连接,客户端连续发送两条消息,客户端关闭与服务端的连接。

对于第一种情况,服务端的处理流程可以是这样的:当客户端与服务端的连接建立成功之后,服务端不断读取客户端发送过来的数据,当客户端与服务端连接断开之后,服务端知道已经读完了一条消息,然后进行解码和后续处理…。对于第二种情况,如果按照上面相同的处理逻辑来处理,那就有问题了,我们来看看第二种情况下客户端发送的两条消息递交到服务端有可能出现的情况:

第一种情况:

服务端一共读到两个数据包,第一个包包含客户端发出的第一条消息的完整信息,第二个包包含客户端发出的第二条消息,那这种情况比较好处理,服务器只需要简单的从网络缓冲区去读就好了,第一次读到第一条消息的完整信息,消费完再从网络缓冲区将第二条完整消息读出来消费。

没有发生粘包、拆包示意图
没有发生粘包、拆包示意图

没有发生粘包、拆包示意图

第二种情况:

服务端一共就读到一个数据包,这个数据包包含客户端发出的两条消息的完整信息,这个时候基于之前逻辑实现的服务端就蒙了,因为服务端不知道第一条消息从哪儿结束和第二条消息从哪儿开始,这种情况其实是发生了TCP粘包。

TCP粘包示意图
TCP粘包示意图.png

TCP粘包示意图

第三种情况:

服务端一共收到了两个数据包,第一个数据包只包含了第一条消息的一部分,第一条消息的后半部分和第二条消息都在第二个数据包中,或者是第一个数据包包含了第一条消息的完整信息和第二条消息的一部分信息,第二个数据包包含了第二条消息的剩下部分,这种情况其实是发送了TCP拆,因为发生了一条消息被拆分在两个包里面发送了,同样上面的服务器逻辑对于这种情况是不好处理的。

TCP拆包示意图
TCP拆包示意图

TCP拆包示意图

6. TCP粘包/拆包的解决办法

  • 设置定长消息,服务端每次读取既定长度的内容作为一条完整消息。
  • 设置消息边界,服务端从网络流中按消息编辑分离出消息内容。
  • 使用带消息头的协议、消息头存储消息开始标识及消息长度信息,服务端获取消息头的时候解析出消息长度,然后向后读取该长度的内容。
  • 更复杂的应用层协议

7. Netty解决TCP粘包/拆包问题

7.1 LineBasedFrameDecoder
  • 解码器说明:文本解码器
  • 参数说明:
1
maxLength:解码的帧的最大长度
  • 代码案例
1
//设置定长解码器 长度设置为30ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());
7.2 FixedLengthFrameDecoder
  • 解码器说明:定长解码器
  • 参数说明:
1
frameLength:帧的固定长度
  • 代码案例
1
//设置定长解码器 长度设置为30ch.pipeline().addLast(new FixedLengthFrameDecoder(30));
7.3 DelimiterBasedFrameDecoder
  • 解码器说明:特殊分隔符解码器
  • 参数说明:
1
- maxFrameLength:解码的帧的最大长度- stripDelimiter:解码时是否去掉分隔符- failFast:为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常- delimiter:分隔符
  • 代码案例
1
2
String message = "netty is a nio server framework &"                +"which enables quick and easy development &"                +"of net applications such as protocol &"                +"servers and clients!";
ByteBuf delimiter = Unpooled.copiedBuffer("&".getBytes());//1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
7.4 LengthFieldBasedFrameDecoder
  • 解码器说明:基于包头不固定长度的解码器
  • 参数说明:
1
- maxFrameLength:解码的帧的最大长度- lengthFieldOffset:长度属性的起始位(偏移位),包中存放有整个大数据包长度的字节,这段字节的其实位置- lengthFieldLength:长度属性的长度,即存放整个大数据包长度的字节所占的长度- lengthAdjustmen:长度调节值,在总长被定义为包含包头长度时,修正信息长度。- initialBytesToStrip:跳过的字节数,根据需要我们跳过lengthFieldLength个字节,以便接收端直接接受到不含“长度属性”的内容- failFast :为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
  • 代码案例
1
ch.pipeline().addFirst(new LengthFieldBasedFrameDecoder(100000000,0,4,0,4));

8. Netty 线程模型

9. 说说 Netty 的零拷贝

9.1 零拷贝的定义
  • Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.

  • 在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据。

  • 但Netty 中的 Zero-copy 与 OS 的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的更多的是偏向于 优化数据操作 。
9.2 Netty的“零拷贝”主要体现以下几个方面:
  • Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

  • Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer.

    1
    2
    3
    //定义两个ByteBuf类型的 body 和 header 
    CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
    compositeByteBuf.addComponents(true, header, body);
  • 通过 FileRegion 包装的FileChannel.tranferTo方法 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环write方式导致的内存拷贝问题。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public static void copyFileWithFileChannel(String srcFileName, String destFileName) throws Exception {
    RandomAccessFile srcFile = new RandomAccessFile(srcFileName, "r");
    FileChannel srcFileChannel = srcFile.getChannel();

    RandomAccessFile destFile = new RandomAccessFile(destFileName, "rw");
    FileChannel destFileChannel = destFile.getChannel();

    long position = 0;
    long count = srcFileChannel.size();

    srcFileChannel.transferTo(position, count, destFileChannel);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    RandomAccessFile raf = null;
    long length = -1;
    try {
    // 1. 通过 RandomAccessFile 打开一个文件.
    raf = new RandomAccessFile(msg, "r");
    length = raf.length();
    } catch (Exception e) {
    ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n');
    return;
    } finally {
    if (length < 0 && raf != null) {
    raf.close();
    }
    }

    ctx.write("OK: " + raf.length() + '\n');
    if (ctx.pipeline().get(SslHandler.class) == null) {
    // SSL not enabled - can use zero-copy file transfer.
    // 2. 调用 raf.getChannel() 获取一个 FileChannel.
    // 3. 将 FileChannel 封装成一个 DefaultFileRegion
    ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));
    } else {
    // SSL enabled - cannot use zero-copy file transfer.
    ctx.write(new ChunkedFile(raf));
    }
    ctx.writeAndFlush("\n");
    }
  • 通过 wrap 操作, 我们可以将 byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作。

    1
    ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

10. Netty 内部执行流程

11. Netty 重连实现

11.1 心跳机制检测连接存活

长连接是指建立的连接长期保持,不管有无数据包的发送都要保持连接通畅。心跳是用来检测一个系统是否存活或者网络链路是否通畅的一种方式,一般的做法是客户端定时向服务端发送心跳包,服务端收到心跳包后进行回复,客户端收到回复说明服务端存活。

通过心跳检测机制,可以检测客户端与服务的长连接是否保持,当客户端发送的心跳包没有收到服务端的响应式,可以认为服务端已经出故障了,这个时候可以重新连接或者选择其他的可用的服务进行连接。

在Netty中提供了一个IdleStateHandler类用于心跳检测,用法如下:

1
ch.pipeline().addLast("ping", new IdleStateHandler(60, 20, 60 * 10, TimeUnit.SECONDS));
  • 第一个参数 60 表示读操作空闲时间
  • 第二个参数 20 表示写操作空闲时间
  • 第三个参数 60*10 表示读写操作空闲时间
  • 第四个参数 单位/秒

在处理数据的ClientPoHandlerProto中增加userEventTriggered用来接收心跳检测结果,event.state()的状态分别对应上面三个参数的时间设置,当满足某个时间的条件时会触发事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {
private ImConnection imConnection = new ImConnection();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MessageProto.Message message = (MessageProto.Message) msg;
System.out.println("client:" + message.getContent());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println("长期没收到服务器推送数据");
//可以选择重新连接
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
System.out.println("长期未向服务器发送数据");
//发送心跳包
ctx.writeAndFlush(MessageProto.Message.newBuilder().setType(1));
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL");
}
}
}
}

服务端收到客户端发送的心跳消息后,回复一条信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MessageProto.Message message = (MessageProto.Message) msg;
if (ConnectionPool.getChannel(message.getId()) == null) {
ConnectionPool.putChannel(message.getId(), ctx);
}
System.err.println("server:" + message.getId());
// ping
if (message.getType() == 1) {
ctx.writeAndFlush(message);
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

当客户端20秒没往服务端发送过数据,就会触发IdleState.WRITER_IDLE事件,这个时候我们就像服务端发送一条心跳数据,跟业务无关,只是心跳。服务端收到心跳之后就会回复一条消息,表示已经收到了心跳的消息,只要收到了服务端回复的消息,那么就不会触发IdleState.READER_IDLE事件,如果触发了IdleState.READER_IDLE事件就说明服务端没有给客户端响应,这个时候可以选择重新连接。

11.2 启动时连接重试

在Netty中实现重连的操作比较简单,Netty已经封装好了,我们只需要稍微扩展一下即可。

连接的操作是客户端这边执行的,重连的逻辑也得加在客户端,首先我们来看启动时要是连接不上怎么去重试

增加一个负责重试逻辑的监听器,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.TimeUnit;

import com.netty.im.client.ImClientApp;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
/**
* 负责监听启动时连接失败,重新连接功能
*/
public class ConnectionListener implements ChannelFutureListener {

private ImConnection imConnection = new ImConnection();

@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
System.err.println("服务端链接不上,开始重连操作...");
imConnection.connect(ImClientApp.HOST, ImClientApp.PORT);
}
}, 1L, TimeUnit.SECONDS);
} else {
System.err.println("服务端链接成功...");
}
}
}

通过channelFuture.isSuccess()可以知道在连接的时候是成功了还是失败了,如果失败了我们就启动一个单独的线程来执行重新连接的操作。

只需要在ConnectionListener添加到ChannelFuture中去即可使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ImConnection {
private Channel channel;

public Channel connect(String host, int port) {
doConnect(host, port);
return this.channel;
}

private void doConnect(String host, int port) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {

// 实体类传输数据,protobuf序列化
ch.pipeline().addLast("decoder",
new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
ch.pipeline().addLast("encoder",
new ProtobufEncoder());
ch.pipeline().addLast(new ClientPoHandlerProto());

}
});

ChannelFuture f = b.connect(host, port);
f.addListener(new ConnectionListener());
channel = f.channel();
} catch(Exception e) {
e.printStackTrace();
}
}
}

可以按照如下步骤进行测试:

直接启动客户端,不启动服务端

当连接失败的时候会进入ConnectionListener中的operationComplete方法执行我们的重连逻辑

11.3 运行中连接断开时重试

使用的过程中服务端突然挂了,就得用另一种方式来重连了,可以在处理数据的Handler中进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {
private ImConnection imConnection = new ImConnection();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MessageProto.Message message = (MessageProto.Message) msg;
System.out.println("client:" + message.getContent());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("掉线了...");
//使用过程中断线重连
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
imConnection.connect(ImClientApp.HOST, ImClientApp.PORT);
}
}, 1L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
}

在连接断开时都会触发 channelInactive 方法, 处理重连的逻辑跟上面的一样。

可以按照如下步骤进行测试:

启动服务端
启动客户端,连接成功

停掉服务端就会触发channelInactive进行重连操作

12. Netty的特性

  • 设计

    统一的API,支持多种传输类型,阻塞的和非阻塞的
    简单而强大的线程模型
    真正的无连接数据报套接字支持
    链接逻辑组件以支持复用

  • 易于使用

    详实的Javadoc和大量的示例集
    不需要超过JDK 1.6+[7]的依赖。(一些可选的特性可能需要Java 1.7+和/或额外的依赖)

  • 性能

    拥有比Java的核心API更高的吞吐量以及更低的延迟
    得益于池化和复用,拥有更低的资源消耗
    最少的内存复制

  • 健壮性

    不会因为慢速、快速或者超载的连接而导致OutOfMemoryError
    消除在高速网络中NIO应用程序常见的不公平读/写比率

  • 安全性

    完整的SSL/TLS以及StartTLS支持
    可用于受限环境下,如Applet和OSGI

  • 社区驱动

    发布快速而且频繁

13. Netty的核心组件

Channel、回调、Future、事件和ChannelHandler,这些构建块代表了不同类型的构造:资源、逻辑以及通知。应用程序将使用它们来访问网络以及流经网络的数据。

13.1 Channel

Channel:Socket

Channel是Java Nio的一个基本构造。

目前,可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭, 连接或者断开连接。

13.2 回调

一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后者可以在适当的时候调用前者。回调在广泛的编程场景中都有应用,而且也是在操作完成后通知相关方最常见的方式之一。

Netty在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个interface-ChannelHandler的实现处理。当一个新的连接已经被建立时,ChannelHandler的channelActive()回调方法将会被调用,并将打印出一条信息。

1
2
3
4
5
6
7
8
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当一个新的连接已经被建立时,channelActive(ChannelHandlerContext)将会被调用
System.out.println(
"Client " + ctx.channel().remoteAddress() + " connected");
}
}
13.3 Future

Future:异步通知

Future提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

JDK预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。

ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我们可以检索产生的Throwable。简而言之,由ChannelFutureListener提供的通知机制消除了手动检查对应的操作是否完成的必要。

每个Netty的出站I/O操作都将返回一个ChannelFuture;也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty完全是异步和事件驱动的。

如下代码清单,展示了一个ChannelFuture作为一个I/O操作的一部分返回的例子。这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。这究竟什么时候会发生则取决于若干的因素,但这个关注点已经从代码中抽象出来了。因为线程不用阻塞以等待对应的操作完成,所以它可以同时做其他的工作,从而更加有效地利用资源。

1
2
3
4
Channel channel = ...;
// Does not block
// 异步地连接到远程节点
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));

代码清单1-4显示了如何利用ChannelFutureListener。首先,要连接到远程节点上。然后,要注册一个新的ChannelFutureListener到对connect()方法的调用所返回的ChannelFuture上。当该监听器被通知连接已经建立的时候,要检查对应的状态❶。如果该操作是成功的,那么将数据写到该Channel。否则,要从ChannelFuture中检索对应的Throwable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Channel channel = ...;
// Does not block
// 异步地连接到远程节点
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
// 注册一个ChannelFutureListener,以便在操作完成时获得通知
future.addListener(new ChannelFutureListener() {
@Override
// ❶ 检查操作的状态
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()){
// 如果操作是成功的,则创建一个ByteBuf以持有数据
ByteBuf buffer = Unpooled.copiedBuffer("Hello",Charset.defaultCharset());
// 将数据异步地发送到远程节点。返回一个ChannelFuture
ChannelFuture wf = future.channel().writeAndFlush(buffer);
....
} else {
Throwable cause = future.cause(); 
cause.printStackTrace();
}
}
});

需要注意的是,对错误的处理完全取决于你、目标,当然也包括目前任何对于特定类型的错误加以的限制。例如,如果连接失败,你可以尝试重新连接或者建立一个到另一个远程节点的连接。

如果你把ChannelFutureListener看作是回调的一个更加精细的版本,那么你是对的。事实上,回调和Future是相互补充的机制;它们相互结合,构成了Netty本身的关键构件块之一。

13.4 事件和ChannelHandler

Netty使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。这些动作可能是:

  • 记录日志;

  • 数据转换;

  • 流控制;

  • 应用程序逻辑。

Netty是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。

可能由入站数据或者相关的状态更改而触发的事件包括:

  • 连接已被激活或者连接失活;

  • 数据读取;

  • 用户事件;

  • 错误事件。

出站事件是未来将会触发的某个动作的操作结果,这些动作包括:

  • 打开或者关闭到远程节点的连接;

  • 将数据写到或者冲刷到套接字。

每个事件都可以被分发给ChannelHandler类中的某个用户实现的方法。这是一个很好的将事件驱动范式直接转换为应用程序构件块的例子。下图展示了一个事件是如何被一个这样的ChannelHandler链处理的。

流经ChannelHandler链的入站事件和出站事件
流经ChannelHandler链的入站事件和出站事件

Netty的ChannelHandler为处理器提供了基本的抽象,如上图所示的那些。我们会在适当的时候对ChannelHandler进行更多的说明,但是目前你可以认为每个ChannelHandler的实例都类似于一种为了响应特定事件而被执行的回调。

Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议(如HTTP和SSL/TLS)的ChannelHandler。在内部,ChannelHandler自己也使用了事件和Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。

13.5 ByteBuf

ByteBuf:Netty的数据容器

13.6 核心组件的组合使用
13.6.1 Future、回调和ChannelHandler

Netty的异步编程模型是建立在Future和回调的概念之上的, 而将事件派发到ChannelHandler的方法则发生在更深的层次上。结合在一起,这些元素就提供了一个处理环境,使你的应用程序逻辑可以独立于任何网络操作相关的顾虑而独立地演变。这也是Netty的设计方式的一个关键目标。

拦截操作以及高速地转换入站数据和出站数据,都只需要你提供回调或者利用操作所返回的Future。这使得链接操作变得既简单又高效,并且促进了可重用的通用代码的编写。

13.6.2 选择器、事件和EventLoop

EventLoop:控制流、多线程、并发

Netty通过触发事件将Selector从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个Channel分配一个EventLoop,用以处理所有事件,包括:

  • 注册感兴趣的事件;

  • 将事件派发给ChannelHandler;

  • 安排进一步的动作。

EventLoop本身只由一个线程驱动,其处理了一个Channel的所有I/O事件,并且在该EventLoop的整个生命周期内都不会改变。这个简单而强大的设计消除了你可能有的在ChannelHandler实现中需要进行同步的任何顾虑,因此,你可以专注于提供正确的逻辑,用来在有感兴趣的数据要处理的时候执行。如同我们在详细探讨Netty的线程模型时将会看到的,该API是简单而紧凑的。