java.io
ByteBuffer
ByteBuffer 可以在堆外分配内存,减少垃圾回收。
1 | ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100); |
ByteBuffer vs ByteBuf(Netty)
Channel
1 | // Files.copy(Paths.get(args[0]), Paths.get(args[1])); |
ECHO
OIO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63public class OIOEchoServer {
private final String host;
private final int port;
private final int poolSize;
private final ExecutorService executor;
public OIOEchoServer(String host, int port) {
this(host, port, Runtime.getRuntime().availableProcessors());
}
public OIOEchoServer(String host, int port, int poolSize) {
this.host = host;
this.port = port;
this.poolSize = poolSize;
this.executor = Executors.newFixedThreadPool(poolSize);
}
static class EchoHandler implements Runnable {
private final Socket socket;
public EchoHandler(Socket socket) {
this.socket = socket;
}
public void run() {
try (InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream();){
byte[] buffer = new byte[1024];
int read = 0;
while (-1 != (read = in.read(buffer))) {
out.write(buffer, 0, read);
}
} catch (IOException ignored) { }
}
}
public void start() {
try (ServerSocket server = new ServerSocket()) {
server.bind(new InetSocketAddress(host, port));
// accept 阻塞 SO_TIMEOUT 2000ms
//server.setSoTimeout(2000);
while (true) {
try {
Socket socket = server.accept();
executor.submit(new EchoHandler(socket));
} catch (SocketTimeoutException ignored) {
System.out.println("timeout");
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new OIOEchoServer("127.0.0.1", 8080).start();
}
}NIO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81public class NIOEchoServer {
private final String host;
private final int port;
private volatile boolean shutdown = false;
public NIOEchoServer(String host, int port) {
this.host = host;
this.port = port;
}
public void shutdown() {
System.out.println("shutdown");
shutdown = true;
}
public void start() {
try (ServerSocketChannel server = ServerSocketChannel.open();
Selector selector = Selector.open();) {
server.bind(new InetSocketAddress(host, port));
server.configureBlocking(false);
// 第二个参数是感兴趣的事件,默认常量有4个(连接、接受、读、写),
// 定义在SelectionKey类中,但并不是所有Channel都一定支持,可以用validOps()判断。
// ServerSocketChannel 默认只支持 {@link SelectionKey#OP_ACCEPT} 事件
server.register(selector, SelectionKey.OP_ACCEPT);
// 主循环 主循环阻塞会影响后面所有事件
while (!shutdown) {
System.out.println(Thread.currentThread().isInterrupted());
if (0 == selector.select(2000)) continue;
// try...catch
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid()) continue;
// 客户端发起连接
if (key.isValid() && key.isConnectable()) {
//System.out.println("connect");
}
// 服务器接收连接
if (key.isValid() && key.isAcceptable()) {
//System.out.println("accept");
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socket = serverChannel.accept();
socket.configureBlocking(false);
// 共用同一个selector
socket.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
if (key.isValid() && key.isReadable()) {
//System.out.println("read");
SocketChannel socket = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// 0 or -1
while (socket.read(buffer) > 0) {
buffer.flip();
if (key.isWritable()) socket.write(buffer);
buffer.clear();
}
socket.close();
}
if (key.isValid() && key.isWritable()) {
//System.out.println("write");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new NIOEchoServer("127.0.0.1", 8080).start();
}
}AIO
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89public class NIO2EchoServer {
private final String host;
private final int port;
public NIO2EchoServer(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
try {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newCachedThreadPool());
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(host, 8080));
// 复用ServerAcceptCompletionHandler,避免创建过多对象。
ServerAcceptCompletionHandler serverHandler = new ServerAcceptCompletionHandler();
server.accept(server, serverHandler);
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class ClientReadCompletionHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> {
private final ByteBuffer buffer;
public ClientReadCompletionHandler(int bufferCapacity) {
this.buffer = ByteBuffer.allocateDirect(bufferCapacity);
}
public ByteBuffer getBuffer() {
return buffer;
}
public void completed(Integer read, final AsynchronousSocketChannel socket) {
buffer.flip();
// 错误示例:
// write 是非阻塞的,我们不能想当然认为后面的buffer.clear后于write执行。
// 如果先于write执行,那么write时将无数据可写。
// socket.write(buffer);
// buffer.clear();
// socket.read(buffer, socket, this);
// ClientWriteCompletionHandler
socket.write(buffer, socket, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
public void completed(Integer write, AsynchronousSocketChannel socket) {
buffer.clear();
socket.read(buffer, socket, ClientReadCompletionHandler.this);
}
public void failed(Throwable exc, AsynchronousSocketChannel socket) {
}
});
}
public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
}
}
static class ServerAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
public void completed(AsynchronousSocketChannel socket, AsynchronousServerSocketChannel server) {
// 开始监听新的请求,回调处理有ServerCompletionHandler处理
server.accept(server, this);
// 开始读数据,读到的数据有ClientCompletionHandler处理
ClientReadCompletionHandler clientHandler = new ClientReadCompletionHandler(3);
socket.read(clientHandler.getBuffer(), socket, clientHandler);
}
public void failed(Throwable exc, AsynchronousServerSocketChannel server) {
System.out.println("fail");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
new NIO2EchoServer("127.0.0.1", 8080).start();
}
}Netty
1
Mina、Netty、Grizzly
本站采用「署名 4.0 国际」进行许可。