Go to Previous Page Go to Contents Go to Java Page Go to Next Page
New Features of Java2 SDK, Standard Edition, v1.4
 
 

New I/O SocketChannel と Selector

 
 

ソケットと select

 
 

Java では RMI や CORBA などの ORB を使うことができますが、かといってソケットを利用した通信がなくなってしまうことはありません。異なるプラットフォームや Java 以外の言語で記述されたアプリケーションと通信を行うときなどはソケットがよく使われます。

Java 2 SE, v1.4 ではソケットも Channel として実装され、パフォーマンスが向上しています。

以前から存在した Socket クラスと ServerSocket クラスに相当するのが、SocketChannel クラスと ServerSocketClass です。また、DatagramSocket クラスに相当するのが、DatagramChannel クラスになります。

これらのクラスは単に既存のクラスを Channel として書き直したものではなく、新しい機能が付け加えられています。その機能の 1 つがノンブロッキングモードの導入です。これと一緒に Selector クラスが新たに付け加えられました。

これは簡単にいえば UNIX のシステムコール select や WinSock の WSAAsyncSelect の機能が Java でも使えるようになったということです (といっても、知らない人にはさっぱり分からないですね ^ ^;;)。

今までできなかったことが不思議なくらいだったのですが、ようやく Java でも使えるようになったわけです。

さて、それでは実際に使ってみたいのですが、今回は DatagramChannel は扱わずに SocketChannel/ServerSocketChannel に絞って使ってみましょう。

 

 
  基本的な使い方  
 

Socket クラスが SocketChannel クラスに変わったからといっても、使い方はそれほど変わりはありません。

基本的な使い方は次のようになります。

サーバ クライアント
  1. ServerSocketChannel オブジェクトを生成
  2. バインドを行う
  3. accept メソッドでクライアントからの接続を待つ
 
 
  1. SocketChannel オブジェクトを生成
  2. connect メソッドによりサーバに接続
  1. accept メソッドの戻り値は Socket オブジェクトなので、そこから SocketChannel オブジェクトを得る
  2. スレッドを生成して、read/write を行う
 
 
  1. SocketChannel の read/write を行う
  1. close
  1. close

 

上の使い方に相当するコードを示していきましょう。ここで使用するコードはクライアントから送信される文字列をそのまま送り返すというサンプルで使ったものです。

アプリケーションのソース
サーバ SocketTestServer1.java
クライアント SocketTestClient1.java

はじめにサーバから見ていきましょう。

まず、ServerSocketChannel オブジェクトを生成します。とはいうものの、ServerSocketChannel のコンストラクタは protected なので、直接 new を使って生成をすることはできません。そこで、使用するのは static メソッドである open メソッドです。

次に行うのがバインドですが、ServerSocketChannel では直接バインドができません。Channel ではなくて ServerSocket クラスの bind メソッドを使用してバインドを行います。

この 2 つのメソッドとも IOException が throw されますので、例外処理は忘れずに。

 1:        try {
 2:            serverSocketChannel = ServerSocketChannel.open();
 3: 
 4:            InetSocketAddress address
 5:                = new InetSocketAddress(InetAddress.getLocalHost(), PORT);
 6: 
 7:            serverSocketChannel.socket().bind(address);
 8:        } catch (IOException ex) {
 9:            ex.printStackTrace();
10:            System.exit(1);
11:        }

2 行目で ServerSocketChannel のオブジェクトを取得しています。4 から 5 行目が自分の IP アドレスとポートを指定します。PORT は定数で、このサンプルで 9000 にしてあります。

そして、このアドレスをバインドしているのが 7 行目です。socket メソッドを使用して ServerSocket オブジェクトを取得し、bind メソッドでバインドを行います。

なんで、ServerSocketChannel クラスに bind メソッドがないのかが理解に苦しむところなんですけど。なんででしょうね?

