banner
NEWS LETTER

RedisProject

Scroll down

1. Redis相关

1.1 Redis常用命令

1.2 如何使用Redis

1.使用Redis做缓存

2.使用setIfAbsent来实现互斥锁(setnx命令,set if not exist)

3.使用increment自增实现ID生成器

1.3 使用了哪些Redis数据类型?

  • string:在使用验证码登录的时候,缓存手机号对应的验证码;在查询商店信息的时候,缓存商店信息
1
2
3
4
5
//手机号 对应 验证码
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);
stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);

//商店id 对应 商店信息
  • hash:缓存登录成功的用户信息,信息有id、昵称等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//缓存登录成功的用户 token 对应 用户信息(map类型)
User user = query().eq("phone", phone).one();

if (user == null) {
user = createUserWithPhone(phone);
}

String token = UUID.randomUUID().toString();
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));

String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
  • set:购买过优惠券的用户放在set中,判断当前用户是否购买过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local voucherId = ARGV[1]
local userId = ARGV[2]

--- 库存key
local stockKey = 'seckill:stock:' .. voucherId
---·订单key
local orderKey = 'seckill:order:' .. voucherId

--- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
--- 判断用户是否重复购买
if(redis.call('sismember', orderKey, userId) == 1) then
return 2
end

--- 扣减库存,并记录用户购买信息
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)

return 0

1.4 如何保证Redis集群情况下分布式锁分可靠性?

  • RedLock算法:向所有Redis节点发送加锁请求,成功超半数才算成功加锁;释放锁的时候要对所有节点都发起释放锁的操作
  • 缺点:已经拿到锁的线程A由于GC暂停或业务超时导致锁被释放,后续线程成功拿到锁,但此时线程A还认为自己持有锁。
  • 改进:
    • 获取锁成功时给线程一个自增的令牌(Fencing Token),资源管理服务器在接受线程更新数据时校验令牌是否比当前已经记录的令牌大,如果大就正常接受,如果小说明是已经过期的令牌
    • 使用Raf共识算法

1.如何实现验证码登录功能的?

  • 首先用户输入手机号,然后点击发送验证码,服务端收到这个请求之后,校验手机号是否合法,然后生成验证码(随机6位数),以前缀和手机号为key保存验证码到Redis中
  • 客户端根据收到的验证码填入并登录,服务端收到请求之后,从Redis中校验验证码和手机号是否匹配,如果匹配就成功登录,则将用户的信息保存到Redis中,并返回随机token给客户端;客户端后续请求都带着这个token在作为autorization放在请求头里面。

img

2. 缓存更新策略

![image-20250225141924104](/Users/effy/Library/Application Support/typora-user-images/image-20250225141924104.png)

2.1 主动更新策略

  1. Cache Aside Pattern(旁路缓存):由缓存的调用者,在更新数据库的同时更新缓存
  2. Read/Write Through Pattern(读写穿透):缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题
  3. Write Behind Patter(异步缓存写入):调用者只更新缓存,由其他线程异步地将缓存数据持久化道数据库,保证最终一致性

2.2 删除缓存还是更新缓存?

  • 更新缓存:每次更新数据库都更新缓存,无效写操作较多 ❌
  • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存 ✅

2.3 如何保证缓存与数据库操作的同时成功或失败

  • 单体系统:将缓存与数据库操作放在一个事务
  • 分布式系统:利用TCC等分布式事务方案

2.4 先操作缓存还是先操作数据库

  • 先删除缓存,再操作数据库 ❌

    ![image-20250225144641785](/Users/effy/Library/Application Support/typora-user-images/image-20250225144641785.png)

  • 先操作数据库,再删除缓存 ✅

    ![image-20250225144716561](/Users/effy/Library/Application Support/typora-user-images/image-20250225144716561.png)

3. 如何解决缓存三大问题?

