Home > Java > javaTutorial > body text

Java NIO Reactor Pattern

巴扎黑
Release: 2016-12-19 11:46:01
Original
1688 people have browsed it

Java NIO Reactor Mode Simple Model

Generally, the reactor mode in NIO is like this: one Acceptor (of course more than one will work, but one is enough in general scenarios) is responsible for the accept event, and registers the received Socket CHannel to the server according to a certain algorithm. On a Reactor taken out of the Reactor pool, the registered events are read, write, etc. After that, all IO events of this Socket Channel have nothing to do with the Acceptor, and are all handled by the registered Reactor.

Each Acceptor and each Reactor each holds a Selector

Of course, each Acceptor and Reactor must be a thread (at least logically it must be a thread)

Simple implementation, three classes NioAcceptor, NioReactor and ReactorPool :

package cc.lixiaohui.demo.dp.reator;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Acceptor负责处理SelectionKey.OP_ACCEPT事件, 将接收到的SocketChannel注册到Reactor上去
 */
public class NioAcceptor {
private int port;
private String host;
private Selector selector; // Java NIO Selector
private final ServerSocketChannel serverChannel; // Java NIO ServerSocketChannel
private ReactorPool reactorPool; // NioReactor池
private Thread thread; // 工作线程
private volatile boolean stop = false;
private static final Logger logger = LoggerFactory.getLogger(NioAcceptor.class);
public NioAcceptor(int port, String host, int reactorPoolSize) throws IOException {
this.port = port;
this.host = Objects.requireNonNull(host);
this.reactorPool = new ReactorPool(reactorPoolSize);
selector = Selector.open(); // 创建selector
serverChannel = ServerSocketChannel.open(); // new server socket channel
serverChannel.configureBlocking(false); // in non-blocking mode
serverChannel.bind(new InetSocketAddress(host, port)); // bind
serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 
}
public void stop() throws InterruptedException {
stop = true;
thread.join();
}
public void start() {
thread = new Thread(new AcceptTask(this));
thread.start();
}
private static class AcceptTask implements Runnable {
NioAcceptor acceptor;
AcceptTask(NioAcceptor acceptor) {
this.acceptor = acceptor;
}
public void run() {
final Selector selector = acceptor.selector;
Set<SelectionKey> keys = null;
while (!acceptor.stop) { // 运行中
try {
selector.select(1000L); // select, 最多等1秒
keys = selector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) { // 可accept
SocketChannel channel = acceptor.serverChannel.accept();
channel.configureBlocking(false);
// 取下一个Reactor并把SocketChannel加入到Reactor的注册队列
acceptor.reactorPool.nextReactor().postRegistry(channel);
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (IOException e) {
logger.error("", e);
}
}
}
}
}
Copy after login
rrree

ReactorPool is used to manage Reactor:

/**
 * Reactor负责SelectionKey.OP_READ | SelectionKey.OP_WRITE等事件
 */
public class NioReactor {
/** 待注册的{@link SocketChannel} 队列 */
private Queue<SocketChannel> registerQueue = new ConcurrentLinkedQueue<SocketChannel>();
private Selector selector;
private volatile boolean stop = false;
private Thread thread;
private static final Logger logger = LoggerFactory.getLogger(NioReactor.class);
public NioReactor() throws IOException {
selector = Selector.open();
}
public void postRegistry(SocketChannel channel) {
registerQueue.add(channel);
selector.wakeup(); // 唤醒selector, 以便让其即时处理注册
}
public NioReactor start() {
thread = new Thread(new ReactTask(this));
thread.start();
return this;
}
public void stop() throws InterruptedException {
stop = true;
thread.join();
}
/**
* 处理队列里面的待注册的SocketChannel
*/
private void doRegister(Selector selector) {
while (!registerQueue.isEmpty()) {
SocketChannel channel = registerQueue.poll();
try {
// 注册读事件, 写事件无需注册, 写事件是业务驱动的, 当往channel写入 数据未写完时再注册写事件
channel.register(selector, SelectionKey.OP_READ); 
} catch (ClosedChannelException e) {
logger.error("", e);
}
}
}
private void handleWrite(SelectionKey key) {
// TODO 业务写
}
private void handleRead(SelectionKey key) {
// TODO 业务读
}
private static class ReactTask implements Runnable {
NioReactor reactor;
ReactTask(NioReactor reactor) {
this.reactor = reactor;
}
public void run() {
Set<SelectionKey> keys = null;
while (!reactor.stop) {
final Selector selector = reactor.selector;
try {
selector.select(500L);
reactor.doRegister(selector); // 处理注册
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
try {
if (!key.isValid()) { // not valid
key.cancel();
continue;
}
if (key.isReadable()) { // 可读
reactor.handleRead(key);
}
if (key.isWritable()) { // 可写
reactor.handleWrite(key);
}
} catch (Throwable t) {
logger.error("", t);
continue;
}
}
} catch (IOException e) {
logger.error("", e);
}
}
}
}
 }
Copy after login


source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template