【问题标题】:Zookeeper to get lock of Multiple resourcesZookeeper 获取多个资源的锁定
【发布时间】:2016-04-01 17:12:39
【问题描述】:

我的场景如下: 我有 5 台 linux 机器,我在 HDFS 中有 10 个(可能不止这个)文件。我的要求是单台机器应该锁定其中一个文件并处理它,而另一台机器不应该处理这个文件而是锁定另一个文件并处理这些文件。 例如: machine1 - 锁定 file2 并处理它 machine2 - 锁定 file3 并处理它 machine3 - 锁定 file1 并处理它

我已经编写了一个虚拟的多线程 java 程序来模拟它。但是它不起作用:

public class DistributedLock {

    private final ZooKeeper zk;
    private final String lockBasePath;
    private String lockPath;

    public DistributedLock(ZooKeeper zk, String lockBasePath) {
        this.zk = zk;
        this.lockBasePath = lockBasePath;
    }

    public boolean lock(String lockName) throws IOException {
        try {
            boolean locked = false;
            if(zk.exists(lockBasePath + "/" + lockName, false) == null){
            lockPath = zk.create(lockBasePath + "/" + lockName, null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
             if(lockPath != null ){
                 locked =true;
             }
            }
            final Object lock = new Object();

            return locked;
        } catch (KeeperException e) {
            throw new IOException(e);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void unlock() throws IOException {
        try {
            zk.delete(lockPath, -1);
            lockPath = null;
        } catch (KeeperException e) {
            throw new IOException(e);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

}

public class DistributedLockTest {

    public static void main(String[] args) throws Exception {
        new DistributedLockTest().run();
    }

    public void run() throws Exception {
        Thread t1 = new Thread(new Process(1));
        Thread t2 = new Thread(new Process(2));
        Thread t3 = new Thread(new Process(3));
        Thread t4 = new Thread(new Process(4));

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }

    class Process implements Runnable {

        int id;
        List<String> fileNames = new ArrayList<String>();

        public Process(int id) {
            this.id = id;
            for (int i = 1; i < 11; i++) {
                fileNames.add("file" + i);
            }
        }

        // @Override
        public void run() {
            try {
                System.out.println("machine " + id + " started");
                String resource = "resource";
                String path = "/LockDir";
                ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1", 2181, null);
                if (zooKeeper.exists(path, false) == null) {
                    zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
                DistributedLock lock = new DistributedLock(zooKeeper, path);
                String lockedFile;
                for (String fileName : fileNames) {
                    System.out.println("machine " + id + " Acquiring Lock on "+ fileName);
                    boolean locked = lock.lock(fileName);
                    if(locked){
                     System.out.println("machine " + id + "got Lock on "+ fileName);
                     lockedFile = fileName;
                    }
                    else continue;
                    Thread.sleep(500);
                }
                System.out.println("machine " + id + " Releasing Lock");
                lock.unlock();
                System.out.println("machine " + id + " Released Lock");

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

我得到的输出是:

machine 1 started
machine 2 started
machine 3 started
machine 4 started
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
machine 2 Acquiring Lock on file1
machine 1 Acquiring Lock on file1
machine 4 Acquiring Lock on file1
machine 3 Acquiring Lock on file1
machine 1got Lock on file1
machine 3got Lock on file1
machine 2got Lock on file1
machine 4got Lock on file1
machine 1 Acquiring Lock on file2
machine 3 Acquiring Lock on file2
machine 4 Acquiring Lock on file2
machine 2 Acquiring Lock on file2
machine 1got Lock on file2
machine 3got Lock on file2
machine 2got Lock on file2
machine 4got Lock on file2
machine 3 Acquiring Lock on file3
machine 1 Acquiring Lock on file3
machine 2 Acquiring Lock on file3
machine 4 Acquiring Lock on file3
machine 1got Lock on file3
machine 4got Lock on file3
machine 3got Lock on file3
machine 2got Lock on file3
machine 2 Acquiring Lock on file4
machine 4 Acquiring Lock on file4
machine 3 Acquiring Lock on file4
machine 1 Acquiring Lock on file4
machine 4got Lock on file4
machine 2got Lock on file4
machine 3got Lock on file4
machine 1got Lock on file4
machine 4 Acquiring Lock on file5
machine 3 Acquiring Lock on file5
machine 2 Acquiring Lock on file5
machine 1 Acquiring Lock on file5
machine 3got Lock on file5
machine 2got Lock on file5
machine 4got Lock on file5
machine 1got Lock on file5
machine 2 Acquiring Lock on file6
machine 4 Acquiring Lock on file6
machine 3 Acquiring Lock on file6
machine 1 Acquiring Lock on file6
machine 2got Lock on file6
machine 1got Lock on file6
machine 4got Lock on file6
machine 3got Lock on file6
machine 2 Acquiring Lock on file7
machine 4 Acquiring Lock on file7
machine 1 Acquiring Lock on file7
machine 3 Acquiring Lock on file7
machine 4got Lock on file7
machine 2got Lock on file7
machine 1got Lock on file7
machine 3got Lock on file7
machine 4 Acquiring Lock on file8
machine 3 Acquiring Lock on file8
machine 1 Acquiring Lock on file8
machine 2 Acquiring Lock on file8
machine 1got Lock on file8
machine 4got Lock on file8
machine 3got Lock on file8
machine 2got Lock on file8
machine 2 Acquiring Lock on file9
machine 4 Acquiring Lock on file9
machine 3 Acquiring Lock on file9
machine 1 Acquiring Lock on file9
machine 4got Lock on file9
machine 3got Lock on file9
machine 1got Lock on file9
machine 2got Lock on file9
machine 4 Acquiring Lock on file10
machine 3 Acquiring Lock on file10
machine 1 Acquiring Lock on file10
machine 2 Acquiring Lock on file10
machine 2got Lock on file10
machine 4got Lock on file10
machine 1got Lock on file10
machine 3got Lock on file10
machine 4 Releasing Lock
machine 1 Releasing Lock
machine 2 Releasing Lock
machine 3 Releasing Lock
machine 2 Released Lock
machine 1 Released Lock
machine 4 Released Lock
machine 3 Released Lock

这表明每个线程/机器都在尝试锁定每个文件并获取它。但我想要的是,如果一台机器没有锁定特定机器,它应该尝试锁定另一个文件并处理它。 对此有何建议?

【问题讨论】:

    标签: hdfs apache-zookeeper


    【解决方案1】:

    我在您的代码中发现了两个错误,第一个是您使用CreateMode.EPHEMERAL_SEQUENTIAL 作为锁定节点。当您想使用CreateMode.EPHEMERAL 时。 Sequential 主要用于队列而不是锁,它会创建名称类似于:file10000000000123file10000000000124 等的节点。因此,您永远不会创建用于检查锁是否被占用的节点。

    如果你解决了这个问题,你很可能会在线程之间遇到竞争条件,因为它们首先检查节点是否存在,然后创建它。使多个线程可以尝试创建同一个节点,因此我的解决方案如下所示:

    public class DistributedLock {  
      public static final String _LOCK = "lock";
    ...
      public boolean lock(String lockName) throws IOException {
        try {
            boolean locked = false;
            synchronized(_LOCK){
              if(zk.exists(lockBasePath + "/" + lockName, false) == null){
                lockPath = zk.create(lockBasePath + "/" + lockName, null,
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL);
                if(lockPath != null ){
                    locked =true;
                }
              }
            }
            final Object lock = new Object();
    
            return locked;
    ...
      }
    ...
    

    输出看起来像这样,这是我假设你想要的:

    machine 1 Acquiring Lock on file1
    machine 4 Acquiring Lock on file1
    machine 2 Acquiring Lock on file1
    machine 3 Acquiring Lock on file1
    machine 1 got Lock on file1
    machine 3 Acquiring Lock on file2
    machine 3 got Lock on file2
    machine 2 Acquiring Lock on file2
    machine 4 Acquiring Lock on file2
    machine 2 Acquiring Lock on file3
    machine 2 got Lock on file3
    ...
    

    PS:作为旁注,我建议您使用 Apache Curator 而不是为 Zookeeper 编写自己的外观,这样会容易得多,而且它们已经涵盖了大多数边缘情况。

    【讨论】:

    • 谢谢 .. 它真的很有帮助。另外,因为我的方案是让这个程序在不同的机器上运行。我不会遇到竞争状况。对吗?
    • 还是可以的,如果两台机器同时检查节点是否存在,然后都尝试写入。由于简单的同步块不起作用,因此很难防范这种情况。这就是我建议使用 Curator 的原因,这是一个为处理此类行为而编写的框架等等。话虽如此,竞争条件不太可能发生,您可以通过巧妙地使用异常来处理它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-08
    • 1970-01-01
    • 2018-01-13
    • 1970-01-01
    • 1970-01-01
    • 2019-01-04
    • 2015-01-18
    相关资源
    最近更新 更多