【发布时间】:2016-04-01 17:12:39
【问题描述】:
我的场景如下: 我有 5 台 linux 机器,我在 HDFS 中有 10 个(可能不止这个)文件。我的要求是单台机器应该锁定其中一个文件并处理它,而另一台机器不应该处理这个文件而是锁定另一个文件并处理这些文件。 例如: machine1 - 锁定 file2 并处理它 machine2 - 锁定 file3 并处理它 machine3 - 锁定 file1 并处理它
我已经编写了一个虚拟的多线程 java 程序来模拟它。但是它不起作用:
public class DistributedLock {
private final ZooKeeper zk;
private final String lockBasePath;
private String lockPath;
public DistributedLock(ZooKeeper zk, String lockBasePath) {
this.zk = zk;
this.lockBasePath = lockBasePath;
}
public boolean lock(String lockName) throws IOException {
try {
boolean locked = false;
if(zk.exists(lockBasePath + "/" + lockName, false) == null){
lockPath = zk.create(lockBasePath + "/" + lockName, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
if(lockPath != null ){
locked =true;
}
}
final Object lock = new Object();
return locked;
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public void unlock() throws IOException {
try {
zk.delete(lockPath, -1);
lockPath = null;
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
public class DistributedLockTest {
public static void main(String[] args) throws Exception {
new DistributedLockTest().run();
}
public void run() throws Exception {
Thread t1 = new Thread(new Process(1));
Thread t2 = new Thread(new Process(2));
Thread t3 = new Thread(new Process(3));
Thread t4 = new Thread(new Process(4));
t1.start();
t2.start();
t3.start();
t4.start();
}
class Process implements Runnable {
int id;
List<String> fileNames = new ArrayList<String>();
public Process(int id) {
this.id = id;
for (int i = 1; i < 11; i++) {
fileNames.add("file" + i);
}
}
// @Override
public void run() {
try {
System.out.println("machine " + id + " started");
String resource = "resource";
String path = "/LockDir";
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1", 2181, null);
if (zooKeeper.exists(path, false) == null) {
zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
DistributedLock lock = new DistributedLock(zooKeeper, path);
String lockedFile;
for (String fileName : fileNames) {
System.out.println("machine " + id + " Acquiring Lock on "+ fileName);
boolean locked = lock.lock(fileName);
if(locked){
System.out.println("machine " + id + "got Lock on "+ fileName);
lockedFile = fileName;
}
else continue;
Thread.sleep(500);
}
System.out.println("machine " + id + " Releasing Lock");
lock.unlock();
System.out.println("machine " + id + " Released Lock");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
我得到的输出是:
machine 1 started
machine 2 started
machine 3 started
machine 4 started
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
machine 2 Acquiring Lock on file1
machine 1 Acquiring Lock on file1
machine 4 Acquiring Lock on file1
machine 3 Acquiring Lock on file1
machine 1got Lock on file1
machine 3got Lock on file1
machine 2got Lock on file1
machine 4got Lock on file1
machine 1 Acquiring Lock on file2
machine 3 Acquiring Lock on file2
machine 4 Acquiring Lock on file2
machine 2 Acquiring Lock on file2
machine 1got Lock on file2
machine 3got Lock on file2
machine 2got Lock on file2
machine 4got Lock on file2
machine 3 Acquiring Lock on file3
machine 1 Acquiring Lock on file3
machine 2 Acquiring Lock on file3
machine 4 Acquiring Lock on file3
machine 1got Lock on file3
machine 4got Lock on file3
machine 3got Lock on file3
machine 2got Lock on file3
machine 2 Acquiring Lock on file4
machine 4 Acquiring Lock on file4
machine 3 Acquiring Lock on file4
machine 1 Acquiring Lock on file4
machine 4got Lock on file4
machine 2got Lock on file4
machine 3got Lock on file4
machine 1got Lock on file4
machine 4 Acquiring Lock on file5
machine 3 Acquiring Lock on file5
machine 2 Acquiring Lock on file5
machine 1 Acquiring Lock on file5
machine 3got Lock on file5
machine 2got Lock on file5
machine 4got Lock on file5
machine 1got Lock on file5
machine 2 Acquiring Lock on file6
machine 4 Acquiring Lock on file6
machine 3 Acquiring Lock on file6
machine 1 Acquiring Lock on file6
machine 2got Lock on file6
machine 1got Lock on file6
machine 4got Lock on file6
machine 3got Lock on file6
machine 2 Acquiring Lock on file7
machine 4 Acquiring Lock on file7
machine 1 Acquiring Lock on file7
machine 3 Acquiring Lock on file7
machine 4got Lock on file7
machine 2got Lock on file7
machine 1got Lock on file7
machine 3got Lock on file7
machine 4 Acquiring Lock on file8
machine 3 Acquiring Lock on file8
machine 1 Acquiring Lock on file8
machine 2 Acquiring Lock on file8
machine 1got Lock on file8
machine 4got Lock on file8
machine 3got Lock on file8
machine 2got Lock on file8
machine 2 Acquiring Lock on file9
machine 4 Acquiring Lock on file9
machine 3 Acquiring Lock on file9
machine 1 Acquiring Lock on file9
machine 4got Lock on file9
machine 3got Lock on file9
machine 1got Lock on file9
machine 2got Lock on file9
machine 4 Acquiring Lock on file10
machine 3 Acquiring Lock on file10
machine 1 Acquiring Lock on file10
machine 2 Acquiring Lock on file10
machine 2got Lock on file10
machine 4got Lock on file10
machine 1got Lock on file10
machine 3got Lock on file10
machine 4 Releasing Lock
machine 1 Releasing Lock
machine 2 Releasing Lock
machine 3 Releasing Lock
machine 2 Released Lock
machine 1 Released Lock
machine 4 Released Lock
machine 3 Released Lock
这表明每个线程/机器都在尝试锁定每个文件并获取它。但我想要的是,如果一台机器没有锁定特定机器,它应该尝试锁定另一个文件并处理它。 对此有何建议?
【问题讨论】:
标签: hdfs apache-zookeeper