sql server 锁与事务拨云见日(上) - 花阴偷移 - 博客园

mikel阅读(430)

来源: sql server 锁与事务拨云见日(上) – 花阴偷移 – 博客园

一.概述

讲到SQL server锁管理时,感觉它是一个大话题,因为它不但重要而且涉及的知识点很多,重点在于要掌握高并发要先要掌握锁与事务,涉及的知识点多它包括各式各样的锁,锁的组合,锁的排斥,锁延伸出来的事务隔离级别, 锁住资源带来的阻塞,锁之间的争用造成的死锁,索引数据与锁等。这次介绍锁和事务,我想分上中下篇,上篇详细介绍锁,中篇介绍事务,下篇总结, 针对锁与事务我想把我掌握的以及参考多方面资料,整合出来尽量说详细。 最后说下,对于高级开发人员或DBA,锁与事务应该是重点关注的,它就像是数据库里的一个大boss,如完全掌握了它,数据库就会像就像庖丁解牛一样游刃有余  哈哈 。

二.锁的产生背景

在关系型数据库里锁是无处不再的。当我们在执行增删改查的SQL语句时,锁也就产生了。锁对应的就的是事务,不去显示加tran就是常说的隐式事务。当我们写个存储过程希望数据一致性时, 要么同时回滚,要么同时提交,这时我们用begin tran 来做显示事务。锁的范围就是事务。在SQL server里事务默认是提交读(Read Committed) 。
锁是对目标资源(行、页、区、表..)获取所有权的锁定,是一个逻辑概念,用来保存事务的ACID. 当多用户并发同时操作数据时,为了避免出现不一致的数据,锁定是必须的机制。 但同时如果锁的数量太多,持续时间太长,对系统的并发和性能都没有好处。

三.锁的全面认识

3.1 锁住的资源

我们知道sql server的存储数据单元包括文件组,页,区,行。锁住资源范围从低到高依次对应的是:行(RID/KEY)锁,页(PAGE)锁, 表(OBJECT)锁。可通过sp_lock查看,比如: 当我们操作一条数据时应该是行锁, 大批量操作时是页锁或表锁, 这是大批量操作会使锁的数量越多,锁就会自动升级 将大量行锁合成多个页锁或表锁,来避免资源耗尽。SQL SERVER要锁定资源时,默认是从最底级开始锁起(行) 。锁住的常见资源如下:

名称 资源

说明

数据行 RID 锁住堆中(表没有建聚集索引)的单个行。格式为File:Page:SlotID  如 1:8787:4
索引键 KEY 锁住T-tree(索引)中单个行,是一个哈值值。如:(fb00a499286b)
PAGE 锁住数据页(一页8kb,除了页头和页尾,页内容存储数据)可在sys.dm_os_buffer_descriptors找到。格式FileID :Page Number 如1:187541
范围 extent 锁住区(一组连续的8个页 64kb)FileID:N页 。如:1:78427
数据表 object 通常是锁整个表。 如:2858747171
文件 File 一般是数据库文件增加或移除时。如:1
数据库 database 锁住整个数据库,比如设置修改库为只读模式时。 database ID如:7

下图是通过sp_lock的查看的,显示了锁住的资源类型以及资源

3.2 锁的类型及锁说明

锁类型 锁说明
共享锁 (S锁) 用于不更改或不更新数据的读取操作,如 SELECT 语句。
更新锁 (U锁) 它是S与X锁的混合,更新实际操作是先查出所需的数据,为了保护这数据不会被其它事务修改,加上U锁,在真正开始更新时,转成X锁。U锁和S锁兼容, 但X锁和U锁不兼容。
独占锁(排它锁)(X锁) 用于数据修改操作,例如 INSERT、UPDATE 或 DELETE。 确保不会同时对同一资源进行多重更新
意向锁(I锁) (I)锁也不是单独的锁模式,用于建立锁的层次结构。 意向锁包含三种类型:意向共享 (IS)、意向排他 (IX) 和意向排他共享 (SIX)。意识锁是用来标识一个资源是否已经被锁定,比如一个事务尝试锁住一个表,首先会检查是否已有锁在该表的行或者页上。
架构锁(Sch-M,Sch-S) 在执行依赖于表架构操作时使用,例如:添加列或删除列 这个时候使用的架构修改锁(Sch-M),用来防止其它用户对这个表格进行操作。别一种是数据库引擎在编译和执行查询时使用架构性  (Sch-S),它不会阻止其它事务访问表格里的数据,但会阻止对表格做修改性的ddl操作和dml操作。
大容量更新 (BU) 是指数据大容量复制到表中时使用BU锁,它允许多个线程将数据并发地大容量加载到同一表,同时防止其它不进行大容量加载数据的进程访问该表。
键范围 当使用可序列化事务隔离级别时(SERIALIZABLE)保护查询读取的行的范围。 确保再次运行查询时其他事务无法插入符合可序列化事务的查询的行。下章介绍的事务时再详细说

四 锁的互斥(兼容性)

在sql server里有个表,来维护锁与锁之间的兼容性,这是SQLServer预先定义好的,没有任务参数或配置能够去修改它们。如何提高兼容性呢?那就是在设计数据库结构和处理sql语句时应该考虑,尽量保持锁粒度小,这样产生阻塞的概率就会比较小,如果一个连接经常申请页面级,表级,甚至是数据库级的锁资源,程序产生的阻塞的可能性就越大。假设:事务1要申请锁时,该资源已被事务2锁住,并且事务1要申请的锁与事务2的锁不兼容。事务1申请锁就会出现wait状态,直到事务2的锁释放才能申请到。 可通过sp_lock查看wait等待(也就是常说的阻塞)

下面是最常见的锁模式的兼容性

五. 锁与事务关系

如今系统并发现象,引起的资源急用,出现的阻塞死锁一直是技术人员比较关心的。这就涉及到了事务, 事务分五种隔离级别,每个隔离级别有一个特定的并发模式,不同的隔离级别中,事务里锁的作用域,锁持续的时间都不同,后面再详细介绍事务。这里看下客户端并发下的锁与事务的关系, 可以理解事务是对锁的封装,事务就是在并发与锁之间的中间层。如下图:

六. 锁的持续时间

下面是锁在不同事务隔离级别里,所持续占用的时间:

6.1  SELECT动作要申请的锁

我们知道select 会申请到共享锁,下面来演示下共享锁在Repeatable 重复读的级别下,共享锁保留到事件提交时才释放。

具体是1.事务A设置隔离级别为Repeatable重复读,开启事务运行且不提交事务。

2.再打开一个会话窗口,使用sys.dm_tran_locks来分析查看事务的持有锁。

--开启一个事务A, 设置可重复读, 不提交
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ 
BEGIN TRAN 
SELECT  * FROM dbo.Product WHERE SID=204144
--上面执行完后,打开另一会话查询锁状态
SELECT  k.request_session_id,k.resource_type,k.request_status,k.request_mode,k.resource_description,
 OBJECT_NAME( p.object_id) as objectName,p.index_id FROM SYS.dm_tran_locks k LEFT JOIN SYS.PARTITIONS p
ON k.resource_associated_entity_id=p.hobt_id
ORDER BY request_session_id,resource_type

先看看查询单条语句的执行计划,再看看锁住的资源

通过DMV查询,我们看到:

(1)首先是锁住DATABASE资源,是数据库级别的共享锁,以防止别人将数据库删除。

(2)锁住OBJECT表资源,在Product表上加了意向共享锁IS,以防止别人修改表的定义。

(3)锁住了二个PAGE页加了意向共享锁IS,通过上面执行计划可以看出来,查询出来的数据是通过索引查询50%,RID堆查询50%。这条数据分布在二个页上,通过where SID来查找没有完全走索引查找。

(4)通过第3点可以看出,数据1个页是对应RID行,另一页对应KEY行 二个共享锁,堆位置1:112205:25  ,KEY的哈希值(70009fe3578a) 。

总结下:通过Repeatable 重复读,直要事务不提交,共享锁一直会存在。针对想减少被别人阻塞或者阻塞别人的概率,能考虑事情有:1. 尽量减少返回的记录,返回的记录越多,需要的锁也就越多,在Repeatable隔离级别及以上,更是容易造成阻塞。2.返回的数据如果是一小部份,尽量使用索引查找,避免全表扫描。3.可以的话,根据业务设计好最合适的几个索引,避免通过多个索引找到结果。

4.2  UPDATE动作要申请的锁

对于UPDATE需要先查询,再修改。具体是查询加S锁,找到将要修改的记录后先加U锁,真正修改时升级成X锁。还是通过上面的product表来演示具体:选用Repeatable级别,运行一个update语句(先kill 掉之前的会放52)

--开启一个事务, 设置可重复读, 不提交
BEGIN TRAN 
UPDATE    dbo.Product SET model='test'
 WHERE SID IN(10905,119921,204144)

通过 dmv查看,吓一跳没想到锁住了这么多资源,纠结 那下面试着来分析下为什么锁住这么多资源:使用sys.indexes查看index_id 的0,2,4各使用了什么索引

  SELECT  * FROM sys.indexes WHERE object_id= OBJECT_id('product')

(1)这个product表并没有建聚集索引,是在堆结构上建立的非索聚索引,index_id=0 是堆, index_id=2和4 又是分别二个非索聚索引

(2)同样在DATABASE和OBJECT资源 上都加了共享锁。

(3)意向排它锁IX,锁住的Page共9页 说明数据关联了9页,其中堆上3页,ix_1非索聚索引上3页,ixUpByMemberID非索聚索引上3页。

(4) 排它锁X锁住RID堆上3行,KEY索引上6行。大家可能会觉得奇怪明明只改三行的model值,为什么会涉及到9行呢?  我来解释下这个表是建了三个非聚集索引,其中ix_1索引里有包含列model,xUpByMemberID索引里也同样有包含列model,还有model数据是在堆,当堆上数据修改后,model关联的非聚集索引也要重新维护。如下图

(5) 这里还有架构锁Sch-s ,锁住了元数据。

总结:1.一定要给表做聚集索引,除了特殊情况使用堆结构。2.要修改的数据列越多,锁的数目就会越多,这里model就涉及到了9行维护。3. 描述的页面越多,意向锁就会越多,对扫描的记录也会加锁,哪怕没有修改。所以想减少阻塞要做到:1).尽量修改少的数据集,修改量越多,需要的锁也就越多。2) 尽量减少无谓的索引,索引的数目越多,需要的锁也可能越多。3.严格避免全局扫描,修改表格记录时,尽量使用索引查询来修改。

4.3  DELETE动作要申请的锁

BEGIN TRAN 
DELETE     dbo.Product WHERE SID =10905

