HBaseとZookeeperの設定

完全分散モードによるHBaseとZookeeperに関する設定メモです。Hadoopについては以前のブログの内容に従ってインストールされていることが前提です。

今回利用したバージョンはhbase-0.94.11とzookeeper-3.4.5です。なお、hbase-0.94.11のデフォルトのHadoopバージョンは1.0.4で前回は1.1.2を設定する方法を紹介しましたが設定方法は同じです。

なお、今回紹介する方法はhbaseにzookeeperを管理させず、zookeeperノードを単体で起動する方法を紹介します。zookeeperは奇数台必要となります。

[ ]はそれぞれの環境で任意の設定値に読み替えてください。

■Zookeeperの設定
rootユーザでzookeeperを解凍し、ディレクトリのオーナーなどの設定を行います。


su -
cd /usr/local
tar -zxvf zookeeper-3.4.5.tar.gz
chown -R hadoop:hadoop zookeeper-3.4.5/
ln -s zookeeper-3.4.5 zookeeper

zookeeper用のデータディレクトリを作成します。


cd /opt
mkdir zookeeper
chown -R hadoop:hadoop zookeeper/

zookeeperの起動ユーザ(hadoopユーザ)にスイッチしzoo.cfgファイルの設定を行います。


su - hadoop
cd /usr/local/zookeeper/conf
cp -p zoo_sample.cfg zoo.cfg
vi zoo.cfg
/tmp/zookeeper

/opt/zookeeper #先ほど作成したディレクトリに変更
zookeeperが動作するノードリストを以下のように追加
#servers ↓zookeeperが動作するサーバ名とポート番号を設定
server.1=[ホスト名1]:[port1]:[port2]
server.2=[ホスト名2]:[port1]:[port2]
server.3=[ホスト名3]:[port1]:[port2]
...

各zookeeperのサーバででmyidファイルを/opt/zookeeper配下に作成します。


cd /opt/zookeeper
vi myid
1 ←server.1のノードでは1を設定。server.2の場合は2。

iptablesを次のように設定します。ここでは自分のネットワーク内からのアクセスのみを許可するようにします。


su -
cd /etc/sysconfg
vi iptables
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport 2181 -j ACCEPT
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport [zoo.cfgファイルのport1] -j ACCEPT
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport [zoo.cfgファイルのport2] -j ACCEPT

zookeeperの起動・停止・クライアントでのログイン

【起動】


cd /usr/local/zookeeper
./bin/zkServer.sh start

【停止】


./bin/zkServer.sh stop

【クライアントログイン】


./bin/zkCli.sh

■HBaseの設定
rootユーザでhbaseを解凍し、ディレクトリのオーナーなどの設定を行います。


su -
cd /usr/local
tar -zxvf hbase-0.94.11.tar.gz
ln -s hbase-0.94.11 hbase
chown -R hadoop:hadoop hbase-0.94.11

hbase用のディレクトリを作成します。


cd /opt
mkdir hbase
chown -R hadoop:hadoop hbase/

hadoopユーザでhbase/conf配下のhbase-site.xml、hbase-env.sh、regionservers、backup-mastersファイルをそれぞれ次のように編集します。

【hbase-site.xml

<configuration>
 <property>
   <name>hbase.rootdir</name>
   <value>hdfs://[hadoop_hdfsサーバ]:[port]/hbase</value>
   <description>
   ...
   </description>
 </property>
 <property>
   <name>hbase.cluster.distributed</name>
   <value>true</value>
   <description>
   ...
   </description>
 </property>
 <property>
   <name>hbase.tmp.dir</name>
   <value>/opt/hbase</value>
   <description>
   ...
   </description>
 </property>
 <property>
   <name>hbase.zookeeper.quorum</name>
   <value>[zookeeper_server_name1],[zookeeper_server_name1],[zookeeper_server_name1]</value>
   <description>
   ...
   </description>
 </property>
</configuration>

【hbase-env.sh】


export HBASE_PID_DIR=/var/run/hadoop
export HBASE_MANAGES_ZK=false #hbaseでzookeeperを起動しないようにする

【regionservers】
分散させるリージョンサーバ名をすべて記載します。


[regionserver_name1]
[regionserver_name2]
[regionserver_name3]
[regionserver_name4]
[regionserver_name5]
...