次に行うのがアクセプトです。

 1:        while (true) {
 2:            try {
 3:                Socket socket = serverSocketChannel.accept();
 4:                SocketChannel socketChannel = socket.getChannel();
 5:                
 6:                System.out.println(socket.getInetAddress() + " connect.");
 7:
 8:                new MessageRepeater(socketChannel).start();
 9:            } catch (IOException ex) {
10:                ex.printStackTrace();
11:                return;
12:            }
13:        }

アクセプトは accept メソッドを使用します。accept メソッドは通常はクライアントが接続してくるまで処理をブロックします。クライアントが接続してくると、accept メソッドを抜けて戻り値として Socket オブジェクトを返します。

戻り値が Socket オブジェクトなので、そこから SocketChannel クラスを取り出します。これには getChannel メソッドを使用します。

なんで、accept の戻り値が SocketChannel オブジェクトではなくて、Socket オブジェクトなのかぜんぜん理由がわからないのですが、そういうものなのでしょう。

それにしても、bind といい、accept といい首尾が一貫していないような気がします。

(補) と書きましたが、FCS では accept メソッドの戻り値は SocketChannel オブジェクトになりました。bind メソッドはそのままですが。このため、ソースも次のように変更しています。

 1:        while (true) {
 2:            try {
 3:                SocketChannel socketChannel = serverSocketChannel.accept();
 4:                
 5:                System.out.println(socketChannel.socket().getInetAddress() + " connect.");
 6:
 7:                new MessageRepeater(socketChannel).start();
 8:            } catch (IOException ex) {
 9:                ex.printStackTrace();
10:                return;
11:            }
12:        }

(Apr. 2002)

それはさておき、普通は実際の読み書きは別のスレッドを用意して、そちらで行うようにします。というのも、このスレッドでは接続要求を待っていなくてはいけないからです。そのために、この部分は while 文で無限ループにしてあります。

さて、別のスレッドですが、このサンプルで使用しているのは SocketChannelServer1 クラスの内部クラスである MessageRepeater クラスです。このクラスはスレッドを持っているので、そちらで読み書きを行うようにしました。

