Zookeeper客户端Curator使用详解

[TOC]

Zookeeper客户端Curator使用详解

简介

Curator是Netflix公司开源之同拟zookeeper客户端框架,解决了不少Zookeeper客户端非凡底层的底细开发工作,包括连续重连、反复注册沃特cher和NodeExistsException非凡等等。Patrixck
Hunt(Zookeeper)以平等句子“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的由是于好玩之,下面的片摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理和执行》一开:
Zookeeper最早起点于雅虎的琢磨院的一个研讨小组。在及时,探究人士发现,在雅虎内部多巨型的系要借助一个看似的体系开展分布式协调,不过这一个系统往往存在分布式单点问题。所以雅虎的开发人士就试图开一个通用的无论是单点问题的分布式协调框架。在立项初期,考虑到无数门类都是因而动物的讳来定名的(例如知名的Pig项目),雅虎的工程师希望于这路也落一个动物之讳。时任探究院的上位数学家Raghu
Ramakrishnan开玩笑说:再这么下来,我们这就变成动物园了。此话一发生,我们纷纷表示就被动物园管理员吧——因为各样以动物命名的分布式组件放在同,雅虎的全体分布式系统看上去就是比如一个特大型的动物园了,而Zookeeper正好用来开展分布式环境的和谐——于是,Zookeeper的名字因而诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”或者”管理者”,不知底是勿是开发小组有意要为的,笔者臆想有或这么命名的原因是认证Curator就是Zookeeper的馆长(脑洞有接触非凡:Curator就是动物园的园长)。
Curator包含了五只确保:
curator-framework:针对zookeeper的底部api的有的包裹
curator-client:供一些客户端的操作,例如重试策略等
curator-recipes:包装了一部分高等特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式巴里r等
Maven倚重(使用curator的版本:2.12.0,对诺Zookeeper的本子也:3.4.x,设超越版据会生包容性问题,很有或引致节点操作失利):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Curator的基本Api

创设会话

1.运静态工程方创造客户端

一个例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包含三个紧要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

2.接纳Fluent风格的Api创立会话

主干参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

3.创包含隔离命名空间的对话

为了促成不同之Zookeeper业务之间的割裂,需要吗每个事情分配一个独的命名空间(NameSpace),即指定一个Zookeeper的根本路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的例子)当客户端指定了单独命名空间也“/base”,那么该客户端对Zookeeper上之数码节点的操作都是基于该目录进行的。通过设置Chroot可以用客户端应用与Zookeeper服务端的同等课子树相对应,在差不三个使用共用一个Zookeeper集群的意况下,这对于贯彻不同采取内的并行隔离异常起意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

起步客户端

当创设会说话成功,得到client的实例然后可以平素调用其start( )方法:

client.start();

数码节点操作

成立数量节点

Zookeeper的节点创立情势:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带来连串号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时又带来体系号

**创设一个节点,开始内容吗空 **

client.create().forPath("path");

注意:假设没安装节点性,节点创造情势默认为持久化节点,内容默认为空

创建一个节点,附带初阶化内容

client.create().forPath("path","init".getBytes());

创立一个节点,指定成立形式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创造一个节点,指定创立形式(临时节点),附带开首化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创立一个节点,指定成立情势(临时节点),附带伊始化内容,并且自动递归成立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口极度有由此,因为一般情状开发人士在创制一个子节点必须认清其的父节点是否在,如若非有直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator可以自动递归创立所有所需的父节点。

删去数据节点

删除一个节点

client.delete().forPath("path");

只顾,此方法就可以去除叶子节点,否则会丢掉来深。

删去一个节点,并且递归删除该有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

去除一个节点,强制指定版本举行删减

client.delete().withVersion(10086).forPath("path");

去一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个维持法,只要客户端会话有效,那么Curator会在后台持续拓展删减操作,直到删除节点成功。

注意:下面的差不六只流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

读取数据节点数据

读取一个节点的数额内容

client.getData().forPath("path");

留神,此措施返的重临值是byte[ ];