【backup-masters】
バックアップ用のHBaseマスタサーバ名を記載します。


[backup_masterserver_name1]

iptablesの設定
マスタサーバとリージョンサーバでそれぞれ以下のように設定します。ここでもローカルネットワークからのアクセスのみを許可するようにします。

【マスタサーバ】


su -
cd /etc/sysconfg
vi iptables
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport 60000 -j ACCEPT
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport 60010 -j ACCEPT

【リージョンサーバ】


su -
cd /etc/sysconfg
vi iptables
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport 60020 -j ACCEPT
-A INPUT -s [IPアドレス]/24 -p tcp -m state --state NEW -m tcp --dport 60030 -j ACCEPT

hbaseクラスタの起動と停止

【起動】


cd /usr/local/hbase
./bin/start-hbase.sh

【停止】


./bin/stop-hbase.sh

長くなりましたが以上です。

LVMのボリューム・グループ名の変更方法

CentOS-6をVirtualboxに特に何も考えずにインストールしたときに気付いたんですが、LVMのボリューム・グループ名にホスト名が付与されるようです。(記憶があいまいだけど、以前のバージョンとかはそうじゃなかった気がする。)

Vitualboxのクローン機能を使ってOSを複製すると、当然ながら複製先のLVMのボリューム・グループ名は複製元のホスト名付の名前のまま。

気持ち悪いので名前の変更を行った時の手順を、ほとんどやることがないから忘れると思ってメモ。

変更方法は以下の通り。ここではvg_hoge1というLVMボリューム・グループ名をvg_hoge2に変更する。

LVMのボリューム・グループ名の確認


#vgs
VG #PV #LV #SN Attr VSize VFree
vg_hoge1 1 2 0 wz--n- 199.51g 0

LVMのボリューム・グループ名の変更


#vgrename vg_hoge1 vg_hoge2
Volume group "vg_hoge1" successfully renamed to "vg_hoge2"

GRUBの設定ファイル修正


cd /boot/grub
cp -p grub.conf grub.conf.org
sed -i 's/vg_hoge1/vg_hoge2/g' grub.conf

/etc/fstabの修正


cd /etc
cp -p fstab fstab.org
sed -i 's/vg_hoge1/vg_hoge2/g' fstab

OS再起動後にdfコマンドなどで確認すると変更後の名前が適用されていると思います。

HadoopのWeb管理画面よりHDFSにアクセスする際のちょっとした設定

前回、「Hadoop-1.1.2を完全分散モードで動作させるまでの設」で環境を構築するととりあえず完全分散モードで動作するようになります。

Hadoopの便利なWeb管理画面なども利用できるようになるのですが、前回の設定のままだとWebからHDFSにアクセスした際にログファイルにスタックトレースが出力されると思います。

Hadoopにはdfs.web.ugiという設定があり、パーミッションに関する設定が有効な場合、デフォルトではこのプロパティ定義されたユーザ(webuser,webgroup)でHDFSにアクセスしているようで、要は権限エラーのような例外が発生するということです。

少し調べてみて、この例外を出さないようにするには、前回の手順で環境設定した場合、この定義をhadoopに変更すると発生しなくなります。

よって全ノードhdfs-site.xmlファイルに以下の設定を追加するとスタックトレースが出力されなくなります。

hdfs-site.xml

...
<pproperty>
    <name>dfs.web.ugi</name>
    <value>hadoop</value>
</property>
...

Hadoop-1.1.2を完全分散モードで動作させるまでの設定

CentOS6にHadoop-1.1.2を設定したのでその時のメモを記載します。
以下の手順は完全分散モードで動作するように設定を行った手順になります。[ ]はそれぞれの環境に合わせて調整してください。
ここでは、SELinuxはtargetedでファイアウォールiptables)は有効にしたままで動作する手順を紹介します。(ちなみにIPv6は無効)

■全環境でユーザ作成とHadoopのインストール、各種設定を実施

hadoopユーザの作成


groupadd hadoop
useradd -g hadoop hadoop
passwd hadoop

hadoop-1.1.2.tar.gzを解凍しオーナをhadoopに設定した後、シンボリックリンクを作成。


cd [path_to_install_dir]
tar -zxvf hadoop-1.1.2.tar.gz
chown -R hadoop:hadoop hadoop-1.1.2/
ln -s hadoop-1.1.2/ hadoop

hadoopユーザにsuしJAVA_HOME、HADOOP_HOMEの設定を行う。


vi .bash_profile
JAVA_HOME=[path_to_jdk]
HADOOP_HOME=[path_to_hadoop_home]
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export HADOOP_HOME
export PATH

Hadoopの設定

$HADOOP_HOME/conf/core-site.xmlの設定

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>[path_to_hadoop_tmp_dir]</value>
    </property>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://[master_host_name]:[port]</value>
    </property>
</configuration>

$HADOOP_HOME/conf/hdfs-site.xmlの設定

<configuration>
    <property>
        <name>dfs.name.dir</name>
        <value>${hadoop.tmp.dir}/dfs/name</value>
    </property>
    <property>
        <name>dfs.data.dir</name>
        <value>${hadoop.tmp.dir}/dfs/data</value>
    </property>
</configuration>

$HADOOP_HOME/conf/mapred-site.xmlの設定

<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>[master_host_name]:[port]</value>
    </property>
</configuration>

$HADOOP_HOME/conf/hadoop-env.shの設定


export JAVA_HOME=[path_to_jdk]
export HADOOP_PID_DIR=[path_to_hadoop_pid_dir] #デフォルトでも良いが、他のサービスに習って/var/run/配下にhadoop用のディレクトリを作成するほうがわかりやすい?

ルートユーザでデータディレクトリとPIDファイル格納ディレクトリを作成


mkdir [path_to_hadoop_tmp_dir]
chown -R hadoop:hadoop [path_to_hadoop_tmp_dir]
mkdir [path_to_hadoop_pid_dir]
chown -R hadoop:hadoop [path_to_hadoop_pid_dir]

■マスタノードでhadoopユーザでssh用の公開キーを作成し各ノードに配布する


ssh-keygen -t rsa -P ""
cd .ssh
cat id_rsa.pub >> authorized_keys
chmod 600 authorized_keys

完全にローカルなネットワーク内であれば上記のような空パスフレーズを設定しても問題ないのですが、そうでない場合はセキュリティ上の問題が発生します。
何か適当なパスフレーズを設定し、毎回ssh-agentを起動してパスフレーズをキャッシュするのもいいのですが、今回はPAMを利用してhadoopユーザでssh可能なホストに制限をかける方法を紹介します。

SSHでPAM認証を利用するように設定し、hadoopユーザによるssh接続はマスタノードの場合は自ノードのみ、スレーブノードの場合はマスタノード及び、自ノードのみからアクセス可能にする。


cd /etc/security
vi access.conf
- : hadoop : ALL EXCEPT [マスタノードのIP] 127.0.0.1 [自ノードのIP(スレーブノードの場合のみ)] #定義追加

vi /etc/pam.d/sshd
account required pam_access.so shadow nullok #定義追加

cd /etc/ssh
vi sshd_config
#PermitEmptyPasswords no

PermitEmptyPasswords yes

UsePAM no

UsePAM yes

こうすることで多少セキュリティは向上すると思います。

なお、tcpwrapperを利用しhosts.denyですべて拒否し、hosts.allowで特定のネットワークおよび、サービスに対する制御を行っている場合、少なくともhosts.allowファイル内にsshdのサービスにlocalhostからの接続を許可するように設定する必要があります。
理由はマスタ側でhadoop起動時にセカンダリのNameNodeを起動する際にHadooplocalhostに対してsshログインを行いセカンダリの起動を行っているようなので、localhostを定義していないとサーバ側で接続が拒否されてしまうからです。


vi /etc/hosts.allow
sshd : [xxx.yyy.zzz.] localhost
or
ALL : [xxx.yyy.zzz.] localhost #簡単に設定するならこちらだが・・・

上記作業が各ノードで完了したらsshdを再起動し、マスタノードで作成した公開キーを各スレーブノードに登録しまます。


service sshd restart

公開キーを各ノードに登録する。


ssh-copy-id -i .ssh/id_rsa.pub hadoop@[slave_node1]
ssh-copy-id -i .ssh/id_rsa.pub hadoop@[slave_node2]
...

■マスターノードのiptablesにとりあえず最低限以下の定義を追加しiptablesを再起動


vi /etc/sysconfig/iptables
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [fs.default.name用のポート] -j ACCEPT
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [mapred.job.tracker用のポート] -j ACCEPT