(1) 删除了RID堆的数据,以及关联的非聚集索引三个key的值分别是(2,5,4)

(2) 在要删除的4个page上加了意向排它锁,同样对应一个RID和三个KEY。

(3)在OBJECT资源表上加了意向排它锁。

总结:在DELETE过程中是先找到符合条件的记录,然后再删除, 可以说是先SELECT后DELETE,如果有索引第一步查询申请的锁会比较 少。 对于DELETE不但删除数据本身,还会删除所有相关的索引键,一个表上的索引越多,锁的数目就会越多,也容易阻塞。为了防步阻塞我们不能不建索引,也不能随便就建索引,而是要根据业务建查询绝对有利的索引。

4.4  INSERT动作要申请的锁

BEGIN TRAN 
INSERT into    dbo.Product VALUES('modeltest','brandtest',GETDATE(),9708,'test')

对于以上三种动作,INSERT相对简单点,只需要对要插入数据本身加上X锁,对应的页加IX锁,同步更新了关联的索引三个key。

这里新增跟删除最终显示的锁一样,但在锁申请的过程中,新增不需要先查询到数据s锁,升级u锁,再升级成X锁。

七. 锁的升级

7.1 使用profiler窗口查看实时的锁升级

以单次批操作受影响的行数超过5000条时(锁数量最大值5000),升级为表锁。在SQLServer里可以选择完全关掉锁升级,虽然可以减少阻塞,但锁内存会增加,降低性能还可能造成更多死锁。

锁升级缺点:会给其它会话带来阻塞和死锁。锁升级优点:减少锁的内存开销。

检测方法:在profiler中查看lock:escalation事件类。通过查看Type列,可查看锁升级的范围,升级成表锁(object是表锁)

如下图:

如果减少批操作量,就没有看到升级表锁, 可自行通过 escalation事件查看,下图就是减少了受影响的行数。

总结:将批操作量受影响行数减少到5000以下,减少锁的升级后,发生了更频繁的死锁,原因是多个page页的争用。后有人指出你先把并行度降下来(删除500一下的数据可以不使用并行) 在语句中设置maxdop = 1 这样应该不会死锁了。具体原因还需具体分析。

7.2 使用dmv查看锁升级

sys.dm_db_index_operational_stats返回数据库中的当前较低级别 I/O、 锁定、 闩锁,和将表或索引的每个分区的访问方法活动。

index_lock_promotion_attempt_count:数据库引擎尝试升级锁的累积次数。

index_lock_promotion_count:数据库引擎升级锁的累积次数。

复制代码
SELECT  OBJECT_NAME(ddios.[object_id], ddios.database_id) AS [object_name] ,
        i.name AS index_name ,
        ddios.index_id ,
        ddios.partition_number ,
        ddios.index_lock_promotion_attempt_count ,
        ddios.index_lock_promotion_count ,
        ( ddios.index_lock_promotion_attempt_count
          / ddios.index_lock_promotion_count ) AS percent_success
FROM    sys.dm_db_index_operational_stats(DB_ID(), NULL, NULL, NULL) ddios
        INNER JOIN sys.indexes i ON ddios.object_id = i.object_id
                                    AND ddios.index_id = i.index_id
WHERE   ddios.index_lock_promotion_count > 0
ORDER BY index_lock_promotion_count DESC;
复制代码

7.3 使用dmv查看页级锁资源争用

page_lock_wait_count:数据库引擎等待页锁的累积次数。

  page_lock_wait_in_ms:数据库引擎等待页锁的总毫秒数。

  missing_index_identified:缺失索引的表。

复制代码
SELECT  OBJECT_NAME(ddios.object_id, ddios.database_id) AS object_name ,
        i.name AS index_name ,
        ddios.index_id ,
        ddios.partition_number ,
        ddios.page_lock_wait_count ,
        ddios.page_lock_wait_in_ms ,
        CASE WHEN DDMID.database_id IS NULL THEN 'N'
             ELSE 'Y'
        END AS missing_index_identified
FROM    sys.dm_db_index_operational_stats(DB_ID(), NULL, NULL, NULL) ddios
        INNER JOIN sys.indexes i ON ddios.object_id = i.object_id
                                    AND ddios.index_id = i.index_id
        LEFT OUTER JOIN ( SELECT DISTINCT
                                    database_id ,
                                    object_id
                          FROM      sys.dm_db_missing_index_details
                        ) AS DDMID ON DDMID.database_id = ddios.database_id
                                      AND DDMID.object_id = ddios.object_id
WHERE   ddios.page_lock_wait_in_ms > 0
ORDER BY ddios.page_lock_wait_count DESC;
复制代码

八. 锁的超时

在sql server 里锁默认是不会超时的,是无限的等待。多数客户端编程允许用户连接设置一个超时限制,因此在指定时间内没有反馈,客户端就会自动撤销查询, 但数据库里锁是没有释放的。

可以通 select @@lock_timeout  查看默认值是 ” -1″, 可以修改超时时间  例如5秒超时 set  lock_timeout  5000;

下面是查看锁的等待时间, wait_time是当前会话的等待资源的持续时间(毫秒)

select  session_id, blocking_session_id,command,sql_handle,database_id,wait_type
,wait_time,wait_resource
from sys.dm_exec_requests 
where blocking_session_id>50

从wait_type入手模拟SQL Server Lock - iSun - 博客园

mikel阅读(478)

来源: 从wait_type入手模拟SQL Server Lock – iSun – 博客园

一、LCK_M_S,等待获取共享锁

开始一SQL TRAN,其中执行对某数据的UPDATE。但并不COMMIT,也不ROLLBACK。

begin tran
update [dbo].[HR_Employee] set [Description]='ZZ'

这样,便使用排它锁锁定了该[Employee]表。

 

在另一会话中,执行对该表的SELECT操作。至此,死锁产生。

select * from [dbo].[HR_Employee]

 

使用下列script查询当前锁情况。

复制代码
 1 SELECT wt.blocking_session_id                    AS BlockingSessesionId
 2         ,sp.program_name                           AS ProgramName
 3         ,COALESCE(sp.LOGINAME, sp.nt_username)     AS HostName    
 4         ,ec1.client_net_address                    AS ClientIpAddress
 5         ,db.name                                   AS DatabaseName        
 6         ,wt.wait_type                              AS WaitType                    
 7         ,ec1.connect_time                          AS BlockingStartTime
 8         ,wt.WAIT_DURATION_MS/1000                  AS WaitDuration
 9         ,ec1.session_id                            AS BlockedSessionId
10         ,h1.TEXT                                   AS BlockedSQLText
11         ,h2.TEXT                                   AS BlockingSQLText
12   FROM sys.dm_tran_locks AS tl
13   INNER JOIN sys.databases db
14     ON db.database_id = tl.resource_database_id
15   INNER JOIN sys.dm_os_waiting_tasks AS wt
16     ON tl.lock_owner_address = wt.resource_address
17   INNER JOIN sys.dm_exec_connections ec1
18     ON ec1.session_id = tl.request_session_id
19   INNER JOIN sys.dm_exec_connections ec2
20     ON ec2.session_id = wt.blocking_session_id
21   LEFT OUTER JOIN master.dbo.sysprocesses sp
22     ON SP.spid = wt.blocking_session_id
23   CROSS APPLY sys.dm_exec_sql_text(ec1.most_recent_sql_handle) AS h1
24   CROSS APPLY sys.dm_exec_sql_text(ec2.most_recent_sql_handle) AS h2
复制代码

发现该LOCK的wait_type为LCK_M_S,意味着后一会话在等待着获取对该表的共享锁已完成查询工作。

 

二、LCK_M_U,等待获取更新锁。

发起一SQL会话,在其中使用更新锁(UPDLOCK)SELECT数据,而后WAIT一定的时间。

1 begin tran
2 select * from [dbo].[HR_Employee] WITH (UPDLOCK) where [Id]=7
3 waitfor delay '00:01:00' 
4 update [dbo].[HR_Employee] set [Description]='ZZ' where [Id]=7
5 commit tran

在wait的时间内,[Id]=7的行被更新锁锁住。

 

发起另一会话,使用更新锁(UPDLOCK)完成SELECT操作。

1 select * from [dbo].[HR_Employee] WITH (UPDLOCK)

 

发现后一会话被block。wait_type为LCK_M_U,表示其在等待该表的更新锁。

 

三、LCK_M_X,等待获取排它锁

将上一小节中第二个会话的操作改为UPDATE。

update [dbo].[HR_Employee] set [Description]='ZZy' where [Id]=7

后一会话同样被block,但这次的wait_type为LCK_M_X,表明其在等待用于UPDATE DATA的排它锁。

关闭”xx程序已停止工作”提示窗口_顺其自然~的博客-CSDN博客

mikel阅读(436)

来源: 关闭”xx程序已停止工作”提示窗口_顺其自然~的博客-CSDN博客

近日在工作中,接手一个项目,程序运行起来后偶发性间隔几个小时或几天就会出现如下(图1, 图2)的”xx程序已停止工作”的提示窗口,这时需要用户手动点击”关闭程序”按钮,进程才会退出。

图1

图2

当然最好的解决办法就是找出程序中导致”程序错误”的原因,但由于对接手的项目不是很熟悉,再加上时间紧迫,难以在短时间找到问题原因,于是给此程序添加一个”守护程序”(即: 检测到进程退出后就自动重启)。

但程序崩溃时,弹出的”xx程序已停止工作”导致程序进程无法退出,“守护程序”自然也起不到相应的作用。

在网上查找了一番,终于找到两种解决方法:

第一种方法

运行注册表编辑器(【开始】-【运行】中输入regedit并回车),依次定位到HKEY_CURRENT_USER\Software\Microsoft\Windows\WindowsError Reporting,在右侧窗口中找到并双击打开DontshowUI,然后在弹出的窗口中将默认值“0”修改为“1”。

那么,当程序崩溃时,就不会再出现”xx程序已停止工作”的提示框,崩溃程序进程会自动退出。

这种修改系统注册表的方法是最方便和直接的,但会对所有程序生效,如果特别注重系统的安全性,只想让指定的程序在崩溃时不出现”xx程序已停止工作”,请参考”第二种方法”。

图3

第二种方法

查看Windows任务管理器(图4)发现,程序崩溃时之所以出现”xx程序已停止”工作,是因为触发了”Windows的错误报告”机制,在我的系统(Windows 10 64位)任务管理器进程列表中会出现一个名称为”Windows问题报告”的进程,点击此进程左侧的”下拉箭头”,会出现一个窗口列表,此窗口列表就代表了当前所有弹出”xx程序已停止工作”的窗口(图5),而窗口标题就是我们崩溃程序的进程名。

图4

图5

看到此,不知道你是否已经有了启发。

解决思路如下:

在”守护程序”中定期检测Windows系统进程列表中是否出现”WerFault.exe”进程(“Windows问题报告”的进程名), 如果出现, 则查找”WerFault.exe”进程下的窗口名称是否存在”要守护程序的进程名”, 如果存在,则表示“要守护的程序崩溃并出现已停止工作”的提示框, 那么则向“WerFault.exe”进程下的“窗口”发送 WM_Close 消息,关闭此“提示窗口”,如此, “要守护的程序进程就会完全退出”, 守护程序就可以重新启动此程序了。

其实就是用程序模拟“用户手动关闭‘已停止工作’窗口。

  1. #include <windows.h>
  2. #include <tlhelp32.h> //声明快照函数文件
  3. #include “stdio.h”
  4. #include <cstring>
  5. // 根据进程ID, 返回指定进程下”第一个”窗口的窗口句柄
  6. // 注: 此程序还不够完善, 因为指定进程下可能有多个窗口
  7. HWND GetWindowHandleByPID(DWORD dwProcessID)
  8. {
  9.     HWND h = GetTopWindow(0);
  10.     while (h)
  11.     {
  12.         DWORD pid = 0;
  13.         DWORD dwTheardId = GetWindowThreadProcessId(h, &pid);
  14.         if (dwTheardId != 0)
  15.         {
  16.             if (pid == dwProcessID /*your process id*/)
  17.             {
  18.                 // here h is the handle to the window
  19.                 return h;
  20.             }
  21.         }
  22.         h = GetNextWindow(h, GW_HWNDNEXT);
  23.     }
  24.     return NULL;
  25. }
  26. int main(int argc, char *argv[])
  27. {
  28.     PROCESSENTRY32 pe32;
  29.     //在使用这个结构之前,先设置它的大小
  30.     pe32.dwSize = sizeof(pe32);
  31.     //给系统内的所有进程拍一个快照
  32.     HANDLE hProcessSnap = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
  33.     //Includes the process list in the snapshot
  34.     if (hProcessSnap == INVALID_HANDLE_VALUE)
  35.     {
  36.         printf(“CreateToolhelp32Snapshot 调用失败! n”);
  37.         return -1;
  38.     }
  39.     //遍历进程快照,轮流显示每个进程信息
  40.     BOOL bMore = ::Process32First(hProcessSnap, &pe32);
  41.     while (bMore)
  42.     {
  43.         /*printf(” 进程名称为:%s\n”, pe32.szExeFile);
  44.         printf(” 进程ID为:%u \n\n”, pe32.th32ProcessID);*/
  45.         if (_stricmp(pe32.szExeFile, “werfault.exe”) == 0)
  46.         {
  47.             printf(” 进程名称为:%s\n”, pe32.szExeFile);
  48.             printf(” 进程ID为:%u \n\n”, pe32.th32ProcessID);
  49.             HWND hwnd = GetWindowHandleByPID(pe32.th32ProcessID);
  50.             if (hwnd)
  51.             {
  52.                 char szText[256] = { 0 };
  53.                 GetWindowText(hwnd, szText, 256);
  54.                 // 自己崩溃程序的”进程名”
  55.                 if (_stricmp(szText, “myProcessName.exe”) == 0)
  56.                 {
  57.                     printf(“Text: %s\n\n”, szText);
  58.                     // 关闭”xx程序已停止”提示窗口
  59.                     SendMessage(hwnd, WM_CLOSE, NULL, NULL);
  60.                 }
  61.             }
  62.         }
  63.         //遍历下一个
  64.         bMore = ::Process32Next(hProcessSnap, &pe32);
  65.     }
  66.     //清除snapshot对象
  67.     ::CloseHandle(hProcessSnap);
  68.     return 0;
  69. }

补充

此程序还不够完善,因为对于下面方法:

HWNDGetWindowHandleByPID(DWORD dwProcessID);

其根据进程ID,返回指定进程下”第一个”窗口的窗口句柄,但一个进程下可能会有多个窗口(如图6)。

图6

但对“WerFault.exe”进程,我在测试中发现(测试系统:Windows10 64位),当有多个程序出现”已停止工作“提示窗口时(图7),每个程序会各自对应一个”WerFault.exe”进程(图8)。即: 每个WerFault.exe进程下只会出现一个“已停止工作”窗口标题。

当然其他Windows系统我没有测试是否也是这样,以后有时间再进一步完善此程序。

图7

图8

参考文章:

修改DontshowUI默认值弹出窗口关闭小秘密

https://www.baidu.com/link?url=hTh2Zy1SF_KozYKGGdD_HRLEUm4jDB5Uv3jaN3GKP9L40M84F7pHl2upszMXA1lqeRgCpNAeusYrMLhr1rq_QlZ3G6qx5Ypc0CLoCZKQ7s3&wd=&eqid=e02a9f000000a25b0000000357ce11b6

VC++ 通过进程名或进程ID获取进程句柄

http://blog.csdn.net/luxiaoyu_sdc/article/details/6534783

VC 显示当前运行的所有进程

http://www.cnblogs.com/xianyunhe/archive/2011/06/09/2076878.html

分布式锁 redis实现分布式锁,分布式id生成方案,秒杀设计方案 - 战斗小人 - 博客园

mikel阅读(465)

来源: 分布式锁 redis实现分布式锁,分布式id生成方案,秒杀设计方案 – 战斗小人 – 博客园

分布式锁

作用:不同系统上的不同进程,去抢一把锁,谁抢到了,谁才能改数据

要求:高可用性,可冲入性(拿到锁的节点挂了,得有超时过期机制)

实现方式:

基于数据库实现分布式锁;

基于缓存(Redis等)实现分布式锁;

基于Zookeeper实现分布式锁;

 

redis实现分布式锁

复制代码
# 1 分布式锁: 锁住不同机器上的不同进程
# 2 redis实现:官方提供了    https://github.com/SPSCommerce/redlock-py

#Redlock    pip install redlock-py     安装,此为官方提供版

from redlock import Redlock

dlm = Redlock([{"host": "localhost", "port": 6379, "db": 0}, ])    # 创建锁管理器
# dlm = Redlock([{"host":"localhost",'port':6379,'db':0,'password':"admin123"},])

# 获取锁,my_resource_name是锁的唯一标识符。1000代表1000毫秒数,超过这个时间,锁自动释放(防机器宕机)
my_lock = dlm.lock("my_resource_name",1000)    
# 自己的逻辑---》悲观锁---你的代码
print("xxx")

#你写的业务
# 自己的逻辑---》悲观锁---你的代码
dlm.unlock(my_lock)    # 把锁释放了

原理:https://www.cnblogs.com/liuqingzheng/p/11080501.html  # 在4、分布式锁的简单实现代码中
# 也可以参考第三方库  https://github.com/glasslion/redlock
复制代码

 

分布式id生成方案

复制代码
# 分布式id:为了保证全局唯一
    -分表,默认自增--》两个库上--》可能id号重复       uuid:有没有重复(数据量极大的情况下可能会重复,概率较低)
# 设计思路:
  low版本的出来:
    -一个库 1,3,5,7,9
    -另一个库2,4,6,8,10
  分布式id生成方案
      -UUID:不是趋势自增,性能挺高(5台机器生成,一般不会重复)
    -mysql生成:性能低      机器都去mysql中要id
    -redis生成:很快,自增 ,必须还得有台redis服务器  incrby   16位: '当前时间戳+自增'  
    -雪花算法:python版雪花算法
    # 雪花算法是64位二进制数,第1位不用;41位是时间戳,可用69年;10位代表机器id(进程号),最大1024;12位表示4096个数字。雪花算法同一毫秒内最多产生4096个id
    # python实现雪花算法代码参考:https://www.cnblogs.com/oklizz/p/11865750.html
复制代码

秒杀设计方案

复制代码
# 思路一:    # 该方案适用于客户提前充好钱了,不适合连支付宝支付方案
    -某个时间段---》卖商品---》别卖超了---》mysql悲观锁实现---》缺陷,性能低
  -100商品---》预热---》100这个数,放到redis中----》incrby--》[来一个秒杀请求-1,在redis集合中把用户id放进去](加锁,使用分布式锁或者使用pipline做),最后100这个数变成了0,---》起个异步任务---》消费集合中的id,生成订单,扣减库存,扣减账户余额,提前充钱了
  -用户真去看订单的时候---》异步任务完成了

# 思路二:
  -用户发了秒杀请求---》前端看到--》您正在排队 
  -请求来了---》放到队列里---》(djnago中间件:请求放到队列中,直接返回,告诉用户,您正在排队)

## 有些公司潜规则,秒杀超了无所谓,优惠券而已
复制代码

 

如何将 Redis 用于微服务通信的事件存储 - 中间件小哥 - 博客园

mikel阅读(467)

来源: 如何将 Redis 用于微服务通信的事件存储 – 中间件小哥 – 博客园

以我的经验,将某些应用拆分成更小的、松耦合的、可协同工作的独立逻辑业务服务会更易于构建和维护。这些服务(也被称为微服务)各自管理自己的技术栈,因此很容易独立于其他服务进行开发和部署。前人已经总结了很多关于使用这种架构设计的好处,在此我就不再赘述了。关于这种设计,有一个方面我一直在重点关注,因为如果没有它,将会导致一些有趣的挑战。虽然构建松耦合的微服务是一个非常轻量级和快速的开发过程,但是这些服务之间共享状态、事件以及数据的通信模型却不那么简单。我使用过的最简单的通信模型就是服务间直接通信,但是这种模型被 Fernando Dogio 明确地证明一旦服务规模扩大就会失效,会导致服务崩溃、重载逻辑以及负载增加等问题,从而可能引起的巨大麻烦,因此应该尽量避免使用这种模型。还有一些其他通信模型,比如通用的发布/订阅模型、复杂的 kafka 事件流模型等,但是最近我在使用 Redis 构建微服务间的通信模型。

 

拯救者 Redis!

微服务通过网络边界发布状态,为了跟踪这种状态,事件通常需要被保存在事件存储中。由于事件通常是一种异步写入操作的不可变流的记录(又被称为事务日志),因此适用于以下场景:

1. 顺序很重要(时间序列数据)

2. 丢失一个事件会导致错误状态

3. 回放状态在任何给定时间点都是已知的

4. 写操作简单且快捷

5. 读操作需要更多的时间,以至于需要缓存

6. 需要高可扩展性,服务之间都是解耦的,没有关联

