设计一个简单的点赞功能

设计一个简单的点赞功能新增功能:点赞 现在几乎所有的媒体内容,无论是商品评价、话题讨论还是朋友圈都支持点赞,点赞功能成为了互联网项目的标配,那么我们也尝试在评价系统中加入点赞功能,实现为每一个评价点赞。 豆瓣短评中的点赞:

新增功能:点赞

现在几乎所有的媒体内容,无论是商品评价、话题讨论还是朋友圈都支持点赞,点赞功能成为了互联网项目的标配,那么我们也尝试在评价系统中加入点赞功能,实现为每一个评价点赞。

豆瓣短评中的点赞:

image-20210407234242890

要实现的点赞需求细节:

image-20210414215247131

从放弃出发

完整得实现点赞系统功能是很困难的。要支持亿级的用户数量,又要做到数据归档入库,要支持高峰期百万的秒并发写入,又要实现多客户端实时同步,要记录并维护用户的点赞关系,又要展示用户的点赞列表,这样全方位的需求会产生设计上的矛盾,就像CAP矛盾一样。

典型的比如并发量和同步性的矛盾。高并发的本质是速度,网络传输速度和程序运行速度决定了系统所能承载的容量,每个请求处理速度快才能在单位时间内处理更多的请求,只是一味得增大连接数而忽略请求响应时间,并发问题得不到根本性的解决。在我看来,应用程序内部运行速度的瓶颈在于三处,优先级由高到低是网络请求、对象创建、冗余计算,网络请求对响应速度具有决定性的影响力。但是,同步性又要求我们进行网络请求,比如同步数据到mysql或redis之中。鱼与熊掌不可兼得,并发量和同步性具有不可调和的矛盾。

还有存储容量与访问速度的矛盾。要记录用户的点赞列表,就意味着要长期维护用户的点赞关系,日积月累,用户的点赞关系在单台存储系统中装不下,需要写入分布式存储系统中,这带来了额外的复杂度与调度时延,并且需要很好地设计区分维度,不同分区之间数据不耦合。而一旦一次查询跨越了多个存储节点,就会产生级联调用,具有较大的网络时延。

要实现,先舍弃。看到一个新的需求时,我习惯于反向思考,观察这个需求不涉及到哪些功能,哪些功能可以放弃,从这个角度出发,很容易找到取巧而又简单,却能满足当前需求的设计方案。

重新列一个需求清单,上面写了不需要实现哪些功能,这样做设计决策时,就豁然开朗了。

image-20210414225218732

产品经理只会给你提供表格1,他们很少会显示说明什么不需要做。在决定放弃时,还是需要商量一下,因为这些需求往往是软性的,需求文档中没有包含不一定是不需要,也有可能是没考虑到。

如何记录用户的点赞关系

点赞关系是典型的K-V类型或是集合类型,用Redis实现是比较合适的,那么用Redis中的哪种数据类型呢?

下表列出了能想到的数据类型与它们各自的优劣。

image-20210414231656907

比较关键的特性是批量查询和内存占用,批量查询特性使得可以在一次请求中查询全部的点赞关系,内存占用使得可以用尽可能少的redis节点,甚至一台redis解决存储问题。

我选择字符串类型,因为哈希类型真的很难实现点赞数据的淘汰,除非记录点赞时间并且定期全局扫描,或者记录双份哈希键,做新旧替换,代价太高,不合适。而淘汰机制本身就是解决内存占用问题,所以字符串类型不会占用异常多的内存。

image-20210415101020806

点赞操作的原子性

点赞操作需要改写两个值,一个是用户对内容的点赞关系,另一个是内容的点赞总数,这两个能不能放在一个key中表示呢?显然是不行的。所以需要先设置用户的点赞关系,再增加点赞总数,如果点赞关系已经存在,就不能增加点赞总数。

设置点赞关系可以用setnx命令实现,仅当不存在key时才设置,并返回一个是否设置的标志,根据这个标志决定是否增加点赞总数。比如:

if setnx(key1) == 1
then 
	incr(key2)

看似每个操作都是原子性的,但是这样的逻辑如果在客户端执行,整体上仍不满足原子性,仍有可能在两个操作之间发生中断,导致点赞成功但是没有增加计数的情况发生。虽然这对于点赞系统来说不是什么大问题,极少出现的概率可以接受,但是我们完全可以做的更好。

redis的事务或脚本特性可以解决上述的问题。脚本的实现更加灵活自由,而且能减少网络请求,我们选择脚本的方式:

--点赞操作,写入并自增,如果写入失败则不自增,【原子性、幂等性】
if redis.call('SETNX',KEYS[1],1) == 1
then
    redis.call('EXPIRE',KEYS[1],864000)
    redis.call('INCR',KEYS[2])
end
return redis.call('GET',KEYS[2])
--取消点赞操作,删除并递减,如果删除失败则不递减,【原子性、幂等性】
if redis.call('DEL',KEYS[1]) == 1
then
    redis.call('DECR',KEYS[2])
end
return redis.call('GET',KEYS[2])

稳定性的基本要求之一就是数据不能无限膨胀,否则迟早出问题,任何存储方案都必须设计与之对应的销毁方案,才能保证系统的稳定长久运行。所以设置KEY1的有效期非常重要,而KEY2可能需要一直保持,由其他机制来删除它,比如销毁陈旧评价或折叠评价时,需要删除对应的KEY2.

脚本返回了点赞后的总数,这对后续数据归档是有帮助的。

封装脚本操作

既然已经决定了redis存储方式,那么就先来实现它。一步一个脚印,扎扎实实地把点赞功能完成。

首先使用Spring配置Lua脚本,它自动预加载脚本,不用麻烦在redis服务器上用script load预编译。

