已经有的轮子:KryoNet
KryoNet is a Java library that provides a clean and simple API for efficient TCP and UDP client/server network communication using NIO. KryoNet uses the Kryo serialization library to automatically and efficiently transfer object graphs across the network.
正如KryoNet的描述,这个框架已经提供了可用的udp服务器和客户端实现,但是使用的是Kryo来做字节的序列化和反序列化。 我只是想要一个UDP服务器的demo,不想绑定Kryo,而且由于项目需求,字节码解析协议需要定制,于是就有了这篇博客。
一个完整的UDP服务器Demo应该有什么?
先帖一发代码库。这里使用的bibucket建仓库,实在是点不开github… 一个完整的UDP服务器应该能做到下面的事情:
Server端:
- 端口监听。防止dos攻击、报文过滤等等都应该做在这一层。
- 连接池。接收到UDP报文后肯定需要创建连接然后发送ACK或者其他数据,如果不考虑性能,连接池可以省略,我的demo里面也没有实现,都是直接new出来的。
- 连接记录。虽然都说UDP是无连接的协议,但是大部分需求都是需要记录UDP连接的,这样才能发送广播之类的消息;同时如果想要使用UDP来实现一套IM系统,肯定还是需要连接记录的。
- 数据解析处理。
- 可靠的数据传递。(超时重发、超重发次数离线)
- 心跳检测。独立的线程遍历连接记录,发现有心跳超时的记录就”断开连接”,连接池回收。
Client端:
- 端口监听。逻辑跟服务器一样一样。
- 建立连接。不要以为UDP服务无连接就不需要这一步,一般都是发送一个心跳包,等待服务器的ACK,如果等到了就说明链路通了,否则需要提示用户无法建立连接
- 可靠的数据传递。超时重发、超重发次数离线并且重连、超重连次数通知上层逻辑处理。
- 数据解析处理。
- 心跳。
目前我的Demo里面实现的功能很简陋,Server端什么都没有,收到消息直接返回ACK。这个Server本身也只是在项目中用来测试客户端是否正确,并没有花时间来做。Client端相对丰满点,实现了:
- 端口监听。没有防DOS攻击等复杂逻辑,简单地收取报文。
- 建立连接。
- 可靠的数据传递。只做了超时重发、超重发次数重连,没有保存本地数据库。
- 数据解析处理。
- 心跳
UDP连接框架
首先需要熟悉下Java里面的NIO,有一个对应的翻译,最主要的是要看到第9篇, 这样就知道了如果搭建一个简单地UDP服务器了。 这些最基础的UDP连接,对应封装在UDPConnection类中:
从成员变量里面就可以看出来,与NIO的教程里面说的一样有三个角色:buffer、Channel和SelectionKey。同时还有另一个角色:Serialization,也就是我们下面着重会讲的数据解析协议,这个类规定了
- 如何从ByteBuffer转化为我们需要的Java对象
- 如何从Java对象转化为ByteBuffer
先看构造函数和bind。构造函数中传入了具体的Serialization,以及各个缓存的大小。 bind函数作用就是在给定的端口上监听UDP报文,这里使用selector.provider().openDatagramChannel()而不是DatagramChannel.open()还是有一点区别的,方便了我们扩展代码。
看到这个方法可能会让人有点困惑,UDP是无连接的,为何还会有connect方法?这里的connect并不是TCP中的建立连接的意思,而是把Channel与该地址绑定,这样以后写数据的时候就会默认写给connect中传入的地址。具体描述可参见官方文档
这三个方法中readFromAddress是从通道中读取数据到ByteBuffer,readObject使用规定的Serialization从ByteBuffer转为Java对象,send将Java对象写入通道。有了这些基础方法后就可以在上层设计UDP连接框架了。
这里使用了静态代理的设计思路,UDPConnection是IConnection的一个实现,ConnectionWrapper是IConnection的一个代理;ConnectionWrapper中使用的是UDPConnection的实现。ConnectionWrapper中还有一个属性是EndPoint类型的对象,ConnectionWrapper中主要使用的是EndPoint的reconnect实现。
EndPoint定义了一个UDP端的控制接口。
- Serialization getSerialization(); 获取数据序列化的实现
- addListener(Listener listener); 添加上面所说的五中状态的监听
- removeListener(Listener listener);删除状态监听
- run();继承的Runnable,读取消息的循环,运行在独立线程中
- reconnect();重连
- start();开启一个线程运行run()
- stop();停止run()并重置Selector状态
- close();状态置为关闭,关闭通道,重置Selector
- update(int timeout);读取一次数据的实现,run中会循环调用该方法
- getUpdateThread();获取读取数据的线程,这是为了判断建立连接的线程与读取数据的线程是否相同,建立连接的过程会block线程,因此不能使用同一个线程,否则会卡死。
这些接口里面只有1、2、3、5、6、7、8是提供给外部控制Client状态的。
下面我们来走一遍建立连接的过程:
-
创建Client
client = new Client() {};
-
配置状态回调,启动Client,开始监听UDP报文
client.start();
client.addListener(new com.transport.Listener());
-
开启另一个线程开始连接服务器
需要注意的是必须要先启动Client后在建立连接,因为我们这里建立连接做的事情是1.给Channel绑定服务器的地址 2.给服务器发送一个心跳包,然后block线程,等待心跳ACK,超时断开连接;收到ACK唤醒线程,通知状态监听
服务器的逻辑与之类似:
-
创建Server
server = new Server() {};
-
配置状态回调,启动Client,开始监听UDP报文,收到报文后默认返回ACK
server.addListener(new Listener());
server.start();
-
绑定端口,服务器没有建立连接的步骤,不会造成线程block
server.bind(54555);
可靠地数据传输
要实现可靠地数据传输需要实现两个特性:1.断线重连 2.数据包ACK,超时重发 3.心跳
断线重连
断线重连发生在下面几种情况下:
- 从通道读取数据失败。我们认为数据通道被破坏了,需要重新创建。
- ACK检查时发现了超出重发次数的数据包。我们认为服务器失联了,需要重联,确认连接可用。
- 发送数据包失败。与1一样,认为数据通道破坏了。
对应的代码调用可以全局搜索reconnect();的调用
超时重发
上面在介绍EndPoint的时候说过了update(int timeout),运行在独立的线程中,这个方法做了3件事情:
- 检测Selector是否有数据通道处于ready状态,如果有,进入读取数据的逻辑。
- 检测并发送心跳包。
- 检测等待ACK的包,是否有等待的包超时了,有并且已经超出重发次数上线,重连;没有超出重发次数上线则重发数据包,并将该数据包移出等待ACK的列表。
我们在发送数据包时,需要将每一个发送成功的数据包保存在等待ACK的列表中,在收到了对应的ACK后才能将之移除;重连时需要把还在等待ACK的数据包重发一次。
心跳
心跳也是在update函数中检测的,目前定义的是一分钟一次,在update函数中判断距离上次发送心跳的时间,到了一分钟就发送一次心跳,然后重置最近一次发送心跳的时间。这里需要注意的 是重置发送心跳的时间是写在发送数据包的函数中的,因为我们有可能会出现心跳包重发的情况,因此在发送数据包的时候来判断是否属心跳包,以此为依据来更新最新一次数据包的发送时间。 重连的时候不会发送还在等待ACK的心跳包,心跳包会直接抛弃。
数据解析协议
首先需要看下序列化接口的定义:
这里我们规定了ByteBuffer与PacketMessage之间的互相转化接口,具体实现写在PacketSerialization中。这里不太想赘述了,代码中已经写得很详细了。
如何运行Demo
我们虽然导入Android Studio后是一个Android工程,但是不能启动模拟器来运行,因为Android部分的代码都被我删掉了。我们可以运行代码里面的ChatClient 和ChatServer两个Java勒种的main方法,运行方法不需要我再说了吧,附上截图: