为什么要设计分布式锁
在简单的单机系统中,当存在多个线程同时要修改某个共享变量时,为了数据的操作安全,往往需要通过加锁的方法,在同一时刻同一代码块只能有一个进程执行操作,存在很多加锁的方式,比如在java中有synchronize或Lock子类等。
但是在分布式中,会存在多个主机,即会存在多个jvm, 在jvm之间数据是不能共享的,上面的方法只能在一个jvm中执行有效,在多个jvm中同一变量可能会有不同的值。所以我们要设计一种跨jvm的共享互斥机制来控制共享变量资源的访问,这也是提出分布式锁的初衷。
需要解决的问题
为了将分布式锁实现较好的性能,我们需要解决下面几个重要的问题:
- 一个方法或代码片段在同一时刻只能被一个进程所执行。
- 高可用的获取锁与释放锁功能。
- 避免死锁
- 锁只能被持有该锁的客户端删除或者释放。
- 容错,在服务器宕机时,锁依然能得到释放或者其他服务器可以进行加锁。
下面分别利用redis和zookeeper来实现加锁和解锁机制。
基于Redis的加锁第一版
本版本通过变量sign设置锁的唯一标识,确保只有拥有该锁的客户端才能删除它,其他客户端不能删除。
利用阻塞锁的思想, 通过while(System.currentTimeMillis() < endTime)
和Thread.sleep()
相结合,在设置的规定时间内进行多次尝试。
但是setnx
操作和expire
分割开了,不具有原子性,可能会出现问题。
比如说,在执行到jedis.expire
时,可能系统发生了崩溃,导致锁没有设置过期时间,导致发生死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public String addLockVersion1(String key, int blockTime, int expireTime) { if (blockTime <=0 || expireTime <= 0) return null; Jedis jedis = null; try { jedis = jedisPool.getResource(); String sign = UUID.randomUUID().toString(); String token = null; long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { if (jedis.setnx(key, sign) == 1) { jedis.expire(key, expireTime); token = sign; return token; } try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return null; }
|
基于Redis的加锁第二版
通过设置key对应的value值为锁的过期时间,当遇到系统崩溃,致使利用expire
设置锁过期时间失败时,通过获取value值,来判断当前锁是否过期,如果该锁已经过期了,则进行重新获取。
但是它也存在一些问题。当锁过期时,如果多个进程同时执行jedis.getSet
方法,虽然只有一个进程可以获得该锁,但是这个进程的锁的过期时间可能被其他进程的锁所覆盖。
该锁没有设置唯一标识,也会被其他客户端锁释放,不满足只能被锁的拥有者锁释放的条件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public boolean addLockVersion2(String key, int blockTime, int expireTime) { if (blockTime <=0 || expireTime <= 0) return false; Jedis jedis = null; try { jedis = jedisPool.getResource(); long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { long redisExpierTime = System.currentTimeMillis() + expireTime; if (jedis.setnx(key, redisExpierTime + "") == 1) { jedis.expire(key, expireTime); return true; } else { String oldRedisExpierTime = jedis.get(key); if (oldRedisExpierTime != null && Long.parseLong(oldRedisExpierTime) < System.currentTimeMillis()) { String lastRedisExpierTime = jedis.getSet(key, System.currentTimeMillis() + blockTime + ""); if (lastRedisExpierTime.equals(oldRedisExpierTime)) { jedis.expire(key, expireTime); return true; } } } try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } return false; }
|
基于Redis的加锁第三版
具体通过set
方法来实现setnx
和expire
的相加功能,实现了原子操作。
如果key不存在时,就进行加锁操作,并对锁设置一个有效期,同时uniqueId表示加锁的客户端;如果key存在,不做任何操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public boolean addLockVersion3(String key, String uniqueId, int blockTime, int expireTime) { Jedis jedis = null; try { long endTime = System.currentTimeMillis() + blockTime; while (System.currentTimeMillis() < endTime) { jedis = jedisPool.getResource(); String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_STATE.equals(result)) return true; try { Thread.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return false; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; }
|
基于Redis的加锁第四版
为了使对同一个对象添加多次锁,并且不发生阻塞,即实现类似可重入锁,我们借鉴了ReetrantLock
的思想,添加了变量states
来控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public boolean addLockVersion4(String key, String uniqueId, int expireTime) { int state = states.get(); if (state > 1) { states.set(state+1); return true; } return doLock(key, uniqueId, expireTime); }
private boolean doLock(String key, String uniqueId, int expireTime) { Jedis jedis = null; if (expireTime <= 0) return false; try { jedis = jedisPool.getResource(); String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_STATE.equals(result)) states.set(states.get() + 1); return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; }
|
基于Redis的加锁第五版
从上面可知,利用setnx
和expire
实现加锁机制时因为不是原子操作,会产生一些问题,我们可用lua脚本来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public boolean addLockVersion5(String key, String uniqueId, int expireTime) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String luaScript = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(uniqueId); values.add(String.valueOf(expireTime)); Object result = jedis.eval(luaScript, keys, values); if ((Long)result == 1L) return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) { jedis.close(); } } return false; }
|
基于Redis的释放锁第一版
在解锁时首先判断加速与解锁是否是同一个客户端,然后利用del
方法进行删除。
但是会出现一些问题。
当方法执行到判断内部时,即将要执行del
方法时,该锁已经过期了,并被其他的客户端所请求应有,此时执行del
会造成锁的误删。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public boolean releaseLockVersion1(String key, String uniqueId) { Jedis jedis = null; try { jedis = jedisPool.getResource(); String lockId = jedis.get(key); if (lockId != null && lockId.equals(uniqueId)) { jedis.del(key); return true; } } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; }
|
基于Redis的释放锁第二版
从上面的分析来看,我们要确保删除的原子性,利用lua脚本可以保证一点。
在脚本语言里,KEYS[1]和ARGV[1]分别表示传入的key名和唯一标识符。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public boolean releaseLockVersion2(String key, String uniqueId) { String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Jedis jedis = null; Object result = null; try{ jedis = jedisPool.getResource(); result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId)); if ((Long)result == 1) return true; } catch (JedisException e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; }
|
基于Redis的释放锁第三版
在利用可重入锁思想时,只有当states=1
时才能被释放,大于0时,只能进行减1操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public boolean releaseLockVersion3(String key, String uniqueId) { int state = states.get(); if (state > 1) { states.set(states.get() - 1); return false; } return this.doRelease(key, uniqueId);
} private boolean doRelease(String key, String uniqueId) { String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Jedis jedis = null; Object result = null; try{ jedis = jedisPool.getResource(); result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId)); if ((Long)result == 1) return true; } catch (JedisException e) { e.printStackTrace(); } finally { states.set(0); if (jedis != null) jedis.close(); } return false; }
|
利用Zookeeper实现分布式锁
Zookeeper提供一个多层次的节点命名空间,每个节点都用一个以斜杠(/)分割的路径表示,
而且每个节点都有父节点(根节点除外),非常类似于文件系统。
基本思想流程
- 在某父节点下添加创建一个节点,
- 获取该父节点下的所有子节点,并进行排序,获得有个有序序列
- 如果当前添加的节点是序列中序号最小的节点,表示获取锁成功
- 如果不是最小的节点,则对在有序列表中的它的前一个节点进行监听,当被监听的节点被删除后,会通知该节点获取锁。
- 解锁的时候删除当前节点。
实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| public class zklock { private ZkClient zkClient; private String name; private String currentLockPath; private CountDownLatch countDownLatch; private static final String PATENT_LOCK_PATH = "distribute_lock"; private static final int MAX_RETEY_TIMES = 3; private static final int DEFAULT_WAIT_TIME = 3; public zklock(ZkClient zkClient, String name) { this.zkClient = zkClient; this.name = name; } public void addLock() { if (!zkClient.exists(PATENT_LOCK_PATH)) { zkClient.createPersistent(PATENT_LOCK_PATH); } int count = 0; boolean iscompleted = false; while (!iscompleted) { iscompleted = true; try { currentLockPath = zkClient.createEphemeralSequential(PATENT_LOCK_PATH + "/", System.currentTimeMillis()); } catch (Exception e) { if (count++ < MAX_RETEY_TIMES) { iscompleted = false; } else throw e; } } } public void releaseLock() { zkClient.delete(currentLockPath); } private boolean checkMinNode(String localPath) { List<String> children = zkClient.getChildren(PATENT_LOCK_PATH); Collections.sort(children); int index = children.indexOf(localPath.substring(PATENT_LOCK_PATH.length()+1));
if (index == 0) { if (countDownLatch != null) { countDownLatch.countDown(); } return true; } else { String waitPath = PATENT_LOCK_PATH + "/" + children.get(index-1); waitForLock(waitPath, false); return false; }
} private void waitForLock(String waitPath, boolean useTime) { countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(waitPath, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception {
}
@Override public void handleDataDeleted(String s) throws Exception { checkMinNode(currentLockPath); } }); if (!zkClient.exists(waitPath)) { return; } try { if (useTime == true) countDownLatch.await(DEFAULT_WAIT_TIME, TimeUnit.SECONDS); else countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch = null; }
}
|
基于Redis和Zookeeper的分布式锁的优劣
- Redis是nosql数据库,主要特点是缓存;
- Zookeeper是分布式协调工具,主要用于分布式解决方案。
加锁机制
- Redis: 通过
set
方法创建key, 因为Redis的key是唯一的,谁先创建成功,谁能够先获得锁。
- Zookeeper: 会在Zookeeper上创建一个临时节点,因为Zookeeper节点命名路径保证唯一,只要谁先创建成功,谁能够获取到锁。
释放锁
- Redis: 为了确保锁的一致性问题,在删除的redis的key时,需要判断是否是之前拥有该锁的客户端;通过设置有效期解决死锁。
- Zookeeper: 直接关闭临时节点session会话连接,因为临时节点生命周期与session会话绑定在一块,如果session会话连接关闭的话,该临时节点也会被删除。
就性能而言,redis是Nosql数据库,性能优于zookeeper;就健壮性而言,zookeeper明显优于redis。