读取一个节点的数内容,同时获到拖欠节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新数据节点数据

立异一个节点的多少内容

client.setData().forPath("path","data".getBytes());

小心:该接口会重回一个Stat实例

履新一个节点的数内容,强制指定版本举行翻新

client.setData().withVersion(10086).forPath("path","data".getBytes());

自我批评节点是否留存

client.checkExists().forPath("path");

留意:该法重返一个Stat实例,用于检查ZNode是否是的操作.
可以调用额外的主意(监控或后台处理)并在末调用forPath()指定要操作的ZNode

取有节点的所有子节点路径

client.getChildren().forPath("path");

在意:该办法的重临值为List<String>,拿到ZNode的子节点Path列表。
可以调用额外的情势(监控、后台处理仍然抱状态watch, background or get
stat) 并以结尾调用forPath()指定要操作的父ZNode

事务

CuratorFramework的实例包含inTransaction()接口方法,调用此道被一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调整用commit()作为一个原子操作提交。一个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

异步接口

方提到的始建、删除、更新、读取等形式都是一块的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次回到的结果音讯。BackgroundCallback接口中一个首要之回调值吗Curator伊夫(Eve)nt,里面包含事件类、响应也和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创制节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的伊芙(Eve)ntThread去举办异步处理。

Curator食谱(高级特性)

唤醒:首先你要补偿加curator-recipes倚重,下文仅仅针对recipes一些风味的接纳举办诠释以及举例,不打算进行源码级别之探索

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

着重提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态也LOST,curator-recipes下的备Api将汇合失灵或者逾期,即使前边有的例证都不曾动到ConnectionStateListener。

缓存

Zookeeper原生扶助通过挂号沃特cher来拓展事件监听,不过开发者需要数注册(沃特(Wat)cher只可以单次注册单次使用)。Cache是Curator中针对事件监听的卷入,可以看成是针对事件监听的本土缓存视图,可以自可以吗开发者处理反复注册监听。Curator提供了二种沃特cher(Cache)来监听结点的浮动。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点增添, 更新,删除时,
Path Cache会改变她的状态, 会包含最新的子节点,
子节点的数量和状态,而状态的更变将经过PathChildrenCacheListener文告。

实际上利用时会提到到六只类似:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过下的构造函数创设Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

思念采纳cache,必须调用它的start格局,使用完后调用close方法。
可以设置StartMode来实现启动的格局,

StartMode有下几乎种植:

  1. NORMAL:正常初阶化。
  2. BUILD_INITIAL_CACHE:在调用start()从前会师调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache起始化数据后发送一个PathChildrenCache伊芙(Eve)nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)足扩充listener监听缓存的更动。

getCurrentData()格局再次回到一个List<ChildData>对象,可以遍历所有的子节点。

设置/更新、移除其实是使用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:要是new PathChildrenCache(client, PATH,
true)中之参数cacheData值设置也false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将非晤面缓存节点数据。

注意:演示中之Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不净,那或和PathCache的实现原理有关,不能尽过频繁之触及事件!

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它事关到脚的老三独像样:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:动用cache,依旧要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将获取节点当前底状态,通过它们的状态可以落时之价。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不咸,这可能与NodeCache的贯彻原理有关,不克无限过数的点事件!

注意:NodeCache只好监听一个节点的状态变化。

Tree Cache

Tree
Cache可以监控整个树上的有着节点,类似于PathCache和NodeCache的构成,重要涉及到下四单近乎:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙(Eve)nt – 触发的波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以斯示例中一向不使用Thread.sleep(10),不过事件触发次数也是正常的。

注意:TreeCache在初步化(调用start()道)的早晚会回调TreeCacheListener实例一个事TreeCache伊夫nt,而回调的TreeCache伊夫(Eve)nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()雅有或引致空指针非常,这里应该积极处理并制止这种状态。

Leader选举