使用 Redis,我始终可以轻松实现发布-订阅模式。但现在,Redis 5.0 提供了新的Streams 数据类型,我们可以以一种更加抽象的方式对日志数据结构进行建模-使之成为时间序列数据的理想用例(例如最多一次或最少一次传递语义的事务日志)。基于双主功能,轻松简单的部署以及内存中的超快速处理能力,Redis 流成为一种管理大规模微服务通信的必备工具。基本的模型被称为命令查询职责分离(CQRS),它将命令和查询分开执行,命令使用 HTTP 协议,而查询采用 RESP(Redis 序列化协议)。让我们使用一个例子来说明如何使用 Redis 作为事件存储。

OrderShop简单应用概述

我创建了一个简单但是通用的电子商务应用作为例子。当创建/删除客户、库存物品或订单时,使用 RESP 将事件异步传递到 CRM 服务,以管理 OrderShop 与当前和潜在客户的互动。像许多常见应用程序的需求一样,CRM 服务可以在运行时启动和停止,而不会影响其他微服务。这需要捕获在其停机期间发送给它的所有消息以进行后续处理。
下图展示了 9 个解耦的微服务的互连性,这些微服务使用由 Redis 流构建的事件存储进行服务间通信。他们通过侦听事件存储(即 Redis 实例)中特定事件流上的任何新创建的事件来执行此操作。

图1. OrderShop 架构

 

我们的 OrderShop 应用程序的域模型由以下 5 个实体组成:

  • 顾客
  • 产品
  • 库存
  • 订单
  • 账单

通过侦听域事件并保持实体缓存为最新状态,事件存储的聚合功能仅需调用一次或在响应时调用。

 

 

图2. OrderShop 域模型

安装并运行OrderShop

按照如下步骤安装并运行 OrderShop 应用:

1. 从这里下载代码仓库:

https://github.com/Redislabs-Solution-Architects/ordershop

2. 确保已经安装了 Docker Engine和Docker Compose

3. 安装 Python3:

https://python-docs.readthedocs.io/en/latest/starting/install3/osx.html

4. 使用 docker-compose up启动应用程序

5. 使用 pip3 install -r client / requirements.txt 安装需求

6. 然后使用 python3 -m unittest client / client.py 执行客户端

7. 使用 docker-compose stop crm-service 停止 CRM 服务

8. 重新执行客户端,您会看到该应用程序正常运行,没有任何错误

 

深入了解

以下是来自 client.py 的一些简单测试用例,以及相应的 Redis 数据类型和键。

 

 

 

我选择流数据类型来保存这些事件,因为它们背后的抽象数据类型是事务日志,非常适合我们连续事件流的用例。我选择了不同的键来分配分区,并决定为每个流生成自己的条目 ID,ID 包含秒“-”微秒的时间戳(为了保持 ID 的唯一,并保留了键/分区之间事件的顺序)。我选择集合来存储 ID(UUID),并选择列表和哈希来对数据建模,因为它反映了它们的结构,并且实体缓存只是域模型的简单投影。

 

结论

Redis 提供的各种数据结构-包括集合,有序集合,哈希,列表,字符串,位数组,HyperLogLogs,地理空间索引以及现在的流-可以轻松适应任何数据模型。流包含的元素不仅是单个字符串,而且是由字段和值组成的对象。范围查询速度很快,并且流中的每个条目都有一个 ID,这是一个逻辑偏移量。流提供了针对时间序列等应用的解决方案,并可为其他应用提供流消息,例如,替换需要更高可靠性的通用发布/ 订阅应用程序,以及其他全新的应用。
您可以通过分片(聚集多个实例)来扩展 Redis 实例并提供容灾恢复的持久性选项,所以 Redis 可以作为企业级应用的选择。

【高并发】Redis如何助力高并发秒杀系统,看完这篇我彻底懂了!! - 冰河团队 - 博客园

mikel阅读(475)

来源: 【高并发】Redis如何助力高并发秒杀系统,看完这篇我彻底懂了!! – 冰河团队 – 博客园

写在前面

之前,我们在《【高并发】高并发秒杀系统架构解密,不是所有的秒杀都是秒杀!》一文中,详细讲解了高并发秒杀系统的架构设计,其中,我们介绍了可以使用Redis存储秒杀商品的库存数量。很多小伙伴看完后,觉得一头雾水,看完是看完了,那如何实现呢?今天,我们就一起来看看Redis是如何助力高并发秒杀系统的!

有关高并发秒杀系统的架构设计,小伙伴们可以关注 冰河技术 公众号,查看《【高并发】高并发秒杀系统架构解密,不是所有的秒杀都是秒杀!》一文。

秒杀业务

在电商领域,存在着典型的秒杀业务场景,那何谓秒杀场景呢。简单的来说就是一件商品的购买人数远远大于这件商品的库存,而且这件商品在很短的时间内就会被抢购一空。比如每年的618、双11大促,小米新品促销等业务场景,就是典型的秒杀业务场景。

秒杀业务最大的特点就是瞬时并发流量高,在电商系统中,库存数量往往会远远小于并发流量,比如:天猫的秒杀活动,可能库存只有几百、几千件,而瞬间涌入的抢购并发流量可能会达到几十到几百万。

所以,我们可以将秒杀系统的业务特点总结如下。

(1)限时、限量、限价

在规定的时间内进行;秒杀活动中商品的数量有限;商品的价格会远远低于原来的价格,也就是说,在秒杀活动中,商品会以远远低于原来的价格出售。

例如,秒杀活动的时间仅限于某天上午10点到10点半,商品数量只有10万件,售完为止,而且商品的价格非常低,例如:1元购等业务场景。

限时、限量和限价可以单独存在,也可以组合存在。

(2)活动预热

需要提前配置活动;活动还未开始时,用户可以查看活动的相关信息;秒杀活动开始前,对活动进行大力宣传。

(3)持续时间短

购买的人数数量庞大;商品会迅速售完。

在系统流量呈现上,就会出现一个突刺现象,此时的并发访问量是非常高的,大部分秒杀场景下,商品会在极短的时间内售完。

秒杀三阶段

通常,从秒杀开始到结束,往往会经历三个阶段:

  • 准备阶段:这个阶段也叫作系统预热阶段,此时会提前预热秒杀系统的业务数据,往往这个时候,用户会不断刷新秒杀页面,来查看秒杀活动是否已经开始。在一定程度上,通过用户不断刷新页面的操作,可以将一些数据存储到Redis中进行预热。
  • 秒杀阶段:这个阶段主要是秒杀活动的过程,会产生瞬时的高并发流量,对系统资源会造成巨大的冲击,所以,在秒杀阶段一定要做好系统防护。
  • 结算阶段: 完成秒杀后的数据处理工作,比如数据的一致性问题处理,异常情况处理,商品的回仓处理等。

Redis助力秒杀系统

我们可以在Redis中设计一个Hash数据结构,来支持商品库存的扣减操作,如下所示。

seckill:goodsStock:${goodsId}{
	totalCount:200,
	initStatus:0,
	seckillCount:0
}

在我们设计的Hash数据结构中,有三个非常主要的属性。

  • totalCount:表示参与秒杀的商品的总数量,在秒杀活动开始前,我们就需要提前将此值加载到Redis缓存中。
  • initStatus:我们把这个值设计成一个布尔值。秒杀开始前,这个值为0,表示秒杀未开始。可以通过定时任务或者后台操作,将此值修改为1,则表示秒杀开始。
  • seckillCount:表示秒杀的商品数量,在秒杀过程中,此值的上限为totalCount,当此值达到totalCount时,表示商品已经秒杀完毕。

我们可以通过下面的代码片段在秒杀预热阶段,将要参与秒杀的商品数据加载的缓存。

/**
 * @author binghe
 * @description 秒杀前构建商品缓存代码示例
 */
public class SeckillCacheBuilder{
    private static final String GOODS_CACHE = "seckill:goodsStock:"; 
    private String getCacheKey(String id) { 
        return  GOODS_CACHE.concat(id);
    } 
    public void prepare(String id, int totalCount) { 
        String key = getCacheKey(id); 
        Map<String, Integer> goods = new HashMap<>(); 
        goods.put("totalCount", totalCount); 
        goods.put("initStatus", 0); 
        goods.put("seckillCount", 0); 
        redisTemplate.opsForHash().putAll(key, goods); 
     }
}

秒杀开始的时候,我们需要在代码中首先判断缓存中的seckillCount值是否小于totalCount值,如果seckillCount值确实小于totalCount值,我们才能够对库存进行锁定。在我们的程序中,这两步其实并不是原子性的。如果在分布式环境中,我们通过多台机器同时操作Redis缓存,就会发生同步问题,进而引起“超卖”的严重后果。

在电商领域,有一个专业名词叫作“超卖”。顾名思义:“超卖”就是说卖出的商品数量比商品的库存数量多,这在电商领域是一个非常严重的问题。那么,我们如何解决“超卖”问题呢?

Lua脚本完美解决超卖问题

我们如何解决多台机器同时操作Redis出现的同步问题呢?一个比较好的方案就是使用Lua脚本。我们可以使用Lua脚本将Redis中扣减库存的操作封装成一个原子操作,这样就能够保证操作的原子性,从而解决高并发环境下的同步问题。

例如,我们可以编写如下的Lua脚本代码,来执行Redis中的库存扣减操作。

local resultFlag = "0" 
local n = tonumber(ARGV[1]) 
local key = KEYS[1] 
local goodsInfo = redis.call("HMGET",key,"totalCount","seckillCount") 
local total = tonumber(goodsInfo[1]) 
local alloc = tonumber(goodsInfo[2]) 
if not total then 
    return resultFlag 
end 
if total >= alloc + n  then 
    local ret = redis.call("HINCRBY",key,"seckillCount",n) 
    return tostring(ret) 
end 
return resultFlag

我们可以使用如下的Java代码来调用上述Lua脚本。

public int secKill(String id, int number) { 
    String key = getCacheKey(id); 
    Object seckillCount =  redisTemplate.execute(script, Arrays.asList(key), String.valueOf(number)); 
    return Integer.valueOf(seckillCount.toString()); 
}

这样,我们在执行秒杀活动时,就能够保证操作的原子性,从而有效的避免数据的同步问题,进而有效的解决了“超卖”问题。

消息队列与快递柜之间的奇妙关系 - 字母哥博客 - 博客园

mikel阅读(410)

来源: 消息队列与快递柜之间的奇妙关系 – 字母哥博客 – 博客园

提到消息队列可能一些朋友经常听别人说起一些名词,比如:服务程序解耦,处理流量削峰,通过异步处理提升用户体验,缓冲批处理提高处理性能。笔者擅于白话解说,所以我就不用专业的术语去解释专业的问题了。我一直觉得消息队列的功能和快递柜的功能非常相似,怎么个相似法呢?让我来详细给你说说。

