ZooKeeper分布式锁

ZooKeeper排它锁

排它锁(Exclusive Lock),又称为写锁和独占锁。在Java中排它锁可以用synchronized机制和JDK5提供的ReentrantLock。ZooKeeper中直接的API的实现排它锁,而是通过ZooKeeper的节点来实现锁。首先创建一个/Exclusive_Lock节点,然后创建一个其子节点 /Exclusive_Lock/lock,为临时(EPHEMERAL)节点。如下图

ZK排它锁

锁获取

需要获取锁的所有Client,都试图创建临时节点 /Exclusive_Lock/lock,ZK会保证只有一个client创建成功,创建成功的client获得锁。没有获得锁的节点需要到/Exclusive_Lock节点下面注册节点变更的Watcher监听,一旦lock节点变更,其他的client可以再次创建lock节点(抢占锁)。

锁释放

锁的释放,即是lock节点被删除,所以有两种情况可以释放锁

  • 持有锁的client正常完成业务,主动删除临时节点/Exclusive_Lock/lock
  • 持有锁的client发生宕机或者Session过期, ZK服务器会删除临时节点/Exclusive_Lock/lock

流程图

ZK排它锁流程图

排它锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class DistributedExcusiveLock implements Watcher{
private ZooKeeper zk;
private static String root = "/ExcusiveLock";
private static String lockName = root+"/lock";
private CountDownLatch connectedLatch; //用于连接初始化
private CountDownLatch latch; //用于wait
private String host;
private String port;
private int timeOut;

public DistributedExcusiveLock(String host, String port, int timeOut) {
this.host = host;
this.port = port;
this.timeOut = timeOut;
this.connectedLatch = new CountDownLatch(1);
}

public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected == event.getState()){
connectedLatch.countDown();
}
if(Event.EventType.NodeDeleted==event.getType()){
if(latch!=null){
latch.countDown();
}
}
}

