在我的初识NIO——1中,我讲到了Selector管理器和Channel管道以及Buffer的应用方法和简要工作原理,今天来模拟一下java网络通信中的粘包半包问题以及当单次消息过大时Buffer缓冲区不能一次读完,需要扩容的情况,最后,我们再来使用多线程结合多路复用IO对进行改进。
1.模拟粘包半包问题
1>以'\n'为耽搁消息分隔符,进行拆包
public class TestBuffer1 {
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(10);
source.put("hello,wor9".getBytes());
split(source);
source.put(" I'm zh\n".getBytes());
split(source);
}
public static void split(ByteBuffer source){
source.flip();
for(int i=0;i<source.limit();i++){
if(source.get(i)=='\n'){
int length=i-source.position()+1;
ByteBuffer target=ByteBuffer.allocate(length);
for(int j=0;j<length;j++){
target.put(source.get());
}
target.flip();
System.out.println(Charset.defaultCharset().decode(target));
}
}
source.compact();
}
}
2>将以上代码应用到服务器端,处理事务的主要代码如下:
if (key.isAcceptable()) {
ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);
ByteBuffer buffer =ByteBuffer.allocate(10);
sChannel.register(selector, SelectionKey.OP_READ,buffer);
} else if (key.isReadable()) {
SocketChannel sChannel = (SocketChannel) key.channel();
int read=0;
ByteBuffer buffer=(ByteBuffer) key.attachment();
if(sChannel.isOpen()){
read=sChannel.read(buffer);
}
if(read==-1){
key.cancel();
}else {
TestBuffer1.split(buffer);
//判断buffer中能否拆出一条完整的消息,若不能,则将buffer扩容,扩容之后继续进行读取
if(buffer.position()==buffer.limit()){
buffer.flip();
ByteBuffer buff=ByteBuffer.allocate(buffer.capacity()*2);
buff.put(buffer);
key.attach(buff);
}
}
}
2.缓冲区扩容
我们在split()方法结束时,对buffer调用了compact()方法,这个方法会压缩buffer空间(我们在读取buffer时,如果将所有数据都读取到那这个方法的效果就和clear()方法的效果一样如果我们只读取了一部分数据,buffer会将这些数据保存在缓冲区中,下一次向缓冲区中写入数据时,position索引指向这些数据之后的第一个可用空间)注:如果原本buffer是满的,但是本次一个数据都没有读取,那么compact()方法执行之后,buffer缓冲区还是满的,即 buffer.position()==buffer.limit() 。
当我们发现当前buffer的容量不足以存储一条完整的消息时,我们需要对buffer进行扩容,但我们应该做的是只对当前Channel的buffer进行扩容,在连接到多条Channel时需要将不同的buffer区分开,我们需要借助一个叫做附件的东西将buffer和Channel绑定在一起。(Channel注册到Selector之后,在Selector中会形成一个密钥集,其中有一个个密钥,即SelectionKey,这个对象通过attach()方法可以将buffer绑定到其身上,之后该SelectionKey对象调用attachment()方法,可以获得该buffer)
该示例实现的完整代码如下所示:(笔者实现的缓冲区扩容还有一个弊端,就是扩容之后无法根据下一次读取内容的多少自动缩小缓冲区)
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Logger;
public class NIOServer {
public static final Logger log=Logger.getLogger(EchoServer.class.toString());
public static void main(String[] args) throws IOException ,InterruptedException{
Selector selector = Selector.open();
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
SelectionKey ssKey=ssChannel.register(selector, SelectionKey.OP_ACCEPT);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 6660);
ssChannel.bind(address);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
try{
if (key.isAcceptable()) {
ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);
ByteBuffer buffer =ByteBuffer.allocate(10);
sChannel.register(selector, SelectionKey.OP_READ,buffer);
} else if (key.isReadable()) {
SocketChannel sChannel = (SocketChannel) key.channel();
int read=0;
ByteBuffer buffer=(ByteBuffer) key.attachment();
if(sChannel.isOpen()){
read=sChannel.read(buffer);
}
if(read==-1){
log.info("read false... read ="+read);
key.cancel();
}else {
TestBuffer1.split(buffer);
if(buffer.position()==buffer.limit()){
buffer.flip();
ByteBuffer buff=ByteBuffer.allocate(buffer.capacity()*2);
buff.put(buffer);
key.attach(buff);
}
log.info("write success... read ="+read);
}
}
}catch(IOException ex){
log.info("IOException...");
ex.printStackTrace();
key.cancel();
key.channel().close();
}
}
}
}
}
这个示例中,我们使用一个线程和多路复用IO相结合的方法,对此,我们可以再做出一些改进:
boss线程负责接受客户端的连接,worker线程负责执行IO读写操作。
boss线程接收到一个来自客户端的连接之后,将该管道中的OP_READ事件注册到worker线程的Selector管理器上,worker线程中的Selector使用select()方法进行等待感兴趣事件的发生。
在这里,我创建了多个worker线程,worker线程数和我的cpu核心数一致,这样可以有效发挥我多核cpu的优势。当有多个客户端连接到服务器时,我采取了一种很朴素的负载均衡方案,即,将多个连接上来的客户端均匀分配到每一个worker上。
看上去他们似乎能很好的工作,但是有一个问题需要注意,如果worker线程中的select()方法首先阻塞到了那里,之后boss线程中的register()方法才开始执行,那这个兴趣事件就无法被成功注册到worker线程中的Selector管理器中,这样就算客户端发起了多次写入请求服务端仍然无法进行处理。
我们可以思考一下如何解决这个问题:
首先,我们要想办法让select()方法在boss线程中register()方法执行时不再阻塞,也就是要暂时破除select()的阻塞状态,我们可以使用wakeup()方法唤醒select()的阻塞,这样注册事件就可以正常地完成了。
完整代码如下:
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class MultiThreadServer {
public static final Logger log=Logger.getLogger(EchoServer.class.toString());
public static void main(String[] args)throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
Selector boss = Selector.open();
ssChannel.register(boss, SelectionKey.OP_ACCEPT);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8887);
ssChannel.bind(address);
Worker[] workers=new Worker[6];
for(int i=0;i<workers.length;i++){
workers[i]=new Worker("worker-"+i);
}
AtomicInteger index=new AtomicInteger();
while (true) {
boss.select();
Set<SelectionKey> keys = boss.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssChannel.accept();
log.info("connected... "+sc.getRemoteAddress());
sc.configureBlocking(false);
log.info("before register... "+sc.getRemoteAddress());
workers[index.getAndIncrement()% workers.length].register(sc);//????????worker???????select??????????????socketChannel?????worker??Selector?
log.info("after register... "+sc.getRemoteAddress());
}
}
}
}
//selector worker???????
static class Worker implements Runnable{
private Thread thread;
private Selector worker;
private volatile boolean start=false;
private String name;
public Worker(String name){
this.name=name;
}
public void register(SocketChannel sc)throws IOException{
if(!start){//对于每个worker对象只执行一次
worker=Selector.open();
thread=new Thread(this,name);
thread.start();
start=true;
}
worker.wakeup();
ByteBuffer buffer=ByteBuffer.allocate(16);
sc.register(worker,SelectionKey.OP_READ,buffer);
}
@Override
public void run() {
while(true){
try{
worker.select();
Iterator<SelectionKey> iterator=worker.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
if(key.isReadable()){
ByteBuffer buffer=(ByteBuffer) key.attachment();
SocketChannel channel=(SocketChannel) key.channel();
int read=channel.read(buffer);
if(read!=-1){
TestBuffer1.split(buffer);
if(buffer.position()==buffer.limit()){
buffer.flip();
ByteBuffer buff=ByteBuffer.allocate(buffer.capacity()*2);
buff.put(buffer);
key.attach(buff);
}
log.info("read... Thread = "+Thread.currentThread());
}else{
log.info("read false... read ="+read);
key.cancel();
}
}
}
}catch(IOException ex){
ex.printStackTrace();
}
}
}
}
}
写在最后:由于笔者目前NIO了解程度较浅,这篇文章若有不足之处,欢迎读者进行斧正。