0. 原生 ZOOKEEPER JAVA API  http://www.cnblogs.com/rocky-fang/p/9030438.html

1. 概述

Curator采用cache封装对事件的监听,包括监听节点、子节点。主要有:

NodeCache、PathChildrenCache、TreeCache

 

2. 例子

2.1 NodeCache

监听节点本身的变化,当节点的状态发生变更后,回调NodeCacheListener

代码

package com.rocky.learn.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

import javax.xml.ws.soap.Addressing;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/14.
 */
public class NodeCacheTest {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final String ADDRESS = "10.0.40.10:2181";
    private static final String PREFIX_SYNC = "/mytest-curator";
    private static final String NAMESPACE = "mybase";
    private static CuratorFramework client;
    private static NodeCache nodeCache;
    static {
//        client = CuratorFrameworkFactory.newClient(ADDRESS, 5000, 5000,
//                new ExponentialBackoffRetry(1000, 3));
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace(NAMESPACE)
                .build();
        client.start();
    }
    private static void initCache() throws Exception {
        client.create().forPath(PREFIX_SYNC);
        client.setData().forPath(PREFIX_SYNC,"hello curator..".getBytes());
        nodeCache = new NodeCache(client, PREFIX_SYNC);
        nodeCache.start(true);
        startCache(nodeCache);
    }

    private static void startCache(final NodeCache nodeCache) throws Exception {
        ChildData currentData = nodeCache.getCurrentData();
        System.out.println("1111:" + new String(currentData.getData()));
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("data change..." + new String(nodeCache.getCurrentData().getData()));
                countDownLatch.countDown();
            }
        });
        Thread.sleep(2000);
        if(client.checkExists().forPath(PREFIX_SYNC) != null){
            System.out.println("设置新内容。。。。");
            client.setData().forPath(PREFIX_SYNC, "2222".getBytes());
        }
    }

    public static void main(String[] args) throws Exception {
        initCache();
        countDownLatch.await();
    }
}
View Code

相关文章: