Redis 应用篇

从应用角度介绍Redis常见应用面试题。

缓存

缓存穿透

(1)概念

客户端请求的数据在缓存和数据库中都不存在。如果持续发起这个请求,那么缓存会失效,好像不存在。

(2)解决方法

  • 缓存空对象
  • 布隆过滤器

缓存空对象是在数据库未命中时,缓存一个null并设置一个短有效时间。

布隆过滤器则是在缓存前的一道关卡,查询数据是否存在,具体实现未知。

缓存雪崩

(1)概念

缓存雪崩就是同一时间大量key失效或者直接Redis宕机。那么这时候针对不同的key的大量请求都会要求重建缓存,数据库压力大。

(2)解决方法

  • 为不同的key设置不同的过期时间(随机TTL)
  • Redis集群

缓存击穿

(1)概念

缓存击穿也称之为热点key问题,一旦热点key失效,那么大量的请求都会打到数据库(在缓存重建时间内)。

(2)解决方法

  • 互斥锁:为缓存重建的过程加锁,只有一个线程能拿到锁进行缓存重建,没拿到锁的线程休眠一段时间,再次查缓存,直到缓存重建完成
  • 逻辑过期:不设置热点key的过期时间,由代码判断key的value中的expireTime字段是否过期。如果过期,由互斥锁保证开启一个新线程重建缓存,未获得锁的其他线程直接返回旧数据。

缓存与数据库的一致性

读操作:缓存命中直接返回;缓存未命中,则查询数据库,然后更新缓存并设置有效时间。

写操作:写数据库,然后选择主动更新缓存抑或删除缓存。

四种同步策略(数据库更新时主动更新缓存)

  • 先更新缓存再更新数据库:第二步失败缓存库是脏数据
  • 先更新数据库再更新缓存:第二步失败缓存库是旧数据
  • 先删除缓存再更新数据库:第二步失败缓存库是空数据
  • 先更新数据库、再删除缓存(推荐):第二步失败缓存库是旧数据

更新缓存or删除缓存

更新缓存的优点是每次数据变化都能及时更新缓存,但缺点是操作消耗大,频繁更新缓存会影响服务器性能。

删除缓存优点是操作简单,惰性重建,将重建代价移动到下一次访问。

更新数据库与删除缓存的时序

先删缓存再更新数据库:线程1删除缓存后,线程2查询缓存不存在,然后重新写入缓存,此时缓存为旧数据。线程1再更新数据库为新数据。那么就出现了缓存旧数据、数据库新数据的不一致性。

先更新数据库在删除缓存:恰好缓存数据失效,线程1查缓存失效,查数据库旧数据;线程2更新数据库;线程1再更新缓存。这种不一致性出现概率低。

最优同步策略:先更新数据库、再删除缓存

所以我们得到结论:先更新数据库、再删除缓存是影响更小的方案。如果第二步出现失败的情况,则可以采用重试机制解决问题。

同步删除方案: 先更新数据库、再删除缓存。适用于不强制要求数据一致性的情景

流程:先更新数据库、再删除缓存。

问题:

  • 并发时脏数据:在查询数据库到写缓存期间其他线程执行了一次更新删除,导致缓存的数据是旧数据
  • 缓存删除失败:删除失败导致缓存库还是旧数据

同步删除+可靠消息方案

同步删除+可靠消息删除: 解决缓存删除失败问题,利用可靠消息多次重试删除缓存操作。

流程:先更新数据库、再删除缓存,如果删除失败就发可靠MQ不断重试删除缓存,直到删除成功或重试5次。

问题:消息队列中消息消费有时延,数据不一致时间较长(适用于不强制要求数据一致性的情景);MQ多次重试失败,导致长期脏数据

延迟双删:更高一致性

流程:先删除缓存再更新数据库,然后在从数据库库更新后再删一次缓存。

为什么要延时呢?为了分布式系统下主从同步。

数据工作的大致流程:

  1. 主节点删除 redis 库数据;
  2. 主节点修改 mysql 库数据;
  3. 当前业务处理 等待一段时间,等 redis 和 mysql 主从节点数据同步;
  4. 从节点 redis 主库删除数据;
  5. 其它服务节点读取 redis 从库数据,发现没有数据,从 mysql 从库读取数据,并写入 redis 主库。

问题:时间无法控制,不能保证在数据库从库更新后删除缓存。如果在从库更新前删除,用户再在更新前查从库又把脏数据写在缓存里了。

异步监听数据库+可靠消息删除

流程:

  1. 更新数据库后不做操作;
  2. Canal等组件监听binlog发现有更新时就发可靠MQ删除缓存;
  3. 如果删除缓存失败,就基于手动ack、retry等机制,让消息在有限次数之内不断重试。

优点:

  • 异步删除,性能更高;
  • 可靠消息重试机制,多次删除保证删除成功。

问题:要求canal等binlog抓取组件高可用,如果canal故障,会导致长期脏数据。

限流

常见限流算法:

  • 计数器算法
  • 滑动窗口算法
  • 漏桶算法
  • 令牌桶算法

计数器算法

比如说我要维持1秒内最多200个请求访问,那么我就设置一个计数器表示1秒内请求访问的数量,每隔一秒固定清空计数器。可以自定义窗口时间。

1
2
3
4
5
6
7
8
9
10
boolean fixedWindowsTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
if (currentTime - lastRequestTime > windowUnit) { //检查是否在时间窗口内
counter = 0; // 计数器清0
lastRequestTime = currentTime; //开启新的时间窗口
}
if (counter >= threshold) return flase;
counter++; //计数器加1
return true;
}

坏处:临界请求问题。比如0.9s有200个请求,1.1s有200个请求,那么实际在0.5到1.5s之间处理了400个请求。

滑动窗口算法

滑动窗口的思想是将一个窗口细分为若干个小窗口,每个小窗口独立负责一个时间段的请求限流,随着时间的前进,最旧的窗口丢弃,新的空窗口生成。

通过细化窗口的方式,能够将临界请求也一起细化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int SUB_CYCLE = 10; // 单位时间划分的小周期(单位时间是1分钟,10s一个小格子窗口,一共6个格子)
private int thresholdPerMin = 100; // 每分钟限流请求数
private final TreeMap<Long, Integer> counters = new TreeMap<>(); // 计数器, key为当前窗口的开始时间值秒,value为当前窗口的计数
boolean slidingWindowsTryAcquire() {
//获取当前时间在哪个小周期窗口
long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / SUB_CYCLE * SUB_CYCLE;
//当前窗口总请求数
int currentWindowNum = countCurrentWindow(currentWindowTime);

if (currentWindowNum >= thresholdPerMin) return false;
counters.get(currentWindowTime)++;
return true;
}

/**
* 统计当前窗口的请求数
*/
private int countCurrentWindow(long currentWindowTime) {
//计算窗口开始位置
long startTime = currentWindowTime - SUB_CYCLE* (60/SUB_CYCLE-1);
int count = 0;

//遍历存储的计数器
Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Integer> entry = iterator.next();
// 删除无效过期的子窗口计数器
if (entry.getKey() < startTime) {
iterator.remove();
} else {
count =count + entry.getValue();
}
}
return count;
}

漏桶算法

漏桶算法采取一个容器存储到来的请求,以生产者消费者的思想处理请求。客户端是生产者,实际流量的产生者;服务端是消费者,负责消费容器中的请求。容器的大小是固定的,超出容器大小的请求将会被丢弃。

这样就能保证以不超过固定消费速率的方式处理请求。缺点是:消费速率固定,无法调整。如果大量请求到来,我们希望提高系统的处理效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private long rate; // 每秒处理数(出水率)
private long currentWater; //当前剩余水量
private long refreshTime; // 最后刷新时间
private long capacity; // 桶容量

boolean leakybucketLimitTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
long outWater = (currentTime - refreshTime) / 1000 * rate; //流出的水量 =(当前时间-上次刷新时间)* 出水率
long currentWater = Math.max(0, currentWater - outWater); // 当前水量 = 之前的桶内水量-流出的水量
refreshTime = currentTime; // 刷新时间

// 当前剩余水量还是小于桶的容量,则请求放行
if (currentWater < capacity) {
currentWater++;
return true;
}

// 当前剩余水量大于等于桶的容量,限流
return false;
}

令牌桶算法

令牌桶算法改良了漏桶算法,使得消费的速率能够调整。具体实现为:漏桶不再存储请求,而是存储令牌。每个请求要先获得令牌才能被处理。

此时桶的生产者和消费者倒置:由服务端以可调整的速率生成令牌,由客户端请求消费令牌。如果请求拿不到令牌,就直接放弃处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private long putTokenRate; //每秒处理数(放入令牌数量)
private long refreshTime; //最后刷新时间
private long capacity; //令牌桶容量
private long currentToken = 0L; // 令牌桶容量

boolean tokenBucketTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
long generateToken = (currentTime - refreshTime) / 1000 * putTokenRate; //生成的令牌 =(当前时间-上次刷新时间)* 放入令牌的速率
currentToken = Math.min(capacity, generateToken + currentToken); // 当前令牌数量 = 之前的桶内令牌数量+放入的令牌数量
refreshTime = currentTime; // 刷新时间

//桶里面还有令牌,请求正常处理
if (currentToken > 0) {
currentToken--; //令牌数量-1
return true;
}

return false;
}

Redis实现限流

滑动窗口法

主要是利用Redis的ZSet数据结构来实现,每个请求的value都要保持互不相同,score为当前时间戳。利用ZSet的range,查询当前时间窗口内的请求数量,进而达到限流作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
public Response limitFlow(){
        Long currentTime = new Date().getTime();
        System.out.println(currentTime);
        if(redisTemplate.hasKey("limit")) {
            Integer count = redisTemplate.opsForZSet().rangeByScore("limit", currentTime -  intervalTime, currentTime).size();        // intervalTime是限流的时间 
            System.out.println(count);
            if (count != null && count > 5) {
                return Response.ok("每分钟最多只能访问5次");
            }
        }
        redisTemplate.opsForZSet().add("limit",UUID.randomUUID().toString(),currentTime);
        return Response.ok("访问成功");
    }

通过上述代码可以做到滑动窗口的效果,并且能保证每N秒内至多M个请求,缺点就是zset的数据结构会越来越大。

令牌桶方法

Redis实现令牌桶,实际就是让Redis充当桶的角色。类似消息队列的意味,利用Redis的list实现令牌的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获得令牌
public Response limitFlow2(Long id){
        Object result = redisTemplate.opsForList().leftPop("limit_list");
        if(result == null){
            return Response.ok("当前令牌桶中无令牌");
        }
        return Response.ok(articleDescription2);
    }
// 10S的速率往令牌桶中添加UUID,只为保证唯一性
    @Scheduled(fixedDelay = 10_000,initialDelay = 0)
    public void setIntervalTimeTask(){
        redisTemplate.opsForList().rightPush("limit_list",UUID.randomUUID().toString());
    }

依靠Java的定时任务,定时往List中rightPush令牌,当然令牌也需要唯一性。

全局ID生成

全局id生成器,是在分布式系统下用来生成全局唯一ID的工具,

设计思想:时间戳以秒为单位,留下1秒的时间让业务并发。这1秒的时间最多2的32次方个id。我不同的业务使用不同的key自增,业务再怎么大,一秒2的32次方个id也是够用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Component
public class RedisIdWorker {
//开始时间戳
private static final long BEGIN_TIMESTAMP = 1674086400L;

//序列号位数
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime time = LocalDateTime.now();
long nowSecond = time.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2.生成序列号,redis自增长,redis单个key自增长有上限,2的64次方
//2.1获取当前日期,精确到天
String date = time.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并返回,不能使用字符串方式拼接
return timestamp << COUNT_BITS | count;//先向左移32位,那么低32位全为0,跟序列号进行或操作

}

/**
* 生成开始时间戳
* @param args
*/
public static void main(String[] args) {
LocalDateTime time = LocalDateTime.of(2023, 1, 19, 0, 0, 0);
long second = time.toEpochSecond(ZoneOffset.UTC);
System.out.println(second);
}
}

分布式锁

简单的分布式锁

利用setnx实现简单的分布式锁,锁名称由业务决定,并完成锁的初始化(初始化需要传入锁名称)tryLock返回加锁是否成功的判断。锁需要设置超时过期时间,避免业务阻塞造成锁无法释放。

获取锁

1
SET lock thread1 NX EX 10

释放锁

1
DEL key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String lock_name; //业务名称
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
lock_name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
// 1、确定key和value
String key = "lock:" + lock_name;
String value = Thread.currentThread().getId() + "";
// 2、执行setnx
Boolean succ = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(succ);
}

@Override
public void unlock() {
String key = "lock:" + lock_name;
stringRedisTemplate.delete(key);
}
}

加锁失败,由业务自己决定是否重试。

改进分布式锁

锁二次删除问题

考虑这么一个场景:线程获得锁后阻塞,超过过期时间,锁删除。第二个线程获得锁,但一个线程恢复后删除了第二个线程持有的锁。最后导致第三个线程又获得了锁。

改进方法:利用锁的value来判断当前持有锁的线程。每个机器上线程都要有一个全局ID。释放锁时需要判断锁是否是自己的。

加锁:指定锁的value 为线程的全局ID。

释放锁:释放锁一共有三个步骤,先获取锁的value,然后判断value是否为当前线程ID,如果是才能删除锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String lock_name; //业务名称
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
lock_name = name;
}
private final String KEY_PREFIX = "lock:";
private final String VALUE_PREFIX = UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
// 1、确定key和value
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
// 2、执行setnx
Boolean succ = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(succ);
}

@Override
public void unlock() {
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
String lock_value = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.equals(value, lock_value)) // 线程只能释放自己持有的锁!
stringRedisTemplate.delete(key);
}
}

释放锁原子性问题

释放锁的过程可能被阻塞(JVM的垃圾回收机制导致的短暂阻塞),因此要采用LUA脚本保证原子性。

(1)编写lua脚本

1
2
3
4
5
6
7
8
9
10
-- 获取key和value
local key = KEYS[1]
local value = ARGV[1]

-- 判断get(key)和value是否一致
local lock_value = redis.call('GET', key)
if lock_value == value then
return redis.call('DEL', key)
end
return 0

(2)加载lua脚本并执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String lock_name; //业务名称
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
lock_name = name;
}
/**
* 加载Lua脚本
*/
private static final DefaultRedisScript<Long> LOCK_SCRIPT;

static {
LOCK_SCRIPT = new DefaultRedisScript<>();
LOCK_SCRIPT.setLocation(new ClassPathResource("lock.lua"));
LOCK_SCRIPT.setResultType(Long.class);
}

private final String KEY_PREFIX = "lock:";
private final String VALUE_PREFIX = UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
// 1、确定key和value
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
// 2、执行setnx
Boolean succ = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(succ);
}

@Override
public void unlock() {
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
// 调用LUA脚本释放锁,保证原子性
stringRedisTemplate.execute(LOCK_SCRIPT, Collections.singletonList(key), value);
}
}

至此,一个简单的不可重入的无重试机制的分布式锁就完成了。

后续改进:

  • 可重入性(需要记录线程获锁的次数,当次数为0时释放锁)
  • 重试机制
  • 续约机制
  • 主从一致性问题

可重入分布式锁实现

可重入锁需要维护引用计数,简单的string类型无法满足需求,需要换用hash类型。

加锁:两次判断,第一次判断锁是否存在;如果不存在创建锁;如果存在,增加引用计数。

释放锁:两次判断,引用计数减1;如果引用计数为0,删除锁。

由于加锁和释放锁都是多步操作,因此均采用LUA脚本。

(1)获取锁的lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2]
-- 1、查询key是否存在
-- 1.1 不存在直接获取锁,并设置过期时间
if (redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, '1')
redis.call('expire', key, releaseTime)
return 1
end

-- 2、如果key存在,再比较field是否是当前线程(锁是否是自己的)
-- 2.1 如果是当前线程,引用计数加1并重设有效期返回ture
if (redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, '1')
redis.call('expire', key, releaseTime)
return 1
end
-- return 1 获取锁成功,0失败
return 0

(2)释放锁的lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
local key = KEYS[1]
local threadId = ARGV[1]
local releaseTime = ARGV[2]

-- 1、判断要释放的锁是否存在
if (redis.call('hexists', key, threadId) == 0) then
return nil
end
-- 2、锁存在,引用计数减1
local count = redis.call('HINCRBY', key, threadId, -1);

-- 进一步判断是否需要释放锁
if (count > 0) then
-- 重入次数大于0,说明不能释放锁,且刷新锁的有效期
redis.call('EXPIRE', key, releaseTime);
return nil;
else
-- 重入次数等于0,说明可以释放锁
redis.call('DEL', key);
return nil;
end