一、白话消息队列

我们来将快递柜与消息队列做一个对比

  • 消息队列比作快递柜:有很多厂家生产快递柜,如:丰巢(apache kafka),速递易(alibaba RocketMQ),近邻宝(ActiveMQ)等等,反正常用的就这几个。快递柜负责临时保存邮件,消息队列负责临时保存消息数据。
  • 快递员比作消息生产者:快递员负责向快递柜投递邮件,生产者负责向消息队列投递消息。异曲同工之妙啊!
  • 消费者比作消息消费者: 可能是这个例子太贴切了,以至于这句怎么看都是废话。废话也还是要说,生活中的消费者取邮件,程序中的消费者取消息数据。

二、快递柜(消息队列)带来的好处

我们先回顾一下在没有快递柜的日子里是怎么度过的:某天早上突然接到快递员电话:”兄弟,有你的快递啊”。心里想真糟糕:“你早不来晚不来,我马上就要上班了,你这个时候来。好吧,我等你一会”。结果有可能快递员很不靠谱,一会说堵车,一会说马上到,等来等去你上班迟到了。这种情况让你很崩溃!

突然有一天,小区里突然出现了一个叫做快递柜的东西,这东西好啊。”兄弟,有你快递啊”,心想谁是你兄弟:”啊,你放快递柜里面吧,我晚上下班回来取”。快递员愉快的把快递放入快递柜,开始打下一个电话,一早上10个邮件。如果每个都送上门快递员最少要半小时。现在好了,9个放快递柜,1个用户要求送上门,10分钟就搞定了。快递员觉得这个东东真的很好!

快递员高兴了,消费者用户其实也很满意,有的购物狂一天有可能收10来个邮件。没有快递柜的时候,快递员来一个电话就去取一次(等一次)快递。有了快递柜,下班的时候就一起全都取了。上面的例子,体现了消息队列(快递柜)的几个优越性,请读者仔细品评:

  • 异步解耦:有了快递柜,消费者不用等待快递员,用户体验增强。消费者与生产者(快递员)之间解耦,不会因为对方的操作行为,影响自己独立处事的程序。用户不用疲于等待与接收事件阻塞耗时。
  • 流量削峰:我们假定一种极端的情况,你通过各个渠道买了1000本书,突然某一小时集中的给你打电话。你肯定不具备一个小时收1000个邮件的能力,所以你让快递员将邮件放入快递柜。所以你就可以按照自己的处理能力,按照自己的时间安排去取邮件。同样我们的消费者程序在面临多用户、高并发的请求情况下,将数据放入消息队列保存可以将流量数据削峰,按照程序能够处理的能力和资源进行数据消费。
  • 缓冲批处理:生产者批量投递,爽!消费者一次性取多个邮件,爽!

三、引入快递柜带来的缺点

说了这么多的优点,那么快递柜有没有缺点呢?当然有

  1. 引入复杂度。毫无疑问,快递柜(消息队列)这东西是多出来的,在原来的收取过程中是不存在的。所以需要地方放它,还需要防火、防盗防潮,需要去维护它。消息队列中间件也是一样的,你需要服务器区安装它,还要对它进行维护。
  2. 会导致暂时的数据不一致。 如果没有快递柜,你收到了邮件件就是真的收到了。但是使用快递柜之后,你收到了”邮件放入快递柜的消息”,但是与你真的取到邮件这中间会有一定的延时。当然你最终还是会取到邮件,选择”消息队列”快递柜,就是要忍受暂时不一致,接受”最终一致性”。
  3. 当然极端情况下,快递柜坏了,你要不可避免地接受”邮件可能会丢失”的事实,对于安保系数高的小区这几乎不会发生。

redis 分布式锁的 5个坑,真是又大又深 - YoungDeng - 博客园

mikel阅读(405)

来源: redis 分布式锁的 5个坑,真是又大又深 – YoungDeng – 博客园

引言

最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。

由于是做商城业务,要频繁的对商品库存进行扣减,应用是集群部署,为避免并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本以为给扣库存的代码加上锁lock.tryLock就万事大吉了

    /**
     * @author xiaofu
     * @description 扣减库存
     * @date 2020/4/21 12:10
     */
   public String stockLock() {
        RLock lock = redissonClient.getLock("stockLock");
        try {
            /**
             * 获取锁
             */
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
                /**
                 * 查询库存数
                 */
                Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
                /**
                 * 扣减库存
                 */
                if (stock > 0) {
                    stock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stockCount", stock.toString());
                    LOGGER.info("库存扣减成功,剩余库存数量:{}", stock);
                } else {
                    LOGGER.info("库存不足~");
                }
            } else {
                LOGGER.info("未获取到锁业务结束..");
            }
        } catch (Exception e) {
            LOGGER.info("处理异常", e);
        } finally {
            lock.unlock();
        }
        return "ok";
  }

结果业务代码执行完以后我忘了释放锁lock.unlock(),导致redis线程池被打满,redis服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。

随着 使用redis 锁的时间越长,我发现 redis 锁的坑远比想象中要多。就算在面试题当中redis分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。

今天就分享一下我用redis 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。

一、锁未被释放

这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用Jedis客户端会报如下的错误信息

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

redis线程池已经没有空闲线程来处理客户端命令。

解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。

  public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
                TODO .........
          } else {
                // 释放当前redis连接
                redis.close();
                // 休眠1000毫秒
                sleep(1000);
          }
        }
    }

二、B的锁被A给释放了

我们知道Redis实现锁的原理在于 SETNX命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key 已经存在,则 SETNX不做任何动作,返回值为 0 。

SETNX key value

我们来设想一下这个场景:AB两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。

那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。

但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。

为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定valuekey,否则就会出现释放锁混乱的场景。

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

   @Transaction
   public void lock() {
   
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
            }
        }
    }

给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;

    @Transaction
    public void lock() {
        //手动开启事务
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            while (true) {
                boolean flag = this.getLock(key);
                if (flag) {
                    insert();
                    //手动提交事务
                    dataSourceTransactionManager.commit(transactionStatus);
                }
            }
        } catch (Exception e) {
            //手动回滚事务
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }

四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,redis锁的过期时间再弄长点不就解决了吗?

那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

要是redis锁的过期时间能够自动续期就好了。

为了解决这个问题我们使用redis客户端redissonredisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。

redisson对分布式锁做了很好封装,只需调用API即可。

  RLock lock = redissonClient.getLock("stockLock");

redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。

举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边redisson的源码实现可以发现,不管是加锁解锁续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性

 
@Slf4j
@Service
public class RedisDistributionLockPlus {
 
    /**
     * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
     */
    private static final long DEFAULT_LOCK_TIMEOUT = 30;
 
    private static final long TIME_SECONDS_FIVE = 5 ;
 
    /**
     * 每个key的过期时间 {@link LockContent}
     */
    private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
 
    /**
     * redis执行成功的返回
     */
    private static final Long EXEC_SUCCESS = 1L;
 
    /**
     * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
     */
    private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
            "if redis.call('exists', KEYS[1]) == 0 then " +
               "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
               "for k, v in pairs(t) do " +
                 "if v == 'OK' then return tonumber(ARGV[2]) end " +
               "end " +
            "return 0 end";
 
    /**
     * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
     */
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "local ctime = tonumber(ARGV[2]) " +
            "local biz_timeout = tonumber(ARGV[3]) " +
            "if ctime > 0 then  " +
               "if redis.call('exists', KEYS[2]) == 1 then " +
                   "local avg_time = redis.call('get', KEYS[2]) " +
                   "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                   "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                   "else redis.call('del', KEYS[2]) end " +
               "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
            "end " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
    /**
     * 续约lua脚本
     */
    private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
 
 
    private final StringRedisTemplate redisTemplate;
 
    public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        ScheduleTask task = new ScheduleTask(this, lockContentMap);
        // 启动定时任务
        ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
    }
 
    /**
     * 加锁
     * 取到锁加锁,取不到锁一直等待知道获得锁
     *
     * @param lockKey
     * @param requestId 全局唯一
     * @param expire   锁过期时间, 单位秒
     * @return
     */
    public boolean lock(String lockKey, String requestId, long expire) {
        log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
        for (; ; ) {
            // 判断是否已经有线程持有锁,减少redis的压力
            LockContent lockContentOld = lockContentMap.get(lockKey);
            boolean unLocked = null == lockContentOld;
            // 如果没有被锁,就获取锁
            if (unLocked) {
                long startTime = System.currentTimeMillis();
                // 计算超时时间
                long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                String lockKeyRenew = lockKey + "_renew";
 
                RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                List<String> keys = new ArrayList<>();
                keys.add(lockKey);
                keys.add(lockKeyRenew);
                Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                if (null != lockExpire && lockExpire > 0) {
                    // 将锁放入map
                    LockContent lockContent = new LockContent();
                    lockContent.setStartTime(startTime);
                    lockContent.setLockExpire(lockExpire);
                    lockContent.setExpireTime(startTime + lockExpire * 1000);
                    lockContent.setRequestId(requestId);
                    lockContent.setThread(Thread.currentThread());
                    lockContent.setBizExpire(bizExpire);
                    lockContent.setLockCount(1);
                    lockContentMap.put(lockKey, lockContent);
                    log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                    return true;
                }
            }
            // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
            if (Thread.currentThread() == lockContentOld.getThread()
                      && requestId.equals(lockContentOld.getRequestId())){
                // 计数 +1
                lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                return true;
            }
 
            // 如果被锁或获取锁失败,则等待100毫秒
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // 这里用lombok 有问题
                log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
                return false;
            }
        }
    }
 
 
    /**
     * 解锁
     *
     * @param lockKey
     * @param lockValue
     */
    public boolean unlock(String lockKey, String lockValue) {
        String lockKeyRenew = lockKey + "_renew";
        LockContent lockContent = lockContentMap.get(lockKey);
 
        long consumeTime;
        if (null == lockContent) {
            consumeTime = 0L;
        } else if (lockValue.equals(lockContent.getRequestId())) {
            int lockCount = lockContent.getLockCount();
            // 每次释放锁, 计数 -1,减到0时删除redis上的key
            if (--lockCount > 0) {
                lockContent.setLockCount(lockCount);
                return false;
            }
            consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
        } else {
            log.info("释放锁失败,不是自己的锁。");
            return false;
        }
 
        // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
        lockContentMap.remove(lockKey);
 
        RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        keys.add(lockKeyRenew);
 
        Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                Long.toString(lockContent.getBizExpire()));
        return EXEC_SUCCESS.equals(result);
 
    }
 
    /**
     * 续约
     *
     * @param lockKey
     * @param lockContent
     * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
     */
    public boolean renew(String lockKey, LockContent lockContent) {
 
        // 检测执行业务线程的状态
        Thread.State state = lockContent.getThread().getState();
        if (Thread.State.TERMINATED == state) {
            log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
            return false;
        }
 
        String requestId = lockContent.getRequestId();
        long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
 
        RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
        log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
        return EXEC_SUCCESS.equals(result);
    }
 
 
    static class ScheduleExecutor {
 
        public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
            long delay = unit.toMillis(initialDelay);
            long period_ = unit.toMillis(period);
            // 定时执行
            new Timer("Lock-Renew-Task").schedule(task, delay, period_);
        }
    }
 
    static class ScheduleTask extends TimerTask {
 
        private final RedisDistributionLockPlus redisDistributionLock;
        private final Map<String, LockContent> lockContentMap;
 
        public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
            this.redisDistributionLock = redisDistributionLock;
            this.lockContentMap = lockContentMap;
        }
 
        @Override
        public void run() {
            if (lockContentMap.isEmpty()) {
                return;
            }
            Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
            for (Map.Entry<String, LockContent> entry : entries) {
                String lockKey = entry.getKey();
                LockContent lockContent = entry.getValue();
                long expireTime = lockContent.getExpireTime();
                // 减少线程池中任务数量
                if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                    //线程池异步续约
                    ThreadPool.submit(() -> {
                        boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                        if (renew) {
                            long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                            lockContent.setExpireTime(expireTimeNew);
                        } else {
                            // 续约失败,说明已经执行完 OR redis 出现问题
                            lockContentMap.remove(lockKey);
                        }
                    });
                }
            }
        }
    }
}

