分布式应用中,需要如何解决资源同步的问题?
这篇文章讲解的很不错了。Redis 分布式锁的正确实现方式( Java 版 )。另外在Github上也有一个demo做的很不错。
什么时候需要用到分布式锁呢?比如:减库存。
假设某个商品的库存是N,有N+1个订单过来,那么势必只能有N个有效订单。那么如何保证库存不出现负数的情况呢?(多个减库存操作保证库存的同步)
图一:减库存流程图
图二:分布式锁减库操作
利用Redis就可以给库存加一个分布式锁。需要注意的是不要造成死锁,即保证持有锁有一定的时限制。
获取锁的过程可以是:在redis服务器set一个key-value,lock。如果此lock已经存在,则说明当前锁被其他线程持有。否则set此lock的值。lock需要有设置存活时间(保证不死锁)。另外需要注意手动释放锁的时候,需要保证是当前持有锁的线程去释放。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Collections; @RestController @RequestMapping public class RedisTemplateController { @Resource(name = "myRedisTemplate") private RedisTemplate<String, String> redisTemplate; @GetMapping(value = "set") public String set(String key, String value) { redisTemplate.opsForValue().set(key, value); return "success"; } @GetMapping(value = "get") public String get(String key) { return redisTemplate.opsForValue().get(key); } /** * 加锁 * @param key * @param value * @return */ @GetMapping(value = "redisTemplateLock") public Object lock(String key, String value) { String script = "local key = KEYS[1]; local value = ARGV[1]; if redis.call('set', key, value, 'NX' ,'PX', 5000) then return 1 else return 0 end"; DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class); Object execute = redisTemplate.execute(redisScript, Collections.singletonList(key), Collections.singletonList(value)); System.out.println(execute); return execute; } /** * 阻塞锁 */ @GetMapping(value = "blockLock") public String blockLock(String key, String value) throws InterruptedException { // 被阻塞的时间超过5秒就停止获取锁 int blockTime = 5000; // 默认的间隔时间 int defaultTime = 1000; for(;;) { if(blockTime >= 0) { String script = "local key = KEYS[1]; local value = ARGV[1]; if redis.call('set', key, value, 'NX' ,'PX', 5000) then return 1 else return 0 end"; DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class); Long result = redisTemplate.execute(redisScript, Collections.singletonList(key), value); System.out.println("try lock ... ,result: "+result); if(result != null && result == 1) { // 得到了锁 return "lock success"; } else { blockTime -= defaultTime; Thread.sleep(1000); } } else { // 已经超时 return "lock timeout..., please retry later..."; } } } /** * 解锁 * @param key * @param value */ @GetMapping("redisTemplateUnlock") public String unlock(String key, String value) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class); Long execute = redisTemplate.execute(redisScript, Collections.singletonList(key), value); System.out.println("unlock result: "+execute); if(execute != null && execute != 0) { // 解锁成功 return "unlock success"; } else { return "unlock failed"; } } }
另外延伸出一个问题:如何保证一个60秒的延迟。Thread.sleep(60000)是否可以保证延迟60s。答案是不行。解决办法:
复制代码
1
2
3
4
5
6
7
8
9
10public void sleep(long millis) { while (millis > 0) { long begin = System.currentTimeMillis(); try { Thread.sleep(millis); } catch (Exception e) { } millis -= System.currentTimeMillis() - begin; } }
最后
以上就是鳗鱼白昼最近收集整理的关于分布式锁之redis分布式锁的全部内容,更多相关分布式锁之redis分布式锁内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复