当分布式总结中, leader elections是大重大的一个力量,
这多少个选举过程是这样子的: 指派一个历程作为社团者,将任务分发给各个节点。
在任务起始前,
哪个节点都无明了哪个是leader(领导者)或者coordinator(协调者).
当选举算法开端施行后, 每个节点最后会面取得一个唯一的节点作为任务leader.
除此之外,
选举还三天五头会晤来在leader意外宕机的状下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后经过Zab磋商落实follower的一路,leader或者follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是有存活的客户端不停顿的轮番开Leader,南平社会。后者是若选举发生Leader,除非有客户端挂掉还触发选举,否则不会见交出领导权。某党?

LeaderLatch

LeaderLatch有少数独构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

倘启动,LeaderLatch会和外使用相同latch
path的此外LeaderLatch交涉,然后中间一个末会面给推为leader,可以经hasLeadership计查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //再次回到true表达当前实例是leader

好像JDK的CountDownLatch,
LeaderLatch在求成为leadership会block(阻塞),一旦不行使LeaderLatch了,必须调用close法。
假设她是leader,会自由leadership, 其余的参加者以会选一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

酷处理:
LeaderLatch实例可以加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为好或leader。当LOST后连续重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后还创立一个。LeaderLatch用户须考虑导致leadership丢失的连天问题。
强烈推荐你用ConnectionStateListener。

一个LeaderLatch的用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

好添加test module的信赖方便开展测试,不需启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我们成立了10只LeaderLatch,启动后其被的一个会晤给选举为leader。
因为选举会花费一些年华,start后连无可知登时便获leader。
通过hasLeadership翻开自己是否是leader, 假设是的说话重临true。
可以通过.getLeader().getId()得抱时底leader的ID。
单独能够通过close放飞当前底领导权。
await凡是一个不通方法, 尝试获取leader地位,可是未必会上位。

LeaderSelector

LeaderSelector以的当儿要干上边几乎独八九不离十:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时你的listener的takeLeadership()术让调用。而takeLeadership()方法就来领导权被放飞时才回。
当你不再使用LeaderSelector实例时,应该调用它的close方法。

深处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的改动。假设实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现平时,
实例必使在重连接成在此以前它或许不再是leader了。 倘若LOST状态出现,
实例不再是leader, takeLeadership方法重临。

重要: 推荐处理形式是当接受SUSPENDED 或
LOST时抛来CancelLeadershipException十分.。这会招致LeaderSelector实例中断并销执行takeLeadership方法的异常.。这很重大,
你必考虑扩充LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了引进的拍卖逻辑。

下的一个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

汝可以于takeLeadership举行任务的分红等等,并且永不回来,假设您想只要使这实例一贯是leader的讲话能够加以一个死循环。调用
leaderSelector.autoRequeue();管教在那实例释放领导权之后还可能得领导权。
在此我们选取AtomicInteger来记录此client得到领导权的次数, 它是”fair”,
每个client有一样的会得到领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相比可以,LeaderLatch必须调用close()法才会合自由领导权,而对LeaderSelector,通过LeaderSelectorListener可对领导权举办支配,
在合适的时段释放领导权,这样每个节点都来或取得领导权。从而,LeaderSelector具有双重好之八面玲珑和可控性,提出发LeaderElection应用场景下优先使用LeaderSelector。

分布式锁

提醒:

1.推荐应用ConnectionStateListener监控连接的状态,因为当连接LOST时若不再持有锁

2.分布式的锁全局同步,
那表示任何一个光阴点不会面发生少独客户端都拥有同等之锁。

然而重新称并享锁—Shared Reentrant Lock

Shared意味着锁是大局可见的, 客户端都足以请锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在装有锁的又,可以频繁得,不汇合于封堵。
它是由于接近InterProcessMutex来实现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()抱锁,并提供过机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()道释放锁。 InterProcessMutex 实例可以引用。

Revoking ZooKeeper recipes wiki定义了可研讨的吊销机制。
为了裁撤mutex, 调用底的主意:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

要是你要撤废当前的吊,
调用attemptRevoke()法,注意锁释放时RevocationListener用会面回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

其次不行提示:错误处理
仍然强烈推荐你使用ConnectionStateListener处理连接状态的更改。
当连接LOST时你不再有锁。