五、redis主从复制的坑

redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。

redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。

如果此时redis master节点宕机,为保证集群可用性,会进行主备切换slave变为了redis masterB客户端在新的master节点上加锁成功,而A客户端也以为自己还是成功加了锁的。

此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。

至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。

总结

上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。

秒杀系统优化方案(下)吐血整理 - 开拖拉机的蜡笔小新 - 博客园

mikel阅读(407)

来源: 秒杀系统优化方案(下)吐血整理 – 开拖拉机的蜡笔小新 – 博客园

3. 深入优化设计

3.1   初始方案问题分析

在前面针对数据库的优化中,由于数据库行级锁存在竞争造成大量的串行阻塞,我们使用了存储过程(或者触发器)等技术绑定操作,整个事务在MySQL端完成,把整个热点执行放在一个过程当中一次性完成,可以屏蔽掉网络延迟时间,减少行级锁持有时间,提高事务并发访问速度。

可是问题时并发的流量实际上都是直接穿透让MYSQL自己去抗,比如说库存是否卖完以及用户是否重复秒杀都完全是靠查询数据库去判断,造成数据库不必要的负担非常大,然而这些都可以放在缓存做一个标记在服务层进行拦截,对于中小规模的并发还可以,但是真正的超高并发,显然这个还不完善。

3.2    优化的方向和思路

方向:将请求尽量拦截在系统上游

传统秒杀系统之所以挂,请求都压倒了后端数据层,数据读写锁冲突严重,并发高响应慢,几乎所有请求都超时,流量虽大,下单成功的有效流量甚小【一趟火车其实只有2000张票,200w个人来买,基本没有人能买成功,请求有效率为0】

思路:限流和削峰

限流:屏蔽掉无用的流量,允许少部分流量流向后端。

削峰:瞬时大流量峰值容易压垮系统,解决这个问题是重中之重。常用的消峰方法有异步处理、缓存和消息中间件等技术。

 

异步处理:秒杀系统是一个高并发系统,采用异步处理模式可以极大地提高系统并发量,其实异步处理就是削峰的一种实现方式。

缓存:秒杀系统本身是一个典型的读多写少的应用场景【一趟火车其实只有2000张票,200w个人来买,最多2000个人下单成功,其他人都是查询库存,写比例只有0.1%,读比例占99.9%】,非常适合使用缓存。

消息队列:消息队列可以削峰,将拦截大量并发请求,这也是一个异步处理过程,后台业务根据自己的处理能力,从消息队列中主动的拉取请求消息进行业务处理。

3.3   前端优化

3.3.1   静态资源缓存

1. 页面静态化

对商品详情和订单详情进行页面静态化处理,页面是存在html,动态数据是通过接口从服务端获取,实现前后端分离,静态页面无需连接数据库打开速度较动态页面会有明显提高。

2.页面缓存

通过CDN缓存静态资源,来抗峰值。不使用CDN的话也可以通过在手动渲染得到的html页面缓存到redis。

3.3.2   限流手段

1. 使用数学公式验证码

描述:点击秒杀前,先让用户输入数学公式验证码,验证正确才能进行秒杀。

好处:

1)防止恶意的机器人和爬虫

2)分散用户的请求

实现:

1)前端通过把商品id作为参数调用服务端创建验证码接口

2)服务端根据前端传过来的商品id和用户id生成验证码,并将商品id+用户id作为key,生成的验证码作为value存入redis,同时将生成的验证码输入图片写入imageIO让前端展示。

3)将用户输入的验证码与根据商品id+用户id从redis查询到的验证码对比,相同就返回验证成功,进入秒杀;不同或从redis查询的验证码为空都返回验证失败,刷新验证码重试

 

2. 禁止重复提交

用户提交之后按钮置灰,禁止重复提交

3.4    中间代理层

可利用负载均衡(例如反响代理Nginx等)使用多个服务器并发处理请求,减小服务器压力。

3.5     后端优化

3.5.1   控制层(网关层)

限制同一UserID访问频率:尽量拦截浏览器请求,但针对某些恶意攻击或其它插件,在服务端控制层需要针对同一个访问uid,限制访问频率。

1.    利用缓存

设置缓存有效时间,在缓存中计数,如果在缓存的有效时间内请求的次数超了的话,就返回请求访问太频繁。

2.    利用RateLimiter

RateLimiter是guava提供的基于令牌桶算法的限流实现类,通过调整生成token的速率来限制用户频繁访问秒杀页面,从而达到防止超大流量冲垮系统。(令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

3.5.2   服务层

当用户量非常大的时候,拦截流量后的请求访问量还是非常大,此时仍需进一步优化。

1.    业务分离:将秒杀业务系统和其他业务分离,单独放在高配服务器上,可以集中资源对访问请求抗压。——应用的拆分

2.    采用消息队列缓存请求:将大流量请求写到消息队列缓存,利用服务器根据自己的处理能力主动到消息缓存队列中抓取任务处理请求,数据库层订阅消息减库存,减库存成功的请求返回秒杀成功,失败的返回秒杀结束。

3.    利用缓存应对读请求:对于读多写少业务,大部分请求是查询请求,所以可以读写分离,利用缓存分担数据库压力。

4.    利用缓存应对写请求:缓存也是可以应对写请求的,可把数据库中的库存数据转移到Redis缓存中,所有减库存操作都在Redis中进行,然后再通过后台进程把Redis中的用户秒杀请求同步到数据库中。

可以将缓存和消息中间件 组合起来,缓存系统负责接收记录用户请求,消息中间件负责将缓存中的请求同步到数据库。

 

方案:本地标记 + redis预处理 + RabbitMQ异步下单 + 客户端轮询

描述:通过三级缓冲保护,1、本地标记 2、redis预处理 3、RabbitMQ异步下单,最后才会访问数据库,这样做是为了最大力度减少对数据库的访问。

实现:

  1. 在秒杀阶段使用本地标记对用户秒杀过的商品做标记,若被标记过直接返回重复秒杀,未被标记才查询redis,通过本地标记来减少对redis的访问
  2. 抢购开始前,将商品和库存数据同步到redis中,所有的抢购操作都在redis中进行处理,通过Redis预减少库存减少数据库访问
  3. 为了保护系统不受高流量的冲击而导致系统崩溃的问题,使用RabbitMQ用异步队列处理下单,实际做了一层缓冲保护,做了一个窗口模型,窗口模型会实时的刷新用户秒杀的状态。
  4. client端用js轮询一个接口,用来获取处理状态

3.5.3  数据库层

数据库层是最脆弱的一层,一般在应用设计时在上游就需要把请求拦截掉,数据库层只承担“能力范围内”的访问请求。所以,上面通过在服务层引入队列和缓存,让最底层的数据库高枕无忧。但依然可以进行如下方向的优化:

对于秒杀系统,直接访问数据库的话,存在一个【事务竞争优化】问题,可使用存储过程(或者触发器)等技术绑定操作,整个事务在MySQL端完成,把整个热点执行放在一个过程当中一次性完成,可以屏蔽掉网络延迟时间,减少行级锁持有时间,提高事务并发访问速度。

 

3.7  优化秒杀流程

  1. 秒杀活动开始之前有个活动倒计时,时间到了则会放开秒杀的权限,并生成一个验证码展示在前面页面,并把验证结果存在redis中,这里利用redis有过期时间的特性,也给验证码的缓存加了个过期时间。这里的redis缓存用的是redis的string类型。
  2. 在秒杀之前先要填一个验证码verifyCode,点击秒杀按钮时,先发送ajax请求到后台获取真实的秒杀地址path,这里秒杀地址是隐藏的,目的是防止有人恶意刷秒杀接口。所谓隐藏地址,其实是在请求地址中加一段随机字符串,这段字符串是变化的,因此秒杀请求地址是动态的;
  3. 先说下如何获取真实的秒杀地址,后台先访问redis,验证一下这个验证码有没有过期以及这个verifyCode是不是正确,验证码验证通过后,先删除这个验证码缓存,然后生成真实地址;
  4. 真实地址随机字符串由uuid以及md5加密生成,并且保存在redis中,并且设置了有效期;
  5. 从浏览器端向秒杀地址发起请求,带上path参数去后台调用真正的秒杀接口,下面是秒杀接口的逻辑
  6. 访问redis,验证path有没有过期,以及是不是正确。这里验证path以及上面的校验验证码,都是用userId对应生成的一个key值去取redis中的数据;
  7. path验证通过后,先访问内存标识,看秒杀的这个商品有没有卖完,减少对redis的不必要访问。每一种参与秒杀活动的商品都在内存里用HashMap设置了一个标识,标识某个商品id商品是否卖完了。这里的是否卖完的内存标识设置以及每种参与秒杀商品的库存存入redis是在系统启动时做的;
  8. 如果内存标识中这个商品没有卖完,则要看这个用户在这次活动中是否重复秒杀,因为我们的秒杀规则是一个用户id对于某个商品id的商品只能秒杀一件。如何判断该用户有没有秒杀过这件商品呢,秒杀记录也保存在redis缓存中
  9. 如果判断秒杀过则返回提示,如果没有秒杀过,继续;
  10. 上面说过系统加载时redis中保存了各商品对应的库存,这里用到redis的原子操作的方法decr,将对应商品的库存减1,此时数据库时的库存还没有减,因此是预减库存
  11. desc方法返回该商品此时的库存,如果小于0,说明商品已经卖完了,此次秒杀无效,并且设置该商品的内存标识为true,表示已卖完
  12. 正确地预减库存后,然后就要真正操作数据库了,数据库一般是性能瓶颈,比较耗时,因此决定用异步方式处理。对于每一条秒杀请求存入消息队列RabbitMQ中,消息体中要包含哪个用户秒杀哪个商品的信息,这里是封装了一个消息体类,这样一个秒杀请求就进入了消息队列,一个秒杀请求还没有完成,真正的秒杀请求的完成得要持久化到数据库,生成订单,减了数据库的库存才能算数,这时在客户端显示的一般是排队中,比如以前在抢购小米手机时,我就看到这样的展示,过一会再刷新页面就显示没抢到;
  13. 消息队列处理秒杀请求。先从消息体中解析出用户id和商品id,查数据库看这个商品是否卖完了查数据库看该用户对于这个商品是否有过秒杀记录数据库减库存,数据库生成订单,这两项持久化地写数据库操作放在同一个事务中,要么都执行成功,要么都失败。并把秒杀记录对象,包括秒杀单号、订单号、用户id、商品id,存入redis如果数据库减库存失败,表明商品卖完了,则要在redis中设置该商品已卖完的标识消息队列处理秒杀请求。先从消息体中解析出用户id和商品id,查数据库看这个商品是否卖完了查数据库看该用户对于这个商品是否有过秒杀记录
  14. 数据库减库存,数据库生成订单,这两项持久化地写数据库操作放在同一个事务中,要么都执行成功,要么都失败。并把秒杀记录对象,包括秒杀单号、订单号、用户id、商品id,存入redis如果数据库减库存失败,表明商品卖完了,则要在redis中设置该商品已卖完的标识
  15. ajax发起秒杀请求,秒杀请求的处理逻辑最后也只是把这条请求放入消息队列,并不能返回是否秒杀成功的结果。因此,当秒杀请求正确响应后,即请求放入消息队列后,需要另外一个请求去轮询秒杀结果,秒杀成功的标志是生成秒杀订单,并把秒杀订单对象放入redis中。所以轮询秒杀结果,只用去轮询redis中是否有对应于该用户的该商品的秒杀订单对象,如果有,则表明秒杀成功,并在前台给出提示。

上面的秒杀流程对应的流程图如下:
步骤1到12,主体是redis预减库存,生成消息队列:

 

步骤13到14是处理消息队列:

步骤15,是客户端请求秒杀结果:

 

4. 问题解析

1.      如何解决库存的超卖问题?

卖超原因:

(1)一个用户同时发出了多个请求,如果库存足够,没加限制,用户就可以下多个订单。(2)减库存的sql上没有加库存数量的判断,并发的时候也会导致把库存减成负数。

解决办法:

(1):在后端的秒杀表中,对user_id和goods_id加唯一索引,确保一个用户对一个商品绝对不会生成两个订单。

(2):我们的减库存的sql上应该加上库存数量的判断

数据库自身是有行级锁的,每次减库存的时候判断count>0,它实际上是串行的执行update的,因此绝对不会卖超!。

UPDATE seckill

        SET number = number-1

        WHERE seckill_id=#{seckillId}

        AND start_time <#{killTime}

        AND end_time >= #{killTime}

        AND number > 0;

2.    如何解决少卖问题—Redis预减成功而DB扣库存失败?

前面的方案中会出现一个少卖的问题。Redis在预减库存的时候,在初始化的时候就放置库存的大小,redis的原子减操作保证了多少库存就会减多少,也就会在消息队列中放多少。

现在考虑两种情况:

1)数据库那边出现非库存原因比如网络等造成减库存失败,而这时redis已经减了。

