ソケットのスケーラビリティを向上させる
生成
- open
public static Selector open()
Selectorオブジェクトを生成するためのファクトリメソッドです。
実行可能なキーの選択
- ServerSocketChannel#select
public int select()
public int select(long timeout)
登録されたキーの中で実行可能なものを選択します。このメソッドはキーが実行可能になるまでブロックされます。引数でtimeoutを指定した場合は、timeoutまでに実行可能なキーがなければメソッドを抜けてしまいます。
selectメソッドの戻り値は実行可能なキーの個数です。タイムアウトした場合は実行可能なキーはないので、戻り値は0になります。
実行可能なキーの取得
- selectedKeys
public Set selectedKeys()
selectメソッドで選択された実行可能なキーのセットを返します。セットの要素はSelectionKeyオブジェクトです。
キーを処理する時には、まずセットからそのキーを削除します。SelectionKeyクラスは入出力操作を調べるためのメソッドがあるので、それを使用して入出力操作を特定します。その後、その操作を処理します。
リスト 1 はServerSocketChannelクラスのサンプルをSelectorクラスを使用して書き直したものです。アクセプトと入力操作の監視にSelectorクラスを使用しています。
ServerSocketChannel オブジェクトをオープンした後に configureBlocking メソッドを使用して非同期モードにし、アクセプト操作を selector に登録します。
while ループでは選択されたキーの処理を行います。SelectionKey#channel メソッドを使用して、操作が行えるソケットチャネルを取得します。
アクセプトの場合は ServerSocketChannel#accept メソッドを使用して SocketChannel オブジェクトを取得します。accept メソッドを使用して取得した SocketChannel オブジェクトは同期モードになので、非同期モードに変更し、読み込み処理を selector に登録します。
リスト 1 Selectorクラスの使用例
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
public class NonblockServer {
private Selector selector;
private Charset charset = Charset.forName("UTF-16");
public NonblockServer() {
try {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 非同期モードにする
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 9000);
serverSocketChannel.socket().bind(address);
// Selector にアクセプトを登録
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
// 選択された SelectionKey オブジェクトをまとめて取得する
Set keys = selector.selectedKeys();
Iterator keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey)keyIterator.next();
keyIterator.remove(); // 選択されたキーを削除
if (key.isAcceptable()) {
// アクセプトの場合
ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false); // 非同期に変更
// Selector に読み込みを登録
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel + " connect.");
} else if (key.isReadable()) {
// 入力
SocketChannel channel = (SocketChannel)key.channel();
sendBack(channel);
}
}
}
serverSocketChannel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
private void sendBack(SocketChannel socketChannel) throws Exception {
// データの読み込み
ByteBuffer buffer = ByteBuffer.allocate(2048);
socketChannel.read(buffer);
// 読み込んだデータをそのまま送り返す
buffer.flip();
System.out.println("Recieve: " + charset.decode(buffer.duplicate()));
socketChannel.write(buffer);
socketChannel.close();
}
public static void main(String[] args){
new NonblockServer();
}
}
(2003.03)