(3)可重入锁类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.StrUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String lock_name; //业务名称
private long timeOutSec;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name){
this.stringRedisTemplate = stringRedisTemplate;
lock_name = name;
}
/**
* 加载Lua脚本
*/
private static final DefaultRedisScript<Long> LOCK_SCRIPT;
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
LOCK_SCRIPT = new DefaultRedisScript<>();
LOCK_SCRIPT.setLocation(new ClassPathResource("get_lock.lua"));
LOCK_SCRIPT.setResultType(Long.class);
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("release_lock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

private final String KEY_PREFIX = "lock:";
private final String VALUE_PREFIX = UUID.randomUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
this.timeOutSec = timeoutSec;
// 1、确定key和value
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
// 2、执行setnx
// Boolean succ = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);
Long result = stringRedisTemplate.execute(LOCK_SCRIPT,
Collections.singletonList(key), value, Long.toString(timeoutSec));
// System.out.println(result);
return result!=null && result.equals(1L);
}

@Override
public void unlock() {
String key = KEY_PREFIX + lock_name;
String value = VALUE_PREFIX + Thread.currentThread().getId();
// String lock_value = stringRedisTemplate.opsForValue().get(key);
// if(StrUtil.equals(value, lock_value)) // 线程只能释放自己持有的锁!
// stringRedisTemplate.delete(key);
// 调用LUA脚本释放锁,保证原子性
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(key), value, Long.toString(this.timeOutSec));
}
}

滚动推送

TimeLine推送模型

最典型的就是朋友圈,按照内容产生的时间顺序推送给关注用户。

实现模式:推模式、拉模式、推拉结合

推模式就是当前用户发布一条内容后,马上将这条内容推送到每个粉丝的收件箱。适合粉丝数少的一般人。

拉模式则相反,用户发布的内容先暂存到发件箱,当粉丝用户查看内容时,再从发件箱拉取内容到收件箱消费。适合粉丝众多的大V。

推拉模式结合适合用于大V的活跃粉丝和不活跃粉丝。活跃粉丝用推模式,不活跃粉丝用拉模式。

Redis实现推模式

推模式包括推送加收取。推送:将内容id推送的粉丝的收件箱;拉取:从个人收件箱拉取推文id。

可以利用set维护用户关注表,查找出所有粉丝ID。收件箱利用ZSet,每个推文加上时间戳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
save(blog);
// 1、查找当前用户的粉丝
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
// 2、将blog的id推送到粉丝的收件箱
follows.forEach(
follow -> {
stringRedisTemplate.opsForZSet().add(FEED_KEY+follow.getUserId(), blog.getId().toString(), System.currentTimeMillis());
}
);
return Result.ok(blog.getId());
}

滚动分页查询:回想起朋友圈,可以一直往下滚动,浏览旧时间的朋友圈内容。如果想消费最新内容,往上滑刷新,就得到了最新内容。

实现:对推文进行时间戳进行降序排序,记录每次读的最后一条推文的时间戳位置。下次读从上次记录到的位置开始。如果是刷新的情况,则从头开始读(新的推文一定在头部,因为时间戳较大)。

这在Redis的ZSet对应的命令如下:

1
ZREVRANGEBYSCORE key Max Min LIMIT offset count

ZSet+Reverse+RangeByScore,按照分数降序排序。Max则是要读取的最大分数,Min是要读取的最低分数。offset就是偏移量,count就是读取的数据条数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public Result queryBlogOfFollow(Long lastId, Integer offset) {
Long userId = UserHolder.getUser().getId();
//1、从Redis的收件箱中查询博客id
// ZREVRANGEBYSCORE key Max Min LIMIT offset count
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(FEED_KEY + userId, 0, lastId, offset, 2);
if(typedTuples==null||typedTuples.isEmpty()){
return Result.ok();
}
//2、解析获得的数据
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int ofset = 1;
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
String idStr = typedTuple.getValue();
ids.add(Long.valueOf(idStr));
long time = typedTuple.getScore().longValue();
if(time == minTime){
ofset+=1;
}else{
minTime = time;
ofset=1;
}
}
//3、从数据库中查询博客的具体信息
String idStrs = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStrs + ")")
.list();
blogs.forEach(blog->{
queryUserByBlog(blog);
isBlogLiked(blog);
});
// 4、封装并返回
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setOffset(ofset);
scrollResult.setMinTime(minTime);
return Result.ok(scrollResult);
}

其中第一次查询时,lastId传当前时间戳;此后第二次查询,lastId为上次查询的最小时间戳,

偏移量是为了记住同一时间发布的推文数量,假如上次查询的最小时间戳一共有5条推文,那么下次查询必须从第6次推文读起。

作者

Desirer

发布于

2024-02-19

更新于

2024-11-15

许可协议