2)万一一个用户发出多个请求,而且这些请求恰巧比别的请求更早到达服务器,如果库存足够,redis就会减多次,redis提前进入卖空状态,并拒绝。不过这两种情况出现的概率都是非常低的。

两种情况都会出现少卖的问题,实际上也是缓存和数据库出现不一致的问题

但是我们不是非得解决不一致的问题,本身使用缓存就难以保证强一致性:

在redis中设置库存比真实库存多一些就行。

3.   秒杀过程中怎么保证redis缓存和数据库的一致性?

在其他一般读大于写的场景,一般处理的原则是:缓存只做失效,不做更新。

采用Cache-Aside pattern:

失效:应用程序先从cache取数据,没有得到,则从数据库中取数据,成功后,放到缓存中。

更新:先把数据存到数据库中,成功后,再让缓存失效。

4.  Redis中的库存如何与DB中的库存保持一致?

Redis中的数量不是库存,它的作用仅仅时候只是为了阻挡多余的请求透传到db,起到一个保护DB的作用。因为秒杀商品的数量是有限的,比如只有10个,让1万个请求去访问DB是没有意义的,因为最多只有10个请求会下单成功,剩余的9990个请求都是无效的,是可以不用去访问db而直接失败的。

因此,这是一个伪问题,我们是不需要保持一致的。

5.   为什么要隐藏秒杀接口?

html是可以被右键->查看源代码,如果秒杀地址写死在源文件中,是很容易就被恶意用户拿到的,就可以被机器人利用来刷接口,这对于其他用户来说是不公平的,我们也不希望看到这种情况。所以我们可以控制让用户在没有到秒杀时间的时候不能获取到秒杀地址,只返回秒杀的开始时间。

当到秒杀时间的时候才返回秒杀地址即seckill_id以及根据seckill_id和salt加密的MD5,前端再次拿着seckill_id和MD5才能执行秒杀。假如用户在秒杀开始前猜测到秒杀地址seckill_id去请求秒杀,也是不会成功的,因为它拿不到需要验证的MD5。这里的MD5相当于是用户进行秒杀的凭证。

6.   一个秒杀系统,500用户同时登陆访问服务器A,服务器B如何快速利用登录名(假设是电话号码或者邮箱)做其他查询?

主从复制,读写分离

秒杀系统优化方案(上)吐血整理 - 开拖拉机的蜡笔小新 - 博客园

mikel阅读(454)

来源: 秒杀系统优化方案(上)吐血整理 – 开拖拉机的蜡笔小新 – 博客园

前一段时间好好研究了秒杀的问题,我把里面的问题好好总结了,可以说是比较全面的了,真的是吐血整理了。

由于我先是在word中整理的,格式都整理得比较好,放到博客上格式挺难调,暂时按word的格式来吧,有时间了在好好排版下。

主要需要解决的问题有两个:

  1. 高并发对数据库产生的压力
  2. 竞争状态下如何解决库存的正确减少(超卖问题)

优化的思路:

1) 尽量将请求拦截在系统上游

2)读多写少经量多使用缓存
3) redis缓存 +RabbitMQ+ mySQL 批量入库

1.   初始秒杀设计

1.1 业务分析

秒杀系统业务流程如下:

由图可以发现,整个系统其实是针对库存做的系统。用户成功秒杀商品,对于我们系统的操作就是:1.减库存。2.记录用户的购买明细。下面看看我们用户对库存的业务分析:

记录用户的秒杀成功信息,我们需要记录:1.谁购买成功了。2.购买成功的时间/有效期。这些数据组成了用户的秒杀成功信息,也就是用户的购买行为。

为什么我们的系统需要事务?

1.若是用户成功秒杀商品我们记录了其购买明细却没有减库存。导致商品的超卖

2.减了库存却没有记录用户的购买明细。导致商品的少卖。对于上述两个故障,若是没有事务的支持,损失最大的无疑是我们的用户和商家。在MySQL中,它内置的事务机制,可以准确的帮我们完成减库存和记录用户购买明细的过程。

1.2  难点分析

当用户A秒杀id为10的商品时,此时MySQL需要进行的操作是:

1.开启事务。2.更新商品的库存信息。3.添加用户的购买明细,包括用户秒杀的商品id以及唯一标识用户身份的信息如电话号码等。4.提交事务。

若此时有另一个用户B也在秒杀这件id为10的商品,他就需要等待,等待到用户A成功秒杀到这件商品,然后MySQL成功的提交了事务他才能拿到这个id为10的商品的锁从而进行秒杀,而同一时间是不可能只有用户B在等待,肯定是有很多很多的用户都在等待竞争行级锁。秒杀的难点就在这里,如何高效的处理这些竞争?如何高效的完成事务?

1.3 功能实现

我们只是实现秒杀的一些功能:1.秒杀接口的暴露。2.执行秒杀的操作。3.相关查询,比如说列表查询,详情页查询。我们实现这三个功能即可。

1.4 数据库设计

Seckill秒杀表单

Success_seckill购买明细表

在购买明细表中seckill_id和user_phone是联合主键,当重复秒杀的时候,加入ignore防止报错,只是会返回0,表示重复秒杀。

