AsynchronousFileChannelを用いたファイルの読書き処理について
AsynchronousFileChannelを用いたファイルの非同期書き込みと、読み込み処理について記載します。
まずは書き込み処理を行う処理。
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class AsyncFileWriteMain { private static final int LINE_SIZE = 10; public static void main(String[] args) throws IOException { final Path path = Paths .get("path_to_file"); if (Files.exists(path)) { Files.delete(path); } try (AsynchronousFileChannel channel = AsynchronousFileChannel.open( path, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { System.out.println("Start to write"); final CompletionHandler<Integer, String> handler = new CompletionHandlerImpl(); for (int i = 0, position = 0; i < LINE_SIZE; i++) { final String message = "Hello World-" + Integer.toString(i) + "\n"; // Buffer、書き込み位置、アタッチメント、ハンドラを与えて書き込み処理実行 channel.write(ByteBuffer.wrap(message.getBytes()), position, getAttachment(i), handler); // 次にメッセージを書き込む位置を設定 position += message.length(); } } } /* * Attachmentはサンプルなので適当なメッセージを返す。 */ private static String getAttachment(int i) { return (i + 1) == 1 ? Integer.toString(i + 1) + " time" : Integer.toString(i + 1) + " times"; } private static final class CompletionHandlerImpl implements CompletionHandler<Integer, String> { @Override public void completed(Integer result, String attachment) { System.out.println("[Completed to write] Current thread : " + Thread.currentThread().getId() + " Attachment : " + attachment + " Written bytes : " + result); } @Override public void failed(Throwable e, String attachment) { System.out.println("[Failed to write] Current thread : " + Thread.currentThread().getId() + " Attachment : " + attachment + " Exception : " + e); } } }
try-with-resourcesブロック内で、AsynchronousFileChannelクラスのopenメソッドを用いてチャネルをオープンします。
あとはコメントにある通りAsynchronousFileChannel#writeメソッドで書き込み処理を行います。CompletionHandlerは書き込み完了時に実行されるハンドラクラスで、書き込みが正常に完了すると、completedメソッドが実行され、失敗するとfailedメソッドが実行されます。
上記プログラムを実行するとコンソールに以下のようなメッセージが出力されると思います。
Start to write
[Completed to write] Current thread : 16 Attachment : 2 times Written bytes : 14
[Completed to write] Current thread : 17 Attachment : 1 time Written bytes : 14
[Completed to write] Current thread : 16 Attachment : 7 times Written bytes : 14
[Completed to write] Current thread : 15 Attachment : 3 times Written bytes : 14
[Completed to write] Current thread : 17 Attachment : 8 times Written bytes : 14
[Completed to write] Current thread : 12 Attachment : 6 times Written bytes : 14
[Completed to write] Current thread : 14 Attachment : 4 times Written bytes : 14
[Completed to write] Current thread : 13 Attachment : 5 times Written bytes : 14
[Completed to write] Current thread : 17 Attachment : 10 times Written bytes : 14
[Completed to write] Current thread : 16 Attachment : 9 times Written bytes : 14
上記メッセージから当該書き込み処理は6つのスレッドにより処理されたことがわかると思います。
ファイルには次のようなメッセージが出力されていると思います。
Hello World-0
Hello World-1
Hello World-2
Hello World-3
Hello World-4
Hello World-5
Hello World-6
Hello World-7
Hello World-8
Hello World-9
次に、上で書き込みを行ったファイルから非同期で読み込む処理を紹介します。
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.EnumSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AsyncFileReadMain { private static final int LINE_SIZE = 10; private static final int READ_BUFFER_SIZE = 14; @SuppressWarnings("unchecked") public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { final Path path = Paths.get("path_to_file"); ExecutorService executorService = Executors.newFixedThreadPool(LINE_SIZE); try (AsynchronousFileChannel channel = AsynchronousFileChannel.open( path, EnumSet.of(StandardOpenOption.READ), executorService)) { System.out.println("Start to read"); ByteBuffer[] buffers = new ByteBuffer[LINE_SIZE]; Future<Integer>[] futures = new Future[LINE_SIZE]; for (int i = 0, position = 0; i < LINE_SIZE; i++, position += READ_BUFFER_SIZE) { buffers[i] = ByteBuffer.allocate(READ_BUFFER_SIZE); futures[i] = channel.read(buffers[i], position); } for(int i = 0; i < LINE_SIZE; i++) { Integer readByteSize = futures[i].get(); System.out.println("Read bytes : " + readByteSize + " Read message : " + new String(buffers[i].array())); } } executorService.shutdown(); executorService.shutdownNow(); } }
書き込み処理同様にtry-with-resourcesブロック内で、AsynchronousFileChannelクラスのopenメソッドを用いてチャネルをオープンしますが、ExecutorServiceを第3引数に渡しています。
このスレッドプール内で生成されたスレッドにより非同期読み込みが行われます。
AsynchronousFileChannelのreadメソッドをにBufferと読み込み開始位置を示す値を渡します。writeメソッド同様にCompletionHandlerを渡すことができるメソッドもありますがここでは省略します。
当該メソッドはFutureオブジェクトを返すので、読み込みループ後に、Future#getメソッドより読み込みが完了した際のバイト数を取得し、bufferに読み込まれたメッセージを出力します。
実行結果は以下のようになると思います。
Start to read
Read bytes : 14 Read message : Hello World-0Read bytes : 14 Read message : Hello World-1
Read bytes : 14 Read message : Hello World-2
Read bytes : 14 Read message : Hello World-3
Read bytes : 14 Read message : Hello World-4
Read bytes : 14 Read message : Hello World-5
Read bytes : 14 Read message : Hello World-6
Read bytes : 14 Read message : Hello World-7
Read bytes : 14 Read message : Hello World-8
Read bytes : 14 Read message : Hello World-9
これだと非同期で読み込まれていることがよくわからないと思うのですが、CompletionHandlerを引数に渡すreadメソッドを実行し書き込み時同様にスレッドのIDを出力すると異なるスレッドにより処理されていることがわかると思います。
このようにJava7では非同期処理が非常に簡単にプログラミング可能です。