🤔️测试问我:为啥阅读量计数这么简单的功能你都能写出bug?

前言

可乐他们团队最近在做一个文章社区平台,由于人手不够,后端部分也是由前端同学来实现,使用的是 nest

今天他接到了一个需求,就是在用户点开文章详情的时候,把阅读量 +1 ,这里不需要判断用户是否阅读过,无脑 +1 就行。

它心想:这么简单,这不是跟 1+1 一样么。

往期文章

仓库地址

初版实现

我们用的 orm 框架是 typeorm ,然后他看了一眼官方文档,下面是一个更新的例子。

🤔️测试问我:为啥阅读量计数这么简单的功能你都能写出bug?

文章表里有一个字段 views ,表示该文章的阅读量。那我是不是把这篇文章的 views 取出来,然后 +1 ,再塞回去就可以了呢?

啪一下,就写出了下面这样的代码:

  async addView(articleId: number) {
    const entity = await this.articleRepository.findOne({
      where: { id: articleId },
      select: ['views', 'id'],
    });
    entity.views = entity.views + 1;
    await this.articleRepository.save(entity);
  }

然后就美滋滋的提测,继续摸鱼去了。

并发bug

不出意外的话意外就要发生了,测试下午就找到了可乐,说这个实现有 bug ,具体复现是在并发压测的时候。

这里用一个 node 脚本来模拟一下并发的请求:

import axios from "axios";

const axiosInstance = axios.create({
  withCredentials: true,
  headers: {
    Cookie: "your cookie",
  },
});

for (let i = 0; i < 10; i++) {
  axiosInstance.get("http://localhost:3000/api/articles/getArticleInfo?id=2");
}

本来应该阅读量加 10 的,结果只加了 1

🤔️测试问我:为啥阅读量计数这么简单的功能你都能写出bug?

可乐当时脑瓜子嗡嗡的,这还能有 bug

具体来说,这是 mysql 处理并发的机制—— MVCC ,它有几种默认的隔离级别。默认的隔离级别是可重复读,在可重复读的隔离级别下,会出现以下这种情况:

🤔️测试问我:为啥阅读量计数这么简单的功能你都能写出bug?

也就是说,当多个客户端同时读取相同的文章实体,然后分别对其浏览次数进行增加,并尝试保存回数据库,这有可能前面的提交会被后提交的操作覆盖,导致阅读量的的更新丢失。

update…set

最简单的解决方案就是不要取出来再更新,而是使用 mysqlupdate...set 语句,它本身自带的锁可以帮我们规避掉这种问题。

await this.articleRepository.query(
  'UPDATE articles SET views = views + 1 WHERE id = ?',
  [articleId],
);

这一次再用测试脚本去跑的时候,发现是没有问题的了,加的次数是对的。

🤔️测试问我:为啥阅读量计数这么简单的功能你都能写出bug?

乐观锁

如果你非要取出来,在代码里面做一些操作,再更新。那么也可以试试乐观锁,乐观锁的实现通常会使用比较与交换 (Compare-and-Swap,CAS) 或类似的机制来确保数据的一致性。

比较与交换是一种并发算法,用于实现原子操作,它会比较内存位置的值与预期值,如果相等则进行更新操作,否则不做任何操作或者重新尝试。

在乐观锁中,当读取实体时,会同时获取一个版本号标识,然后在保存时比较这个标识是否与读取时一致,如果一致,则执行更新操作。

具体的实现是在表里增加一个 version 字段:

ALTER TABLE jueyin.articles ADD version INT DEFAULT 0 NULL;

然后把查询出来的 version 值作为更新条件之一,判断这次更新是否生效,如果不生效,则等待并重试

async addView(articleId: number) {
    const oldVersion = await this.articleRepository.findOne({
      where: { id: articleId },
      select: ['version'],
    });
    const res = await this.articleRepository.query(
      'UPDATE articles SET views = views + 1,version = version + 1 WHERE id = ? AND version = ?',
      [articleId, oldVersion.version],
    );
    // 没有更新成功
    if (res.affectedRows === 0) {
      await wait();
      await this.addView(articleId);
    }
  }

当然,这个阅读量 +1 的场景其实也没必要用到乐观锁了。这里只是通过这个场景,来举例乐观锁的使用。

悲观锁

我们一般使用 SELECT FOR UPDATE 实现悲观锁,即在数据读取的同时锁定数据,以防止其他事务修改数据,从而确保当前事务能够安全地读取和更新数据。

SELECT FOR UPDATE 获取数据并将选定行上的排他锁,别的事务无法读取和修改该行的数据。

总的来说,SELECT FOR UPDATE 适用于需要确保数据一致性和完整性的场景,特别是在并发环境中需要对共享数据进行读取和修改的情况下。通过锁定数据行,可以确保在操作过程中其他事务不会对相同的数据进行修改,从而避免并发冲突和数据不一致性。

但是,这可能会导致大量的线程阻塞,因此使用与否需要根据你自身的业务场景来进行判断。

  async addView(articleId: number) {
    return this.entityManager.transaction(async (entityManager) => {
      const query = `SELECT views FROM articles WHERE id = ? FOR UPDATE`;
      const res = await entityManager.query(query, [articleId]);
      const value = res[0].views + 1;
      await entityManager.update(
        ArticleEntity,
        { id: articleId },
        { views: value },
      );
    });
  }

消息队列

再拓展一下,首先阅读量计数是一个频繁的逻辑。尤其对一些热榜文章来说,所以这里可以用到消息队列来做流量控制。