/** * Lua脚本 */
@Configuration
public class LuaConfiguration {
    /** * [点赞]脚本 lua_set_and_incr */
    @Bean
    public DefaultRedisScript<Integer> voteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }

    /** * [取消点赞]脚本 lua_del_and_decr */
    @Bean
    public DefaultRedisScript<Integer> noVoteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }
}
/** * 点赞箱 */
@Repository
public class VoteBox {
    private final RedisTemplate<String, Object> redisTemplate;
    private final DefaultRedisScript<Integer> voteScript;
    private final DefaultRedisScript<Integer> noVoteScript;

    public VoteBox(RedisTemplate<String, Object> redisTemplate, DefaultRedisScript<Integer> voteScript, DefaultRedisScript<Integer> noVoteScript) {
        this.redisTemplate = redisTemplate;
        this.voteScript = voteScript;
        this.noVoteScript = noVoteScript;
    }

    /** * 给评价投票(点赞),用户增加评价点赞记录,评价点赞次数+1.该操作是原子性、幂等性的。 * @param voterId 投票人 * @param contentId 投票目标内容id * @return 返回当前最新点赞数 */
    public Integer vote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(voteScript, list);
    }

    /** * 取消给评价投票(点赞),用户删除评价点赞记录,评价点赞次数-1.该操作是原子性、幂等性的。 * @param voterId 投票人 * @param contentId 投票目标内容id * @return 返回当前最新点赞数 */
    public Integer noVote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(noVoteScript, list);
    }
}

点赞的流程

点赞的流程可以用如下时序图表示:

image-20210415151828448

  1. 服务端接收用户的点赞请求
  2. 执行redis脚本,并返回点赞总数信息,redis保存点赞功能的暂时数据
  3. 发送普通消息到消息队列
  4. 以上两步执行成功后响应点赞完成,否则加入重试队列
  5. 重试队列异步重试请求redis或消息队列,直到成功或重试次数用尽
  6. 消息队列消费者接收消息,并将消息写入mysql

为什么加入消息队列这个角色?因为消息队列使得同步和异步可以优雅的分离。redis命令需要在当前请求中完成,用户想看到请求的执行结果,希望在其他客户端上立刻看到自己的点赞状态,这个举例可能不太恰当,点赞也可能是单向请求,用户没有那么在乎同步性,这里只是为了演示案例。而数据入库或者是其他操作不需要在当前请求生命周期内完成。

如果同步可以称之为“在线服务”,那么异步可以称之为“半在线半离线服务”,虽然不在请求的生命周期内,但是运行于在线服务器之上,占用cpu和内存,占用网络带宽,势必给线上业务造成影响。当异步模式调整时,需要连同在线业务一起发布,造成逻辑上的耦合。而消息队列让“离线服务”成为可能,消费者可以与在线服务器独立开来,独立开发独立部署,无论是物理上还是逻辑上都完全解耦。当然前提是消息对象的序列化格式一致,所以我喜欢使用字符串作为消息对象的内容,而不是对象序列化。

实现mysql的点赞入库

设计好redis的存储方案后,接下来设计mysql的存储方案。

首先是表结构:

#点赞/投票归档表
CREATE TABLE IF NOT EXISTS vote_document
(
   id INT primary key auto_increment COMMENT 'ID',
   gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT '创建时间',
   voter_id INT not null COMMENT '投票人id',
   contentr_id INT not null COMMENT '投票内容id',
   voting TINYINT not null COMMENT '投票状态(0:取消投票 1:投票)',
   votes INT not null COMMENT '投下/放弃这一票后,内容在此刻的投票总数',
   create_date INT not null COMMENT '创建日期 如:20210414 用于分区分表'
);

insert into vote_document(voter_id,content_id,voting,votes,create_date)
values(1,1,1,1,'20210414');

显然,这是一个以Insert代替Update的日志表,无论是点赞、取消点赞还是重新点赞,都是追加新的记录,而不是修改原有记录。这样做有两个原因,一是Insert不用锁表,执行效率远高于Update,二是蕴含的信息更丰富,可以看到用户的完整行为,对于大数据分析是有帮助的。

Insert代替Update之后,一大难点就是数据聚合,解决方案就是每一次插入,都冗余地记录聚合状态,就像votes字段一样,分析时只需要拿相关评价的最后一条记录即可知道点赞总数,而不需全表扫描。

入库代码:

@Repository
public class VoteRepository {
    @Autowired
    private JdbcTemplate db;

    /** * 添加点赞 * @param vote 点赞对象 * @return 如果插入成功,返回true,否则返回false */
    public boolean addVote(/*valid*/ Vote vote) {
        String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(?,?,?,?,?)";
        return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0;
    }
}

RocketMQ

Apache RocketMQ是一种低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

消息队列核心概念:

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • Broker:中间人/经纪人,消息队列集群的节点,负责保存和收发消息。
  • 生产者:也称为消息发布者,负责生产并发送消息至Topic。
  • 消费者:也称为消息订阅者,负责从Topic接收并消费消息。
  • Tag:消息标签,二级消息类型,表示Topic主题下的具体消息分类。
  • 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

生产者发送消息到消息队列,最终发送到消费者的示意图如下:

image-20210112223820896

消息类型可以划分为

  • 普通消息。也称并发消息,没有顺序,生产消费都是并行的,拥有极高的吞吐性能
  • 事务消息。提供了保证消息一定送达到broker的机制。
  • 分区顺序消息。Topic分为多个分区,在一个分区内遵循先入先出原则。
  • 全局顺序消息。把Topic分区数设置为1,所有消息都遵循先入先出原则。
  • 定时消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定时间点进行投递
  • 延迟消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定延迟时间点进行投递

消费方式可以划分为:

  • 集群消费。任意一条消息只需要被集群内的任意一个消费者处理即可。
  • 广播消费。将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

消费者获取消息模式可以划分为:

  • Push。开启单独的线程轮询broker获取消息,回调消费者的接收方法,仿佛是broker在推消息给消费者。
  • Pull。消费者主动从消息队列拉取消息。

使用RocketMQ

我们使用某云产品的RocketMq消息队列,按照官方文档,先在云控制中心创建Group和Topic,然后引入maven依赖,创建好MqConfig连接配置对象。最后:

配置生产者(在项目A):

@Configuration
public class ProducerConfig {
    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer buildProducer() {
        return ONSFactory.createProducer(mqConfig.getMqPropertie());
    }
}

配置消费者(在项目B):

@Configuration
public class ConsumerClient {
    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private VoteMessageReceiver receiver;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID);
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10");
        consumerBean.setProperties(properties);

        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.TOPIC_ISSUE);
        subscription.setExpression(mqConfig.TAG_ISSUE);
        subscriptionTable.put(subscription, receiver);

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

创建消息接收、监听器:

/** * 投票消息接收器 */
@Component
public class VoteMessageReceiver implements MessageListener {
    private final VoteRepository voteRepository;

    public VoteMessageReceiver(VoteRepository voteRepository) {
        this.voteRepository = voteRepository;
    }

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            JSONObject object = JSONObject.parseObject(new String(message.getBody()));

            Vote vote = new Vote();
            vote.setVoterId(object.getLongValue("voterId"));
            vote.setContentId(object.getLongValue("contentId"));
            vote.setVoting(object.getIntValue("voting"));
            vote.setVotes(object.getLongValue("votes"));

            try {
                vote.validate();
                voteRepository.addVote(vote);
            } catch (IllegalArgumentException ignored) {
            }

            return Action.CommitMessage;
        }catch (Exception e) {
            e.printStackTrace();
            return Action.ReconsumeLater;
        }
    }
}

发送消息的生产者,再稍稍封装一下:

/** * 消息生产者,消息投递仓库 */
@Repository
public class MessagePoster {
    private final Producer producer;

    public MessagePoster(Producer producer) {
        this.producer = producer;
    }

    public void sendMessage(String topic, String tag, String content){
        Message message = new Message();
        message.setTopic(topic);
        message.setTag(tag);
        message.setBody(content.getBytes(StandardCharsets.UTF_8));
        producer.send(message);
    }

    public void sendMessage(String topic, String content){
        sendMessage(topic, "", content);
    }
}

发布消费者,在云控制中心测试(确保流程走通,步步为营):

image-20210415171026192

能达成一致吗

执行redis命令与发送消息这两步,能做到一致性吗,也就是常说的同时完成与同时失败?如果是同构的系统,可以利用系统本身的特性实现事务,比如同是redis操作可以使用redis事务或脚本,前面已经这么做了,如果同是数据库操作,可以使用数据库事务,其他存储系统应该也有类似的支持。

但它们是异构的系统,只能通过在客户端实现事务逻辑或者由第三方协调。常见的客户端实现方法是回滚:

try{
	redis.call(); 
    mq.call();
}catch(MqException e){	//只有mq出错时才需要回滚
    //使用反向操作回滚
    redis.rollback();
}   

但是如果回滚失败呢?如果消息发到MQ但却接收失败呢?如果依赖的服务不支持回滚呢?在苛刻的条件下实现苛刻的一致性是不可能的。

还是应该反向思考,有选择性地舍弃某些不重要的部分,才能实现我们的需求。在目前这个需求中,没有必要为了redis和MQ的同步引入第三方的事务协调,但也不能对明显的事务问题视而不见。

我总结的分布式事务解决思路导图:

image-20210415202027996

我们选择使用重试队列来解决这个问题。

设计重试队列

不局限于当前的分布式事务问题,我们设计一个较为通用的重试队列。

先设计重试队列中的基本概念:任务。一个任务由多个单元组成,可计算单元表示有返回值的方法对象,执行单元表示没有返回值的方法对象,但是会接收上一步可计算单元的返回值作为入参。任务中保持了单元的单向链表,只有当一个单元执行成功后,才会指向下一个单元继续执行,但当执行失败时,会在当前单元不断重试直到成功,已执行通过的单元不会重试。这样就保证了各个单元的稳定、有序运行,每个环节的执行具有容错性。

image-20210415210047077

基础接口,让使用者可以自己实现任务执行失败的日志记录,比如持久化磁盘或是发送到远程服务器,避免任务丢失,是保持事务一致性的兜底方案之一,设置成缺省方法使得使用者有选择地实现,不强制一定要有失败处理方案。

/** * 失败记录器 */
interface IFailRecorder {
    /** * 记录每次重试的失败情况 * @param attemptTimes 重试次数,第一次重试=0 * @param e 导致失败的异常 */
    default void recordFail(int attemptTimes, Exception e){}

    /** * 记录每次重试的失败情况 * @param attemptTimes 重试次数,第一次重试=0 */
    default void recordFail(int attemptTimes){}

    /** * 记录重试之后的最终失败 * @param e 导致失败的异常,如果没有异常,返回null */
    default void recordEnd(Exception e){}
}

定义执行的基本单元,代表需要执行一个redis操作或是发送MQ操作,接口方法可能会由调度器重复地执行,所以要求接口实现者自身保证幂等性。

/** * 可重复执行的任务 */
public interface Repeatable<V> extends IFailRecorder{
    /** * Computes a result, or throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @return computed result * @throws Exception if unable to compute a result */
    V compute(int repeatTimes) throws Exception;

    /** * Execute with no result, and throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @param receiveValue last step computed result * @throws Exception if unable to compute a result */
    default void execute(int repeatTimes, V receiveValue) throws Exception{}

    /** * Execute with no result, and throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @throws Exception if unable to compute a result */
    default void execute(int repeatTimes) throws Exception{}
}

对应的派生抽象类,主要是为了引导用户实现接口。

/** * 可计算任务 * @param <V> 计算结果类型 */
public abstract class Computable<V> implements Repeatable<V>{
    @Override
    public void execute(int repeatTimes) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }

    @Override
    public void execute(int repeatTimes, V receiveValue) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }
}

/** * 可执行任务 */
public abstract class Executable<V> implements Repeatable<V>{
    @Override
    public V compute(int repeatTimes) throws Exception {
        throw new IllegalAccessException("不支持的方法");
    }
}

重试的意义

好的重试机制可以起到削峰填谷的作用,而不好的重试机制可能火上浇油。

这不是危言耸听,仔细思考一下,程序什么情况下会失败,大致可以总结为三种情况:

  1. 参数错误导致的逻辑异常
  2. 负载过大导致的超时或熔断
  3. 不稳定的网络与人工意外事故

其中对于情况1进行重试是完全没有意义的,参数错误的问题应该通过改变参数来解决,逻辑异常应该修复逻辑bug,无脑重试只能让错误重复发生,只会浪费cpu。对于情况2的重试得小心,因为遇到流量波峰而失败,短时间内重试很可能再次遭遇失败,并且这次重试还会带来更大的流量压力,像滚雪球一样把自己搞垮,也就是火上浇油。

对于情况3的重试就非常有价值,尤其是对于具有SLA协议的第三方服务。第三方服务可能因为种种意外(比如停服更新),导致服务短暂不可用,但是却不违反SLA协议。将这种失败情况加入重试队列,确保只要第三方服务在较长的一段时间内有响应,任务就可以成功,如果第三方服务一直没有响应而导致任务最终失败,那么他往往也就破坏了SLA协议,可以申请赔偿了。

所以,设计重试策略时首先需要判断什么情况下需要重试,可以设定当出现特定的比如参数错误的异常时,就没必要重试了,直接失败即可。可以设定只要当返回参数不为空时才算成功。可以设置固定的重试间隔,让两个重试之间拉开比较长的时间。

更聪明的做法是,使用断路器模式,借助当前连接对目标服务器的请求结果,如果不符预期(异常比率大),就暂时阻塞重试队列中等待的任务,隔一段时间再试探一下。

重试队列与普通限流降级或熔断的区别:

image-20210415234437188

重试的策略

重试策略决定任务何时发起重试,重试策略接口:

/** * 重试策略,决定任务何时可以重试 */
public interface IRetryStrategy {

    /** * 现在是否应该执行重试 * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */
    boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId);

    /** * 通知一次失败 * @param itemId 当前的执行项目id */
    void noticeFail(int itemId);

    /** * 通知一次成功 * @param itemId 当前的执行项目id */
    void noticeSuccess(int itemId);
}

基本实现类:

/** * 指定间隔时间的重试策略 */
public class DefinedRetryStrategy implements IRetryStrategy {
    private final int[] intervals;

    public DefinedRetryStrategy(int... intervals) {
        if (intervals.length == 0) {
            this.intervals = new int[]{0};
        } else {
            this.intervals = intervals;
        }
    }

    private DefinedRetryStrategy() {
        this.intervals = new int[]{0};
    }

    /** * 现在是否应该执行重试 * * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */
    @Override
    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
        return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L;
    }

    @Override
    public void noticeFail(int itemId) {

    }

    @Override
    public void noticeSuccess(int itemId) {

    }

    /** * 根据当前重试次数,获取下一次重试等待间隔(单位:秒) */
    private int getWaitSecond(int attemptTimes) {
        if (attemptTimes < 0) {
            attemptTimes = 0;
        }

        if (attemptTimes >= intervals.length) {
            attemptTimes = intervals.length - 1;
        }

        return intervals[attemptTimes];
    }
}

使用断路器实现重试策略,断路器内部实现省略:

/** * 断路器模式实现的智能的重试策略 */
public class SmartRetryStrategy extends DefinedRetryStrategy {
    //断路器集合
    private final Map<Integer, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();

    private final Object LOCK = new Object();

    private static CircuitBreaker newCircuitBreaker() {
        return new ExceptionCircuitBreaker();
    }

    public SmartRetryStrategy(int[] intervals) {
        super(intervals);
    }

    private CircuitBreaker getCircuitBreaker(Integer itemId) {
        if (!circuitBreakers.containsKey(itemId)) {
            synchronized (LOCK) {
                if (!circuitBreakers.containsKey(itemId)) {
                    circuitBreakers.put(itemId, newCircuitBreaker());
                }
            }
        }

        return circuitBreakers.get(itemId);
    }

    /** * 现在是否应该执行重试 * * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */
    @Override
    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {
        //如果基本条件不满足,则不能重试
        if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) {
            return false;
        }

        //断路器是否允许请求通过
        return canPass(itemId);
    }

    /** * 通知一次失败 * * @param itemId 当前的执行项目id */
    @Override
    public void noticeFail(int itemId) {
        getCircuitBreaker(itemId).onFail();
    }

    /** * 通知一次成功 * * @param itemId 当前的执行项目id */
    @Override
    public void noticeSuccess(int itemId) {
        getCircuitBreaker(itemId).onSuccess();
    }

    /** * 是否允许通过 */
    public boolean canPass(int itemId){
        return getCircuitBreaker(itemId).canPass();
    }
}

可重试任务

根据上面的结构图,定义可重试任务接口:

/** * 重试任务 */
public interface IRetryTask<V> {
    /** * 执行一次重试 * @return 如果执行成功,返回true,否则返回false */
    boolean tryOnce();

    /** * 是否应该关闭任务 * @return 如果达到最大重试次数,返回true,表示可以关闭 */
    boolean shouldClose();

    /** * 现在是否应该执行重试 * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false */
    boolean shouldTryAtNow();

    /** * 获取执行结果 */
    V getResult();
}

然后设计抽象类:

/** * 重试任务. * 非线程安全 */
public abstract class AbstractRetryTask<V> implements IRetryTask<V> {
    //重试等待间隔
    protected final IRetryStrategy retryStrategy;

    //当前重试次数
    protected int curAttemptTimes = -1;

    //最大重试次数
    private final int maxAttemptTimes;

    //上一次重试的时间戳
    protected long lastTimestamp = 0;

    public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) {
        this.retryStrategy = retryStrategy;
        this.maxAttemptTimes = maxAttemptTimes;
    }

    /** * 执行一次重试 * * @return 如果执行成功,返回true,否则返回false */
    @Override
    public boolean tryOnce() {
        if (isFinished()) {
            return true;
        }

        setNextCycle();

        //执行重试
        doTry();

        //重试任务执行异常或者返回null,将视为执行失败
        return isFinished();
    }

    /** * 是否结束 */
    protected abstract boolean isFinished();

    /** * 执行回调 */
    protected abstract void doTry();

    /** * 是否应该关闭任务 * * @return 如果达到最大重试次数,返回true,表示可以关闭 */
    @Override
    public boolean shouldClose() {
        return curAttemptTimes >= maxAttemptTimes;
    }

    //设置下一执行周期
    private void setNextCycle() {
        curAttemptTimes++;
        lastTimestamp = System.currentTimeMillis();
    }
}

以及实现类:

/** * 多段重试任务. 任务链路执行失败时,下一次重试从失败的点继续执行。 */
@Slf4j
public class SegmentRetryTask<V> extends AbstractRetryTask<V> {
    //分段执行方法
    private final List<Repeatable<V>> segments;

    //当前执行片段,上一次执行中断的片段
    private int currentSegment = 0;

    //上一次的执行结果值
    private V result;

    public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List<Repeatable<V>> segments) {
        super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes);
        this.segments = segments;
    }

    /** * 执行回调 */
    @Override
    protected void doTry() {
        try {
            for (; currentSegment < segments.size(); currentSegment++) {
                //如果当前断路器打开,不尝试执行
                if (retryStrategy instanceof SmartRetryStrategy){
                    if (!((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) {
                        segments.get(currentSegment).recordFail(curAttemptTimes, new CircuitBreakingException());
                        return;
                    }
                }

                //如果抛异常,分段计数器不增加,下次从这个地方执行
                Repeatable<V> repeatable = segments.get(currentSegment);
                if (!execute(repeatable)) return;
            }
        } catch (Exception e) {
            retryStrategy.noticeFail(currentSegment);
            if (currentSegment < segments.size()) {
                if (shouldClose()) {
                    segments.get(currentSegment).recordEnd(e);
                } else {
                    segments.get(currentSegment).recordFail(curAttemptTimes, e);
                }
            }
        }
    }

    private boolean execute(Repeatable<V> repeatable) throws Exception {
        if (repeatable instanceof Computable) {
            result = repeatable.compute(curAttemptTimes);
            if (result == null) {
                repeatable.recordFail(curAttemptTimes);
                retryStrategy.noticeFail(currentSegment);
                return false;
            }
            retryStrategy.noticeSuccess(currentSegment);
        }

        if (repeatable instanceof Executable) {
            if (result == null) {
                repeatable.execute(curAttemptTimes);
            } else {
                repeatable.execute(curAttemptTimes, result);
            }
            retryStrategy.noticeSuccess(currentSegment);
        }
        return true;
    }

    @Override
    protected boolean isFinished() {
        return currentSegment >= segments.size();
    }

    /** * 现在是否应该执行重试 * * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false */
    @Override
    public boolean shouldTryAtNow() {
        return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment);
    }

    /** * 获取执行结果 */
    @Override
    public V getResult() {
        return result;
    }
}

一个单元测试,当然单元测试有很多,不能全贴出来,这里只展示有代表性的:

class SegmentRetryTaskTest {
    private final List<String> messages = new ArrayList<>();

    @Test
    void doTry() {
        List<Repeatable<String>> list = new ArrayList<>();
        list.add(new Computable<>(){
            @Override
            public String compute(int repeatTimes) throws Exception {
                if (repeatTimes < 2)
                    throw new Exception();
                if (repeatTimes < 4)
                    return null;
                messages.add("result:good");
                return "good";
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                messages.add("fail:" + attemptTimes);
            }

            @Override
            public void recordFail(int attemptTimes) {
                messages.add("fail:" + attemptTimes);
            }

            @Override
            public void recordEnd(Exception e) {
                messages.add("end");
            }
        });

        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes, String receiveValue) throws Exception {
                messages.add("receive:" + receiveValue);
                throw new Exception("exc");
            }

            @Override
            public void recordEnd(Exception e) {
                messages.add("end:" + e.getMessage());
            }
        });

        IRetryTask retryTask = new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 5, list);

        //重试未开始
        assertFalse(retryTask.shouldClose());

        //重试直到成功
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.shouldClose());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertFalse(retryTask.tryOnce());
        assertTrue(retryTask.shouldClose());

        assertTrue(messages.contains("result:good"));
        assertTrue(messages.contains("fail:1"));
        assertTrue(messages.contains("fail:2"));
        assertTrue(messages.contains("fail:3"));
        assertFalse(messages.contains("end"));
        assertTrue(messages.contains("receive:good"));
        assertTrue(messages.contains("end:exc"));
    }
}

重试队列的运作

image-20210416101646494

线程安全的重试队列。
* (Spring-retry 和 guava-retrying都不完全适合这个场景,决定自己开发一个简单的重试机制)
* 重试队列会尽最大努力让任务多次执行并成功,使用时需要考虑以下几点。
* 1.重试队列存储在内存之中,暂未同步到磁盘,要求使用者可以承受丢失的风险。
* 2.重试不保证一定会成功,它将在重试一定的次数后结束,如果最终失败,将记录失败结果。
* 3.为了不让频繁的重试让系统的负载过大,建议设置恰当的重试间隔,以起到削峰填谷的作用。
* 4.当超过重试队列允许容纳的数量时,将抛出异常。
* 5.重试任务将在独立的线程中执行,不会阻塞当前线程
* 6.重试任务执行异常或者返回null,将视为执行失败。暂不支持拦截自定义异常。
* 7.由于网络问题,远程过程执行成功未必代表会返回成功,重试任务需要实现幂等性。
* 8."队列"仅指按先进先出的顺序扫描任务,任务移除队列操作取决于其何时完成或结束
*

实现重试队列

/** * 线程安全的重试队列。 * @author sunday * @version 0.0.1 */
public final class RetryQueue {
    //重试任务队列(全局唯一)
    private final static Deque<IRetryTask> retryTaskList = new ConcurrentLinkedDeque<>();

    //重试任务工厂
    private final IRetryTaskFactory retryTaskFactory;

    public RetryQueue(IRetryTaskFactory retryTaskFactory) {
        this.retryTaskFactory = retryTaskFactory;
    }

    static {
        Thread daemon = new Thread(RetryQueue::scan);
        daemon.setDaemon(true);
        daemon.setName(RetryConstants.RETRY_THREAD_NAME);
        daemon.start();
    }

    //扫描重试队列,执行重试并移除任务(如果成功),周期性执行
    private static void scan() {
        while (true) {
            //先执行,再删除
            retryTaskList.removeIf(task -> retry(task) || task.shouldClose());

            // wait some times
            try {
                TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL);
            } catch (Throwable ignored) {
            }
        }
    }

    //执行重试
    private static boolean retry(/*not null*/IRetryTask task) {
        if (task.shouldTryAtNow()) {
            return task.tryOnce();
        }
        return false;
    }

    /** * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。 * * @param segments 分段执行任务 * @param <V> 结果返回类型 * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。 * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常 */
    public final <V> V submit(List<Repeatable<V>> segments) throws RetryRefuseException {
        if (segments == null || segments.size() == 0) {
            return null;
        }

        IRetryTask<V> task = retryTaskFactory.createRetryTask(segments);

        //在当前线程执行
        if(!task.tryOnce()){
            //失败后加入队列
            ensureCapacity();
            retryTaskList.push(task);
        }

        //只要当前已经有执行结果,就返回,即便是加入了重试队列
        return task.getResult();
    }

    /** * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。 * * @param repeatable 执行任务 * @param <V> 结果返回类型 * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。 * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常 */
    public final <V> V submit(Repeatable<V> repeatable) throws RetryRefuseException {
        return submit(List.of(repeatable));
    }

    //确保容量
    private void ensureCapacity() throws RetryRefuseException {
        //非线程安全,高并发下可能短暂冲破最大容量,不过问题不大
        if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) {
            throw RetryRefuseException.getInstance();
        }
    }

    /** * 队列是否为空 * * @return 如果当前无正在执行的任务,返回true */
    public boolean isEmpty() {
        return retryTaskList.isEmpty();
    }
}

单元测试:

class RetryQueueTest {
    private final static int NUM = 100000;
    private List<String> messages1 = Collections.synchronizedList(new ArrayList<>());
    private List<String> messages2 = Collections.synchronizedList(new ArrayList<>());

    IRetryTaskFactory taskFactory = new IRetryTaskFactory() {
        @Override
        public <V> IRetryTask createRetryTask(List<Repeatable<V>> segments) {
            return new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 10, segments);
        }
    };

    RetryQueue retryQueue = new RetryQueue(taskFactory);

    @Test
    void submit() {
        List<Repeatable<String>> list = new ArrayList<>();
        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes) throws Exception {
                if (repeatTimes < 4)
                    throw new Exception();
                messages1.add("good");
            }
        });

        //模拟高并发提交
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        Semaphore semaphore = new Semaphore(0);
        for (int i = 0; i < NUM; i++) {
            executorService.submit(() -> {
                try {
                    retryQueue.submit(list);
                } catch (RetryRefuseException e) {
                    fail();
                }
                semaphore.release();
            });
        }

        executorService.shutdown();

        //等待执行完成
        try {
            semaphore.acquire(NUM);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //等待执行完成
        while (!retryQueue.isEmpty()) Thread.yield();
        assertEquals(NUM, messages1.size());
        for (String s : messages1) {
            assertEquals(s, "good");
        }
    }
}

久等的点赞实现代码

好了,轮子已经造完了,可以开始写点赞服务的代码了:

/** * 投票服务 */
@Service
@Slf4j
public class VoteService {
    private final VoteBox voteBox;
    private final MessagePoster mq;
    private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory());

    public VoteService(VoteBox voteBox, MessagePoster mq) {
        this.voteBox = voteBox;
        this.mq = mq;
    }

    /** * 给评价投票(点赞) * * @param voterId 投票人 * @param contentId 投票目标内容id * @param voting 是否进行点赞(true:点赞 false:取消点赞) * @return 当前内容点赞后的总数,如果点赞失败,抛出异常 * @throws VoteException 投票异常 */
    public int vote(long voterId, long contentId, boolean voting) throws VoteException {
        /* * 第零种情况:用户请求没有发送到服务器,用户可以适时重试。 * 第一种情况:执行1失败,最终点赞失败,记录日志,加入重试队列池,用户也可以适时重试。 * 第二种情况:执行1成功,但返回时网络异常,最终点赞失败,记录日志,加入重试队列池,用户也可能适时重试,该方法是幂等的。 * 第三种情况:执行1成功,但并未增加点赞总数,因为这次是重复提交。仍然执行之后的逻辑,该方法是幂等的。 * 第四种情况:执行1成功,但执行2失败,记录日志,把发送mq加入重试队列池,返回成功。 * 第五种情况:执行方法成功,但返回过程网络异常,用户未收到响应,用户稍后可以查询出点赞结果,用户也可以适时重试 */

        List<Repeatable<Integer>> list = new ArrayList<>();

        //1.先在redis中投票
        list.add(new Computable<>() {
            @Override
            public Integer compute(int repeatTimes) {
                return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId);
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                //只记录第一次错误
                if (attemptTimes == 0)
                    log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }

            @Override
            public void recordEnd(Exception e) {
                //放弃重试.当然,日志会记录下来,或者通过其他机制将失败记录到中央存储库中,最终还是可以恢复。
                log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }
        });

        //2.再通知mq
        list.add(new Executable<>() {
            @Override
            public void execute(int repeatTimes, Integer receiveValue) {
                JSONObject object = new JSONObject();
                object.put("voterId", voterId);
                object.put("contentId", contentId);
                object.put("voting", voting ? 1 : 0);
                object.put("votes", receiveValue);
                mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString());
            }

            @Override
            public void recordFail(int attemptTimes, Exception e) {
                if (attemptTimes == 0)
                    log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }

            @Override
            public void recordEnd(Exception e) {
                log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
            }
        });

        Integer value = null;
        try {
            //系统可能因为mq或者redis自身的过载等问题导致点赞失败,我们想珍惜用户的一次点赞,所以选择为他重试。
            value = retryQueue.submit(list);
        } catch (RetryRefuseException e) {
            log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);
        }

        if (value == null){
            //当前无法获得投票总数,意味着点赞操作失败,虽然我们会稍后重试,但仍将这个信息告知用户,他们可以进行更理智的选择。
            throw new VoteException("投票失败,请稍后再试");
        }

        return value;
    }

    private static class SegmentRetryTaskFactory implements IRetryTaskFactory {
        private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[]{10,100,100,1000,10000});

        @Override
        public <V> IRetryTask<V> createRetryTask(List<Repeatable<V>> segments) {
            return new SegmentRetryTask<>(waitStrategy, 5, segments);
        }
    }
}

补充说明:

  1. 封装工厂对象的目的是为了简化构造方法参数,并且复用不变对象,如重试策略。
  2. 只要重试队列执行有返回结果,哪怕只是部分成功,仍可以算作接口响应成功,剩余部分加入重试队列。
  3. 如果重试队列执行全部失败,没有返回结果,则抛出异常,毕竟此刻确实失败了,用户有权知道。
  4. 只有熔断器闭合时,才会执行任务,否则将会一直等待,可以设置恰当的中止策略来完善这个机制。
  5. 重试队列这个轮子在其他很多场景也都有用武之地,依照我的理解,它大致算是“仓库层”。

但就点赞实现来说,没有必要使用重试,实际上,mq是多节点高可用的,一般不会出现问题,并且,mq自带了重试功能。mq的重试机制是,在一次请求中,如果失败了,立刻向另外的broker发起请求,是一种负载均衡融合高可用的设计。在不要求刚性事务的情景下,可以认为mq是可靠的。

给评价添加点赞

评价列表的数据是相对静态的,不含用户个性化信息,可以很容易地缓存供所有人访问,但是一旦加上用户对每个评价的点赞关系,或是实时变化的点赞数量信息,就变得难以缓存了。我们选择动静分离,静态的数据按照原先的缓存策略不变,动态的数据专门从redis服务中获取,然后再追加到静态数据上。

服务层、控制层,就是数据的聚合层、任务的委派层。

而至于数据聚合,有三种模式:

image-20210416110925640

我们选择第三种方式,这次设计点赞功能,只是作为评价系统的一部分。

在RemarkService中添加如下代码:

/** * 给评价列表添加点赞信息,在现有列表数据上修改 * @param remarks 评价列表 * @param consumerId 用户id * @return 修改后的评价列表 */
public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){
    if (remarks == null || remarks.size() == 0) {
        return remarks;
    }

    //获取评价id列表
    List<Object> idList = new ArrayList<>();
    for (int i = 0; i < remarks.size(); i++) {
        idList.add(remarks.getJSONObject(i).getString("id"));
    }

    //获取并添加点赞总数
    List<String> voteKeys = new ArrayList<>();
    for (Object s : idList) {
        voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s));
    }
    List<Object> voteValues = redisRepository.readAll(voteKeys);
    for (int i = 0; i < remarks.size(); i++) {
        remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i));
    }

    //未传用户id,查询时不附带个人点赞数据
    if (consumerId == null) {
        return remarks;
    }

    //获取并添加个人点赞状态
    List<String> votesKeys = new ArrayList<>();
    for (Object s : idList) {
        votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s));
    }
    List<Object> votingValues = redisRepository.readAll(votesKeys);
    for (int i = 0; i < remarks.size(); i++) {
        remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1);
    }

    return remarks;
}

//更新商品的评价缓存
private void updateRemarkCache(String itemId){
    //吞掉异常,让更新评价方法不影响原操作的执行结果
    try {
        redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId);
    } catch (Exception e) {
        log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId);
    }
}

修改查询评价列表接口,聚合内容:

/** * 查询商品关联的评价,一次查询固定的条目 * @param itemId 商品id * @param curIndex 当前查询坐标 */
@GetMapping("/remark")
public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){
    Assert.isTrue(!StringUtils.isEmpty(itemId), "商品id不能为空");
    Assert.isTrue(curIndex > 0, "查询坐标异常");

    JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH);

    //原列表是从redis或db中读取的静态数据,而点赞数据每时每刻都在变化,分开获取这两个部分。
    return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId));
}