率先让大家成立一个效的共享资源,
那些资源要只好单线程的顾,否则会起出现问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创立一个InterProcessMutexDemo仿佛, 它担负请求锁,
使用资源,释放锁这样一个一体化的访过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也特别简短,生成10只client, 每个client重复执行10不善
请求锁–访问资源–释放锁的进程。每个client皆以单身的线程中。
结果可以望,锁是不管三七二十一的于每个实例排他性的下。

既然如此是只是采用的,你可以在一个线程中再三调用acquire(),在线程拥有锁时它们总是回到true。

若不应当于差不四只线程中之所以和一个InterProcessMutex
你可在每个线程中都非凡成一个初的InterProcessMutex实例,它们的path都如出一辙,这样它们可同享同一个沿。

不得重入共享锁—Shared Lock

其一锁与下面的InterProcessMutex比,就是少了Reentrant的效劳,也就意味着它们不可知在和一个线程中重入。这些近乎是InterProcessSemaphoreMutex,使用办法及InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运转后发觉,有还仅生一个client成功博得第一只锁(第一独acquire()法重返true),然后她好死在次只acquire()措施,获取第二个锁超时;其他具备的客户端都阻塞在首先只acquire()方法超时并且抛来相当。

这么吗就算表达了InterProcessSemaphoreMutex心想事成之锁是不可重入的。

而又称读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理均等对有关的吊。一个顶住读操作,其它一个顶住写操作。读操作以描写锁没叫运时但是又鉴于多独过程使,而写锁在以时莫允读(阻塞)。

此锁是可重入的。一个持有写锁的线程可重入读锁,不过读锁却无可以上写锁。这为意味着描绘锁得降成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成为写锁是蛮的。

而是更可读写锁重如若因为片个像样实现:InterProcessReadWriteLockInterProcessMutex。使用时首先成立一个InterProcessReadWriteLock实例,然后重新冲你的需要拿到读锁或者写锁,读写锁之类型是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一致组认同(permits),而Curator中称租约(Lease)。
有半点栽形式可决定semaphore的无限特别租约数。第一种办法是用户为定path并且指定最要命LeaseSize。第二栽模式用户被定path并且采纳SharedCountReader类。一经未动SharedCount里德(Reade)r,
必须管所有实例在多进程中采纳相同之(最可怜)租约数量,否则暴发或出现A进程遭到之实例持有最深租约数量为10,但是于B进程中存有的但是特别租约数量为20,此时租约的含义就失效了。

这一次调用acquire()会面回来一个租约对象。
客户端必须于finally中close这个租约对象,否则这一个租约会丢失不见。 不过,
然则,假如客户端session由于某种原因比如crash丢掉,
那么这个客户端有的租约会自动close,
这样任何客户端好连续用这多少个租约。 租约还得经下边的不二法门回还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

顾你得一遍性请求多单租约,假使Semaphore当前的租约不够,则呼吁线程会给卡住。
同时还提供了过的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的要害类包括下面几乎独:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

第一我们先拿到了5个租约, 最终我们把它还被了semaphore。
接着要了一个租约,因为semaphore还来5独租约,所以告可以满意,再次回到一个租约,还残留4单租约。
然后重新请一个租约,因为租约不够,闭塞到过,仍然不曾会满足,重临结果为null(租约不足会阻塞到过,然后回到null,不相会积极性弃来老;假使不装过时间,会雷同阻塞)。

下面说说话的沿都是一碗水端平锁(fair)。 总ZooKeeper的角度看,
每个客户端都依照请求的相继拿到锁,不设有非公平的侵占的境况。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁的器皿。 当调用acquire()
所有的吊都汇合让acquire(),假诺要退步,所有的沿都会师于release。
同样调用release时持有的锁都给release(砸为忽视)。
基本上,它就是组锁的代表,在它下面的伸手释放操作都相会传送给她蕴含的装有的吊。

重中之重涉嫌个别单近乎:

  • InterProcessMultiLock
  • InterProcessLock