我们可以把一些频繁触发的任务交给消息队列去处理,然后再通过控制消息队列的消费频率或消费者个数,让一些任务可以较为平滑的处理而不是一下子全部处理,这样可以平衡系统的负载。

而且,也可以指定消息队列的消费者数量,比如你可以指定只有一个消费者,来规避上述的并发问题。

我们这里用到了 Bull ,它是一个基于 redis 的消息队列。它提供了简单而灵活的 API ,可用于实现各种队列和后台任务处理的需求。

首先安装一下需要的包:

npm i bull @nestjs/bull

然后可以在 app.module.ts 中进行配置:

@Module({
  imports: [
    ScheduleModule.forRoot(),
    BullModule.forRoot({
    redis: {
      host,
      port,
      db: 2,
      password: configService.get<string>('REDIS_PASSWORD', 'password'),
    },
  })
  ],
})
export class AppModule { }

以阅读量计数为例,当需要对文章的阅读量计数时,可以往队列里面扔一个任务,然后队列根据配置的逻辑,把任务取出来执行。

article.service 中,主要组装好参数,然后调用 queue-provider.service ,并加上重试逻辑

  async addView(articleId: number) {
    const addQueue = async (retryTime = 3) => {
      if (retryTime === 0) {
        return;
      }
      try {
        const jobId = uuidv4();
        await this.queueProviderService.pushAddView({
          jobId,
          articleId,
        });
      } catch (error) {
        addQueue(retryTime - 1);
      }
    };

    await addQueue();
  }

这个 jobId 主要是用来保证消息只会被消费一次,至于怎么做,在流程的最后会介绍。

然后来实现一个 queue-provider.service ,它主要是操作队列,往队列里面丢东西。

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { QUEUE } from '../utils/constant';

@Injectable()
export class QueueProviderService {
  constructor(@InjectQueue(QUEUE) private readonly queue: Queue) {}

  async pushAddView(data: { jobId: string; articleId: number }) {
    await this.queue.add('addView', data, {
      attempts: 5, // 任务失败后的重试次数
      backoff: {
        type: 'exponential', // 两次重试之间的时间间隔将以指数形式增长
        delay: 5000, // 重试时间间隔,单位:毫秒
      },
    });
  }
}

这样,消息就被发送到了队列中,然后我们还要实现一个消息的消费者,来消费这条消息。

import { Process, Processor } from '@nestjs/bull';
import { QUEUE } from '../utils/constant';
import { ArticleService } from '../modules/article/article.service';

@Processor(QUEUE)
export class QueueConsumerService {
  constructor(private articleService: ArticleService) {}
  @Process('addView')
  async handleAddView(job: any) {
    // 处理任务
    console.log('Processing job:', job.data);
    const { jobId, articleId } = job.data;
    await this.articleService.handleAddView({ jobId, articleId });
  }
}

这里实现一个 queue-consumer.service ,用来处理消息的消费,最后还是调用了 article.service 中的方法,这里 article.servicehandleAddView 方法就是用来真正阅读量计数的。

  async handleAddView(data: { jobId: string; articleId: number }) {
    const { jobId, articleId } = data;
    await this.entityManager.transaction(async (transactionalEntityManager) => {
      try {
        await transactionalEntityManager.save(JobRecordEntity, {
          jobId,
        });
      } catch (error) {
        if (error?.sqlMessage?.includes('Duplicate entry')) {
          return;
        } else {
          throw error;
        }
      }
      await this.articleRepository.query(
        'UPDATE articles SET views = views + 1,version = version + 1 WHERE id = ?',
        [articleId],
      );
    });
  }

好的,这里就需要提到一开始说的 jobId ,这个 jobId 可以认为是这条消息的唯一标识,我们是用 uuid 这个库生成的。一个 jobId 对应的消息只应该被消费一次,这里被消费一次的定义是,一条消息只能是阅读量 +1

在生产者一侧,我需要保证消息一定发成功,所以我们会加重试逻辑。但是这并不意味着消费者只收到了一条消息。所以这里需要保证消息的幂等性。

可以使用一个唯一索引来做这件事情,这里新建一张 job_records 表,并把 job_id 作为唯一索引。

-- jueyin.job_records definition
CREATE TABLE `job_records` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `job_records_job_id_IDX` (`job_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=40 DEFAULT CHARSET=utf8mb4;

然后在真正消费消息的时候会开启一个事务,如果报了数据重复的数据,表示这个 jobId 已经存在,这条消息已经被消费过了,此时可以直接 return ,提交事务。

否则就正常走事务的逻辑,成功就提交,失败就回滚。

感悟

或许很多前端都有成为全栈的梦想,所以自发去学习一些后端知识。但无论是前端工程师,还是后端工程师,他们都是软件工程师。软件工程师的必备技能应该是网络、数据库、操作系统。

我见过工作几年的后端只会用框架里的分页组件,让他写一个分页 sql 写不出来;我也见过在分布式系统下用了内存缓存,导致的一些奇葩问题。。

不是说学了一些上层的 node 框架怎么用,会写写 CURD 就是全栈(虽然真正工作中大多数都是这个),我们还是要不断精进自身的能力,才能一步步成为更资深的开发。

诸君,共勉~

最后

以上就是本文的全部内容,如果你觉得有意思的话,点点关注点点赞吧~

原文链接:https://juejin.cn/post/7358704806779764777 作者:可乐鸡翅kele

(0)
上一篇 2024年4月18日 上午11:11
下一篇 2024年4月18日 下午4:00

相关推荐

发表回复

登录后才能评论