INSERT ignore INTO success_killed(seckill_id,user_phone,state)
VALUES (#{seckillId},#{userPhone},0)

在购买明细表中seckill_id和user_phone是联合主键,当重复秒杀的时候,加入ignore防止报错,只是会返回0,表示重复秒杀。

INSERT ignore INTO success_killed(seckill_id,user_phone,state)
VALUES (#{seckillId},#{userPhone},0)

1.5 DAO层设计

秒杀表的DAO:减库存(id,nowtime)、由id查询商品、由偏移量查询商品

购买明细表的DAO:插入购买明细、根据商品id查询明细SucceesKill对象(携带Seckill对象)—mybatis的复合查询

减库存和增加明细的sql

复制代码
<update id="reduceNumber">
        UPDATE seckill
        SET number = number-1
        WHERE seckill_id=#{seckillId}
        AND start_time <![CDATA[ <= ]]> #{killTime}
        AND end_time >= #{killTime}
        AND number > 0;
</update>
<insert id="insertSuccessKilled">
        <!--当出现主键冲突时(即重复秒杀时),会报错;不想让程序报错,加入ignore-->
        INSERT ignore INTO success_killed(seckill_id,user_phone,state)
        VALUES (#{seckillId},#{userPhone},0)
 </insert>
复制代码

 

1.6 Service层设计

暴露秒杀地址(接口)DTO

复制代码
public class Exposer {
    //是否开启秒杀
    private boolean exposed;
    //加密措施
    private String md5;
    private long seckillId;
    //系统当前时间(毫秒)
    private long now;
    //秒杀的开启时间
    private long start;
    //秒杀的结束时间
    private long end;}
复制代码

 

封装执行秒杀后的结果:是否秒杀成功

复制代码
public class SeckillExecution {
    private long seckillId;
    //秒杀执行结果的状态
    private int state;
    //状态的明文标识
    private String stateInfo;
    //当秒杀成功时,需要传递秒杀成功的对象回去
    private SuccessKilled successKilled;}
复制代码

 

秒杀过程

接口暴露:

复制代码
public Exposer exportSeckillUrl(long seckillId) {
        //缓存优化
        Seckill seckill = getById(seckillId);
        //若是秒杀未开启
        Date startTime = seckill.getStartTime();
        Date endTime = seckill.getEndTime();
        //系统当前时间
        Date nowTime = new Date();
        if (startTime.getTime() > nowTime.getTime() || endTime.getTime() < nowTime.getTime()) {
            return new Exposer(false, seckillId, nowTime.getTime(), startTime.getTime(), endTime.getTime());
        }
        //秒杀开启,返回秒杀商品的id、用给接口加密的md5
        String md5 = getMD5(seckillId);
        return new Exposer(true, md5, seckillId);
}
复制代码

如果当前时间还没有到秒杀时间或者已经超过秒杀时间,秒杀处于关闭状态,那么返回秒杀的开始时间和结束时间;如果当前时间处在秒杀时间内,返回暴露地址(秒杀商品的id、用给接口加密的md5)

为什么要进行MD5加密?

我们用MD5加密的方式对秒杀地址(seckill_id)进行加密,暴露给前端用户。当用户执行秒杀的时候传递seckill_id和MD5,程序拿着seckill_id根据设置的盐值计算MD5,如果与传递的md5不一致,则表示地址被篡改了。

 

为什么要进行秒杀接口暴露的控制或者说进行秒杀接口的隐藏?

现实中有的用户回通过浏览器插件提前知道秒杀接口,填入参数和地址来实现自动秒杀,这对于其他用户来说是不公平的,我们也不希望看到这种情况。所以我们可以控制让用户在没有到秒杀时间的时候不能获取到秒杀地址,只返回秒杀的开始时间。当到秒杀时间的时候才

返回秒杀地址即seckill_id以及根据seckill_id和salt加密的MD5,前端再次拿着seckill_id和MD5才能执行秒杀。假如用户在秒杀开始前猜测到秒杀地址seckill_id去请求秒杀,也是不会成功的,因为它拿不到需要验证的MD5。这里的MD5相当于是用户进行秒杀的凭证。

 

执行秒杀:

复制代码
 //秒杀是否成功,成功: 增加明细,减库存;失败:抛出异常,事务回滚
    @Transactional
    public SeckillExecution executeSeckill(long seckillId, long userPhone, String md5)  throws SeckillException, RepeatKillException, SeckillCloseException {
        if (md5 == null || !md5.equals(getMD5(seckillId))) {
            //秒杀数据被重写了
            throw new SeckillException("seckill data rewrite");
        }
        //执行秒杀逻辑:增加购买明细+减库存
        Date nowTime = new Date();
        try {
            //先增加明细,然后再执行减库存的操作
            int insertCount = successKilledDao.insertSuccessKilled(seckillId, userPhone);
            //看是否该明细被重复插入,即用户是否重复秒杀
            if (insertCount <= 0) {
                throw new RepeatKillException("seckill repeated");
            } else {
                //减库存,热点商品竞争
                int updateCount = seckillDao.reduceNumber(seckillId, nowTime);
                if (updateCount <= 0) {
                    //没有更新库存记录,说明秒杀结束或者是已经卖完 rollback
                    throw new SeckillCloseException("seckill is closed");
                } else {
                    //秒杀成功,得到成功插入的明细记录,并返回成功秒杀的信息 commit
                    SuccessKilled successKilled = successKilledDao.queryByIdWithSeckill(seckillId, userPhone);
                    return new SeckillExecution(seckillId, SeckillStatEnum.SUCCESS, successKilled);
                }
            }
        } catch (SeckillCloseException e1) {
            throw e1;
        } catch (RepeatKillException e2) {
            throw e2;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            //所以编译期异常转化为运行期异常
            throw new SeckillException("seckill inner error :" + e.getMessage());
        }
    }
复制代码

首先检查用户是否已经登录,查看cookie中是否有phone的信息,如果没有,返回没有注册的错误信息。

接着执行秒杀,首先验证md5,看地址是否被篡改。先增加明细(为什么要先增加明细见后面优化的过程),看是否该明细被重复插入,即用户是否重复秒杀,如果是,抛异常。然后减库存,因为sql在减库存的时候判断了当前时间和秒杀时间是否对应,如果数据库update返回0没有更新库存记录,说明秒杀结束;或者是库存已经没有主动抛出错误rollback。(前面在获取秒杀地址的时候已经挡住了秒杀关闭的请求(没到时间或者时间已过),然后从获取到秒杀地址到执行秒杀还可能会在这段时间秒杀结束)

最后秒杀成功,得到购买明细信息,接着commit。

注意事务在这里的处理:

Spring事务异常回滚,捕获异常不抛出就不会回滚

1.7 Web层设计

交互流程

2.   初始优化设计

红色部分代表可能高并发的点,绿色表示没有影响

2.1 详情页缓存

通过CDN缓存静态资源,来抗峰值。

动静态数据分离

详情页静态资源是部署在CDN节点中,也就是说访问静态资源或者详情页是不用访问我们的系统的。

限流小技巧:用户提交之后按钮置灰,禁止重复提交 

为什么要单独ajax请求获取服务器的时间?

为了保持时间一致,因为详情页放在CDN上和系统存放的位置是分离的。

2.2 秒杀接口地址缓存

无法使用CDN是因为,CDN适合的请求的资源是不易变化的。

秒杀接口是变化的,可以使用redis服务端缓存可以用集群抗住非常大的并发。1秒钟可以承受10万qps。多个Redis组成集群,可以到100w个qps

一致性:当秒杀的对象改变的时候修改我们的数据库同时修改缓存。

原本查询秒杀商品时是通过主键直接去数据库查询的,选择将数据缓存在Redis,在查询秒杀商品时先去Redis缓存中查询,以此降低数据库的压力。如果在缓存中查询不到数据再去数据库中查询,再将查询到的数据放入Redis缓存中,这样下次就可以直接去缓存中直接查询到。

这里有一个继续优化的点:在redis中存放对象是将对象序列化成byte字节。

通过Jedis储存对象的方式有大概三种

  1. 本项目采用的方式:将对象序列化成byte字节,最终存byte字节;
  2. 对象转hashmap,也就是你想表达的hash的形式,最终存map;
  3. 对象转json,最终存json,其实也就是字符串

其实如果你是平常的项目,并发不高,三个选择都可以,这种情况下以hash的形式更加灵活,可以对象的单个属性,但是问题来了,在秒杀的场景下,三者的效率差别很大。

10w数据

时间

内存占用

存json

10s

14M

存byte

6s

6M

存jsonMap

10s

20M

存byteMap

4s

4M

取json

7s

取byte

4s

取jsonmap

7s

取bytemap

4s

bytemap最快啊,为啥不用啊,因为项目用了超级高性能的自定义序列化工具protostuff。

2.3 秒杀操作优化

Mysql真的低效吗?

在mysql端一条update压力测试约4wQPS,即使是现在最好的秒杀产品应该也达不到这个数字。

然而实际上远没有这么高的QPS,那么时间消耗在哪呢?

串行化操作,大量的堵塞

2.3.1 瓶颈分析

客户端执行update,当我们的sql通过网络发送到mysql的时候,这本身就有网络延迟在里面,并且还有GC的时间,GC又分为新生代GC和老年代GC,新生代会暂停所有的事务代码,也就是我们的java代码,一般在几十毫秒

也即是说如果由java客户端去控制这些事务的话,update减库存,网络延迟,update数据操作结果返回,然后执行GC;然后执行insert,发生网络延迟,等待insert执行结果返回,也可能出现GC,最后commit或者rollback。当这些执行完了之后,第二个等待行锁的线程才有可能拿到这个数据行的锁,再去执行update减库存。

不是我们的mysql慢,也不是java慢,可能存在我们的java客户端执行这些sql,然后等待这些sql的结果,再去做判断再去执行这些sql,这一长串的事务在java客户端执行,但是java客户端和数据库之间会有网络延迟,或者是GC这些时间也要加载事务的执行周期里面,而同一行的事务是串行化的。

那么我们的QPS分析就是所有的sql执行时间+网络延迟时间+可能的GC,这就是当前执行一行数据的时间。

优化的方向

2.3.2 简单优化

将原本先update(减库存)再进行insert(插入购买明细)的步骤改成:先insert再update。

为什么要先insertupdate

首先是在更新操作的时候给行加锁,插入并不会加锁,如果更新操作在前,那么就需要执行完更新和插入以后事务提交或回滚才释放锁。而如果插入在前,更新在后,那么只有在更新时才会加行锁,之后在更新完以后事务提交或回滚释放锁。

在这里,插入是可以并行的,而更新由于会加行级锁是串行的

也就是说是更新在前加锁和释放锁之间两次的网络延迟和GC,如果插入在前则加锁和释放锁之间只有一次的网络延迟和GC,也就是减少的持有锁的时间。

这里先insert并不是忽略了库存不足的情况,而是因为insert和update是在同一个事务里,光是insert并不一定会提交,只有在update成功才会提交,所以并不会造成过量插入秒杀成功记录。

2.3.3 深度优化

客户端逻辑事务SQLMYSQL端执行,完全屏蔽网络延迟和GCMYSQL只需告诉最终结果。

1. 阿里巴巴做了一个mysql源码层的修改方案,当执行完update之后,它会自动做回滚,回滚的条件影响的记录数是1,就会commit;如果是0就会rollback,不由java客户端来控制commit或者rollback,不给java客户端和mysql之间通信的网络延迟,本质上减低了网络延迟或者GC的干扰,但是这个成本高,要修改mysql源码,只有大公司能做。

2.我们可以将执行秒杀操作时的insert和update放到MySQL服务端的存储过程里,而Java客户端直接调用这个存储过程,这样就可以避免网络延迟和可能发生的GC影响。另外,由于我们使用了存储过程,也就使用不到Spring的事务管理了,因为在存储过程里我们会直接启用一个事务。

2.3.4 优化总结

 

预知后事如何,请看下篇分解:秒杀系统优化方案(下)吐血整理