Zookeeper客户端Curator使用详解

1.选取静态工程措施成立客户端

二个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包罗多个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

分布式long计数器—DistributedAtomicLong

再看三个Long类型的计数器。 除了计数的界定比SharedCount大了之外,
它首先尝试选拔乐观锁的法门设置计数器,
假使不成事(比如时期计数器已经被其余client更新了),
它使用InterProcessMutex格局来更新计数值。

能够从它的中间贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一比比皆是的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须自小编批评重回结果的succeeded(), 它表示此操作是还是不是中标。
即使操作成功, preValue()意味着操作前的值,
postValue()表示操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

利用Curator也得以简化Ephemeral Node
(权且节点)的操作。Curator也提供ZK Recipe的分布式队列完成。 利用ZK的
PEKugaSISTENTS_EQUENTIAL节点,
可以确认保证放入到行列中的项目是依据顺序排队的。
假使单纯的消费者从队列中取数据, 那么它是先入先出的,那也是队列的性情。
假若您严俊需求顺序,你就的利用单一的买主,能够利用Leader公投只让Leader作为唯一的消费者。

不过, 依照Netflix的Curator笔者所说,
ZooKeeper真心不相符做Queue,大概说ZK没有完结贰个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实践中ZNode必须相对较小,而队列包蕴众多的消息,相当大。
  2. 比方有好多节点,ZK运行时非常快。 而使用queue会导致众多ZNode.
    你需求精通增大 initLimit 和 syncLimit.
  3. ZNode极大的时候很难清理。Netflix不得不成立了3个特地的次第做那事。
  4. 当非常大方的盈盈众多的子节点的ZNode时, ZK的性质变得倒霉
  5. ZK的数据库完全放在内部存储器中。 大批量的Queue意味着会占有很多的内存空间。

即便, Curator还是创制了各个Queue的兑现。
假使Queue的数据量不太多,数据量不太大的情景下,酌情考虑,依旧得以选择的。

事务

CuratorFramework的实例包蕴inTransaction(
)接口方法,调用此方法开启三个ZooKeeper事务. 能够复合create, setData,
check, and/or delete
等操作然后调用commit()作为3个原子操作提交。三个例证如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

Tree Cache

Tree
Cache能够监督整个树上的具有节点,类似于PathCache和NodeCache的重组,重要涉及到上边八个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的轩然大波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中并未应用Thread.sleep(10),不过事件触发次数也是常规的。

注意:TreeCache在开首化(调用start()情势)的时候会回调TreeCacheListener实例七个事TreeCache伊夫nt,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有或者导致空指针至极,那里应该主动处理并防止那种场合。

Leader选举

在分布式计算中, leader elections是很要紧的三个效能,
那几个大选进程是那样子的: 指派一个经过作为组织者,将职责分发给各节点。
在职分早先前,
哪个节点都不了然何人是leader(领导者)只怕coordinator(协调者).
当公投算法初始进行后, 每种节点最后会博得2个唯一的节点作为职分leader.
除此而外,
选举还每每会时有产生在leader意外宕机的情事下,新的leader要被公投出来。

在zookeeper集群中,leader负责写操作,然后经过Zab磋商落实follower的共同,leader或然follower都能够拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前者是怀有存活的客户端不间断的轮流做Leader,梅州社会。后者是一旦公投出Leader,除非有客户端挂掉重新触发大选,不然不会交出领导权。某党?

简介

