0. 原生 ZOOKEEPER JAVA API http://www.cnblogs.com/rocky-fang/p/9030438.html
1. 概述
Curator采用cache封装对事件的监听,包括监听节点、子节点。主要有:
NodeCache、PathChildrenCache、TreeCache
2. 例子
2.1 NodeCache
监听节点本身的变化,当节点的状态发生变更后,回调NodeCacheListener
代码
package com.rocky.learn.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import javax.xml.ws.soap.Addressing; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * @Author: rocky * @Date: Created in 2018/5/14. */ public class NodeCacheTest { private static final CountDownLatch countDownLatch = new CountDownLatch(1); private static final String ADDRESS = "10.0.40.10:2181"; private static final String PREFIX_SYNC = "/mytest-curator"; private static final String NAMESPACE = "mybase"; private static CuratorFramework client; private static NodeCache nodeCache; static { // client = CuratorFrameworkFactory.newClient(ADDRESS, 5000, 5000, // new ExponentialBackoffRetry(1000, 3)); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace(NAMESPACE) .build(); client.start(); } private static void initCache() throws Exception { client.create().forPath(PREFIX_SYNC); client.setData().forPath(PREFIX_SYNC,"hello curator..".getBytes()); nodeCache = new NodeCache(client, PREFIX_SYNC); nodeCache.start(true); startCache(nodeCache); } private static void startCache(final NodeCache nodeCache) throws Exception { ChildData currentData = nodeCache.getCurrentData(); System.out.println("1111:" + new String(currentData.getData())); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println("data change..." + new String(nodeCache.getCurrentData().getData())); countDownLatch.countDown(); } }); Thread.sleep(2000); if(client.checkExists().forPath(PREFIX_SYNC) != null){ System.out.println("设置新内容。。。。"); client.setData().forPath(PREFIX_SYNC, "2222".getBytes()); } } public static void main(String[] args) throws Exception { initCache(); countDownLatch.await(); } }