service iptables restart

■スレーブノードのiptablesにとりあえず最低限以下の定義を追加しiptablesを再起動


vi /etc/sysconfig/iptables
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [dfs.datanode.address用のポート(デフォルト:50010)] -j ACCEPT
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [dfs.datanode.ipc.address用のポート(デフォルト:50020)] -j ACCEPT
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [dfs.datanode.http.address用のポート(デフォルト:50075)] -j ACCEPT
-A INPUT -s [ローカルネットワーク]/24 -p tcp -m state --state NEW -m tcp --dport [mapred.task.tracker.http.address用のポート(デフォルト:50060)] -j ACCEPT

service iptables restart

■マスターノードの$HADOOP_HOME/conf/slavesにslaveノードのホスト名を設定する


cd $HADOOP_HOME/conf
vi slaves
slave_node1
slave_node2
...

■NameNodeのフォーマット処理を行い、hadoopクラスタを起動


cd $HADOOP_HOME/bin
./hadoop namenode -format
./start-all.sh

hadoopユーザでHADOOP_HOMEを設定しているため、HADOOP_HOME環境変数は非推奨云々の警告が出ますが、とりあえず動作上は問題なさそうです。
(0.20のころは出てなかった気がするが、長い間使ってなかったんでいつの間にか変わった?)