3.1 缓存穿透

  • 定义:请求大量缓存和数据库中都不存在的值,导致请求直接落到数据库

  • 解决方案

    • 缓存空值

      • 优点:实现简单,效率高

      • 缺点:存储大量null值,空间占用大

      • // 使用缓存空值解决商铺信息缓存穿透问题: 
        if (shop == null) {
          stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
          return Result.fail("店铺不存在");
        }
        
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32

        - **布隆过滤器**

        - 优点:空间占用少

        - 缺点:实现复杂,存在误判可能

        - ```java
        使用布隆过滤器解决商铺信息缓存穿透问题:
        @PostConstruct
        public void init() {
        RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter("shop");
        bloomFilter.tryInit(10000L, 0.02);
        List<Shop> ids = query().select("id").list();
        for (Shop shop: ids) {
        bloomFilter.add(shop.getId());
        }
        }

        @Override
        public Result queryById(Long id) {
        String key = CACHE_SHOP_KEY + id;
        //0. 先查布隆过滤器
        RBloomFilter<Object> bloomFilter = redissonClient.getBloomFilter("shop");
        if (!bloomFilter.contains(id)) {
        return Result.fail("shop doesn't exist!!");
        }

        //1. 布隆过滤器有,再查缓存
        ...
        //2. 缓存没有,再查数据库
        }
    • 增强id的复杂度,避免被猜测id规律

    • 做好数据的基础格式校验

    • 加强用户权限校验

    • 做好热点参数的限流

3.2 缓存雪崩

  • 定义:同一时段大量的缓存key失效或者Redis服务宕机,导致大量请求到达数据库
  • 解决方案:
    • 给不同的key的TTL添加随机值
    • 利用Redis集群提高服务可用性
    • 给缓存业务添加降级限流策略
    • 给业务添加多级缓存

3.3 缓存击穿

  • 定义:缓存击穿也叫热点key问题,即一个高并发访问并且缓存重建业务较复杂的key突然失效,无数请求直接打到数据库上。
  • 解决方案
互斥锁 逻辑过期
概念 线程A发现缓存过期,加锁重建缓存,后续线程获取锁失败自旋重试直至线程A释放锁,此时缓存重建成功 不设置过期时间,但在value中添加逻辑过期字段,线程A查询热点key发现value中逻辑时间过期,则线程A加锁并另起线程B重建缓存,线程A直接返回旧数据(不需等待线程B执行结束),后续线程C查询缓存发现过期后尝试加锁,如果加锁失败则说明有线程正在重建缓存,线程C也直接返回旧数据即可,线程B重建完缓存后释放锁
优点 没有额外的内存消耗
保证一致性
实现简单
线程无需等待,性能较好
缺点 线程需要等待,性能受到影响
可能有死锁风险
不保证一致性
有额外内存消耗
实现复杂
示意图 ![image-20250119151602588](/Users/effy/Library/Application Support/typora-user-images/image-20250119151602588.png) ![image-20250119151633156](/Users/effy/Library/Application Support/typora-user-images/image-20250119151633156.png)

4. 全局唯一ID生成器如何实现的?

  • 项目中使用的ID是一个64位的整数,其中第一位是符号位,永远为0,然后拼接31位的时间戳,再拼接32位Redis中存的自增值。

    • 时间戳以秒为单位,根据当前时间和基准时间的时间差计算得来,基准时间是预先设置好的,31位的时间戳最长可以支持69年左右;

    • 32位的自增值是秒内的计数器,支持每秒产生2^32个不同的ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class RedisIDWorker {
/**
* 开始时间戳,20220101
*/
private static final long BEGIN_STAMP = 1640995200L;
/**
* 序列号位数
*/
private static final int COUNT_BITS = 32;
private final StringRedisTemplate stringRedisTemplate;
public RedisIDWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix) {
//1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_STAMP;
//2. 生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
System.out.println("icr:" + keyPrefix + ":" + date);
return timeStamp << COUNT_BITS | count;
}
}

补充:

  • 全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般需要满足:

    • 唯一性
    • 高可用
    • 高性能
    • 递增性
    • 安全性
  • 常见全局唯一ID生成策略:

    • UUID
    • Redis自增:时间戳 + 计数器
    • 雪花算法
    • 数据库自增

5. 如何解决超卖问题?

项目中参考乐观锁的思想解决超卖问题。一开始是以CAS的方式,扣减库存的时候判断库存和之前读出来的值是否一致,如果一致就可以正常扣减,如果不一致就不能扣减,实际应用中发现会有很多请求失败,因为扣减库存的时候发现和原来读的值不一样,结合实际业务需求,简化成只要大于0就可以成功扣减。

1
2
3
4
5
6
7
8
9
10
//乐观锁解决超卖
//原实现,请求失败率高
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).eq("stock", voucher.getStock()).update();

//简化后
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0).update();

补充:

  • 悲观锁:添加同步锁,让线程串行执行

    • 优点:简单粗暴

    • 缺点:性能一般

  • 乐观锁:不加锁,在更新数据时比较数据是否被修改过

    • CAS、版本号

    • 优点:性能好

    • 缺点:存在成功率低的问题

6. 怎么解决一人一单问题?

单体情况下:

  • 采用用悲观锁的方式,对同一用户userId.toString().intern()加synchronized关键字,确保同一用户的请求是串行执行,减少了锁定资源的范围,一定程度上提高了项目的并发性(加在方法上锁的是当前实例,变成所有用户都得串行执行)
  • 注意点一:加锁的范围没有包括整个事务,导致synchronized代码块执行完退出后,事务还没有提交,此时新的线程又进入到synchronized代码块,所以应该锁整个方法
  • 注意点二:还有一个问题是当使用return createVoucherOrder(voucherId)直接调用的是this的方法,由于Spring的事务是通过代理对象来实现的,所以会导致事务失效,应此应该调用代理对象的的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//用synchronized实现一人一单
//存在线程安全问题,synchronized代码块执行完退出后,事务还没有提交
@Transactional(rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("已经购买过一次");
}
synchronized (userId.toString().intern()) {
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0).update();
if (!success) {
log.error("库存不足");
}

VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);
}
}

//加锁到整个事务,存在事务失效的问题
@Override
public Result seckillVoucher(Long voucherId) {
//判断秒杀是否开始、是否结束、判断库存是否充足
...
//
Long userId = UserHolder.getUSer().getId();
synchronized (userId.toString().intern()) {
return createVoucherOder(voucherId);
}
}

@Transactional(rollbackFor = Exception.class)
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("已经购买过一次");
}

boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0).update();
if (!success) {
log.error("库存不足");
}

VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIDWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);
}

//使用当前类的代理类
@Override
public Result seckillVoucher(Long voucherId) {
//判断秒杀是否开始、是否结束、判断库存是否充足
...
//
Long userId = UserHolder.getUSer().getId();
synchronized (userId.toString().intern()) {
//获取事务有关的代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return createVoucherOder(voucherId);
}
}

集群情况下:使用分布式锁

7. 你是如何实现分布式锁的?

主要是使用Redis的setnx命令来实现分布式锁的。setnx是互斥的,可以设置超时时间,当要set的key已经存在的时候,set就会失败,key不存在时才会成功,对应的方法是setIfAbsent。

自己实现的分布式锁有两个核心方法,获取锁tryLock释放锁unlock。获取锁的时候是互斥、非阻塞的,尝试一次,成功就返回true,失败就返回false。释放锁有两种方式,手动调用unlock释放或者当setnx时设置的超时时间到会自动释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SimpleRedisLock {
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLocl(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec);

return Boolean.TRUE.equals(success);
}

public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

7.1 自定义分布式锁的误删问题

自定义的分布式锁可能存在误删的问题,比如线程1在获取锁后,由于业务操作耗时久或,导致Redis超时释放,此时线程2来获取锁成功了,开始执行业务操作,线程1执行完后释放了Redis锁,但是此时锁已经被线程2获取了,导致线程1释放的是线程2的锁,就出现误删问题。

解决方案是在获取锁时存入线程标示,在释放锁时先获取锁中的线程标识,判断是否和当前线程标识一致,如果一致就释放锁,不一致就不释放锁。

线程不能直接使用线程ID,因为集群模式下容易出现冲突,而是用UUID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SimpleRedisLock implements ILock {
//线程标识前缀
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识 前缀 + 线程ID
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}

@Override
public void unlock() {
String threadId = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

//线程标识解决分布式锁误删的问题
if (threadId.equals(id)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

7.2 自定义分布式锁的原子性问题

解决误删时由于 判断锁标识是否是自己释放锁 两步操作不是原子的,还是可能出现误删的问题,所以要借助Lua脚本保证一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

/**
* 调用lua脚本原子释放锁
*/
@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

//unlock.lua
--比较线程标示与锁中的标示是否一致
if (redis.call('get',KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
return 0

补充:

分布式锁特点:

  • 多进程可见
  • 互斥
  • 高可用
  • 高性能
  • 安全性

7.3 基于setnx实现的分布式锁的问题

  1. 不可重入,同一个线程无法多次获取同一把锁
  2. 不可重试,获取锁只尝试一次就返回false,没有重试机制
  3. 超时释放,超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  4. 主从一致性,如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁失效

7.4 基于ZooKeeper实现分布式锁

  • 基于临时顺序节点Watcher(事件监听器)机制

8. Redisson有什么功能?

可重入锁、可重试、锁延时

img8.1 Redisson的可重入锁怎么实现的?

  • 用hash结构记录线程id和重入次数
  • 获取锁时,判断当前锁是否已被获取过,如果未获取,则锁记录线程标识,同时锁计数加一;如果获取过,判断线程标识是否一致,如果一致则是重入,锁计数加一,重新设置有效期,如果不一致,则获取锁失败
  • 释放锁时,判断锁是否存在,如果存在,判断是否是自己线程,如果是则将锁计数-1,判断是否为0,如果为0就释放锁,并发布消息通知,通知正在等待该锁的线程,不为0则重置有效期

8.2 Redisson的锁重试如何实现?

  • 获取锁时可以传入最大等待时间,第一次获取锁失败时,会订阅并等待释放锁的信号,在有效期内锁释放后会发布通知,收到通知后会尝试再次获取锁,又获取失败后,则会根据信号量等待一段时间后再次获取锁,然后外层套while true循环
  • 在这期间会多次比较当前剩余等待时间是否已经超时,如果已经超过最大等待时间,直接返回false;
  • 同时也不会无意义的尝试获取锁,而是通过消息订阅、信号量的方式判断是否能获取锁

8.3 Redisson的锁超时怎么解决?

  • 当过期时间设置为-1时,会开启看门狗,释放锁的时候取消watch dog
  • 每隔一段时间,重置超时时间利用超时和续期机制实现逻辑上永不过期,直到业务完成或者业务异常退出
  • 默认超时时间30s,每10s续期一次

9.RocketMQ怎么用的?

  • 我是在抢购优惠券的时候使用到的,对于一些大额的优惠券,可能会在短时间内有大量的下单请求,如果这些请求全部落到数据库上,由于并发加锁的问题,会造成接口的性能大幅下降,所以在这个地方我使用了RocketMQ来解耦Redis扣减和数据库扣减。
  • 首先把库存信息缓存到Redis当中,然后直接在Lua脚本中用Redis判断库存容量够不够、当前用户有没有重复购买,如果都校验成功,就先在Redis扣减,然后生成一个订单,加入到消息队列中,此时接口可以直接返回。
  • 消费者再从消息队列中获取订单信息,加锁写到数据库中,实现了流量的削峰。
  • 经过解耦后的接口在本地简单测试性能提升约50%,响应时间500+ms降低至200+ms

9.1 如何解决顺序消费?

  • 生产者:单一生产者串行发送
  • 全局有序:可以通过将所有消息路由到同一个队列来实现
  • 局部有序:同一生产者组通过MessageQueueSelector保证消息在同一组
  • 消费者:通过MessageListenerOrderly保证对同一个队列的串行消费

当前系统其实只在生成优惠券订单的时候使用了消息队列,下游服务只有将订单保存至数据库,没有其他下游服务,所以也就不存在需要保证顺序消费的情况,因为所有订单之间都是无序的。当然如果业务扩展,以后下游有多个服务时,可以通过Hash取模的方法把同一系列的消息放到同一个消息队列中,因为相同的消息队列消费的顺序是有序的,所以就保证了顺序消费

9.2 如何解决消息丢失问题

  • 生产者:
    • 同步:确保成功发送给消息队列才执行后续代码,可以放在同一个本地事务中
    • 异步 + 重试 + 补偿:写入消息队列不阻塞,通过回调如果写入失败就重试,如果重试超过3次,就记录到数据库后续补偿
  • broker:
    • 异步刷盘宕机时内存中的消息丢失
    • 同步刷盘:写入到磁盘中再返回给生产者成功
    • 同步复制:同步复制给从节点
  • 消费者
    • 重试 + 幂等性 + 死信队列
    • 减轻单节点负担

9.2 如何解决重复消费

  • 什么时候出现重复消费:消费者消费超时,没有给broker返回ACK,broker此时会重新投递
  • 存入Redis,并把状态设置为消费中

解决重复消费是我把保存订单至数据库的服务设计成幂等的,对于重复的订单,在插入数据库时如果订单ID相同会插入失败。

9.3 如何解决消息堆积

水平扩展消费者数量,检查是否有消费错误、是否有线程卡死,增加消费者数量的同时需要增加主题的队列数。

9.4 如何保证高性能读写

传统IO需要多次用户态内核态切换,以及数据拷贝,RocketMQ使用零拷贝的计数,将内核缓冲区和Socket缓冲区共享,减少一次拷贝操作

9.3 消息队列还有哪些用处?

可以用来实现分布式事务,数据库binlog日志同步

9.4 如何保证消息被成功消费,如果消费失败呢?

其他的中间件消息队列你知道哪些?

用消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰,是分布式和微服务系统中重要的组件之一.

  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是你的首选。
  • RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求比较高的场景可以使用。
  • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。
  • Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

![image-20250311115801061](/Users/effy/Library/Application Support/typora-user-images/image-20250311115801061.png)

10. 限流是怎么实现的?

  • 首先是自定义了一个注解,主要是两个属性type是被限流的key,count表明这个key每秒最大的请求数
1
2
3
4
5
6
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface RateConfigAnno {
String limitType();
double limitCount() default 5d;
}
  • 然后是写了一个切面,拦截所有被RateConfigAnno注解的方法,在方法执行前调用Google Guava的RateLimiter,获取这个key下的limiter,如果能成功获取到令牌就继续执行被拦截方法的业务逻辑,如果获取失败就直接返回限流信息,不再继续执行后续业务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
@Aspect
@Configuration
@Order(1)
public class GuavaLimitAOP {

@Before("execution(@RateConfigAnno * *(..))")
public void limit(JoinPoint joinPoint) {
//1. 获取当前调用的方法
Method currentMethod = getCurrentMethod(joinPoint);
if (Objects.isNull(currentMethod)) {
return;
}

//2. 从方法注解定义上获取限流的类型
String limitType = currentMethod.getAnnotation(RateConfigAnno.class).limitType();
double limitCount = currentMethod.getAnnotation(RateConfigAnno.class).limitCount();
//3. 使用guava的令牌桶算法获取一个令牌,获取不到先等待
RateLimiter rateLimiter = RateLimiterHelper.getRateLimiter(limitType, limitCount);

boolean flag = rateLimiter.tryAcquire();
if (flag) {
log.info("获取令牌成功");
} else {
log.info("获取令牌失败");
HttpServletResponse resp = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
JSONObject jsonObject = new JSONObject();
jsonObject.put("success", false);
jsonObject.put("msg", "限流中");

try {
output(resp, jsonObject.toString());
} catch (Exception e) {
log.error("error, e: {}", e);
}
}
}

private Method getCurrentMethod(JoinPoint joinPoint) {
Method[] methods = joinPoint.getTarget().getClass().getMethods();
Method target = null;

for (Method method : methods) {
if (method.getName().equals(joinPoint.getSignature().getName())) {
target = method;
break;
}
}

return target;
}

private void output(HttpServletResponse response, String msg) {
response.setContentType("application/json;charset=UTF-8");
try(ServletOutputStream outputStream = response.getOutputStream()) {
outputStream.write(msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}

11. 库存是怎么写到Redis的?

因为新增优惠券是商家端的操作,并发一般都比较小,所以我是直接在写数据库的同时写入缓存。

然后在服务启动的时候也需要把mysql中的库存同步到redis中,是写在了@PostConstruct注解下,表明这个方法需要在Bean加载前执行

6. 点赞功能如何实现的

使用Set,因为Set类型的数据结构具有

  • 不重复,符合业务的特点,一个用户只能点赞一次
  • 高性能,Set集合内部实现了高效的数据结构(Hash表)
  • 灵活性,Set集合可以实现一对多,一个博客被多个用户点赞

当然也可以选择使用Hash(Hash占用空间比Set更小),如果想要点赞排序也可以选用Sorted Set

8. 设计秒杀系统需要关注那些?

高并发和高性能

①热点数据的处理:处理热点数据的问题的关键就在于 我们如何找到这些热点数据(或者说热 key),然后将它们存在 jvm 内存里。对于本项目来说,其实可以直接放入到redis即可,因为并发量不可能过高,但是如果是像京东那种,可能就直接把redis集群给干趴下了。

②通过消息队列来进行流量削峰。验证码和回答问题来筛选出可能存在的脚本。

高可用

如果我们想要保证系统中某一个组件的高可用,往往需要搭建集群来避免单点风险,比如说 Nginx 集
群、Kafka 集群、Redis 集群(哨兵机制)。

限流:限流是从用户访问压力的角度来考虑如何应对系统故障。限流为了对服务端的接口接受请求的频率进行限制,防止服务挂掉。可以用redis,也可以用Sentinel。

排队:限流是直接拒绝用户的请求,而排队是让用户等待一定的时间。

降级:降级是从系统功能优先级的角度应对系统故障,比如请求到一定阈值之后,我们对系统中一些非核心的功能直接关闭或者让他们的功能降低。(比如双十一期间,蚂蚁森林等功能可以进行降低)

熔断:应对当前系统依赖的外部系统或者第三方系统的崩溃。例如秒杀服务位于服务A,A还有其他服务比如用户积分的管理,如果积分管理的服务接口响应特别慢的时候,其他服务直接不再请求这个接口,从而避免其他服务被这个服务拖累。

一致性

减库存:下单减扣库存

接口幂等:分布式锁。我们通过加锁的方式限制用户在第一次请求未结束之前,无法进行第二次请求。

如何解决超卖问题

通过加锁,悲观锁或者乐观锁,乐观锁分为版本号和CAS操作

本项目使用分布式锁 + lua脚本实现原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local voucherId = ARGV[1]
local userId = ARGV[2]

--- 库存key
local stockKey = 'seckill:stock:' .. voucherId
---·订单key
local orderKey = 'seckill:order:' .. voucherId

--- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end
--- 判断用户是否重复购买
if(redis.call('sismember', orderKey, userId) == 1) then
return 2
end

--- 扣减库存,并记录用户购买信息
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)

return 0

后续优化点

2.使用线程池实现查询商品详情信息等复杂任务的并行计算,并使用 Completableruture 异步编排任务,提高系统响应速度。

用什么做并发测试?

jmeter

10. 数据库相关

1. 订单表(tb_voucher_order)

每个店铺都可以发布优惠券,当用户抢购优惠券时,会生成订单并保存到tb_voucher_order表中

1.1 为什么订单表不使用数据库自增ID?

  • id规律性太明显,容易被猜出来
  • 随着订单增长,单张表数据量有限,分表多张表自增ID会重复
Other Articles