MessageRepeater クラスの定義とコンストラクタ、start メソッド、run メソッドは次のようになっています。

 1:    class MessageRepeater implements Runnable {
 2:        private ByteBuffer buffer = ByteBuffer.allocateDirect(2048);
 3:        private SocketChannel socketChannel;
 4: 
 5:        public MessageRepeater(SocketChannel socketChannel){
 6:            this.socketChannel = socketChannel;
 7:        }
 8: 
 9:        public void start(){
10:            Thread thread = new Thread(this);
11:            thread.start();
12:        }
13: 
14:        public void run(){
15:            try {
16:                sendBack();
17:            } catch (IOException ex){
18:                ex.printStackTrace();
19:            }
20:        }

MessageRepeater クラスはスレッドを使用して読み書きを行うので、Runnable インタフェースをインプリメントしています。

プロパティの buffer は読み書きを行うときのバッファとして使用します。socketChannel はコンストラクタで初期化します (6 行目)。

start メソッドで読み書き用のスレッドを生成し (10 行目)、それを次の行でスタートさせています。スレッドがスタートすると run メソッドがコールされますが、ここでは読み書きを行うメソッド sendBack を実行するだけです。

sendBack メソッドを次に示しましょう。

 1:        private void sendBack() throws IOException {
 2:            try {
 3:                while (socketChannel.isConnected()) {
 4:                    buffer.clear();
 5:
 6:                    if (socketChannel.read(buffer) < 0){
 7:                        break;
 8:                    }
 9:
10:                    buffer.flip();
11:                    System.out.println(socketChannel.socket().getInetAddress()
12:                                + " : " + decoder.decode(buffer.duplicate()));
13:
14:                    socketChannel.write(buffer);
15:                }
16:            } finally {
17:                System.out.println(socketChannel.socket().getInetAddress()
18:                                   + " closed.");
19:                socketChannel.close();
20:            }
21:        }

読み書きはくり返し行うため、while ループを使用しています。

まず読み込みを行います。バッファを 4 行目で clear メソッドを使用してクリアします。そして、6 行目で read メソッドで読み込みを行います。read メソッドの戻り値は、読み込んだバイト数ですが、Channel がクローズされたときなどは -1 が返ってくるので、これをチェックするために if 文を使用しています。

次に書き込みを行います。バッファに実際に読み込んだ部分だけを書き込むために flip メソッドで limit を position の位置に移動させ、position を 0 にしておきます。

そして、14 行目で書き込みを行います。

忘れてならないのが、例外処理です。IOException が発生すると、sendBack メソッドを抜けて run メソッドの catch 節に処理が移ります。ここを抜けると、一回メソッドを抜けたにもかかわらず 16 行目の finnaly 節に戻ってきます。

通常に while ループを抜けても finnaly 節に実行が移るので、必ず finally 節は実行されることになります。

finnaly 節では Channel をクローズしています (19 行目)。必ず Channel をクローズする癖をつけておいたほうがいいと思います。

ところで、なぜこんな書き方をしているかというと、Channel インタフェースの close メソッドは IOException を throw するからです。

普通に書くと次のように catch 節の中の close メソッドを try ... catch で囲んでいって、無限にそれにつながってしまいます。

そこで、sendBack メソッドでは IOException は throw してしまい、finnaly 節でクローズを行っているのです。もし、finnaly 節の close メソッドで IOException が発生したら、また throw するだけです。

    try {
        while (channel.isConnected()) {
              // 何らかの Channel に対する処理
        }
    } catch (IOException ex) {
        try{
            channel.close();       
        } catch (IOException ex) {
            try{
                channel.close();       
            } catch (IOException ex) {

                    以下無限につづく

さて、次はクライアントの方です。

クライアントでは SocketChannel オブジェクトを生成して、サーバにコネクトすることから始めます。これは SocketTestClient1 クラスのコンストラクタで行っています。

 1:    public SocketTestClient1(String host) throws IOException {
 2:        InetSocketAddress address
 3:            = new InetSocketAddress(InetAddress.getByName(host), PORT);
 4:
 5:        socketChannel = SocketChannel.open();
 6:        System.out.println("Channel " + socketChannel.isBlocking());
 7:        socketChannel.connect(address);
 8:    }

SocketChannel オブジェクトの生成には SocketChannel クラスの static なメソッド open を使用します (5 行目)。直接 new を使用してオブジェクトを生成することはできません。

そして、2, 3 行目で作成したアドレスに connect メソッドを使用してコネクトします (7 行目)。

読み書きは communicate メソッドで行っています。

 1:    public void communicate() throws IOException {
 2:        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
 3:        
 4:        try{
 5:            while (socketChannel.isConnected()) {
 6:                System.out.print(">");
 7:                String inputText = reader.readLine(); 
 8:                if (inputText.equalsIgnoreCase("q")) {
 9:                    break;
10:                }
11:
12:                socketChannel.write(encoder.encode(CharBuffer.wrap(inputText)));
13:
14:                buffer.clear();
15:                socketChannel.read(buffer);
16:                buffer.flip();
17:                System.out.println(socketChannel.socket().getInetAddress()
18:                                   + " : " + decoder.decode(buffer));
19:            }
20:        } finally {
21:            if (socketChannel != null) {
22:                System.out.println(socketChannel.socket().getInetAddress()
23:                                   + " cloesed.");
24:                socketChannel.close();
25:            }
26:        }
27:    }

変数 reader は標準入出力からの入力を扱うために使用します。入力は inputText という String オブジェクトに格納されるので (7 行目)、それを 12 行目で Channel に書き出しています。ここで送信するのは文字列なので、文字コードに合わせて行わなくてはなりません。そこで、Charset/CharsetEncoder/CharsetDecoder クラスを使用して文字コードの変換を行っています。ここでは UTF-16 を使用しています。

書き込みを行ったらサーバから戻ってくる文字列を読み込みます。読み込みを行うために、バッファのクリアを行っておきます (14 行目)。そして、15 行目で読み込みを行います。

17, 18 行目で読み込んだ文字列を出力していますが、このときも文字コードの変換を行っています。

最後の finnaly 節はサーバの時と同様で、ここで Channel をクローズしています。

 

 
  ノンブロッキングモードの使い方  
 

ソケット通信を使っていると、通信ができるまで待っている部分が何ヶ所かあります。例えば、accept メソッドや read メソッドです。

accept メソッドはクライアントがコネクトするまで、ずーっと待ち状態のままになり、コネクトしてやっとメソッドから抜けてきます。同じように、read メソッドはデータがないときには、相手がデータを書き込みを行わない限り待ち状態のままです。

このような状態がブロッキング状態です。ブロックしていると処理が止まってしまうので、処理を続けて行いたいときには別のスレッドを立ち上げる必要があります。例えば、SocketTestServer1 クラスではアクセプトでブロックしてしまうので、読み書きには別のスレッドを使用しています。

しかし、スレッドを使用することはコンピュータのメモリなどのリソースを消費してしまいます。少数のスレッドであればいいのですが、数千、数万の接続があればそれと同じだけのスレッドが必要になります。でも、そんなにスレッドが作れるかどうか、また作れたとしても正常に動作するかどうかは分かりません。

そこで、登場するのがノンブロッキングの通信です。

ノンブロッキングモードでは accept メソッドも read メソッドもブロックしません。例えば、accept メソッドをコールしたときにクライアントからのコネクト要求がなければそのまま抜けてしまいます。ブロックしないので、スレッドを使わなくても処理を続けることが可能になります。

とすると、複数の接続を同一のスレッドで処理することができるのです。

でも、ちょっと待ってください。

それじゃ、いつ accept メソッドをコールすればいいのでしょうか。接続要求がないときに accept メソッドをコールしてもすぐに返ってきてしまうということは、接続要求があったときに accept メソッドをコールする必要があります。

しかし、接続要求があることをどうやって知ればいいのでしょうか。

おなじように read メソッドはいつコールすればいいのでしょうか。

そんなときに使用するのが java.nio.channels.Selector クラスです。

もともと、UNIX には接続要求やデータの有無を監視するために select というシステムコールが用意されていました。同じように Windows で使われている WinSock には WSAAsyncSelect という Win32 メソッドが用意されています。

Selector はこれらの Java 版に相当します。それも、単に Java で select が使えるようになっただけではなく、使い方も簡単になっています。

さっそく、使い方を見ていきましょう。

サーバで使用したほうが効果が大きいので、クライアントは先ほどの SocketTestClient1 クラスをそのまま使用して、サーバだけ書きかえてみました。

アプリケーションのソース
サーバ SelectSocketTestServer1.java

接続要求やデータの有無を監視するということは、何らかのブロッキングの処理が必要になります。これは、Selector クラスの select メソッドで行います。select メソッドは準備ができている I/O 処理を選択するという意味です。

ここで選択できる I/O 処理は java.nio.channels.SelectionKey クラスで定義されています。

名前 説明
SelectionKey.OP_ACCEPT ソケットのアクセプト
SelectionKey.OP_CONNECT ソケットのコネクト
SelectionKey.OP_READ 読み込み
SelectionKey.OP_WRITE 書き込み

基本的な使い方は次のようになります。

  1. Channel を生成し、ノンブロッキングモードに設定する
  2. Selector への登録
  3. select メソッドのコール
  4. 何らかの I/O 処理が発生し、select メソッドから戻る
  5. I/O 処理の種類を調べ、その種類に応じた処理を行う
  6. 再び select メソッドをコール

Selector に登録できるのは SelecatbleChannel クラスの派生クラスです。SelectableChannel の派生クラスには、SocketChannel, ServerSocketChannel, DatagramChannel の 3 つのクラスがあります。

Selector への登録には SelectableChannel クラスの register メソッドを使用します。SelectorSocketServer1 クラスではコンストラクタで登録を行っています。

 1:    public SelectorSocketServer1() {
 2:        try {
 3:            selector = SelectorProvider.provider().openSelector();
 4: 
 5:            ServerSocketChannel serverSocketChannel 
 6:                = SelectorProvider.provider().openServerSocketChannel();
 7: 
 8:            // Non-Blocking モードにする
 9:            serverSocketChannel.configureBlocking(false);
10: 
11:            InetSocketAddress address
12:                = new InetSocketAddress(InetAddress.getLocalHost(), PORT);
13:            serverSocketChannel.socket().bind(address);
14: 
15:            // Selector への登録
16:            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
17:        } catch (IOException ex) {
18:            System.exit(1);
19:        }
20:    }

変数 selector は Selector オブジェクトで、プロパティとなっています。Selector オブジェクトは 3 行目に示したように SelectorProvider クラスを使用する方法と、じかに Selector クラスの open メソッドを使用する方法がありますが、どちらをつかっても OK です。

同様に ServerSocketChannel オブジェクトもここでは SelectorProvider クラスを使用して生成しています (5, 6 行目)。

9 行目で ServerSocketChannel オブジェクトをノンブロッキングモードに設定しています。Selector と一緒に使うときには、ブロッキングモードではダメなのでご注意ください。

Selector オブジェクトへの登録は 16 行目で示したように、SelectableChannel クラスの register メソッドを使用します。Selector オブジェクトへの登録なので、Selector クラスのメソッドを使うような気がするのですが、実際には逆で SelectableChannel のメソッドになのです。ですから、Selector クラスの JavaDoc を見て、「登録メソッドがない !」と早とちりしてしまいそうです (実際に、私は早とちりしてしまいました ^^;;)。

Selector オブジェクトへの登録には、どの I/O 処理に Selector を使用するか指定します。それが、前述した SelectionKey の定数です。16 行目ではアクセプトを指定しています。

登録ができたので、selector メソッドを使用したブロッキング処理の部分を見ていきましょう。大まかな処理は次のようになります。

            while (selector.select() > 0) {
                // セレクトされた SelectionKey オブジェクトをまとめて取得する
                Iterator keyIterator = selector.selectedKeys().iterator();
                
                while (keyIterator.hasNext()) {
                    // Iterator から要素を取り出し、取り出した要素は Iterator から削除する
                    SelectionKey key = (SelectionKey)keyIterator.next();
                    keyIterator.remove();
                    
                    // セレクトされた SelectionKey の状態に応じて処理を決める
                    if(key.isAcceptable()) {
                        // accept の場合の処理
                        ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                                  .......
                    } else if (key.isReadable()) {
                        // データが送られてきたときの処理
                        SocketChannel channel = (SocketChannel)key.channel();
                                  .......
                    } else if ......
                    }
                }
            }

slectr メソッドは Selector オブジェクトに登録された I/O 処理が起こるまで処理をブロッキングします。select メソッドの戻り値は I/O 処理が行われた (セレクトされた) SelectableChannel オブジェクトの個数となります。

セレクトされた SelectableChannel オブジェクトは slectedKeys メソッドで取りだせます。slectedKey メソッドの戻り値は java.util.Set オブジェクトです。Set クラスは List と異なって、インデックスで扱えないので、Iterator オブジェクトを使用することにしましょう。

次に、Iterator オブジェクトから要素 (SelectionKey オブジェクト) を 1 づつ取り出します。取り出した SelectionKey オブジェクトは Iterator から削除しておきます。

後は SelectionKey オブジェクトには、I/O 処理の種類と SelectableChannel オブジェクトが保持されているので、I/O 処理の種類に対応した処理を行います。SelectableChannel オブジェクトを取りだすには channel メソッド、I/O 処理の調べるには 4 つのメソッドが用意されています。

  • isAcceptable メソッド
  • isConnectable メソッド
  • isReadable メソッド
  • isWritable メソッド

それぞれのメソッドで I/O 処理の種類によって、channel メソッドの戻り値をキャストする必要があります。アクセプトだったら ServerSocketChannel オブジェクトにキャストし、その他は SocketChannel にキャストしてから I/O 処理を行います。

SelectorServerSocket1 クラスでは次のようになりました。

        try {
            while (selector.select() > 0) {
                // セレクトされた SelectionKey オブジェクトをまとめて取得する
                Iterator keyIterator = selector.selectedKeys().iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = (SelectionKey)keyIterator.next();
                    keyIterator.remove();
                    
                    // セレクトされた SelectionKey の状態に応じて処理を決める
                    if (key.isAcceptable()) {
                        // accept の場合

                        ServerSocketChannel serverSocketChannel 
                            = (ServerSocketChannel)key.channel();
                        accept(serverSocketChannel);

                    } else if (key.isReadable()) {
                        // データが送られてきたとき

                        SocketChannel socketChannel = (SocketChannel)key.channel();
                        sendBack(socketChannel);
                    }
                }
            }
        } catch (IOException ex) {
            System.out.println(ex.getClass());
            ex.printStackTrace();
            return;
        }

アクセプトされたときは accept メソッドをコールし、データを受信したときには sendBack メソッドをコールしています。そういえば、Selector オブジェクトに登録した I/O 処理はアクセプトだけなのに、なぜデータ受信があるのか不思議かもしれません。その謎は accept メソッドにあります (えっ、謎でもなんでもないって、そりゃごもっとも)。

    private void accept(ServerSocketChannel serverSocketChannel) throws IOException {
        SocketChannel socketChannel = serverSocketChannel.accept();
 
        // Non-Blocking モードに変更
        socketChannel.configureBlocking(false);
 
        // Selector への登録
        socketChannel.register(selector, SelectionKey.OP_READ);
        System.out.println(socketChannel.socket().getInetAddress() + " connect.");
    }

アクセプトされたら、SocketChannel オブジェクトを取得して、それを再び Selector に登録しているのです。このときの I/O 処理の指定が、データ受信なのです。ですから、 selector メソッドのループでデータ受信の場合があったのです。

データ受信の時にコールされる sendBack メソッドは SocketTestServer1 クラスの時とそれほど違いません。一番大きな違いは while ループがなくなったことです。ループは slector メソッドのループが肩代わりしてくれるからですね。

いちおう、コードを示しておきます。

    private void sendBack(SocketChannel socketChannel) throws IOException {
 
        buffer.clear();
        
        // データの読み込み
        if (socketChannel.read(buffer) < 0){
            socketChannel.close();
            return;
        }
        
        // 読み込んだデータをそのまま送り返す
        buffer.flip();
        System.out.println(socketChannel.socket().getInetAddress()
                           + " : " + decoder.decode(buffer.duplicate()));
        
        socketChannel.write(buffer);
    }

このように Selector クラスを使用すると、データ受信用のスレッドなどを必要とせずに、1 つのスレッドでことが済んでしまいます。ぜひ、ノンブロックモードを使いこなしてみてください。

 

 
 

最後に

 
 

ノンブロックがサポートされたので、複数の接続であってもたった 1 つのスレッドで処理を行うことができます。複数のスレッドを使用すると、リソースを食いますから、できるだけ少ないスレッドで切り盛りできればそれにこしたことはありません。複数のソケット通信のパフォーマンスを測定する VolanoMark というベンチマークまであるのですから、サーバ系のソフトではとても重要になってきます。

Selector を使えば、そんな悩みもふっ飛ばしてくれます。

ただし、selector メソッドのループで行う I/O 処理は、時間がかからないようにする必要があります。これはイベント処理と同じなのですが、処理が長くなってしまうと、他の待ち状態にあるものがいつまでたっても処理されなくなってしまうからです。はてはセレクトされた SelectionKey の Map がどんどん大きくなって、いつかは OutOfMemoryError になってしまうかもしれません。

なるべく迅速な処理を行って、交通渋滞を引き起こさないように心がけるようにしましょう。

今回使用したサンプルはここからダウンロードできます。

参考 URL

(Nov. 2001)

 

 
 
Go to Previous Page Go to Contents Go to Java Page Go to Next Page