她的构造函数需要包含的吊的成团,或者千篇一律组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包含一个重新入锁和一个非重入锁。
调用acquire()继可观望线程同时拥有了即点儿独锁。
调用release()观察就片单锁都叫放飞了。

末段还重蹈覆辙一不善,
强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态也LOST,锁将晤面丢掉。

分布式计数器

顾名思义,计数器是由此来计数的,
利用ZooKeeper可以实现一个集群共享的计数器。
只要利用同一的path就得获取最新的计数器值,
这是出于ZooKeeper的一致性保证的。Curator有些许只计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

斯类似应用int类型来计数。 首要涉嫌三单类似。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount意味着计数器,
可以吧它长一个SharedCountListener,当计数器改变时这一个Listener可以监听到反之波,而SharedCountReader得读取到最新的价,
包括字面值和拉动版本音信的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

于这例子中,我们下baseCount来监听计数值(addListener计来填补加SharedCountListener
)。 任意的SharedCount, 只要利用同一的path,都得获取这么些计数值。
然后我们利用5只线程为总结数值长一个10为内之随机数。相同的path的SharedCount对计数值举办变更,将会师回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此我们采纳trySetCount失去装计数器。
先是个参数提供当前之VersionedValue,如若中其余client更新了是计数值,
你的翻新可能不成功,
但是这你的client更新了流行的值,所以战败了而可品味再一次重新新一不佳。
setCount举凡恫吓更新计数器的价值

小心计数器必须start,使用了后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

复拘留一个Long类型的计数器。 除了计数的克相比较SharedCount坏了外面,
它首先尝试采取乐观锁的计设置计数器,
假诺不成事(比如中计数器已经被其他client更新了),
它以InterProcessMutex主意来更新计数值。

可以起它的其中贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有同样多样的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩大一定的价值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查再次回到结果的succeeded(), 它象征者操作是否中标。
尽管操作成, preValue()代表操作前的值,
postValue()表示操作后底价值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

行使Curator也能够简化Ephemeral Node
(现节点)的操作。Curator也供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以管放入到行列中的序列是依据顺序排队的。
假设单纯的主顾自队列中取数据, 那么它们是优先称先有的,这吗是排的表征。
倘使您严酷要求顺序,你虽的行使单一的顾客,可以选用Leader选举只为Leader作为唯一的买主。

而是, 遵照Netflix的Curator作者所说,
ZooKeeper真心不相符做Queue,或者说ZK没有落实一个好之Queue,详细内容可以看
Tech Note
4

原盖发五:

  1. ZK有1MB 的导限制。
    实践着ZNode必须绝比较小,而行包含多的信,相当的大。
  2. 设若起诸多节点,ZK启动时很是的悠悠。 而使用queue会导致众多ZNode.
    你待明确增大 initLimit 和 syncLimit.
  3. ZNode很挺之早晚特别麻烦清理。Netflix不得不创立了一个专程的顺序召开这事。
  4. 当好大方的盈盈众多的子节点的ZNode时, ZK的特性变得不佳
  5. ZK的数据库完全在内存中。 大量的Queue意味着会占有多之内存空间。

则, Curator如故创制了各类Queue的实现。
假如Queue的数据量不绝多,数据量不绝要命的情事下,酌情考虑,仍旧好用的。

分布式队列—DistributedQueue

DistributedQueue是极端普通的平种植阵。 它计划以下两只类似:

  • QueueBuilder – 创交行使用QueueBuilder,它也是其它队列的创建类
  • QueueConsumer – 队列中的信息消费者接口
  • QueueSerializer –
    队排信息连串化和倒系列化接口,提供了对班中的目的的体系化和倒连串化
  • DistributedQueue – 队列实现类

QueueConsumer是顾客,它可以接收队列的数据。处理队列中的数量的代码逻辑可以放在QueueConsumer.consumeMessage()中。

例行状态下优先以音讯从队列中移除,再提交消费者花。但就是个别独步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当顾客花数量经常持有锁,这样任何消费者莫可知花这一个音讯。假设消费失败或者经过死掉,音讯可以交给其他进程。这会带来或多或少性能的损失。最好或独消费者情势使队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了个别独分布式队列和少个买主,因为PATH是平等的,会有消费者抢占消费音信的情状。

