来源: 第二节:抢单流程优化1(小白写法→lock写法→服务器缓存+队列(含lock)→Redis缓存+原子性+队列【干掉lock】) – Yaopengfei – 博客园
一. 小白写法
1.设计思路
纯DB操作
DB查库存→判断库存→(DB扣减库存+DB创建订单)
2.分析
A.响应非常慢,导致大量请求拿不到结果而报错
B.存在超卖现象
C.扣减库存错误
3.压测结果
前提:原库存为10000,这里统计2s内可处理的并发数,以90%百分位为例,要求错误率为0。
代码分享:
/// <summary> /// 原始版本-纯DB操作 /// </summary> /// <param name="userId">用户编号</param> /// <param name="arcId">商品编号</param> /// <param name="totalPrice">订单总额</param> /// <returns></returns> public string POrder1(string userId, string arcId, string totalPrice) { try { //1. 查询库存 var sArctile = _baseService.Entities<T_SeckillArticle>().Where(u => u.articleId == arcId).FirstOrDefault(); if (sArctile.articleStockNum - 1 > 0) { //2. 扣减库存 sArctile.articleStockNum--; //3. 进行下单 T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = userId; tOrder.orderNum = Guid.NewGuid().ToString("N"); tOrder.articleId = arcId; tOrder.orderTotalPrice = Convert.ToDecimal(totalPrice); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; _baseService.Add<T_Order>(tOrder); _baseService.SaveChange(); return "下单成功"; } else { //卖完了 return "卖完了"; } } catch (Exception ex) { throw new Exception(ex.Message); } }
测试结果:
(1). 100并发,需要1788ms,订单数量插入正确,但库存扣减错误。
(2). 200并发,需要4453ms,订单数量插入正确,但库存扣减错误。
二. lock写法
1.设计思路
纯DB操作的基础上Lock锁
Lock { DB查库存→判断库存→(DB扣减库存+DB创建订单) }
2.分析
A. 解决超卖现象
B. 响应依旧非常慢,导致大量请求拿到结果而报错
3.压测结果
前提:原库存为10000,这里统计2s内可处理的并发数,以90%百分位为例,要求错误率为0。
代码分享:
/// <summary> /// 02-纯DB操作+Lock锁 /// </summary> /// <param name="userId">用户编号</param> /// <param name="arcId">商品编号</param> /// <param name="totalPrice">订单总额</param> /// <returns></returns> public string POrder2(string userId, string arcId, string totalPrice) { try { lock (_lock) { //1. 查询库存 var sArctile = _baseService.Entities<T_SeckillArticle>().Where(u => u.articleId == arcId).FirstOrDefault(); if (sArctile.articleStockNum - 1 > 0) { //2. 扣减库存 sArctile.articleStockNum--; //3. 进行下单 T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = userId; tOrder.orderNum = Guid.NewGuid().ToString("N"); tOrder.articleId = arcId; tOrder.orderTotalPrice = Convert.ToDecimal(totalPrice); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; _baseService.Add<T_Order>(tOrder); _baseService.SaveChange(); return "下单成功"; } else { //卖完了 return "卖完了"; } } } catch (Exception ex) { throw new Exception(ex.Message); } }
(1). 30并发,需要2132ms,订单数量插入正确,库存扣减正确。
(2). 100并发,需要9186ms,订单数量插入正确,库存扣减正确。
三. 服务器缓存+队列
1.设计思路
生产者和消费者模式→流量削峰(异步的模式平滑处理请求)
A. Lock{ 事先同步DB库存到缓存→缓存查库存→判断库存→订单相关信息服务端队列中 }
B. 消费者从队列中取数据批量提交信息,依次进行(DB扣减库存+DB创建订单)
2.分析
A. 接口中彻底干掉了DB操作, 并发数提升非常大
B. 服务宕机,原队列中的下单信息全部丢失
C. 但是生产者和消费者必须在一个项目及一个进程内
3.压测结果
前提:原库存为10000,这里统计2s内可处理的并发数,以90%百分位为例,要求错误率为0。
代码分享:
初始化库存到内存缓存中
/// <summary> /// 后台任务-初始化库存到缓存中 /// </summary> public class CacheBackService : BackgroundService { private IMemoryCache _cache; private StackExchange.Redis.IDatabase _redisDb; private IConfiguration _Configuration; public CacheBackService(IMemoryCache cache,RedisHelp redisHelp, IConfiguration Configuration) { _cache = cache; _redisDb = redisHelp.GetDatabase(); _Configuration = Configuration; } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { // EFCore的上下文默认注入的请求内单例的,而CacheBackService要注册成全局单例的 // 由于二者的生命周期不同,所以不能相互注入调用,这里手动new一个EF上下文 var optionsBuilder = new DbContextOptionsBuilder<ESHOPContext>(); optionsBuilder.UseSqlServer(_Configuration.GetConnectionString("EFStr")); ESHOPContext context = new ESHOPContext(optionsBuilder.Options); IBaseService _baseService = new BaseService(context); //初始化库存信息,连临时写在这个位置,充当服务器启动的时候初始化 var data = await _baseService.Entities<T_SeckillArticle>().Where(u => u.id == "300001").FirstOrDefaultAsync(); //服务器缓存 _cache.Set<int>($"{data.articleId}-sCount", data.articleStockNum); } }
队列定义和下单接口
/// <summary> /// 基于内存的队列 /// </summary> public static class MyQueue { private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>(); public static ConcurrentQueue<string> GetQueue() { return _queue; } } /// <summary> /// 03-服务端缓存+队列版本+Lock /// </summary> /// <param name="userId">用户编号</param> /// <param name="arcId">商品编号</param> /// <param name="totalPrice">订单总额</param> /// <returns></returns> public string POrder3(string userId, string arcId, string totalPrice) { try { lock (_lock) { //1. 查询库存 int count = _cache.Get<int>($"{arcId}-sCount"); if (count - 1 >= 0) { //2. 扣减库存 count = count - 1; _cache.Set<int>($"{arcId}-sCount", count); //3. 将下单信息存到消息队列中 var orderNum = Guid.NewGuid().ToString("N"); MyQueue.GetQueue().Enqueue($"{userId}-{arcId}-{totalPrice}-{orderNum}"); //4. 把部分订单信息返回给前端 return $"下单成功,订单信息为:userId={userId},arcId={arcId},orderNum={orderNum}"; } else { //卖完了 return "卖完了"; } } } catch (Exception ex) { throw new Exception(ex.Message); } }
基于内存的消费者
/// <summary> /// 后台任务--基于内存队列的消费者(已经测试) /// </summary> public class CustomerService : BackgroundService { private IConfiguration _Configuration; public CustomerService(IConfiguration Configuration) { _Configuration = Configuration; } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { // EFCore的上下文默认注入的请求内单例的,而CacheBackService要注册成全局单例的 // 由于二者的生命周期不同,所以不能相互注入调用,这里手动new一个EF上下文 var optionsBuilder = new DbContextOptionsBuilder<ESHOPContext>(); optionsBuilder.UseSqlServer(_Configuration.GetConnectionString("EFStr")); ESHOPContext context = new ESHOPContext(optionsBuilder.Options); IBaseService _baseService = new BaseService(context); Console.WriteLine("下面开始执行消费业务"); while (true) { try { string data = ""; MyQueue.GetQueue().TryDequeue(out data); if (!string.IsNullOrEmpty(data)) { List<string> tempData = data.Split('-').ToList(); //1.扣减库存---禁止状态追踪 var sArctile = context.Set<T_SeckillArticle>().AsNoTracking().Where(u => u.id == "300001").FirstOrDefault(); sArctile.articleStockNum = sArctile.articleStockNum - 1; context.Update(sArctile); //2. 插入订单信息 T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = tempData[0]; tOrder.orderNum = tempData[3]; tOrder.articleId = tempData[1]; tOrder.orderTotalPrice = Convert.ToDecimal(tempData[2]); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; context.Add<T_Order>(tOrder); int count = await context.SaveChangesAsync(); //释放一下 context.Entry<T_SeckillArticle>(sArctile).State = EntityState.Detached; Console.WriteLine($"执行成功,条数为:{count},当前库存为:{ sArctile.articleStockNum}"); } else { Console.WriteLine("暂时没有订单信息,休息一下"); await Task.Delay(TimeSpan.FromSeconds(1)); } } catch (Exception ex) { Console.WriteLine($"执行失败:{ex.Message}"); } } } }
(1). 1000并发,需要600ms,订单数量插入正确,库存扣减正确。
(2). 2000并发,需要1500ms,订单数量插入正确,库存扣减正确。
四. Redis缓存+原子性+队列【干掉lock】
1.设计思路
生产者和消费者模式→流量削峰(异步的模式平滑处理请求)
思路同上,缓存和队列改成基于Redis的。
2. 分析
A. 引入Redis缓存和消息队列代替基于内存的缓存和队列,数据可以持久化解决了丢失问题。
B. Redis是单线程的,利用api自身的原子性,从而可以干掉lock锁。
C. 引入进程外的缓存Redis,从而可以把生产者和消费者解耦分离,可以作为两个单独的服务运行。
3. 压测结果
前提:原库存为10万,这里统计2s内可处理的并发数,以90%百分位为例,要求错误率为0。
代码分享:
初始化库存到redis缓存中
/// <summary> /// 后台任务-初始化库存到缓存中 /// </summary> public class CacheBackService : BackgroundService { private IMemoryCache _cache; private StackExchange.Redis.IDatabase _redisDb; private IConfiguration _Configuration; public CacheBackService(IMemoryCache cache,RedisHelp redisHelp, IConfiguration Configuration) { _cache = cache; _redisDb = redisHelp.GetDatabase(); _Configuration = Configuration; } protected async override Task ExecuteAsync(CancellationToken stoppingToken) { // EFCore的上下文默认注入的请求内单例的,而CacheBackService要注册成全局单例的 // 由于二者的生命周期不同,所以不能相互注入调用,这里手动new一个EF上下文 var optionsBuilder = new DbContextOptionsBuilder<ESHOPContext>(); optionsBuilder.UseSqlServer(_Configuration.GetConnectionString("EFStr")); ESHOPContext context = new ESHOPContext(optionsBuilder.Options); IBaseService _baseService = new BaseService(context); //初始化库存信息,连临时写在这个位置,充当服务器启动的时候初始化 var data = await _baseService.Entities<T_SeckillArticle>().Where(u => u.id == "300001").FirstOrDefaultAsync(); //Redis缓存 _redisDb.StringSet($"{data.articleId}-sCount", data.articleStockNum); } }
下单接口
/// <summary> /// 04-Redis缓存+队列 /// </summary> /// <param name="userId">用户编号</param> /// <param name="arcId">商品编号</param> /// <param name="totalPrice">订单总额</param> /// <returns></returns> public string POrder4(string userId, string arcId, string totalPrice) { try { //1. 直接自减1 int iCount = (int)_redisDb.StringDecrement($"{arcId}-sCount", 1); if (iCount >= 0) { //2. 将下单信息存到消息队列中 var orderNum = Guid.NewGuid().ToString("N"); _redisDb.ListLeftPush(arcId, $"{userId}-{arcId}-{totalPrice}-{orderNum}"); //3. 把部分订单信息返回给前端 return $"下单成功,订单信息为:userId={userId},arcId={arcId},orderNum={orderNum}"; } else { //卖完了 return "卖完了"; } } catch (Exception ex) { throw new Exception(ex.Message); } }
基于redis队列的消费者
{ Console.WriteLine("下面开始执行消费业务"); using (ESHOPContext db = new ESHOPContext()) { RedisHelp redisHelp = new RedisHelp("localhost:6379"); var redisDB = redisHelp.GetDatabase(); while (true) { try { var data = (string)redisDB.ListRightPop("200001"); if (!string.IsNullOrEmpty(data)) { List<string> tempData = data.Split('-').ToList(); { //1.扣减库存 --去掉状态追踪 var sArctile = db.Set<T_SeckillArticle>().AsNoTracking().Where(u => u.id == "300001").FirstOrDefault(); sArctile.articleStockNum = sArctile.articleStockNum - 1; db.Update(sArctile); //2. 插入订单信息 T_Order tOrder = new T_Order(); tOrder.id = Guid.NewGuid().ToString("N"); tOrder.userId = tempData[0]; tOrder.orderNum = tempData[3]; tOrder.articleId = tempData[1]; tOrder.orderTotalPrice = Convert.ToDecimal(tempData[2]); tOrder.addTime = DateTime.Now; tOrder.orderStatus = 0; db.Add<T_Order>(tOrder); int count = db.SaveChanges(); //释放一下--否则报错 db.Entry<T_SeckillArticle>(sArctile).State = EntityState.Detached; Console.WriteLine($"执行成功,条数为:{count},当前库存为:{ sArctile.articleStockNum}"); } } else { Console.WriteLine("暂时没有订单信息,休息一下"); Thread.Sleep(1000); } } catch (Exception ex) { Console.WriteLine($"执行失败-{ex.Message}"); } } } }
(1). 1000并发,需要600ms,订单数量插入正确,库存扣减正确。
(2). 2000并发,需要1560ms,订单数量插入正确,库存扣减正确。