ソケットのスケーラビリティを向上させる
生成
- open
public static Selector open()
Selectorオブジェクトを生成するためのファクトリメソッドです。
実行可能なキーの選択
- ServerSocketChannel#select
public int select()
public int select(long timeout)
登録されたキーの中で実行可能なものを選択します。このメソッドはキーが実行可能になるまでブロックされます。引数でtimeoutを指定した場合は、timeoutまでに実行可能なキーがなければメソッドを抜けてしまいます。
selectメソッドの戻り値は実行可能なキーの個数です。タイムアウトした場合は実行可能なキーはないので、戻り値は0になります。
実行可能なキーの取得
- selectedKeys
public Set selectedKeys()
selectメソッドで選択された実行可能なキーのセットを返します。セットの要素はSelectionKeyオブジェクトです。
キーを処理する時には、まずセットからそのキーを削除します。SelectionKeyクラスは入出力操作を調べるためのメソッドがあるので、それを使用して入出力操作を特定します。その後、その操作を処理します。
リスト 1 はServerSocketChannelクラスのサンプルをSelectorクラスを使用して書き直したものです。アクセプトと入力操作の監視にSelectorクラスを使用しています。
ServerSocketChannel オブジェクトをオープンした後に configureBlocking メソッドを使用して非同期モードにし、アクセプト操作を selector に登録します。
while ループでは選択されたキーの処理を行います。SelectionKey#channel メソッドを使用して、操作が行えるソケットチャネルを取得します。
アクセプトの場合は ServerSocketChannel#accept メソッドを使用して SocketChannel オブジェクトを取得します。accept メソッドを使用して取得した SocketChannel オブジェクトは同期モードになので、非同期モードに変更し、読み込み処理を selector に登録します。
import java.net.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.util.*; public class NonblockServer { private Selector selector; private Charset charset = Charset.forName("UTF-16"); public NonblockServer() { try { selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 非同期モードにする InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 9000); serverSocketChannel.socket().bind(address); // Selector にアクセプトを登録 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { // 選択された SelectionKey オブジェクトをまとめて取得する Set keys = selector.selectedKeys(); Iterator keyIterator = keys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = (SelectionKey)keyIterator.next(); keyIterator.remove(); // 選択されたキーを削除 if (key.isAcceptable()) { // アクセプトの場合 ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel(); SocketChannel socketChannel = serverChannel.accept(); socketChannel.configureBlocking(false); // 非同期に変更 // Selector に読み込みを登録 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel + " connect."); } else if (key.isReadable()) { // 入力 SocketChannel channel = (SocketChannel)key.channel(); sendBack(channel); } } } serverSocketChannel.close(); } catch (Exception ex) { ex.printStackTrace(); } } private void sendBack(SocketChannel socketChannel) throws Exception { // データの読み込み ByteBuffer buffer = ByteBuffer.allocate(2048); socketChannel.read(buffer); // 読み込んだデータをそのまま送り返す buffer.flip(); System.out.println("Recieve: " + charset.decode(buffer.duplicate())); socketChannel.write(buffer); socketChannel.close(); } public static void main(String[] args){ new NonblockServer(); } }
(2003.03)