今天早上关注了这个问题,刚抽出时间大概整理下,以下仅是个人理解:
一定要多看几遍代码并结合文字理解下
引
0、从I/O说起
这些概念之所以容易令人迷惑,在于很多人对I/O就没有清晰准确的理解,后面的理解自然不可能正确。我想用一个具体的例子来说明一下I/O。
设想自己是一个进程,就叫小进吧。小进需要接收一个输入,我们不管这个输入是从网络套接字来,还是键盘,鼠标来,输入的来源可以千千万万。但是,都必须由内核来帮小进完成,为啥内核这么霸道?因为计算机上运行的可不只是咱小进一个进程,还有很多进程。这些进程兄弟也可能需要从这些输入设备接收输入,没有内核居中协调,岂不是乱套。
从小进的角度看,内核帮助它完成输入,其实包括三个步骤:
- 内核替小进接收好数据,这些数据暂时存在内核的内存空间
- 内核将数据从自己的内存空间复制到小进的内存空间
- 告诉小进,输入数据来了,赶快读吧
这三步看似挺简单,其实在具体实现时,有很多地方需要考虑:
- 小进如何告诉内核自己要接收一个输入?
- 内核接到小进的请求,替小进接收好数据这段时间, 小进咋办?
- 内核在将数据复制到小进的内存空间这段时间,小进咋办?
- 到底什么时候告诉小进数据准备好了,是在内核接收好数据之后就告诉小进,还是在将数据复制到小进的内存空间之后再告诉他?
- 内核以什么样的方式告诉小进,数据准备好了?
1、阻塞式I/O模型
对上面5个问题,最简单的解决方案就是阻塞式I/O模型,它的过程是这样的:
小进:内核内核,我要接收一个键盘输入,快点帮我完成!
内核:好咧!biubiu!一个阻塞丢给小进,小进顿时石化,就像被孙悟空点了定一样。
就这样,小进在石化中,时间一点点流逝。终于,内核收到了数据。
内核:数据终于来了,我要开干了!duang duang duang,先把数据存在自己的内核空间,然后又复制到小进的用户空间。
内核:biubiu!一个解除阻塞丢给小进,小进瞬间复活,小进的记忆还是停留在让内核帮他接收输入时。
小进:哇!内核真靠谱,数据已经有了!干活去!
我们可以看到,小进发出接收输入的请求给内核开始,就处于阻塞状态,直到内核将数据复制到小进的用户空间,小进才解除阻塞。
2、非阻塞式I/O
小进发现,阻塞式I/O中,自己总要被阻塞好久,好不爽啊,于是小进改用了非阻塞式I/O,其过程是这样的:
小进:内核内核,我要接收一个输入,赶紧帮我看看,数据到了没有,先说好,不要阻塞我。
内核:查看了一下自己的内核空间,没有发现数据,于是迅速告诉小进,没有呢!并继续帮小进等着数据。
如此这样,小进不断地问内核,终于,过了一段时间,小进再一次询问时,内核往自己的空间中一查,呦!数据来了,不胜其烦的内核迅速告诉小进,数据好了!
小进:快给我!
内核:biu!一个阻塞丢给小进,悲催的小进还是石化了!
内核赶紧将自己空间的输入数据复制到小进的用户空间,复制好后。
内核:biu!一个非阻塞丢给小进,小进立马复活
小进:哇!数据来了,啥也不说,干活!
我们看到,所谓的非阻塞I/O,其实在内核将数据从内核空间复制到小进的用户空间时,小进还是被阻塞的。
3、信号驱动式I/O
非阻塞I/O中,小进不停地问内核,数据好了没有啊,内核感觉太烦了,于是想出一个好办法。
内核告诉小进,本内核升级了,如果想要我替你接收输入,请先注册一个信号处理函数,等数据准备好时,我会发信号给你。于是,现在的流程是这样的:
小进:注册信号处理函数,告诉内核,自己要接收一个输入,然后继续干活!
内核:收到函数,开始执行数据接收
接收完成时,给小进发送信号,信号处理函数收到信号,开始向内核发送读数据请求
内核:biu!阻塞了小进,并把数据从内核空间复制到小进的用户空间。
内核:biu!解除了阻塞
小进:哇!数据来了!啥也不说,干活去!
4、异步I/O
上面的三种I/O解决方案中,小进都被阻塞了,只不过是阻塞时间长短不一样,第一种方案中小进被阻塞的时间长一些,在内核接收数据以及将数据复制到小进的用户空间时,都被阻塞。
第二、第三种方案中,只在内核将数据从内核空间复制到小进的用户空间时,小进才被阻塞。
我们现在说的异步I/O,目的就是让小进绝对不被阻塞。其过程是这样的:
小进:内核内核,我要接收一个输入,弄好了告诉我。同时将一个信号和信号处理函数告诉内核,然后继续干自己的活了。
内核:得了您嘞,您先忙。
一直到内核接收到数据并将数据从内核空间复制到小进的用户空间后,内核才给小进发送信号。小进在信号处理函数中可以直接处理数据。
引自
践
1、阻塞式I/O式
客户端代码
public class Client {
public static void main(String[] args) {
Socket socket = null;
try {
System.out.println("socket begin " + System.currentTimeMillis());
// 随机绑定本地地址与端口
socket = new Socket("localhost", 8888);
System.out.println("socket end " + System.currentTimeMillis());
OutputStream os = socket.getOutputStream();
Random ran = new Random();
for (int n = 0; n < 10; n++) {
System.out.println("send message " + n);
os.write(("hello server form " + socket.getLocalAddress().getHostAddress() + " - " + n).getBytes());
try {
TimeUnit.SECONDS.sleep(ran.nextInt(10));
} catch (InterruptedException e) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null) {
// 自动关闭绑定流
socket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服务端代码
public class Server {
public static void main(String[] args) {
ServerSocket serverSocket = null;
Socket socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化线程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadBuilder());
// 监听通配符地址
serverSocket = new ServerSocket(8888);
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private byte[] buffer = new byte[10 * 1024];
private Socket socket;
public Task(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
int len = is.read(buffer);// 呈阻塞效果
while (len != -1) {
String str = new String(buffer, 0, len);
System.out.println(str);
len = is.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自动关闭绑定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
2、非阻塞式I/O
客户端代码同上
服务端代码
public class Server {
public static void main(String[] args) {
ServerSocketChannel serverSocket = null;
SocketChannel socket = null;
ThreadPoolExecutor executor = null;
try {
// 初始化线程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadBuilder());
serverSocket = ServerSocketChannel.open();
// 设置阻塞
serverSocket.configureBlocking(true);
// 监听通配符地址
serverSocket.bind(new InetSocketAddress(8888));
System.out.println("accept begin " + System.currentTimeMillis());
while ((socket = serverSocket.accept()) != null) {
// 设置非阻塞
socket.configureBlocking(false);
executor.execute(new Task(socket));
}
System.out.println("accept end " + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
} finally {
executor.shutdown();
try {
if (serverSocket != null) {
serverSocket.close();
}
System.out.println("exit");
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class ThreadBuilder implements ThreadFactory {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "server-thread-" + counter.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> {
if (e instanceof TaskException) {
System.err.println(t.getName() + "|" + e.getCause().getMessage());
} else {
System.err.println(t.getName() + "|" + e.getMessage());
}
});
return thread;
}
}
static class Task implements Runnable {
private ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
private SocketChannel socket;
public Task(SocketChannel socket) {
this.socket = socket;
}
@Override
public void run() {
try {
System.out.println("--------------------------------------------------");
System.out.println("read begin " + System.currentTimeMillis());
System.out.println("***");
socket.read(buffer);// 呈阻塞效果
while (true) {
if (buffer.position() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
continue;
}
} else {
buffer.flip();
String str = new String(buffer.array(), 0, buffer.limit());
System.out.println(str);
if ("exit".equals(str)) {
break;
}
buffer.clear();
}
socket.read(buffer);
}
System.out.println("***");
System.out.println("read end " + System.currentTimeMillis());
System.out.println("--------------------------------------------------");
} catch (IOException e) {
throw new TaskException(e);
} finally {
if (socket != null) {
try {
// 自动关闭绑定流
socket.close();
System.out.println("socket closed");
} catch (IOException e) {
System.err.println("socket close failed");
}
}
}
}
}
static class TaskException extends RuntimeException {
public TaskException(Throwable cause) {
super(cause);
}
}
}
3、多路复用式I/O(基于非阻塞式I/O)
客户端代码
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…