« | August 2025 | » | 日 | 一 | 二 | 三 | 四 | 五 | 六 | | | | | | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | | | | | | | |
| 公告 |
戒除浮躁,读好书,交益友 |
Blog信息 |
blog名称:邢红瑞的blog 日志总数:523 评论数量:1142 留言数量:0 访问次数:9697752 建立时间:2004年12月20日 |

| |
[java语言]java nio的典型例子 文章收藏, 网上资源, 软件技术, 电脑与网络
邢红瑞 发表于 2006/3/25 10:31:44 |
主要是OP_WRITE和Reactor pattern的用法。import java.io.IOException;import java.nio.channels.SelectionKey;
/** * Created by IntelliJ IDEA. * User: hongruixing * Date: 2008-3-31 * Time: 14:49:31 * To change this template use File | Settings | File Templates. */public interface Reactor { /** * React to an NIO event. * * @param key SelectionKey which has a ready operation. */ void react(SelectionKey key) throws IOException;}
import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;
/** * Created by IntelliJ IDEA. * User: hongruixing * Date: 2008-3-31 * Time: 14:49:59 * To change this template use File | Settings | File Templates. */public class Acceptor implements Reactor { public void react(SelectionKey key) throws IOException { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); if (sc != null) {
sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ, new Reader(ByteBuffer.allocate(1024))); } }}import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;
/** * Created by IntelliJ IDEA. * User: hongruixing * Date: 2008-3-31 * Time: 14:50:39 * To change this template use File | Settings | File Templates. */public class Reader implements Reactor { ByteBuffer buffer;
/** * Constructs a Reader. * * @param buffer ByteBuffer for I/O. */ public Reader(ByteBuffer buffer) { this.buffer = buffer; }
public void react(SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); int count; while ((count = sc.read(buffer)) > 0 || buffer.position() > 0) {
try { buffer.flip(); count = sc.write(buffer);
if (count == 0) { // Stop reading this channel and wait for an OP_WRITE sc.register(key.selector(), SelectionKey.OP_WRITE, new Writer(buffer)); return; } } finally { buffer.compact(); } } if (count < 0) {
sc.close(); } }}
import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;
/** * Created by IntelliJ IDEA. * User: hongruixing * Date: 2008-3-31 * Time: 14:51:18 * To change this template use File | Settings | File Templates. */public class Writer implements Reactor { ByteBuffer buffer;
/** * Constructs a Writer. * * @param buffer ByteBuffer for I/O. */ public Writer(ByteBuffer buffer) { this.buffer = buffer; }
public void react(SelectionKey key) throws IOException { try { SocketChannel sc = (SocketChannel) key.channel(); buffer.flip(); int count = sc.write(buffer);
if (!buffer.hasRemaining()) // Nothing left to write: OK to start reading again sc.register(key.selector(), SelectionKey.OP_READ, new Reader(buffer)); } finally { buffer.compact(); } }}
import java.beans.PropertyChangeSupport;import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;
import java.nio.channels.ClosedSelectorException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.logging.Level;import java.util.logging.Logger;
public class EchoServer implements Closeable, Runnable { private static final Map<Integer, String> operations = new HashMap<Integer, String>(); private Logger logger = Logger.getLogger(this.getClass().getName()); private ServerSocketChannel ssc; private Selector selector;
/** * Creates a new instance of EchoServer. * * @param port Port number to listen at. */ public EchoServer(int port) throws IOException { this.ssc = ServerSocketChannel.open(); this.selector = Selector.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT, new Acceptor()); }
static { operations.put(SelectionKey.OP_ACCEPT, "OP_ACCEPT"); operations.put(SelectionKey.OP_CONNECT, "OP_CONNECT"); operations.put(SelectionKey.OP_READ, "OP_READ"); operations.put(SelectionKey.OP_WRITE, "OP_WRITE"); }
static String getOperations(int ops) { String result = ""; for (int i = 1; i < 32; i <<= 1) { if ((ops & i) == 0) continue; if (result.length() > 0) result += ","; result += operations.get(i); } return result; }
public void close() throws IOException { ssc.close(); selector.close(); }
public void run() { while (selector.isOpen()) { try { int nsel = selector.select(timeout); if (nsel == 0) { // TODO: timeout for (SelectionKey key : selector.keys()) { logger.log(Level.INFO, "channel " + key.channel() + " waiting for " + getOperations(key.interestOps())); } continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (!key.isValid() || !key.channel().isOpen()) // e.g.OP_ACCEPT is triggered once when the channel is closed. continue; Reactor reactor = (Reactor) key.attachment(); try { reactor.react(key); } catch (IOException exc) { logger.log(Level.SEVERE, "calling Reactor.react()", exc); } } } catch (ClosedSelectorException exc) { logger.log(Level.FINE, "select loop", exc); } catch (IOException exc) { logger.log(Level.SEVERE, "select loop", exc); } } }
/** * Utility field used by bound properties. */ private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
/** * Adds a PropertyChangeListener to the listener list. * * @param l The listener to add. */ public void addPropertyChangeListener(java.beans.PropertyChangeListener l) {
propertyChangeSupport.addPropertyChangeListener(l); }
/** * Removes a PropertyChangeListener from the listener list. * * @param l The listener to remove. */ public void removePropertyChangeListener(java.beans.PropertyChangeListener l) {
propertyChangeSupport.removePropertyChangeListener(l); }
/** * Holds value of property timeout. */ private int timeout;
/** * Getter for property timeout. * * @return Value of property timeout. */ public int getTimeout() { return this.timeout; }
/** * Setter for property timeout. * * @param timeout New value of property timeout. */ public void setTimeout(int timeout) { int oldTimeout = this.timeout; this.timeout = timeout; propertyChangeSupport.firePropertyChange("timeout", new Integer(oldTimeout), new Integer(timeout)); }
/** * Main method, for testing. * * @param args Command line arguments. */ public static void main(String[] args) throws IOException { int port = 8082; EchoServer server = new EchoServer(port); server.setTimeout(10 * 1000); new Thread(server).start(); while (true) { } }} |
|
|