public void connected() throws IOException {
zk = new ZooKeeper(host+":"+port,timeOut, this);
if (ZooKeeper.States.CONNECTING == zk.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}

public void lock() {
try {
// 没有获得锁,一直try
while(!tryLock()){
// 等待锁
waitForLock();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public boolean tryLock() {
try {
//检查lock节点是否存在, 注册监听
Stat stat = zk.exists(lockName, true);
if(stat!=null){
// 该节点已经创建,表示其他client持有锁
System.out.println(lockName + " is hold by other client");
return false;
}
String cLockNode = zk.create(lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
if(cLockNode!=null && cLockNode.equals(lockName)){
System.out.println(lockName + " is created and hold the lock");
return true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

private boolean waitForLock() throws InterruptedException, KeeperException {
// 检查时候存在lockName,并注册监听
Stat stat = zk.exists(lockName ,true);
if(stat != null){
System.out.println("waiting for " + lockName);
this.latch = new CountDownLatch(1);
this.latch.await();
this.latch = null;
}
return true;
}

public void unlock() {
try {
zk.delete(lockName,-1);
System.out.println(lockName+" is release");
} catch (Exception e) {
e.printStackTrace();
}
}

public void disConnected() throws IOException {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, InterruptedException {
String host = "zkhost";
String port = "2181";
int timeOut = 2000;

DistributedExcusiveLock excusiveLock = new DistributedExcusiveLock(host, port, timeOut);
excusiveLock.connected();
try {
excusiveLock.lock();
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}finally {
excusiveLock.unlock();
excusiveLock.disConnected();
}
}
}

ZooKeeper共享锁

共享锁(Shared Locks),又称为读锁或乐观锁。在Java编程中单机共享锁很容易实现,但是分布式环境下的就不好实现了,基于ZooKeeper很容易实现这个功能。共享锁不同于排它锁在于,排它锁只能被一个线程或者Client持有,即是这两个线程都是读的。

ZooKeeper共享锁原理

Client在ZK服务器创建一个EPHEMERAL_SEQUENTIAL节点,例如/Shared_Lock/192.168.0.1-R-00001,其中R表示读类型,W表示写类型,00001是SEQUENTIAL产生的编号。

  1. 调用exists()方法监听/hared_Lock下节点的变化
  2. 调用getChildren()方法获取当前节点列表
  3. 读请求:如果列表中最小的节点是自己创建的,或者比自己编号的节点都是读的,那么获得锁;
  4. 写请求:如果列表中最小的节点是自己创建的,那么获得锁;

锁释放

共享锁的释放和排它锁的一样,在正常完成业务后删除节点,或者Client宕机、Session过期等异常
共享锁

共享锁流程图

共享锁流程图

共享锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
public class DistributedSharedLock implements Watcher{
private ZooKeeper zk;
private static final String root = "/SharedLock";
private static final String SPLIT = "-";
private static final String TYPE_WRITE = "W";
private static final String TYPE_READ = "R";
private CountDownLatch connectedLatch; //用于连接初始化
private CountDownLatch latch; //用于wait
private String lockName; //用于wait
private String host;
private String port;
private String ip;
private int timeOut;

public DistributedSharedLock(String host, String port, int timeOut) {
this.host = host;
this.port = port;
this.timeOut = timeOut;
this.connectedLatch = new CountDownLatch(1);
try {
this.ip = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}

public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected == event.getState()){
connectedLatch.countDown();
}
if(Event.EventType.NodeChildrenChanged==event.getType()){
if(latch!=null){
latch.countDown();
}
}
}

public void connected() throws IOException {
zk = new ZooKeeper(host+":"+port,timeOut, this);
if (ZooKeeper.States.CONNECTING == zk.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
System.out.println("connected!!!!");
}

private String getSeq(){
if(lockName==null)
return null;
return lockName.substring(lockName.lastIndexOf(SPLIT));
}

public void lock(String type) {
try {
while(!tryLock(ip, type, getSeq())){
// 等待锁
waitForLock();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public class LockNode implements Comparable{
private String ip;
private String type;
private String seq;
private String node;
private int order;

public LockNode(String node,String root) {
if(node!=null){
String[] arr = node.split(SPLIT);
this.ip=arr[0];
this.type=arr[1];
this.seq=arr[2];
}
this.node = root+"/"+node;
}

@Override
public int compareTo(Object o) {
LockNode lockNode = (LockNode)o;
return Integer.valueOf(this.seq)-Integer.valueOf(lockNode.seq);
}

public String getType() {
return type;
}

public String getNode() {
return node;
}

public int getOrder() {
return order;
}

public void setOrder(int order) {
this.order = order;
}
}

/**
* node format: 192.168.0.1-R-000000012
* @param type
* @return
*/

public boolean tryLock(String ip, String type, String seq) {
try {
lockName = root+"/"+ip+SPLIT+type+SPLIT;

if(seq==null||seq.length()==0){
lockName=zk.create(lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}else{
lockName += seq;
}
List<String> childred = zk.getChildren(root,false);
List<LockNode> lockNodes = new ArrayList<LockNode>();
for(String child:childred){
lockNodes.add(new LockNode(child,root));
}
// 排序,排序完了存入map,便于查找
Collections.sort(lockNodes);
Map<String,LockNode> lockNodeMap = new HashMap<String, LockNode>();
for(int i=0;i<lockNodes.size();i++){
LockNode ln = lockNodes.get(i);
ln.setOrder(i);
lockNodeMap.put(ln.getNode(),ln);
}

LockNode lockNode = lockNodeMap.get(lockName);

// 对于读和写,如果都是第一个节点,持有锁
if(lockNode.getOrder()==0){
return true;
}
// 对于序号未排在第一的读请求,如果比其需要小的都是读请求,也能持有锁
if(TYPE_READ.equals(type)){
for(int i=0;i<lockNode.getOrder();i++){
LockNode ln = lockNodes.get(i);
if(TYPE_WRITE.equals(ln.getType())){
System.out.println(lockName + " is holded by other client");
return false;
}
}
System.out.println(lockName + " hold lock!!!!");
return true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

private boolean waitForLock() throws InterruptedException, KeeperException {
// 检查root下面子节点,并注册监听
List<String> children = zk.getChildren(root ,true);
if(children != null && !children.isEmpty()){
System.out.println("waiting for " + lockName);
this.latch = new CountDownLatch(1);
this.latch.await();
this.latch = null;
}
return true;
}

public void unlock() {
if(lockName==null || lockName.length()==0)
return;
try {
zk.delete(lockName, -1);
System.out.println(lockName + " is release");
this.lockName = null;
} catch (Exception e) {
e.printStackTrace();
}
}

public void disConnected() throws IOException {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String host = "zkhost";
String port = "2181";
int timeOut = 20000;

DistributedSharedLock excusiveLock = new DistributedSharedLock(host, port, timeOut);
excusiveLock.connected();

try {
excusiveLock.lock(TYPE_READ);
Thread.sleep(5000);
System.out.println("sleep end , start release lock");
}catch (Exception e){
e.printStackTrace();
}finally {
excusiveLock.unlock();
excusiveLock.disConnected();
}
}
}

Apache Curator

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,提供了一套Fluent风格的操作API,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

基于Curator实现一个分布式锁