ブラウザでhadoopのWeb管理画面(http://[master]:50070)にアクセスし、各ノードが起動されているか確認できます。
念のため各ノードのログファイルも参照しエラーが発生していないか確かめたほうがいいと思います。

マスタノードではNameNode、SecondaryNameNode、JobTrackerの3プロセスが、スレーブノードではDataNode、TaskTrackerの2プロセスが起動すればまずは問題ないと思います。

停止する場合は以下のコマンドで停止させるのが簡単です。


cd $HADOOP_HOME/bin
./stop-all.sh

これで準備完了です。あとはファイルディスクリプタの設定(/etc/security/limits.conf)やカーネルパラメータのチューニング(/etc/sysctl.conf)など、必要に応じて行えばいいと思います。

この辺についてはいろいろなサイトや、書籍を参考にすればいいので省略します。

なお、sshのアクセス制御などセキュリティにかかわる設定が関係しているため、参考にする場合は利用する環境に合わせて自己責任でお願いします。
(結構面倒なので完全にプライベートなネットワークで動作させることをおススメします。)

JavaEE7が正式に発表されたようです

JavaEE7が正式に発表されたようですね。

個人的な興味はWebSocketとConcurrency Utilities for JavaEEかな。

NetBeans7.3.1も早速リリースされているようなのですぐにでも試せそうです。

JBossASの次期バージョンWildfly8のApha1もリリースされているようだし、またやることが増えそうです。

PostgreSQL-9.2のレプリケーション構成における可用性の確認

■基本構成とシナリオ


master -----> slave1
| sync
|
+--------> slave2
potencial

slave1がダウンするとslave2は自動的に同期モードになるように設定。
以前の記事ではレプリケーションの設定でslave2を非同期(async)に設定しました。slave2をpotencialにするにはマスタのpostgresql.confのsynchronous_standby_namesにslave2を追加。


synchronous_standby_names='slave1,slave2'


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave1 | streaming | sync
slave2 | streaming | potencial

システム構成が上記構成であることを前提に、今回は以下3つの障害シナリオに沿って、PostgreSQL9.2のレプリケーション構成時の可用性の確認とその運用についてみていきます。

1.slave1のノードダウン時はslave2が自動的に同期モードによる接続となること。
2.masterノードダウン後にslave1をマスタに昇格させ、新マスタslave1を停止させることなくslave2と接続可能であること。
3.障害から復旧した旧マスタが新マスタslave1に(slave1を停止させることなく)接続可能であること。

■テスト用のテーブルを準備
各ステップの途中に適当にデータを投入しリカバリの状態を確認するために適当なテーブルを準備

【例】


CREATE TABLE TEST_TBL (ID INTEGER PRIMARY KEY, COL1 VARCHAR(16));

■slave1をダウンさせてslave2が同期モードに昇格することを確認
(1)slave1のサーバでimmediateモードで疑似的にクラッシュした状態で停止させる。


pg_ctl stop -m immediate -D /var/lib/pgsql/9.2/data

(2)masterでレプリケーションの状態を確認


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave2 | streaming | sync

(3)masterサーバで事前に準備しておいたテーブルに適当なデータを投入しmasterとslave2に反映されることを確認

(4)slave1を起動しmasterでレプリケーションの状態を確認


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave1 | streaming | sync
slave2 | streaming | potencial

(5)slave1で停止中に追加されたデータが反映されていることを確認

■masterをダウンさせslave1をマスタサーバへ昇格させる
これ以降の作業を行う前に念のため全ノードのデータベースを停止し、/var/lib/pgsql/9.2/dataディレクトリのバックアップを取得することをお勧めします。
なお、今回はpg_basebackupを利用した方法でリカバリを行います。

(1)masterでレプリケーションの状態を確認


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave1 | streaming | sync
slave2 | streaming | potential

(2)masterノードを停止し、slave1をマスタに昇格
masterサーバで下記コマンドにより擬似的にクラッシュさせる


pg_ctl stop -m immediate -D /var/lib/pgsql/9.2/data

(3)slave1のサーバをマスタに昇格(slave1のサーバで実施)


pg_ctl -D /var/lib/pgsql/9.2/data promote

■pg_basebackupによりslave2のリカバリを行い新マスタslave1に同期モードで接続する
事前にpostgresql.conf、pg_hba.conf、recovery.confは退避させておく。

(1)slave2のデータベースを停止

(2)slave2の/var/lib/pgsql/9.2/dataディレクトリの削除

(3)pg_basebackupよりslave1(新マスタ)のベースバックアップを取得し/var/lib/pgsql/9.2/dataに展開


pg_basebackup -h [slave1サーバ] -p [ポート] -U [レプリケーションユーザ] -D /var/lib/pgsql/9.2/data --xlog --progress --verbose

(4)退避させておいたpostgresql.conf、pg_hba.conf、recovery.confをdataディレクトリに戻し、recovery.confのprimary_conninfoのhostをslave1に変更後slave2のデータベースを再起動

(5)slave1でレプリケーションの状態を確認する


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave2 | streaming | async

(6)slave1のpostgresql.confのsynchronous_standby_namesにslave2を追加し設定の再読み込みを行う


pg_ctl -D /var/lib/pgsql/9.2/data reload

(7)slave1とslave2が同期モードでレプリケーションされていることを確認


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave2 | streaming | sync

(8)slave1のテーブルに適当なデータを投入してslave2に反映されることを確認

■ダウンした元マスタを復旧させ新マスタslave1に接続する
旧マスタのpostgresql.conf、pg_hba.confは事前に退避させておく。recovery.confはダウンするまではマスタであったため存在しない。

(1)masterの/var/lib/pgsql/9.2/dataディレクトリの削除

(2)pg_basebackupよりslave1(新マスタ)のベースバックアップを取得し/var/lib/pgsql/9.2/dataに展開


pg_basebackup -h [slave1サーバ] -p [ポート] -U [レプリケーションユーザ] -D /var/lib/pgsql/9.2/data --xlog --progress --verbose

(3)退避させたpostgresql.confとpg_hba.confを/var/lib/pgsql/9.2/data配下に戻す

(4)postgresql.confのsynchronous_standby_namesをコメントアウト、hot_standby = onとなっていない場合は合わせて設定を行う

(5)recovery.doneファイルをrecovery.confにリネームし、primary_conninfoのhostをslave1に、application_nameをmasterに設定した後、masterを再起動

(6)新マスタslave1でレプリケーションの状態を確認する


select application_name, state, sync_state from pg_stat_replication;
application_name | state | sync_state
------------------+-----------+------------
slave2 | streaming | sync
master | streaming | async

(7)slave1のpostgresql.confのsynchronous_standby_namesを以下のように設定し、再読み込みを行う。


synchronous_standby_names = 'master,slave2'


pg_ctl -D /var/lib/pgsql/9.2/data reload

(8)新マスタslave1で再度レプリケーションの状態を確認


select application_name, state, sync_priority, sync_state from pg_stat_replication;
application_name | state | sync_priority | sync_state
------------------+-----------+---------------+------------
slave2 | streaming | 2 | potential
master | streaming | 1 | sync

後から復旧してきた旧マスタが優先順位が上がり同期モードになり、slave2はpotencialに優先度が下がっています。

これは、synchronous_standby_namesに定義した順番が関係しています。

もし、synchronous_standby_names = 'slave2,master'と定義するとレプリケーションの状態は次のようになります。


select application_name, state, sync_priority, sync_state from pg_stat_replication;
application_name | state | sync_priority | sync_state
------------------+-----------+---------------+------------
slave2 | streaming | 1 | sync
master | streaming | 2 | potential

(9)新マスタslave1で適当なデータを投入するとレプリケーションをしている全ノードにデータが反映されることを確認

■まとめ
3台構成でレプリケーションをさせた状態で、各シナリオを検証しました。当たり前ですが全ノードがダウンしない限り最低1台で運転が可能です。
実運用ではPacemaker+Heartbeat(or Corosync)+pgpool-IIあたりを利用した運用が一般的かと思いますが、今回は動作確認が主なのでそういったソフトウェアは利用していません。

WatchServiceを利用したIOイベントハンドリング

Java7ではWatchServiceを利用して特定のディレクトリ内のイベントを監視するプログラムを簡単に記述することが可能です。

用途としては常駐プロセス系のプログラムで特定ディレクトリのファイル作成、更新、削除などのイベントをハンドリングし任意の処理を行うなど。
(たとえば設定ファイルを監視しておいて、更新されたらその内容を読み取り反映を行う、もしくは特定のファイルが作成されたことをトリガにして任意の処理を開始するといったことなど。)

実際の利用方法はFileSystemクラスのnewWatchService()メソッドよりWatchServiceを取得し、捕捉したいイベント(WatchEvent)とWatchServiceオブジェクトを監視対象となるディレクトリのPathオブジェクトに登録します。

その後は、どういったアプリケーションかにもよりますが、例えば無限ループなどでWatchServiceのtakeメソッドを呼び出し、イベントがキューイングされるまで待機し、イベント発生後takeメソッドの戻り値を利用して特定の処理を行うといったところかと思います。

【サンプルプログラム】

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;

public class Main {

    private static volatile boolean stop = false;

    public static void main(String[] args) {
        FileSystem fileSystem = FileSystems.getDefault();
        // 監視対象ディレクトリのPathを
        Path path = fileSystem.getPath("path_to_dir");
        try {
            // WatchServiceの取得
            WatchService watchService = fileSystem.newWatchService();
            WatchEvent.Kind<?>[] events = {
                    StandardWatchEventKinds.ENTRY_CREATE,
                    StandardWatchEventKinds.ENTRY_MODIFY,
                    StandardWatchEventKinds.ENTRY_DELETE
            };
            // 捕捉したいイベント種別とWatchServiceをPathに登録
            path.register(watchService, events);
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    stop = false;
                }
            });
            while (!Thread.currentThread().isInterrupted() && !stop) {
                System.out.println("Watching...");
                try {
                    // イベントが発生するまで待機
                    WatchKey watchKey = watchService.take();
                    String watchableName = watchKey.watchable().toString();
                    System.out.println("Watchable : " + watchableName);
                    
                    if (watchKey.isValid()) {
                        for (WatchEvent<?> event : watchKey.pollEvents()) {
                            System.out.println("イベント種別 : " + event.kind());
                            System.out.println("対象コンテンツ : " + event.context());
                            System.out.println("イベント回数 : " + event.count());
                            Path targetPath = FileSystems.getDefault().getPath(watchableName + "\\" + event.context());
                            processContent(targetPath, event);
                            System.out.println();
                        }
                        if (!watchKey.reset()) {
                            // Is this right ?
                            System.out.println("The watch key might be invalid.");
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("See you.");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    private static final void processContent(final Path targetPath, final WatchEvent<?> watchEvent) {
        // コンテンツがファイルかつ更新イベントの場合コンテンツ内容を出力
        if (Files.isRegularFile(targetPath)) {
            if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
                try {
                    if (Files.size(targetPath) > 0) {
                        try(BufferedReader reader = Files.newBufferedReader(targetPath, Charset.defaultCharset())) {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                System.out.println(line);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

上記サンプルはループ内でイベントが発生するまで待機し、ファイル作成、削除の場合はその旨イベントが発生したことを表示し、更新された場合は更新内容も出力します。

サンプルプログラムなのでかなり適当に作成していますが、挙動を確かめるには十分かと。。。