Curator是Netflix企业开源的一套zookeeper客户端框架,化解了好多Zookeeper客户端万分底层的底细开发工作,包含延续重连、反复注册沃特cher和NodeExistsException极度等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的原故是相比较有意思的,上边的一对摘抄自《从PAXOS到ZOOKEEPE卡宴分布式一致性原理与履行》一书:
Zookeeper最早起点于雅虎的斟酌院的1个商讨小组。在立刻,商讨人士发现,在雅虎内部很多大型的种类需求借助一个接近的体系实行分布式协调,不过这个系统往往存在分布式单点难题。所以雅虎的开发人士就试图开发三个通用的无单点难题的分布式协调框架。在立项初期,考虑到很多品类都是用动物的名字来定名的(例如盛名的Pig项目),雅虎的工程师希望给那些种类也取三个动物的名字。时任切磋院的上位物史学家Raghu
Ramakrishnan开玩笑说:再如此下来,大家那儿就改为动物园了。此话一出,大家纷纭表示就叫动物园管理员吧——因为各种以动物命名的分布式组件放在一起,雅虎的成套分布式系统看上去就如1个特大型的动物园了,而Zookeeper正好用来展开分布式环境的和谐——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”恐怕”管理者”,不掌握是还是不是付出小组有意而为之,笔者测度有或许这样命名的原故是验证Curator正是Zookeeper的馆长(脑洞有点大:Curator正是动物园的园长)。
Curator包括了多少个包:
curator-framework:对zookeeper的底部api的部分装进
curator-client:提供部分客户端的操作,例如重试策略等
curator-recipes:卷入了一些高档本性,如:Cache事件监听、公投、分布式锁、分布式计数器、分布式Barrier等
Maven正视(使用curator的本子:2.12.0,对应Zookeeper的版本为:3.4.x,假若跨版本会有包容性难题,很有也许造成节点操作失败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Curator食谱(高级本性)

提拔:首先你必须添加curator-recipes正视,下文仅仅对recipes一些特色的使用实行解释和举例,不打算展开源码级其他追究

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

第③提示:强烈推荐使用ConnectionStateListener监察和控制连接的事态,当连接情况为LOST,curator-recipes下的兼具Api将会失灵恐怕逾期,就算后面全体的例子都未曾应用到ConnectionStateListener。

自作者批评节点是或不是留存

client.checkExists().forPath("path");

留意:该办法重返1个Stat实例,用于检查ZNode是还是不是存在的操作.
能够调用额外的不二法门(监察和控制恐怕后台处理)并在结尾调用forPath(
)钦命要操作的ZNode

LeaderLatch

LeaderLatch有八个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

比方运转,LeaderLatch会和其余使用相同latch
path的别的LeaderLatch交涉,然后里面多个终极会被推选为leader,可以透过hasLeadership方法查看LeaderLatch实例是还是不是leader:

leaderLatch.hasLeadership( ); //再次回到true表明当前实例是leader

接近JDK的CountDownLatch,
LeaderLatch在乞求成为leadership会block(阻塞),一旦不应用LeaderLatch了,必须调用close方法。
假设它是leader,会自由leadership, 别的的加入者将会选出一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

十三分处理:
LeaderLatch实例可以扩张ConnectionStateListener来监听网络连接难题。 当
SUSPENDED 或 LOST 时,
leader不再认为自个儿恐怕leader。当LOST后一而再重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新成立一个。LeaderLatch用户必须考虑导致leadership丢失的总是难点。
强烈推荐你利用ConnectionStateListener。

三个LeaderLatch的运用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

能够添加test module的正视方便进行测试,不须要运维真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我们创造了1二个LeaderLatch,运营后它们中的一个会被推举为leader。
因为大选会开支一些时光,start后并不能够及时就拿走leader。
通过hasLeadership翻开自身是还是不是是leader, 如若是的话再次来到true。
能够经过.getLeader().getId()能够拿走当前的leader的ID。
只可以经过close自由当前的领导权。
await是二个封堵方法, 尝试获取leader地位,不过未必能上位。

Zookeeper客户端Curator使用详解

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在测算的起来和甘休时手拉手。当丰硕的过程进入到双栅栏时,进度始起盘算,
当总结完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()主意被调用时,成员被打断,直到全体的分子都调用了enter()
leave()方法被调用时,它也打断调用线程,直到全体的积极分子都调用了leave()
就像是百米赛跑竞赛, 发令枪响,
全数的健儿先河跑,等富有的运动员跑过极端线,竞赛才甘休。

DistributedDoubleBarrier会监察和控制连接意况,当连接断掉时enter()leave()方法会抛出格外。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPE揽胜分布式一致性原理与实践》
《 跟着实例学习ZooKeeper的用法》博客连串

类型仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,相比便利导航到各样章节,只是简书不协助,本文的MD原来的小说放在项目标/resources/md目录下,有爱自取,著功用Typora编写,提议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
自个儿是throwable,在墨尔本奋斗,白天上班,上午和双休不定时加班,早晨没事坚韧不拔写下博客。
梦想小编的小说能够给您带来收获,共勉。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是三个锁的器皿。 当调用acquire()
全数的锁都会被acquire(),要是请求战败,全数的锁都会被release。
同样调用release时享有的锁都被release(战败被忽视)。
基本上,它就是组锁的代表,在它上边的乞求释放操作都会传递给它包括的有着的锁。

根本涉嫌八个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数要求包罗的锁的会晤,也许一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建3个InterProcessMultiLock, 包罗四个重入锁和三个非重入锁。
调用acquire()后能够看到线程同时具备了那三个锁。
调用release()观看那八个锁都被假释了。

最终再反复2次,
强烈推荐使用ConnectionStateListener监察和控制连接的气象,当连接情形为LOST,锁将会丢掉。

分布式int计数器—SharedCount

这些类应用int类型来计数。 首要涉嫌四个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount意味着计数器,
能够为它增添1个SharedCountListener,当计数器改变时此Listener能够监听到改变的风浪,而SharedCountReader能够读取到最新的值,
包含字面值和带版本新闻的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这一个事例中,大家运用baseCount来监听计数值(addListener艺术来添加SharedCountListener
)。 任意的SharedCount, 只要选用同一的path,都足以获取这一个计数值。
然后大家使用三个线程为计数值增加1个10以内的随机数。相同的path的SharedCount对计数值举行变更,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此处大家使用trySetCount去设置计数器。
率先个参数提供当前的VersionedValue,倘使时期别的client更新了此计数值,
你的换代恐怕不成功,
不过此时你的client更新了流行的值,所以战败了您能够尝试再更新一回。
setCount是挟持更新计数器的值

小心计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

DistributedBarrier

DistributedBarrier类完结了栅栏的功用。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

率先你须要安装栅栏,它将卡住在它上边等待的线程:

setBarrier();

然后供给阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则知足时,移除栅栏,全数等待的线程将继续执行:

removeBarrier();

可怜处理 DistributedBarrier
会监察和控制连接情状,当连接断掉时waitOnBarrier()方法会抛出十一分。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

这一个例子制造了controlBarrier来设置栅栏和移除栅栏。
大家创造了五个线程,在此巴里r上等待。
最终移除栅栏后具备的线程才继续执行。

假若你起首不设置栅栏,全部的线程就不会阻塞住。

创立会话

3.创办包括隔开分离命名空间的对话

为了完结分裂的Zookeeper业务之间的割裂,要求为各类业务分配1个独立的命名空间(NameSpace),即钦赐四个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(上边包车型地铁例子)当客户端内定了独自命名空间为“/base”,那么该客户端对Zookeeper上的数量节点的操作都以根据该目录举办的。通过设置Chroot能够将客户端应用与Zookeeper服务端的一课子树相对应,在多少个使用共用3个Zookeeper集群的情景下,那对于贯彻分歧应用之间的互动隔离13分有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

开创数量节点

Zookeeper的节点创建方式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带连串号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:最近并且带类别号

**创办多个节点,初叶内容为空 **

client.create().forPath("path");

专注:假若没有设置节点属性,节点创建情势暗中同意为持久化节点,内容暗中认可为空

创办一个节点,附带早先化内容

client.create().forPath("path","init".getBytes());

始建一个节点,钦点创立方式(一时半刻节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创办一个节点,钦定成立格局(目前节点),附带开头化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

始建1个节点,钦点创制格局(一时节点),附带起始化内容,并且自动递归创造父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

以此creatingParentContainersIfNeeded()接口十一分有用,因为一般情形开发人士在开创3个子节点必须认清它的父节点是或不是留存,假诺不设有直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够活动递归创设全体所需的父节点。

分布式锁

美学原理,提醒:

1.推荐使用ConnectionStateListener监察和控制连接的场所,因为当连接LOST时您不再持有锁

2.分布式的锁全局同步,
这意味着任何五个时日点不会有四个客户端都拥有一致的锁。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是大局可知的, 客户端都得以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同三个客户端在装有锁的还要,能够频仍拿走,不会被堵塞。
它是由类InterProcessMutex来贯彻。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()获得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()艺术释放锁。 InterProcessMutex 实例能够引用。

Revoking ZooKeeper recipes wiki定义了可商榷的撤销机制。
为了打消mutex, 调用上边包车型地铁办法:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

倘诺您请求撤除当前的锁,
调用attemptRevoke()办法,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

一遍提示:错误处理
依然强烈推荐你利用ConnectionStateListener处理连接情形的变更。
当连接LOST时你不再持有锁。

率先让大家创设四个模拟的共享能源,
那几个资源期望只可以单线程的拜访,不然会有出现难点。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来成立三个InterProcessMutexDemo类, 它肩负请求锁,
使用能源,释放锁那样一个整机的拜访进程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简单,生成拾贰个client, 每一种client重复执行13回请求锁–访问财富–释放锁的长河。每种client都在单身的线程中。
结果能够看到,锁是不管三七二十一的被种种实例排他性的运用。

既然如此是可选择的,你能够在2个线程中数十次调用acquire(),在线程拥有锁时它总是回到true。

您不应有在八个线程中用同3个InterProcessMutex
你能够在种种线程中都生成二个新的InterProcessMutex实例,它们的path都同一,那样它们得以共享同二个锁。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方面包车型大巴队列类似,而是能够为队列中的每二个要素设置二个ID
能够由此ID把队列中自由的要素移除。 它事关多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

因而下边方法创造:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在这些事例中,
有些成分还不曾被消费者消费前就移除了,那样顾客不会吸收删除的音讯。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

更新数据节点数据

履新3个节点的数据内容

client.setData().forPath("path","data".getBytes());

小心:该接口会回到七个Stat实例

立异三个节点的多少内容,强制内定版本进行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个特定的节点。它关系到上边包车型大巴三个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:使用cache,依旧要调用它的start()主意,使用完后调用close()方法。

getCurrentData()将收获节点当前的状态,通过它的状态能够拿走当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以身作则中的Thread.sleep(10)能够注释,可是注释后事件监听的触发次数会不全,那可能与NodeCache的达成原理有关,不可能太过频仍的触及事件!

注意:NodeCache只可以监听贰个节点的意况变化。

缓存

Zookeeper原生援救通过挂号沃特cher来开始展览事件监听,然则开发者须要频繁注册(沃特cher只可以单次注册单次使用)。Cache是Curator中对事件监听的包裹,能够当做是对事件监听的本土缓存视图,能够活动为开发者处理反复注册监听。Curator提供了二种沃特cher(Cache)来监听结点的变更。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。三个读写锁管理一对有关的锁。三个担负读操作,此外二个担当写操作。读操作在写锁没被应用时可同时由多少个过程使用,而写锁在动用时不允许读(阻塞)。

此锁是可重入的。一个富有写锁的线程可重入读锁,可是读锁却无法进入写锁。那也代表写锁能够降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是可怜的。

可重入读写锁主要由八个类完成:InterProcessReadWriteLockInterProcessMutex。使用时首先创设一个InterProcessReadWriteLock实例,然后再根据你的须要得到读锁或然写锁,读写锁的品类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

分布式屏障—Barrier

分布式Barrier是这般三个类:
它会阻塞全数节点上的等候进程,直到某二个被满意,
然后具备的节点继续开始展览。

例如赛马竞赛中, 等赛马陆续驶来起跑线前。
一声令下,全部的赛马都飞奔而出。

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper能够兑现二个集群共享的计数器。
只要利用同样的path就能够得到最新的计数器值,
那是由ZooKeeper的一致性保险的。Curator有三个计数器,
3个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

信号量—Shared Semaphore

二个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有三种方法能够控制semaphore的最大租约数。第贰种艺术是用户给定path并且钦命最大LeaseSize。第三种方法用户给定path并且动用SharedCountReader类。借使不行使SharedCount里德r,
必须保险全部实例在多进度中接纳同一的(最大)租约数量,不然有恐怕出现A进程中的实例持有最大租约数量为10,不过在B进程中兼有的最大租约数量为20,此时租约的含义就失效了。

这一次调用acquire()会回来2个租约对象。
客户端必须在finally中close那么些租约对象,不然这个租约会丢失掉。 可是,
但是,借使客户端session由于某种原因比如crash丢掉,
那么这个客户端持有的租约会自动close,
那样任何客户端能够三番五次使用这几个租约。 租约还足以因此上边的不二法门返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

留神你能够1回性请求多少个租约,假诺Semaphore当前的租约不够,则请求线程会被封堵。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的重点类包含上面几个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

先是大家先拿走了四个租约, 最终大家把它还给了semaphore。
接着请求了三个租约,因为semaphore还有多少个租约,所以恳请能够满足,重回一个租约,还剩八个租约。
然后再请求八个租约,因为租约不够,卡住到过期,依然没能满意,重临结果为null(租约不足会阻塞到过期,然后回来null,不会积极性抛出十一分;尽管不设置超时时间,会雷同阻塞)。

上面说讲的锁都以同仁一视锁(fair)。 总ZooKeeper的角度看,
每一种客户端都遵照请求的顺序得到锁,不设有非公平的侵夺的地方。

Curator的基本Api

开头客户端

当创设会话成功,获得client的实例然后可以一直调用其start( )方法:

client.start();

数量节点操作

SimpleDistributedQueue

日前即便实现了各类队列,可是你放在心上到没有,那些队列并不曾兑现类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是尚未落到实处Queue接口)。
创造很简单:

public SimpleDistributedQueue(CuratorFramework client,String path)

充实元素:

public boolean offer(byte[] data) throws Exception

去除成分:

public byte[] take() throws Exception

除此以外还提供了别的情势:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take艺术在功成名就再次来到在此以前会被卡住。
poll办法在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

可是实际发送了100条消息,消费完第叁条之后,前边的音讯不只怕消费,近年来没找到原因。查看一下法定文书档案推荐的demo使用下边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过事实上行使发现还是存在消费阻塞难点。

异步接口

上边提到的创办、删除、更新、读取等方法都是3头的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次回到的结果音信。BackgroundCallback接口中三个根本的回调值为Curator伊夫nt,里面富含事件类型、响应吗和节点的详细音讯。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

2个异步创造节点的例证如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦命executor,那么会暗中认可使用Curator的伊芙ntThread去举办异步处理。

读取数据节点数据

读取一个节点的数量内容

client.getData().forPath("path");

留意,此办法返的重返值是byte[ ];

读取3个节点的多寡内容,同时获得到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

Path Cache

Path Cache用来监督叁个ZNode的子节点. 当多少个子节点扩大, 更新,删除时,
Path Cache会改变它的状态, 会包罗最新的子节点,
子节点的数量和景观,而气象的更变将透过帕特hChildrenCacheListener公告。

实则采用时会涉及到多少个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过上面包车型的士构造函数创立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start艺术,使用完后调用close方法。
可以设置StartMode来完成运转的形式,

StartMode有上边两种:

  1. NO奥迪Q3MAL:平常初步化。
  2. BUILD_INITIAL_CACHE:在调用start()以前会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache开始化数据后发送一个PathChildrenCache伊芙nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能够追加listener监听缓存的变动。

getCurrentData()办法重返多个List<ChildData>指标,能够遍历全体的子节点。

安装/更新、移除其实是运用client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:若果new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将再次来到null,cache将不会缓存节点数据。

注意:示范中的Thread.sleep(10)能够注释掉,但是注释后事件监听的触发次数会不全,那只怕与PathCache的落到实处原理有关,不能够太过频仍的触及事件!

不行重入共享锁—Shared Lock

这些锁和地点的InterProcessMutex相比较之下,就是少了Reentrant的职能,也就表示它不可能在同1个线程中重入。那么些类是InterProcessSemaphoreMutex,使用方法和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运作后发觉,有且唯有二个client成功获取第三个锁(第二个acquire()措施重返true),然后它和谐过不去在其次个acquire()办法,获取第三个锁超时;其余具备的客户端都阻塞在首先个acquire()形式超时并且抛出非凡。

这么也就表明了InterProcessSemaphoreMutex贯彻的锁是不可重入的。

收获有些节点的全体子节点路径

client.getChildren().forPath("path");

只顾:该格局的重临值为List<String>,得到ZNode的子节点Path列表。
能够调用额外的办法(监察和控制、后台处理恐怕取得状态watch, background or get
stat) 并在终极调用forPath()钦点要操作的父ZNode

分布式队列—DistributedQueue

DistributedQueue是最平凡的一种队列。 它陈设以下多个类:

  • QueueBuilder – 创设队列使用QueueBuilder,它也是别的队列的创立类
  • QueueConsumer – 队列中的音讯消费者接口
  • QueueSerializer –
    队列消息体系化和反种类化接口,提供了对队列中的对象的类别化和反类别化
  • DistributedQueue – 队列达成类

QueueConsumer是消费者,它可以接收队列的多少。处理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。

健康情况下先将音信从队列中移除,再交给消费者消费。但那是七个步骤,不是原子的。能够调用Builder的lockPath()消费者加锁,当消费者消费数据时拥有锁,那样任何消费者不可能消费此消息。固然消费战败或许经过死掉,音讯能够提交其余进度。那会带来一点天性的损失。最好恐怕单消费者情势应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了三个分布式队列和四个顾客,因为PATH是一律的,会设有消费者抢占消费音讯的情形。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不通晓你是还是不是熟练。
DistributedDelayQueue也提供了近乎的效果, 成分有个delay值,
消费者隔一段时间才能接受成分。 涉及到上面多个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经过上边包车型大巴语句创立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时方可钦点delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离以后的三个日子距离,
比如20皮秒,而是今后的2个光阴戳,如 System.currentTimeMillis() + 10秒。
假设delayUntilEpoch的时光已经过去,音信会立马被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

除去数据节点

删除1个节点

client.delete().forPath("path");

专注,此措施只好去除叶子节点,不然会抛出特别。

除去三个节点,并且递归删除其抱有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

剔除三个节点,强制钦点版本进行删除

client.delete().withVersion(10086).forPath("path");

删去三个节点,强制保障删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是贰个维持措施,只要客户端会话有效,那么Curator会在后台持续进行删减操作,直到删除节点成功。

注意:上面包车型客车八个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

预先级分布式队列—DistributedPriorityQueue

先行级队列对队列中的成分遵照事先级进行排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它事关下边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

透过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级队列获得元素增加和删除音讯时,它会中断处理当下的因素队列,然后刷新队列。minItemsBeforeRefresh钦点刷新前当前运动的类别的细微数量。
主要安装你的顺序能够容忍的不排序的矮小值。

放入队列时索要钦点优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶然你可能会有错觉,优先级设置并不曾起效。那是因为事先级是对于队列积压的因素而言,借使消费速度过快有恐怕出现在后贰个要素入队操作从前前三个成分已经被消费,那种场地下DistributedPriorityQueue会退化为DistributedQueue。

2.使用Fluent风格的Api成立会话

着力参数变为流式设置,三个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

LeaderSelector

LeaderSelector使用的时候根本涉嫌上边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运营,当实例取得领导权时你的listener的takeLeadership()形式被调用。而takeLeadership()方法唯有领导权被假释时才回来。
当你不再利用LeaderSelector实例时,应该调用它的close方法。

极度处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接情况的更动。借使实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再一次连接成功此前它可能不再是leader了。 若是LOST状态出现,
实例不再是leader, takeLeadership方法重返。

重要: 推荐处理格局是当接到SUSPENDED 或
LOST时抛出CancelLeadershipException十分.。那会促成LeaderSelector实例中断并收回执行takeLeadership方法的极度.。那格外主要,
你必须考虑扩展LeaderSelectorListenerAdapter.
LeaderSelectorListener艾达pter提供了引进的处理逻辑。

上边包车型客车一个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您可以在takeLeadership举办任务的分红等等,并且永不回来,假诺您想要要此实例一贯是leader的话能够加3个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还可能获得领导权。
在此地大家采纳AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每一种client有同一的火候获得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相对而言能够,LeaderLatch必须调用close()办法才会自由领导权,而对此LeaderSelector,通过LeaderSelectorListener能够对领导权进行控制,
在安妥的时候释放领导权,那样各种节点都有大概得到领导权。从而,LeaderSelector具有更好的灵活性和可控性,提议有LeaderElection应用场景下优用LeaderSelector。

[TOC]