带动Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方的系列类似,只是足以呢队列中之每一个素设置一个ID
能够通过ID把班中随意的因素移除。 它事关七只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过上面方法创立:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

于这事例中,
有些元素还无为消费者花前纵转换除,这样消费者不相会吸收删除的信。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

预先级分布式队列—DistributedPriorityQueue

先行级列对班中的要素以先级举办排序。 Priority越小,
元素越靠前, 越先为消费掉
。 它涉及上面几乎只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

经builder.buildPriorityQueue(minItemsBeforeRefresh)方法创立。
当优先级列得到元素增删信息时,它会中断处理当下之素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前即移动之行的顶小数目。
紧要安装你的次可以忍受的莫排序的太小值。

放入队列时要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶然你或许会见发错觉,优先级设置并无起效。这是以事先级是对此队列积压的素而言,假使消费速度过快生或出现于晚一个要素入队操作前前一个元素就被消费,这种景观下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中也出DelayQueue,不亮堂你是不是熟谙。
DistributedDelayQueue也供了近乎之效益, 元素有只delay值,
消费者隔一段时间才可以吸纳元素。 涉及到下多少个近乎。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经下边的言辞成立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时得以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch无是距现在之一个时距离,
比如20毫秒,而是以后底一个时空穿,如 System.current提姆e米尔(Mill)is() + 10秒。
倘若delayUntilEpoch的年月已经过去,音讯会及时叫消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

眼前则实现了各类队列,不过若放在心上到无,这几个队列并没实现类似JDK一样的接口。
SimpleDistributedQueue供了同JDK基本一致的接口(可是并未兑现Queue接口)。
创建充裕简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

日增元素:

public boolean offer(byte[] data) throws Exception

剔除元素:

public byte[] take() throws Exception

除此以外还提供了另外情势:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take法在成功重临以前碰面叫死。
poll道在群列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

唯独实际发送了100漫长音讯,消费完第一漫漫后,后边的音无法消费,如今无找到原因。查看转合法文档推荐的demo使用下几乎只Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过事实上利用发现仍旧有消费阻塞问题。

分布式屏障—Barrier

分布式巴里r是这样一个近似:
它会阻塞所有节点上之等历程,直到有一个为满意,
然后所有的节点继续拓展。

论赛马比赛中, 等赛马陆续到起跑线前。
一名声令下,所有的赛马都飞奔而出。

DistributedBarrier

DistributedBarrier类似实现了栅栏的法力。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你得安装栅栏,它以死在她下面等待的线程:

setBarrier();

然后用阻塞的线程调用方法等放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

老大处理 Distributed巴里(Barrie)r
会监控连接状态,当连接断掉时waitOnBarrier()方法会抛来怪。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

本条事例创设了controlBarrier来装栅栏及移除栅栏。
大家成立了5单线程,在此Barrier上等待。
最终移除栅栏后拥有的线程才继续执行。

设若您从头不设置栅栏,所有的线程就非会师堵塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在总括的启同收时共。当丰富的长河进入到双栅栏时,进程始起总结,
当总结好时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty大凡成员数,当enter()形式让调用时,成员叫死,直到有的成员还调用了enter()
leave()主意给调用时,它吗死调用线程,直到所有的分子还调用了leave()
就比如百米赛跑竞赛, 发令枪响,
所有的选手开走,等具有的健儿跑过巅峰线,比赛才停止。

DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()leave()方法会抛来好。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理同执行》
《 跟着实例学习ZooKeeper的用法》博客系列

连串仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,相比好导航及每个章节,只是简书不补助,本文的MD原文放在项目标/resources/md目下,有爱自取,作品用Typora编写,提出就此Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本身是throwable,在苏黎世斗争,白天上班,清晨和双休不定时加班,晌午空闲坚持不渝写下博客。
仰望我之稿子会被您带收获,共勉。