初出 JAVA PRESS Vol.29

Java API ダイジェスト

java.nio.channels.Slector

ソケットチャネルを非同期で使用する時、ともに使われるのがSelectorクラスです。

Selectorクラスは複数のチャネルのブロッキング操作をまとめて行うことができます。このため、オープンしているソケット数にかかわらずシングルスレッドでブロッキング操作の監視が行えます。

したがって、スケーラビリティを高くすることが可能です。

Selectorオブジェクトにはノンブロッキングとして扱う入出力処理がキーとして登録されます。キーはSelectionKeyクラスを使用して表されます。Selectorオブジェクトは登録されたキーを監視し、操作可能なキーを選択します。


入出力操作のマルチプレクサ

キーの登録はSelectableChannelクラスで行われるため、Selectorクラスではキーの管理を主に行います。

Selectorオブジェクトの生成はファクトリメソッドopenを使用します。

生成

selectメソッドは登録されたキーの中で実行可能なものを選択します。このメソッドは実行可能なキーが選択できるまでブロックします。

実行可能なキーの選択

selectメソッドでキーの選択後、selectedKeysメソッドを使用して実行可能なキーのセットを取得します。

実行可能なキーの取得

ソケットのスケーラビリティを向上させる

生成

public static Selector open()

Selectorオブジェクトを生成するためのファクトリメソッドです。

実行可能なキーの選択

public int select()
public int select(long timeout)

登録されたキーの中で実行可能なものを選択します。このメソッドはキーが実行可能になるまでブロックされます。引数でtimeoutを指定した場合は、timeoutまでに実行可能なキーがなければメソッドを抜けてしまいます。

selectメソッドの戻り値は実行可能なキーの個数です。タイムアウトした場合は実行可能なキーはないので、戻り値は0になります。

実行可能なキーの取得

public Set selectedKeys()

selectメソッドで選択された実行可能なキーのセットを返します。セットの要素はSelectionKeyオブジェクトです。

キーを処理する時には、まずセットからそのキーを削除します。SelectionKeyクラスは入出力操作を調べるためのメソッドがあるので、それを使用して入出力操作を特定します。その後、その操作を処理します。

リスト 1 はServerSocketChannelクラスのサンプルをSelectorクラスを使用して書き直したものです。アクセプトと入力操作の監視にSelectorクラスを使用しています。

ServerSocketChannel オブジェクトをオープンした後に configureBlocking メソッドを使用して非同期モードにし、アクセプト操作を selector に登録します。

while ループでは選択されたキーの処理を行います。SelectionKey#channel メソッドを使用して、操作が行えるソケットチャネルを取得します。

アクセプトの場合は ServerSocketChannel#accept メソッドを使用して SocketChannel オブジェクトを取得します。accept メソッドを使用して取得した SocketChannel オブジェクトは同期モードになので、非同期モードに変更し、読み込み処理を selector に登録します。

 

リスト 1 Selectorクラスの使用例

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.*;
 
public class NonblockServer {
    private Selector selector;
    private Charset charset = Charset.forName("UTF-16");
         
    public NonblockServer() {
        try {
            selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false); // 非同期モードにする
 
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 9000);
            serverSocketChannel.socket().bind(address);
 
            // Selector にアクセプトを登録
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  
            while (selector.select() > 0) {
                // 選択された SelectionKey オブジェクトをまとめて取得する
                Set keys = selector.selectedKeys();
                Iterator keyIterator = keys.iterator();
                 
                while (keyIterator.hasNext()) {
                    SelectionKey key = (SelectionKey)keyIterator.next();
                    keyIterator.remove(); // 選択されたキーを削除
                     
                    if (key.isAcceptable()) {
                        // アクセプトの場合
                        ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
                        SocketChannel socketChannel = serverChannel.accept();
                        socketChannel.configureBlocking(false); // 非同期に変更
 	
                        // Selector に読み込みを登録
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println(socketChannel + " connect.");
                    } else if (key.isReadable()) {
                        // 入力
                        SocketChannel channel = (SocketChannel)key.channel();
                        sendBack(channel);
                    }
                }
            }
         
            serverSocketChannel.close();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
 
    private void sendBack(SocketChannel socketChannel) throws Exception {
        // データの読み込み
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        socketChannel.read(buffer);
         
        // 読み込んだデータをそのまま送り返す
        buffer.flip();
        System.out.println("Recieve: " + charset.decode(buffer.duplicate()));
        socketChannel.write(buffer);
        socketChannel.close();
    }
 
    public static void main(String[] args){
        new NonblockServer();
    }
}

(2003.03)