优化点:评价的点赞总数信息是固定的,是用户无关的,可以与评价内容结合在一起缓存在内存中,而用户的点赞信息只能每次请求都去redis查询。

推荐优质评价

完整的评价系统应该能够输出一个优质评价内容的推荐列表,作为用户查看商品评价时的默认展示。

何为”优质内容“呢?我的理解是具有话题性、高热度、内容丰富的评价内容,其中”点赞总数“是衡量高热度的重要指标之一。当前,我们就以点赞数量为唯一指标,算出优质内容并提供查询接口。未来引入其他指标时,也可能会继续沿用这种设计思路。

评价表中有votes字段,可以据此排序生成前n条数据:

select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?

需要注意的是,votes字段并不随着用户点赞而更新它,因为频繁的更新是低效的。可以通过定期汇总的方式来更新votes字段,点赞表保存着评价的最新点赞总数,所以可以每隔1天或1小时,筛选这期间内对应内容的最近一条点赞,就可以更新votes了。

不管基础数据是在何种数据库何种表中,不管是通过什么方式,我都将这一步骤称为”回源“,回源是缓存未命中时的一种行为概念。

在加载推荐评价时,回源算法为

public List<Remark> listRecommendRemarks(/*not null*/ String itemId, int expectCount){
    if (expectCount <= 0)
        return new ArrayList<>();

    Assert.isTrue(expectCount <= MAX_LIST_SIZE, "不允许一次性查询过多内容");

    String sql = "select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?";
    return db.query(sql, (resultSet, i) -> {
        Remark remark = new Remark();
        remark.setId(resultSet.getLong(1));
        remark.setConsumerId(resultSet.getLong(2));
        remark.setOrderId(resultSet.getString(3));
        remark.setItemId(itemId);
        remark.setScore(resultSet.getShort(4));
        remark.setHeader(resultSet.getString(5));
        remark.setContent(resultSet.getString(6));
        remark.setImages(resultSet.getString(7));
        remark.setUsername(resultSet.getString(8));
        remark.setUserface(resultSet.getString(9));
        remark.setCreateTime(resultSet.getString(10));
        return remark;
    }, itemId, expectCount);
}

接下来所要做的,只要将这部分内容保存到缓存,然后输出就可以了。

原子性地替换列表

推荐评价是一个列表,我选择使用Redis的LIST数据类型,可以方便地进行范围查询,参考上篇文章的评价列表。

但是Redis并未直接提供替换列表的操作,只有DEL、LRPUSH、RENAME等命令组合在一起可以才能实现,但客户端的组合操作是非原子性的,不用多说,又要使用脚本了:

--删除并创建列表
--params 1 2
--KEYS 列表键名 代理键
--ARGV 列表

redis.call('DEL', KEYS[1])
for i= 1, #ARGV do
    redis.call('RPUSH', KEYS[1], ARGV[i])
end

--延长代理锁的过期时间
redis.call('SET', KEYS[2], 1)
redis.call('EXPIRE',KEYS[2], 3600)

查询推荐评价的主要代码如下:

@Cacheable(value = "recommend")
public JSONArray listRecommendRemarks(/*not null*/ String itemId, int start, int stop) {
    try {
        if (remarkRedis.shouldUpdateRecommend(itemId)) {
            //加锁成功,需要加载数据库中的评价内容到redis
            remarkQueue.push(itemId, () -> reloadRecommendRemarks(itemId));
        }

        return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop));
    } catch (Exception e) {
        log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop);
        return SystemConstants.EMPTY_ARRAY;
    }
}

其中,仍使用代理键的模式,使Redis存储主要业务数据的列表永不过期,避免缓存击穿以及频繁的分布式阻塞加锁。

一些重要的redis操作代码:

//保存推荐内容并重置过期时间
public void saveRecommendData(String itemId, /*not null*/ List<Remark> list) {
    String[] argv = new String[list.size()];
    for (int i = 0; i < list.size(); i++) {
        argv[i] = JSONObject.fromObject(list.get(i)).toString();
    }
    redisTemplate.execute(resetListScript,
            List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId,
                    RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv);
}

//读取推荐内容
public JSONArray readRecommendRange(String itemId, int start, int stop) {
    String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId;
    return range(start, stop, key);
}

//是否应该更新推荐
public boolean shouldUpdateRecommend(String itemId) {
    Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId);
    return flag == null || !flag;
}

冷启动与空数据

冷启动是指服务的第一次上线或者redis在零缓存下重新启动时,这时,任何的缓存都未加载,或者之前加载过,现在因为意外已经不存在了。这时,代理锁会过期,SETNX命令成功,加锁成功的线程会同步数据库数据到redis,这样业务数据KEY就不再为空了。如果同步过程出现失败,锁会在2秒后自动过期,新的线程会继续接任这项未完成的使命。如果业务数据加载完成,那么就随即延迟代理锁的寿命为1小时,这样1小时之后才会触发同步。整个流程是异步的,用户请求的线程只会读取业务数据KEY,有则返回,无则为空。也就是说,接口只在冷启动的几秒内是返回为空的,这是可以接受的,因为冷启动只在新业务上线或者redis内存无法恢复这些极为特殊的时间点才会出现。

空数据是指数据库的内容是原本就是空的。根据上面的设计思路,可以得出结论,如果数据库内容为空,那么业务数据KEY是空的,也就是nil,不存储占位符,因为代理KEY已经起到占位符的作用了。这一点来看,一个简简单单的代理KEY,可以起到防止缓存击穿、防止同步阻塞、占位符等作用。

后续

可能会更新一些抽奖、秒杀活动的实现方法。

今天的文章设计一个简单的点赞功能分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/23428.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注