浅谈SQL SERVER中事务的ACID - CareySon - 博客园

mikel阅读(598)

来源: 浅谈SQL SERVER中事务的ACID – CareySon – 博客园

简介

 

ACID,是指在可靠数据库管理系统(DBMS)中,事务(transaction)所应该具有的四个特性:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability).这是可靠数据库所应具备的几个特性.下面针对这几个特性进行逐个讲解.

理解原子性(Atomicity)

原子性意味着数据库中的事务执行是作为原子。即不可再分,整个语句要么执行,要么不执行。

SQL SERVER中,每一个单独的语句都可以看作是默认包含在一个事务之中:

1

所以,每一个语句本身具有原子性,要么全部执行,这么全部不执行,不会有中间状态:

2

上面说了,每一条T-SQL语句都可以看作是默认被包裹在一个事务之中的,SQL Server对于每一条单独的语句都实现了原子性,但这种原子粒度是非常小的,如果用户想要自己定义原子的大小,则需要包含在事务中来构成用户自定义的原子粒度:

3

对于用户来说,要用事务实现的自定义原子性往往是和业务相关的,比如银行转账,从A账户减去100,在B账户增加100,如果这两个语句不能保证原子性的话,比如从A账户减去100后,服务器断电,而在B账户中却没有增加100.虽然这种情况会让银行很开心,但作为开发人员的你可不希望这种结果.而默认事务中,即使出错了也不会整个事务进行回滚。而是失败的语句抛出异常,而正确的语句成功执行。这样会破坏原子性。所以SQL SERVER给予了一些选项来保证事务的原子性.

SQL SERVER提供了两大类方式来保证自定义事务的原子性:

  1.通过SET XACT_ABORT ON来设置事务必须符合原子性

       利用设置XACT_ABORT选项设置为ON,来设置所有事务都作为一个原子处理.下面例子利用两个语句插入到数据库,可以看到开启SET XACT_ABORT ON选项后,事务具有了原子性:

   4

 

 2.按照用户设置进行回滚(ROLLBACK)

       这种方式具有更高的灵活性,开发人员可以自定义在什么情况进行ROLLBACK,利用TRY CATCH语句和@@ERROR进行判断都属于这种方式.

5

 

理解一致性(Consistency)

一致性,即在事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。

一致性分为两个层面

1.数据库机制层面

      数据库层面的一致性是,在一个事务执行之前和之后,数据会符合你设置的约束(唯一约束,外键约束,Check约束等)和触发器设置.这一点是由SQL SERVER进行保证的.

 

2.业务层面

      对于业务层面来说,一致性是保持业务的一致性.这个业务一致性需要由开发人员进行保证.很多业务方面的一致性可以通过转移到数据库机制层面进行保证.比如,产品只有两个型号,则可以转移到使用CHECK约束使某一列必须只能存这两个型号.

 

理解隔离性(Isolation)

隔离性。事务的执行是互不干扰的,一个事务不可能看到其他事务运行时,中间某一时刻的数据。

在Windows中,如果多个进程对同一个文件进行修改是不允许的,Windows通过这种方式来保证不同进程的隔离性:

Lock

而SQL Server中,通过SQL SERVER对数据库文件进行管理,从而可以让多个进程可以同时访问数据库:

6

SQL Server利用加锁和阻塞来保证事务之间不同等级的隔离性.

一般情况下,完全的隔离性是不现实的,完全的隔离性要求数据库同一时间只执行一条事务,这样的性能可想而知.想要理解SQL Server中对于隔离性的保障,首先要了解事务之间是如何干扰的.

事务之间的互相影响的情况分为几种,分别为:脏读(Dirty Read),不可重复读,幻读

 

脏读

    脏读意味着一个事务读取了另一个事务未提交的数据,而这个数据是有可能回滚的:

7

下面来看一个例子:

两个事务,事务A插入一条数据,但未提交,事务B在此期间进行了读取,读取到了事务A未提交的数据,造成脏读

8

 

不可重复读(Unrepeatable Read)

不可重复读意味着,在数据库访问中,一个事务范围内两个相同的查询却返回了不同数据。这是由于查询时系统中其他事务修改的提交而引起的。

9

下面来看一个不可重复读的例子:

事务B中对某个查询执行两次,当第一次执行完时,事务A对其数据进行了修改。事务B中再次查询时,数据发生了改变:

10

11

 

幻读(phantom read)

幻读,是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,这种修改涉及到表中的全部数据行。同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象发生了幻觉一样.

13

下面来看一个例子:

事务B更新表中所有的数据,在此期间事务A插入了一条数据,事务B再次查询后,发现居然还有没有修改的数据,产生幻读:

12

 

理解SQL SERVER中的隔离级别

     为了避免上述几种事务之间的影响,SQL Server通过设置不同的隔离等级来进行不同程度的避免。因为高的隔离等级意味着更多的锁,从而牺牲性能.所以这个选项开放给了用户根据具体的需求进行设置。不过默认的隔离等级Read Commited符合了99%的实际需求.

SQL Server隔离事务之间的影响是通过锁来实现的,这个概念比较繁杂,所以本文不会详细对这个概念进行讲解.通过阻塞来阻止上述效果

SQL Server提供了5种选项来避免不同级别的事务之间的影响

隔离等级由低到高分别为

Read Uncommited(最高的性能,但可能出现脏读,不可重复读,幻读)

Read commited(可能出现不可重复读,幻读)

Repeatable Read(可能出现幻读)

Serializable(最低的性能,Range锁会导致并发下降)

SNOPSHOT(这个是通过在tempDB中创建一个额外的副本来避免脏读,不可重复读,会给tempDB造成额外负担,因为不是标准ANSI SQL标准,不详细讨论)

 

总之,不同的隔离级别是通过加不同的锁,造成阻塞来实现的,来看一个例子:

SQL SERVER通过阻塞来阻止脏读,所以保持独立性会以付出性能作为代价:

14

 

理解持久性(Durability)

     持久性,意味着在事务完成以后,该事务所对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。

即使出现了任何事故比如断电等,事务一旦提交,则持久化保存在数据库中.

SQL SERVER通过write-ahead transaction log来保证持久性。write-ahead transaction log的意思是,事务中对数据库的改变在写入到数据库之前,首先写入到事务日志中。而事务日志是按照顺序排号的(LSN)。当数据库崩溃或者服务器断点时,重启动SQL SERVER,SQL SERVER首先会检查日志顺序号,将本应对数据库做更改而未做的部分持久化到数据库,从而保证了持久性.

 

总结

本文简单讲述了ACID的概念和ACID在SQL SERVER中的实现.ACID只是一个理念,并不是某项具体的技术.对于健壮数据库来说,保证ACID是可靠数据库的前提.

一分钟了解负载均衡的一切 - Arlar - 博客园

mikel阅读(781)

来源: 一分钟了解负载均衡的一切 – Arlar – 博客园

什么是负载均衡

负载均衡(Load Balance)是分布式系统架构设计中必须考虑的因素之一,它通常是指,将请求/数据【均匀】分摊到多个操作单元上执行,负载均衡的关键在于【均匀】。

常见的负载均衡方案

常见互联网分布式架构如上,分为客户端层、反向代理nginx层、站点层、服务层、数据层。可以看到,每一个下游都有多个上游调用,只需要做到,每一个上游都均匀访问每一个下游,就能实现“将请求/数据【均匀】分摊到多个操作单元上执行”。

【客户端层->反向代理层】的负载均衡

【客户端层】到【反向代理层】的负载均衡,是通过“DNS轮询”实现的:DNS-server对于一个域名配置了多个解析ip,每次DNS解析请求来访问DNS-server,会轮询返回这些ip,保证每个ip的解析概率是相同的。这些ip就是nginx的外网ip,以做到每台nginx的请求分配也是均衡的。

【反向代理层->站点层】的负载均衡

【反向代理层】到【站点层】的负载均衡,是通过“nginx”实现的。通过修改nginx.conf,可以实现多种负载均衡策略:

1)请求轮询:和DNS轮询类似,请求依次路由到各个web-server

2)最少连接路由:哪个web-server的连接少,路由到哪个web-server

3)ip哈希:按照访问用户的ip哈希值来路由web-server,只要用户的ip分布是均匀的,请求理论上也是均匀的,ip哈希均衡方法可以做到,同一个用户的请求固定落到同一台web-server上,此策略适合有状态服务,例如session(58沈剑备注:可以这么做,但强烈不建议这么做,站点层无状态是分布式架构设计的基本原则之一,session最好放到数据层存储)

4)…

【站点层->服务层】的负载均衡


【站点层】到【服务层】的负载均衡,是通过“服务连接池”实现的。

上游连接池会建立与下游服务多个连接,每次请求会“随机”选取连接来访问下游服务。

上一篇文章《RPC-client实现细节》中有详细的负载均衡、故障转移、超时处理的细节描述,欢迎点击link查阅,此处不再展开。

【数据层】的负载均衡

在数据量很大的情况下,由于数据层(db,cache)涉及数据的水平切分,所以数据层的负载均衡更为复杂一些,它分为“数据的均衡”,与“请求的均衡”。

数据的均衡是指:水平切分后的每个服务(db,cache),数据量是差不多的。

请求的均衡是指:水平切分后的每个服务(db,cache),请求量是差不多的。

业内常见的水平切分方式有这么几种:

一、按照range水平切分

每一个数据服务,存储一定范围的数据,上图为例:

user0服务,存储uid范围1-1kw

user1服务,存储uid范围1kw-2kw

这个方案的好处是:

(1)规则简单,service只需判断一下uid范围就能路由到对应的存储服务

(2)数据均衡性较好

(3)比较容易扩展,可以随时加一个uid[2kw,3kw]的数据服务

不足是:

(1)请求的负载不一定均衡,一般来说,新注册的用户会比老用户更活跃,大range的服务请求压力会更大

二、按照id哈希水平切分

每一个数据服务,存储某个key值hash后的部分数据,上图为例:

user0服务,存储偶数uid数据

user1服务,存储奇数uid数据

这个方案的好处是:

(1)规则简单,service只需对uid进行hash能路由到对应的存储服务

(2)数据均衡性较好

(3)请求均匀性较好

不足是:

(1)不容易扩展,扩展一个数据服务,hash方法改变时候,可能需要进行数据迁移

总结

负载均衡(Load Balance)是分布式系统架构设计中必须考虑的因素之一,它通常是指,将请求/数据【均匀】分摊到多个操作单元上执行,负载均衡的关键在于【均匀】。

(1)【客户端层】到【反向代理层】的负载均衡,是通过“DNS轮询”实现的

(2)【反向代理层】到【站点层】的负载均衡,是通过“nginx”实现的

(3)【站点层】到【服务层】的负载均衡,是通过“服务连接池”实现的

(4)【数据层】的负载均衡,要考虑“数据的均衡”与“请求的均衡”两个点,常见的方式有“按照范围水平切分”与“hash水平切分”

C# 线程并发锁 - Arlar - 博客园

mikel阅读(849)

来源: C# 线程并发锁 – Arlar – 博客园

本文目录:

  • 线程的简单使用
  • 并发和异步的区别
  • 并发控制 – 锁
  • 线程的信号机制
  • 线程池中的线程
  • 案例:支持并发的异步日志组件

线程的简单使用

常见的并发和异步大多是基于线程来实现的,所以本文先讲线程的简单使用方法。

使用线程,我们需要引用System.Threading命名空间。创建一个线程最简单的方法就是在 new 一个 Thread,并传递一个ThreadStart委托(无参数)或ParameterizedThreadStart委托(带参数),如下:

复制代码
复制代码
class Program {
    static void Main(string[] args) {

        // 使用无参数委托ThreadStart
        Thread t = new Thread(Go);
        t.Start();

        // 使用带参数委托ParameterizedThreadStart
        Thread t2 = new Thread(GoWithParam);
        t2.Start("Message from main.");

        t2.Join();// 等待线程t2完成。

        Console.WriteLine("Thread t2 has ended!");
        Console.ReadKey();
    }

    static void Go() {
        Console.WriteLine("Go!");
    }

    static void GoWithParam(object msg) {
        Console.WriteLine("Go With Param! Message: " + msg);
        Thread.Sleep(1000);// 模拟耗时操作
    }
}
复制代码
复制代码

运行结果:

线程的用法,我们只需要了解这么多。下面我们再来通过一段代码来讲讲并发和异步。

并发和异步的区别

关于并发和异步,我们先来写一段代码,模拟多个线程同时写1000条日志:

复制代码
复制代码
class Program {
    static void Main(string[] args) {

        Thread t1 = new Thread(Working);
        t1.Name = "Thread1";
        Thread t2 = new Thread(Working);
        t2.Name = "Thread2";
        Thread t3 = new Thread(Working);
        t3.Name = "Thread3";

        // 依次启动3个线程。
        t1.Start();
        t2.Start();
        t3.Start();

        Console.ReadKey();
    }

    // 每个线程都同时在工作
    static void Working() {
        // 模拟1000次写日志操作
        for (int i = 0; i < 1000; i++) {
            //  异步写文件
            Logger.Write(Thread.CurrentThread.Name + " writes a log: " + i + ", on " + DateTime.Now.ToString() + ".\n");
        }// 做一些其它的事件
        for (int i = 0; i < 1000; i++) { }
    }
}
复制代码
复制代码

代码很简单,相信大家都能看得懂。Logger 大家可以把它看做是一个写日志的组件,先不关心它的具体实现,只要知道它是一个提供了写日志功能的组件就行。

那么,这段代码跟并发和异步有什么关系呢?

我们先用一张图来描述这段代码:

观察上图,3个线程同时调用Logger写日志,对于Logger来说,3个线程同时交给了它任务,这种情况就是并发。对于其中一个线程来说,它在工作过程中,在某个时间请求Logger帮它写日志,同时又继续在自己的其它工作,这种情况就是异步

(经读者反馈,为不“误导”读者(尽管我个人不觉得是误导。之前我的定义和解释不全 面,没有从操作系统和CPU层次去区分这两个概念。我的文章不喜欢搬教科书,只是想用通俗易读的白话让大家理解),为了知识的专业性和严谨,现已把我理解 的对并发和异步的定义删除,感谢园友们的热心讨论)。

 

接下来,我们继续讲几个很有用的有关线程和并发的知识 – 锁、信号机制和线程池。

并发控制 – 锁

CLR 会为每个线程分配自己的内存堆空间,以使他们的本地变量保持分离互不干扰。

线程之间也可以共享通用的数据,比如同一对象的某个属性或全局静态变量。但线程间共享数据是存在安全问题的。举个例子,下面的主线程和新线程共享了变量done,done用来标识某件事已经做过了(告诉其它线程不要再重复做了):

复制代码
复制代码
class Program {
    static bool done;
    static void Main(string[] args) {

        new Thread(Go).Start(); // 在新的线程上调用Go
        Go(); // 在主线程上调用Go

        Console.ReadKey();
    }

    static void Go() {
        if (!done) {
            Thread.Sleep(500); // 模拟耗时操作
            Console.WriteLine("Done"); 
            done = true;
        }
    }
}
复制代码
复制代码

输出结果:

输出了两个“Done”,事件被做了两次。由于没有控制好并发,这就出现了线程的安全问题,无法保证数据的状态。

要解决这个问题,就需要用到锁(Lock,也叫排它锁或互斥锁)。使用lock语句,可以保证共享数据只能同时被一个线程访问。lock的数据对象 要求是不能null的引用类型的对象,所以lock的对象需保证不能为空。为此需要创建一个不为空的对象来使用锁,修改一下上面的代码如下:

复制代码
复制代码
class Program {

    static bool done;
    static object locker = new object(); // !!

    static void Main(string[] args) {

        new Thread(Go).Start(); // 在新的线程上调用Go
        Go(); // 在主线程上调用Go

        Console.ReadKey();
    }

    static void Go() {
        lock (locker) {
            if (!done) {
                Thread.Sleep(500); // Doing something.
                Console.WriteLine("Done");
                done = true;
            }
        }
    }
}
复制代码
复制代码

再看结果:

使用锁,我们解决了问题。但使用锁也会有另外一个线程安全问题,那就是“死锁”,死锁的概率很小,但也要避免。保证“上锁”这个操作在一个线程上执行是避免死锁的方法之一,这种方法在下文案例中会用到。

这里我们就不去深入研究“死锁”了,感兴趣的朋友可以去查询相关资料。

线程的信号机制

有时候你需要一个线程在接收到某个信号时,才开始执行,否则处于等待状态,这是一种基于信号的事件机制。.NET框架提供一个 ManualResetEvent类来处理这类事件,它的 WaiOne 实例方法可使当前线程一直处于等待状态,直到接收到某个信号。它的Set方法用于打开发送信号。下面是一个信号机制的使用示例:

复制代码
复制代码
static void Main(string[] args) {

    var signal = new ManualResetEvent(false);

    new Thread(() => {
        Console.WriteLine("Waiting for signal...");
        signal.WaitOne();
        signal.Dispose();
        Console.WriteLine("Got signal!");
    }).Start();
    Thread.Sleep(2000);

    signal.Set();// 打开“信号”

    Console.ReadKey();
}
复制代码
复制代码

运行结果:

当执行Set方法后,信号保持打开状态,可通过Reset方法将其关闭,若不再需要,通过Dispose将其释放。如果预期的等待时间很短,可以用 ManualResetEventSlim代替ManualResetEvent,前者在等待时间较短时性能更好。信号机制非常有用,后面的日志案例会用 到它。

线程池中的线程

线程池中的线程是由CLR来管理的。在下面两种条件下,线程池能起到最好的效用:

  • 任务运行的时候比较短(<250ms),这样CLR可以充分调配现有的空闲线程来处理该任务;
  • 大量时间处于等待(或阻塞)的任务不去支配线程池的线程。

要使用线程中的线程,主要有下面两种方式:

// 方式1:Task.Run,.NET Framework 4.5 才有
Task.Run (() => Console.WriteLine ("Hello from the thread pool"));

// 方式2:ThreadPool.QueueUserWorkItem
ThreadPool.QueueUserWorkItem (t => Console.WriteLine ("Hello from the thread pool"));

线程池使得线程可以充分有效地被使用,减少了任务启动的延迟。但是不是所有的情况都适合使用线程池中的线程,比如下面要讲的日志案例 – 异步写文件。

这里讲线程池,是为了让大家大致了解什么时候用线程池中的线程,什么时候不用。即,耗时长或有阻塞情况的不用线程池中的线程。

创建不走线程池中的线程,可以直接通过new Thread来创建,也可以通过下面的代码来创建:

Task task = Task.Factory.StartNew (() => ...,TaskCreationOptions.LongRunning);// 注意必须带TaskCreationOptions.LongRunning参数

这里用到了Task,大家不用关心它,后续博文会详细讲。

关于线程的知识很多,这里不再深入了,因为这些已经足够让我们应付Web开发了。

案例:支持并发的异步日志组件

上文的“并发和异步的区别”的代码中我们用到了一个Logger类,现在我们就来做一个这样的Logger。

基于上面的知识,我们可以实现应用程序的并发写日志日志功能。在应用程序中,写日志是常见的功能,简单分析一下该功能的需求:

  1. 在后台异步执行,和其它线程互不影响。
    根据上文线程池的两个最优使用条件,由写日志线程会长时间处于阻塞(或运行等待)状态,所以它不适合使用线程池。即不能使用Task.Run,而最好使用new Thread。

  2. 支持并发,即多个任务(分布在不同线程上)可同时调用写日志功能,但需保证线程安全。
    支持并发,必然要用到锁,但要完全保证线程安全,那就要想办法避免“死锁”。只要我们把“上锁”的操作始终由同一个线程来做即可避免“死锁”问题,但这样的话,并发请求的任务只能放在队列中由该线程依次执行(因为是后台执行,无需即时响应用户,所以可以这么做)。

  3. 单个实例,单个线程。
    任何地方调用写日志功能都调用的是同一个Logger实例(显然不能每次写日志都新建一个实例),即需使用单例模式。不管有多少任务调用写日志功能,都必须始终使用同一个线程来处理这些写日志操作,以保证不占用过多的线程资源和避免新建线程带来的延迟。

运用上面的知识,我们来写一个这样的类。简单理一下思路:

  1. 需要一个用来存放写日志任务的队列。
  2. 需要有一个信号机制来标识是否有新的任务要执行。
  3. 当有新的写日志任务时,将该任务加入到队列中,并发出信号。
  4. 用一个方法来处理队列中的任务,当接收新任务信号时,就依次调用队列中的任务。

开发一个功能前需要有个简单的思路,保证心里面有底。具体开发的时候会发现问题,然后再去补充扩展和完善等。刚开始很难想得太周全,先有个简单的思路,然后代码写起来!

下面是这样一个Logger类初步实现:

复制代码
复制代码
public class Logger {

    // 用于存放写日志任务的队列
    private Queue<Action> _queue;

    // 用于写日志的线程
    private Thread _loggingThread;

    // 用于通知是否有新日志要写的“信号器”
    private ManualResetEvent _hasNew;

    // 构造函数,初始化。
    private Logger() {
        _queue = new Queue<Action>();
        _hasNew = new ManualResetEvent(false);

        _loggingThread = new Thread(Process);
        _loggingThread.IsBackground = true;
        _loggingThread.Start();
    }

    // 使用单例模式,保持一个Logger对象
    private static readonly Logger _logger = new Logger();
    private static Logger GetInstance() {
        /* 不安全代码
        lock (locker) {
            if (_logger == null) {
                _logger = new Logger();
            }
        }*/
        return _logger;
    }

    // 处理队列中的任务
    private void Process() {
        while (true) {
            // 等待接收信号,阻塞线程。
            _hasNew.WaitOne();

            // 接收到信号后,重置“信号器”,信号关闭。
            _hasNew.Reset(); 

            // 由于队列中的任务可能在极速地增加,这里等待是为了一次能处理更多的任务,减少对队列的频繁“进出”操作。
            Thread.Sleep(100);

            // 开始执行队列中的任务。
            // 由于执行过程中还可能会有新的任务,所以不能直接对原来的 _queue 进行操作,
            // 先将_queue中的任务复制一份后将其清空,然后对这份拷贝进行操作。

            Queue<Action> queueCopy;
            lock (_queue) {
                queueCopy = new Queue<Action>(_queue);
                _queue.Clear();
            }

            foreach (var action in queueCopy) {
                action();
            }
        }
    }

    private void WriteLog(string content) {
        lock (_queue) { // todo: 这里存在线程安全问题,可能会发生阻塞。
            // 将任务加到队列
            _queue.Enqueue(() => File.AppendAllText("log.txt", content));
        }

        // 打开“信号”
        _hasNew.Set();
    }

    // 公开一个Write方法供外部调用
    public static void Write(string content) {
        // WriteLog 方法只是向队列中添加任务,执行时间极短,所以使用Task.Run。
        Task.Run(() => GetInstance().WriteLog(content));
    }
}
复制代码
复制代码

类写好了,用上文“并发和异步的区别”中的代码测试一下这个Logger类,在我的电脑上运行的一次结果:

共3000条日志,结果没有问题。

上面的Logger类注释写得很详细,我就不再解析了。

通过这个示例,目的是让大家掌握线程和并发在开发中的基本应用和要注意的问题。

遗憾的是这个Logger类并不完美,而且存在线程安全问题(代码中用红色字体标出),虽然实际环境概率很小。可能上面代码多次运行都很难看到有异常发生(我多次运行未发生异常),但同时再添加几个线程可能就会有问题了。

那么,如何解决这个线程安全问题呢?

 

引用地址:http://www.cnblogs.com/kesimin/p/5085460.html

【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁!! - 冰河团队 - 博客园

mikel阅读(873)

来源: 【高并发】高并发分布式锁架构解密,不是所有的锁都是分布式锁!! – 冰河团队 – 博客园

写在前面

最近,很多小伙伴留言说,在学习高并发编程时,不太明白分布式锁是用来解决什么问题的,还有不少小伙伴甚至连分布式锁是什么都不太明白。明明在生产环境上使用了自己开发的分布式锁,为什么还会出现问题呢?同样的程序,加上分布式锁后,性能差了几个数量级!这又是为什么呢?今天,我们就来说说如何在高并发环境下实现分布式锁,不是所有的锁都是高并发的。

万字长文,带你深入解密高并发环境下的分布式锁架构,不是所有的锁都是分布式锁!!!

究竟什么样的锁才能更好的支持高并发场景呢?今天,我们就一起解密高并发环境下典型的分布式锁架构,结合【高并发】专题下的其他文章,学以致用。

锁用来解决什么问题呢?

在我们编写的应用程序或者高并发程序中,不知道大家有没有想过一个问题,就是我们为什么需要引入锁?锁为我们解决了什么问题呢?

在很多业务场景下,我们编写的应用程序中会存在很多的 资源竞争 的问题。而我们在高并发程序中,引入锁,就是为了解决这些资源竞争的问题。

电商超卖问题

这里,我们可以列举一个简单的业务场景。比如,在电子商务(商城)的业务场景中,提交订单购买商品时,首先需要查询相应商品的库存是否足够,只有在商品库存数量足够的前提下,才能让用户成功的下单。下单时,我们需要在库存数量中减去用户下单的商品数量,并将库存操作的结果数据更新到数据库中。整个流程我们可以简化成下图所示。

很多小伙伴也留言说,让我给出代码,这样能够更好的学习和掌握相关的知识。好吧,这里,我也给出相应的代码片段吧。我们可以使用下面的代码片段来表示用户的下单操作,我这里将商品的库存信息保存在了Redis中。

@RequestMapping("/submitOrder")
public String submitOrder(){
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("库存扣减成功,当前库存为:{}", stock);
    }else{
        logger.debug("库存不足,扣减库存失败");
        throw new OrderException("库存不足,扣减库存失败");
    }
    return "success";
}

注意:上述代码片段比较简单,只是为了方便大家理解,真正项目中的代码就不能这么写了。

上述的代码看似是没啥问题的,但是我们不能只从代码表面上来观察代码的执行顺序。这是因为在JVM中代码的执行顺序未必是按照我们书写代码的顺序执行的。即使在JVM中代码是按照我们书写的顺序执行,那我们对外提供的接口一旦暴露出去,就会有成千上万的客户端来访问我们的接口。所以说,我们暴露出去的接口是会被并发访问的。

试问,上面的代码在高并发环境下是线程安全的吗?答案肯定不是线程安全的,因为上述扣减库存的操作会出现并行执行的情况。

我们可以使用Apache JMeter来对上述接口进行测试,这里,我使用Apache JMeter对上述接口进行测试。

在Jmeter中,我将线程的并发度设置为3,接下来的配置如下所示。

以HTTP GET请求的方式来并发访问提交订单的接口。此时,运行JMeter来访问接口,命令行会打印出下面的日志信息。

库存扣减成功,当前库存为:49
库存扣减成功,当前库存为:49
库存扣减成功,当前库存为:49

这里,我们明明请求了3次,也就是说,提交了3笔订单,为什么扣减后的库存都是一样的呢?这种现象在电商领域有一个专业的名词叫做 “超卖” 。

如果一个大型的高并发电商系统,比如淘宝、天猫、京东等,出现了超卖现象,那损失就无法估量了!架构设计和开发电商系统的人员估计就要通通下岗了。所以,作为技术人员,我们一定要严谨的对待技术,严格做好系统的每一个技术环节。

JVM中提供的锁

JVM中提供的synchronized和Lock锁,相信大家并不陌生了,很多小伙伴都会使用这些锁,也能使用这些锁来实现一些简单的线程互斥功能。那么,作为立志要成为架构师的你,是否了解过JVM锁的底层原理呢?

JVM锁原理

说到JVM锁的原理,我们就不得不限说说Java中的对象头了。

Java中的对象头

每个Java对象都有对象头。如果是⾮数组类型,则⽤2个字宽来存储对象头,如果是数组,则会⽤3个字宽来存储对象头。在32位处理器中,⼀个字宽是32位;在64位虚拟机中,⼀个字宽是64位。

对象头的内容如下表 。

长度 内容 说明
32/64bit Mark Word 存储对象的hashCode或锁信息等
32/64bit Class Metadata Access 存储到对象类型数据的指针
32/64bit Array length 数组的长度(如果是数组)

Mark Work的格式如下所示。

锁状态 29bit或61bit 1bit是否是偏向锁? 2bit锁标志位
无锁 0 01
偏向锁 线程ID 1 01
轻量级锁 指向栈中锁记录的指针 此时这一位不用于标识偏向锁 00
重量级锁 指向互斥量(重量级锁)的指针 此时这一位不用于标识偏向锁 10
GC标记 此时这一位不用于标识偏向锁 11

可以看到,当对象状态为偏向锁时, Mark Word 存储的是偏向的线程ID;当状态为轻量级锁时, Mark Word 存储的是指向线程栈中 Lock Record 的指针;当状态为重量级锁时, Mark Word 为指向堆中的monitor对象的指针 。

有关Java对象头的知识,参考《深入浅出Java多线程》。

JVM锁原理

简单点来说,JVM中锁的原理如下。

在Java对象的对象头上,有一个锁的标记,比如,第一个线程执行程序时,检查Java对象头中的锁标记,发现Java对象头中的锁标记为未加锁状态,于是为Java对象进行了加锁操作,将对象头中的锁标记设置为锁定状态。第二个线程执行同样的程序时,也会检查Java对象头中的锁标记,此时会发现Java对象头中的锁标记的状态为锁定状态。于是,第二个线程会进入相应的阻塞队列中进行等待。

这里有一个关键点就是Java对象头中的锁标记如何实现。

JVM锁的短板

JVM中提供的synchronized和Lock锁都是JVM级别的,大家都知道,当运行一个Java程序时,会启动一个JVM进程来运行我们的应用程序。synchronized和Lock在JVM级别有效,也就是说,synchronized和Lock在同一Java进程内有效。如果我们开发的应用程序是分布式的,那么只是使用synchronized和Lock来解决分布式场景下的高并发问题,就会显得有点力不从心了。

synchronized和Lock支持JVM同一进程内部的线程互斥

synchronized和Lock在JVM级别能够保证高并发程序的互斥,我们可以使用下图来表示。

但是,当我们将应用程序部署成分布式架构,或者将应用程序在不同的JVM进程中运行时,synchronized和Lock就不能保证分布式架构和多JVM进程下应用程序的互斥性了。

synchronized和Lock不能实现多JVM进程之间的线程互斥

分布式架构和多JVM进程的本质都是将应用程序部署在不同的JVM实例中,也就是说,其本质还是多JVM进程。

分布式锁

我们在实现分布式锁时,可以参照JVM锁实现的思想,JVM锁在为对象加锁时,通过改变Java对象的对象头中的锁的标志位来实现,也就是说,所有的线程都会访问这个Java对象的对象头中的锁标志位。

我们同样以这种思想来实现分布式锁,当我们将应用程序进行拆分并部署成分布式架构时,所有应用程序中的线程访问共享变量时,都到同一个地方去检查当前程序的临界区是否进行了加锁操作,而是否进行了加锁操作,我们在统一的地方使用相应的状态来进行标记。

可以看到,在分布式锁的实现思想上,与JVM锁相差不大。而在实现分布式锁中,保存加锁状态的服务可以使用MySQL、Redis和Zookeeper实现。

但是,在互联网高并发环境中, 使用Redis实现分布式锁的方案是使用的最多的。 接下来,我们就使用Redis来深入解密分布式锁的架构设计

Redis如何实现分布式锁

Redis命令

在Redis中,有一个不常使用的命令如下所示。

SETNX key value

这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。

只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。

这个命令的返回值如下。

  • 命令在设置成功时返回1。
  • 命令在设置失败时返回0。

所以,我们在分布式高并发环境下,可以使用Redis的SETNX命令来实现分布式锁。假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行SETNX命令设置加锁状态后继续向下执行。

引入分布式锁

了解了如何使用Redis中的命令实现分布式锁后,我们就可以对下单接口进行改造了,加入分布式锁,如下所示。

/**
* 为了演示方便,我这里就简单定义了一个常量作为商品的id
* 实际工作中,这个商品id是前端进行下单操作传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value可以为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("库存扣减成功,当前库存为:{}", stock);
    }else{
        logger.debug("库存不足,扣减库存失败");
        throw new OrderException("库存不足,扣减库存失败");
    }
    //业务执行完成,删除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    return "success";
}

那么,在上述代码中,我们加入了分布式锁的操作,那上述代码是否能够在高并发场景下保证业务的原子性呢?答案是可以保证业务的原子性。但是,在实际场景中,上面实现分布式锁的代码是不可用的!!

假设当线程A首先执行stringRedisTemplate.opsForValue()的setIfAbsent()方法返回true,继续向下执行,正在执行业务代码时,抛出了异常,线程A直接退出了JVM。此时,stringRedisTemplate.delete(PRODUCT_ID);代码还没来得及执行,之后所有的线程进入提交订单的方法时,调用stringRedisTemplate.opsForValue()的setIfAbsent()方法都会返回false。导致后续的所有下单操作都会失败。这就是分布式场景下的死锁问题。

所以,上述代码中实现分布式锁的方式在实际场景下是不可取的!!

引入try-finally代码块

说到这,相信小伙伴们都能够想到,使用try-finall代码块啊,接下来,我们为下单接口的方法加上try-finally代码块。

/**
* 为了演示方便,我这里就简单定义了一个常量作为商品的id
* 实际工作中,这个商品id是前端进行下单操作传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value可以为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

那么,上述代码是否真正解决了死锁的问题呢?我们在写代码时,不能只盯着代码本身,觉得上述代码没啥问题了。实际上,生产环境是非常复杂的。如果线程在成功加锁之后,执行业务代码时,还没来得及执行删除锁标志的代码,此时,服务器宕机了,程序并没有优雅的退出JVM。也会使得后续的线程进入提交订单的方法时,因无法成功的设置锁标志位而下单失败。所以说,上述的代码仍然存在问题。

引入Redis超时机制

在Redis中可以设置缓存的自动过期时间,我们可以将其引入到分布式锁的实现中,如下代码所示。

/**
* 为了演示方便,我这里就简单定义了一个常量作为商品的id
* 实际工作中,这个商品id是前端进行下单操作传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value可以为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代码中,我们加入了如下一行代码来为Redis中的锁标志设置过期时间。

stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

此时,我们设置的过期时间为30秒。

那么问题来了,这样是否就真正的解决了问题呢?上述程序就真的没有坑了吗?答案是还是有坑的!!

“坑位”分析

我们在下单操作的方法中为分布式锁引入了超时机制,此时的代码还是无法真正避免死锁的问题,那“坑位”到底在哪里呢?试想,当程序执行完stringRedisTemplate.opsForValue().setIfAbsent()方法后,正要执行stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代码时,服务器宕机了,你还别说,生产坏境的情况非常复杂,就是这么巧,服务器就宕机了。此时,后续请求进入提交订单的方法时,都会因为无法成功设置锁标志而导致后续下单流程无法正常执行。

既然我们找到了上述代码的“坑位”,那我们如何将这个”坑“填上?如何解决这个问题呢?别急,Redis已经提供了这样的功能。我们可以在向Redis中保存数据的时候,可以同时指定数据的超时时间。所以,我们可以将代码改造成如下所示。

/**
* 为了演示方便,我这里就简单定义了一个常量作为商品的id
* 实际工作中,这个商品id是前端进行下单操作传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value可以为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代码中,我们在向Redis中设置锁标志位的时候就设置了超时时间。此时,只要向Redis中成功设置了数据,则即使我们的业务系统宕机,Redis中的数据过期后,也会自动删除。后续的线程进入提交订单的方法后,就会成功的设置锁标志位,并向下执行正常的下单流程。

到此,上述的代码基本上在功能角度解决了程序的死锁问题,那么,上述程序真的就完美了吗?哈哈,很多小伙伴肯定会说不完美!确实,上面的代码还不是完美的,那大家知道哪里不完美吗?接下来,我们继续分析。

在开发集成角度分析代码

在我们开发公共的系统组件时,比如我们这里说的分布式锁,我们肯定会抽取一些公共的类来完成相应的功能来供系统使用。

这里,假设我们定义了一个RedisLock接口,如下所示。

public interface RedisLock{
    //加锁操作
    boolean tryLock(String key, long timeout, TimeUnit unit);
    //解锁操作
    void releaseLock(String key);
}

接下来,使用RedisLockImpl类实现RedisLock接口,提供具体的加锁和解锁实现,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        stringRedisTemplate.delete(key);
    }
}

在开发集成的角度来说,当一个线程从上到下执行时,首先对程序进行加锁操作,然后执行业务代码,执行完成后,再进行释放锁的操作。理论上,加锁和释放锁时,操作的Redis Key都是一样的。但是,如果其他开发人员在编写代码时,并没有调用tryLock()方法,而是直接调用了releaseLock()方法,并且他调用releaseLock()方法传递的key与你调用tryLock()方法传递的key是一样的。那此时就会出现问题了,他在编写代码时,硬生生的将你加的锁释放了!!!

所以,上述代码是不安全的,别人能够随随便便的将你加的锁删除,这就是锁的误删操作,这是非常危险的,所以,上述的程序存在很严重的问题!!

那如何实现只有加锁的线程才能进行相应的解锁操作呢? 继续向下看。

如何实现加锁和解锁的归一化?

什么是加锁和解锁的归一化呢?简单点来说,就是一个线程执行了加锁操作后,后续必须由这个线程执行解锁操作,加锁和解锁操作由同一个线程来完成。

为了解决只有加锁的线程才能进行相应的解锁操作的问题,那么,我们就需要将加锁和解锁操作绑定到同一个线程中,那么,如何将加锁操作和解锁操作绑定到同一个线程呢?其实很简单,相信很多小伙伴都想到了—— 使用ThreadLocal实现 。没错,使用ThreadLocal类确实能够解决这个问题。

此时,我们将RedisLockImpl类的代码修改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

上述代码的主要逻辑为:在对程序执行尝试加锁操作时,首先生成一个uuid,将生成的uuid绑定到当前线程,并将传递的key参数操作Redis中的key,生成的uuid作为Redis中的Value,保存到Redis中,同时设置超时时间。当执行解锁操作时,首先,判断当前线程中绑定的uuid是否和Redis中存储的uuid相等,只有二者相等时,才会执行删除锁标志位的操作。这就避免了一个线程对程序进行了加锁操作后,其他线程对这个锁进行了解锁操作的问题。

继续分析

我们将加锁和解锁的方法改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private String lockUUID;
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        lockUUID = uuid;
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

相信很多小伙伴都会看出上述代码存在什么问题了!! 没错,那就是 线程安全的问题。

所以,这里,我们需要使用ThreadLocal来解决线程安全问题。

可重入性分析

在上面的代码中,当一个线程成功设置了锁标志位后,其他的线程再设置锁标志位时,就会返回失败。还有一种场景就是在提交订单的接口方法中,调用了服务A,服务A调用了服务B,而服务B的方法中存在对同一个商品的加锁和解锁操作。

所以,服务B成功设置锁标志位后,提交订单的接口方法继续执行时,也不能成功设置锁标志位了。也就是说,目前实现的分布式锁没有可重入性。

这里,就存在可重入性的问题了。我们希望设计的分布式锁 具有可重入性 ,那什么是可重入性呢?简单点来说,就是同一个线程,能够多次获取同一把锁,并且能够按照顺序进行解决操作。

其实,在JDK 1.5之后提供的锁很多都支持可重入性,比如synchronized和Lock。

如何实现可重入性呢?

映射到我们加锁和解锁方法时,我们如何支持同一个线程能够多次获取到锁(设置锁标志位)呢?可以这样简单的设计:如果当前线程没有绑定uuid,则生成uuid绑定到当前线程,并且在Redis中设置锁标志位。如果当前线程已经绑定了uuid,则直接返回true,证明当前线程之前已经设置了锁标志位,也就是说已经获取到了锁,直接返回true。

结合以上分析,我们将提交订单的接口方法代码改造成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

这样写看似没有啥问题,但是大家细想一下,这样写就真的OK了吗?

可重入性的问题分析

既然上面分布式锁的可重入性是存在问题的,那我们就来分析下问题的根源在哪里!

假设我们提交订单的方法中,首先使用RedisLock接口对代码块添加了分布式锁,在加锁后的代码中调用了服务A,而服务A中也存在调用RedisLock接口的加锁和解锁操作。而多次调用RedisLock接口的加锁操作时,只要之前的锁没有失效,则会直接返回true,表示成功获取锁。也就是说,无论调用加锁操作多少次,最终只会成功加锁一次。而执行完服务A中的逻辑后,在服务A中调用RedisLock接口的解锁方法,此时,会将当前线程所有的加锁操作获得的锁全部释放掉。

我们可以使用下图来简单的表示这个过程。

那么问题来了,如何解决可重入性的问题呢?

解决可重入性问题

相信很多小伙伴都能够想出使用计数器的方式来解决上面可重入性的问题,没错,就是使用计数器来解决。 整体流程如下所示。

那么,体现在程序代码上是什么样子呢?我们来修改RedisLockImpl类的代码,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

至此,我们基本上解决了分布式锁的可重入性问题。

说到这里,我还要问大家一句,上面的解决问题的方案真的没问题了吗?

阻塞与非阻塞锁

在提交订单的方法中,当获取Redis分布式锁失败时,我们直接返回了failure来表示当前请求下单的操作失败了。试想,在高并发环境下,一旦某个请求获得了分布式锁,那么,在这个请求释放锁之前,其他的请求调用下单方法时,都会返回下单失败的信息。在真实场景中,这是非常不友好的。我们可以将后续的请求进行阻塞,直到当前请求释放锁后,再唤醒阻塞的请求获得分布式锁来执行方法。

所以,我们设计的分布式锁需要支持 阻塞和非阻塞 的特性。

那么,如何实现阻塞呢?我们可以使用自旋来实现,继续修改RedisLockImpl的代码如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

在分布式锁的设计中,阻塞锁和非阻塞锁 是非常重要的概念,大家一定要记住这个知识点。

锁失效问题

尽管我们实现了分布式锁的阻塞特性,但是还有一个问题是我们不得不考虑的。那就是 锁失效 的问题。

当程序执行业务的时间超过了锁的过期时间会发生什么呢? 想必很多小伙伴都能够想到,那就是前面的请求没执行完,锁过期失效了,后面的请求获取到分布式锁,继续向下执行了,程序无法做到真正的互斥,无法保证业务的原子性了。

那如何解决这个问题呢?答案就是:我们必须保证在业务代码执行完毕后,才能释放分布式锁。 方案是有了,那如何实现呢?

说白了,我们需要在业务代码中,时不时的执行下面的代码来保证在业务代码没执行完时,分布式锁不会因超时而被释放。

springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

这里,我们需要定义一个定时策略来执行上面的代码,需要注意的是:我们不能等到30秒后再执行上述代码,因为30秒时,锁已经失效了。例如,我们可以每10秒执行一次上面的代码。

有些小伙伴说,直接在RedisLockImpl类中添加一个while(true)循环来解决这个问题,那我们就这样修改下RedisLockImpl类的代码,看看有没有啥问题。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //定义更新锁的过期时间
            while(true){
                Integer count = threadLocalInteger.get();
                //当前锁已经被释放,则退出循环
                if(count == 0 || count <= 0){
                    break;
                }
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒执行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

相信小伙伴们看了代码就会发现哪里有问题了:更新锁过期时间的代码肯定不能这么去写。因为这么写会 导致当前线程在更新锁超时时间的while(true)循环中一直阻塞而无法返回结果。 所以,我们不能将当前线程阻塞,需要异步执行定时任务来更新锁的过期时间。

此时,我们继续修改RedisLockImpl类的代码,将定时更新锁超时的代码放到一个单独的线程中执行,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //启动新线程来执行定时任务,更新锁过期时间
           new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate, key)).start();
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
        String uuid = stringRedisTemplate.opsForValue().get(key);
        if(threadLocal.get().equals(uuid)){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key); 
                //获取更新锁超时时间的线程并中断
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if(updateLockTimeoutThread != null){
                     //中断更新锁超时时间的线程
                    updateLockTimeoutThread.interrupt();
                    stringRedisTemplate.delete(uuid);   
                }
            }
        }
    }
}

创建UpdateLockTimeoutTask类来执行更新锁超时的时间。

public class UpdateLockTimeoutTask implements Runnable{
    //uuid
    private long uuid;
    private StringRedisTemplate stringRedisTemplate;
    private String key;
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate, String key){
        this.uuid = uuid;
        this.stringRedisTemplate = stringRedisTemplate;
        this.key = key;
    }
    @Override
    public void run(){
        //以uuid为key,当前线程id为value保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
         //定义更新锁的过期时间
        while(true){
            springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
            try{
                //每隔10秒执行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

接下来,我们定义一个ThreadUtils工具类,这个工具类中有一个根据线程id获取线程的方法getThreadByThreadId(long threadId)。

public class ThreadUtils{
    //根据线程id获取线程句柄
    public static Thread getThreadByThreadId(long threadId){
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for(int i = 0; i < count; i++){
                if(threadId == threads[i].getId()){
                    return threads[i];
                }
            }
        }
    }
}

上述解决分布式锁失效的问题在分布式锁领域有一个专业的术语叫做 “异步续命” 。需要注意的是:当业务代码执行完毕后,我们需要停止更新锁超时时间的线程。所以,这里,我对程序的改动是比较大的,首先,将更新锁超时的时间任务重新定义为一个UpdateLockTimeoutTask类,并将uuid和StringRedisTemplate注入到任务类中,在执行定时更新锁超时时间时,首先将当前线程保存到Redis中,其中Key为传递进来的uuid。

在首先获取分布式锁后,重新启动线程,并将uuid和StringRedisTemplate传递到任务类中执行任务。当业务代码执行完毕后,调用releaseLock()方法释放锁时,我们会通过uuid从Redis中获取更新锁超时时间的线程id,并通过线程id获取到更新锁超时时间的线程,调用线程的interrupt()方法来中断线程。

此时,当分布式锁释放后,更新锁超时的线程就会由于线程中断而退出了。

实现分布式锁的基本要求

结合上述的案例,我们可以得出实现分布式锁的基本要求:

  • 支持互斥性
  • 支持锁超时
  • 支持阻塞和非阻塞特性
  • 支持可重入性
  • 支持高可用

通用分布式解决方案

互联网行业,分布式锁是一个绕不开的话题,同时,也有很多通用的分布式锁解决方案,其中,用的比较多的一种方案就是使用开源的Redisson框架来解决分布式锁问题。

有关Redisson分布式锁的使用方案大家可以参考《【高并发】你知道吗?大家都在使用Redisson实现分布式锁了!!

既然Redisson框架已经很牛逼了,我们直接使用Redisson框架是否能够100%的保证分布式锁不出问题呢?答案是无法100%的保证。因为在分布式领域没有哪一家公司或者架构师能够保证100%的不出问题,就连阿里这样的大公司、阿里的首席架构师这样的技术大牛也不敢保证100%的不出问题。

在分布式领域,无法做到100%无故障,我们追求的是几个9的目标,例如99.999%无故障。

CAP理论

在分布式领域,有一个非常重要的理论叫做CAP理论。

  • C:Consistency(一致性)
  • A:Availability(可用性)
  • P:Partition tolerance(分区容错性)

在分布式领域中,是必须要保证分区容错性的,也就是必须要保证“P”,所以,我们只能保证CP或者AP。

这里,我们可以使用Redis和Zookeeper来进行简单的对比,我们可以使用Redis实现AP架构的分布式锁,使用Zookeeper实现CP架构的分布式锁。

  • 基于Redis的AP架构的分布式锁模型

在基于Redis实现的AP架构的分布式锁模型中,向Redis节点1写入数据后,会立即返回结果,之后在Redis中会以异步的方式来同步数据。

  • 基于Zookeeper的CP架构的分布式锁模型

在基于Zookeeper实现的CP架构的分布式模型中,向节点1写入数据后,会等待数据的同步结果,当数据在大多数Zookeeper节点间同步成功后,才会返回结果数据。

当我们使用基于Redis的AP架构实现分布式锁时,需要注意一个问题,这个问题可以使用下图来表示。

也就是Redis主从节点之间的数据同步失败,假设线程向Master节点写入了数据,而Redis中Master节点向Slave节点同步数据失败了。此时,另一个线程读取的Slave节点中的数据,发现没有添加分布式锁,此时就会出现问题了!!!

所以,在设计分布式锁方案时,也需要注意Redis节点之间的数据同步问题。

红锁的实现

在Redisson框架中,实现了红锁的机制,Redisson的RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。当红锁中超过半数的RLock加锁成功后,才会认为加锁是成功的,这就提高了分布式锁的高可用。

我们可以使用Redisson框架来实现红锁。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同时加锁:lock1 lock2 lock3, 红锁在大部分节点上加锁成功就算成功。
		lock.lock();
		// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

其实,在实际场景中,红锁是很少使用的。这是因为使用了红锁后会影响高并发环境下的性能,使得程序的体验更差。所以,在实际场景中,我们一般都是要保证Redis集群的可靠性。同时,使用红锁后,当加锁成功的RLock个数不超过总数的一半时,会返回加锁失败,即使在业务层面任务加锁成功了,但是红锁也会返回加锁失败的结果。另外,使用红锁时,需要提供多套Redis的主从部署架构,同时,这多套Redis主从架构中的Master节点必须都是独立的,相互之间没有任何数据交互。

高并发“黑科技”与致胜奇招

假设,我们就是使用Redis来实现分布式锁,假设Redis的读写并发量在5万左右。我们的商城业务需要支持的并发量在100万左右。如果这100万的并发全部打入Redis中,Redis很可能就会挂掉,那么,我们如何解决这个问题呢?接下来,我们就一起来探讨这个问题。

在高并发的商城系统中,如果采用Redis缓存数据,则Redis缓存的并发处理能力是关键,因为很多的前缀操作都需要访问Redis。而异步削峰只是基本的操作,关键还是要保证Redis的并发处理能力。

解决这个问题的关键思想就是:分而治之,将商品库存分开放。

暗度陈仓

我们在Redis中存储商品的库存数量时,可以将商品的库存进行“分割”存储来提升Redis的读写并发量。

例如,原来的商品的id为10001,库存为1000件,在Redis中的存储为(10001, 1000),我们将原有的库存分割为5份,则每份的库存为200件,此时,我们在Redia中存储的信息为(10001_0, 200),(10001_1, 200),(10001_2, 200),(10001_3, 200),(10001_4, 200)。

此时,我们将库存进行分割后,每个分割后的库存使用商品id加上一个数字标识来存储,这样,在对存储商品库存的每个Key进行Hash运算时,得出的Hash结果是不同的,这就说明,存储商品库存的Key有很大概率不在Redis的同一个槽位中,这就能够提升Redis处理请求的性能和并发量。

分割库存后,我们还需要在Redis中存储一份商品id和分割库存后的Key的映射关系,此时映射关系的Key为商品的id,也就是10001,Value为分割库存后存储库存信息的Key,也就是10001_0,10001_1,10001_2,10001_3,10001_4。在Redis中我们可以使用List来存储这些值。

在真正处理库存信息时,我们可以先从Redis中查询出商品对应的分割库存后的所有Key,同时使用AtomicLong来记录当前的请求数量,使用请求数量对从Redia中查询出的商品对应的分割库存后的所有Key的长度进行求模运算,得出的结果为0,1,2,3,4。再在前面拼接上商品id就可以得出真正的库存缓存的Key。此时,就可以根据这个Key直接到Redis中获取相应的库存信息。

同时,我们可以将分隔的不同的库存数据分别存储到不同的Redis服务器中,进一步提升Redis的并发量。

移花接木

在高并发业务场景中,我们可以直接使用Lua脚本库(OpenResty)从负载均衡层直接访问缓存。

这里,我们思考一个场景:如果在高并发业务场景中,商品被瞬间抢购一空。此时,用户再发起请求时,如果系统由负载均衡层请求应用层的各个服务,再由应用层的各个服务访问缓存和数据库,其实,本质上已经没有任何意义了,因为商品已经卖完了,再通过系统的应用层进行层层校验已经没有太多意义了!!而应用层的并发访问量是以百为单位的,这又在一定程度上会降低系统的并发度。

为了解决这个问题,此时,我们可以在系统的负载均衡层取出用户发送请求时携带的用户id,商品id和活动id等信息,直接通过Lua脚本等技术来访问缓存中的库存信息。如果商品的库存小于或者等于0,则直接返回用户商品已售完的提示信息,而不用再经过应用层的层层校验了。

写在最后

如果觉得文章对你有点帮助,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发编程技术。

最后,附上并发编程需要掌握的核心技能知识图,祝大家在学习并发编程时,少走弯路。

C# 数据库并发的解决方案(通用版、EF版) - 天才卧龙 - 博客园

mikel阅读(609)

来源: C# 数据库并发的解决方案(通用版、EF版) – 天才卧龙 – 博客园

还是那句老话:十年河东,十年河西,莫欺骚年穷!~_~ 打错个字,应该是莫欺少年穷!

学历代表你的过去,能力代表你的现在,学习代表你的将来。

学无止境,精益求精。

ASP.NET诞生以来,微软提供了不少控制并发的方法,在了解这些控制并发的方法前,我们先来简单介绍下并发!

并发:同一时间或者同一时刻多个访问者同时访问某一更新操作时,会产生并发!

针对并发的处理,又分为悲观并发处理和乐观并发处理

所谓悲观/乐观并发处理,可以这样理解:

悲观者认为:在程序的运行过程中,并发很容易发生滴,因此,悲观者提出了他们的处理模式:在我执行一个方法时,不允许其他访问者介入这个方法。(悲观者经常认为某件坏事会发生在自己身上)

乐观者认为:在程序的运行过程中,并发是很少发生滴,因此,乐观者提出了他们的处理模式:在我执行一个方法时,允许其他访问者介入这个方法。(乐观者经常认为某件坏事不会发生在自己身上)

那么在C#语言中,那些属于悲观者呢?

C#中诸如:LOCK、Monitor、Interlocked 等锁定数据的方式,属于悲观并发处理范畴!数据一旦被锁定,其他访问者均无权访问。有兴趣的可以参考:锁、C#中Monitor和Lock以及区别

但是,悲观者处理并发的模式有一个通病,那就是可能会造成非常低下的执行效率。

在此:举个简单例子:

售票系统,小明去买票,要买北京到上海的D110次列车,如果采用悲观者处理并发的模式,那么售票员会将D110次列车的票锁定,然后再作出票操作。但是,在D110次列车车票被锁定期间,售票员去了趟厕所,或者喝了杯咖啡,其他窗口售票员是不能进行售票滴!如果采用这种处理方式的话,中国14亿人口都不用出行了,原因是买不到票 ~_~

因此:在处理数据库并发时,悲观锁还是要谨慎使用!具体还要看数据库并发量大不大,如果比较大,建议使用乐观者处理模式,如果比较小,可以适当采用悲观者处理模式!

OK。说了这么多,也就是做个铺垫,本节内容标题叫数据库并发的解决方案,我们最终还得返璞归真,从数据库并发的解决说起!

那么问题来了?

数据库并发的处理方式有哪些呢?

其实数据库的并发处理也是分为乐观锁和悲观锁,只不过是基于数据库层面而言的!关于数据库层面的并发处理大家可参考我的博客:乐观锁悲观锁应用

悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。[1]

乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性。[1] 乐观锁不能解决脏读的问题。

最常用的处理多用户并发访问的方法是加锁。当一个用户锁住数据库中的某个对象时,其他用户就不能再访问该对象。加锁对并发访问的影响体现在锁的粒度上。比如,放在一个表上的锁限制对整个表的并发访问;放在数据页上的锁限制了对整个数据页的访问;放在行上的锁只限制对该行的并发访问。可见行锁粒度最小,并发访问最好,页锁粒度最大,并发访问性能就会越低。

悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。[1] 悲观锁假定其他用户企图访问或者改变你正在访问、更改的对象的概率是很高的,因此在悲观锁的环境中,在你开始改变此对象之前就将该对象锁住,并且直到你提交了所作的更改之后才释放锁。悲观的缺陷是不论是页锁还是行锁,加锁的时间可能会很长,这样可能会长时间的锁定一个对象,限制其他用户的访问,也就是说悲观锁的并发访问性不好。

乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性。[1] 乐观锁不能解决脏读的问题。 乐观锁则认为其他用户企图改变你正在更改的对象的概率是很小的,因此乐观锁直到你准备提交所作的更改时才将对象锁住,当你读取以及改变该对象时并不加锁。可见乐观锁加锁的时间要比悲观锁短,乐观锁可以用较大的锁粒度获得较好的并发访问性能。但是如果第二个用户恰好在第一个用户提交更改之前读取了该对象,那么当他完成了自己的更改进行提交时,数据库就会发现该对象已经变化了,这样,第二个用户不得不重新读取该对象并作出更改。这说明在乐观锁环境中,会增加并发用户读取对象的次数。

本篇的主旨是讲解基于C#的数据库并发解决方案(通用版、EF版),因此我们要从C#方面入手,最好是结合一个小项目

项目已为大家准备好了,如下:

首先我们需要创建一个小型数据库:

复制代码
create database  BingFaTest
go
use BingFaTest
go 
create table Product--商品表
(
ProductId int identity(1,1) primary key,--商品ID 主键
ProductName nvarchar(50),--商品名称
ProductPrice money,--单价
ProductUnit nvarchar(10) default('元/斤'),
AddTime datetime default(getdate())--添加时间

)


create table Inventory--库存表
(
InventoryId int identity(1,1) primary key,
ProductId int FOREIGN KEY REFERENCES Product(ProductId), --外键
ProductCount int,--库存数量
VersionNum TimeStamp not null,
InventoryTime datetime default(getdate()),--时间
)

create table InventoryLog
(
Id int identity(1,1) primary key,
Title nvarchar(50),
)


--测试数据:
insert into Product values('苹果',1,'元/斤',GETDATE())


insert into Inventory(ProductId,ProductCount,InventoryTime) values(1,100,GETDATE())
复制代码

创建的数据库很简单,三张表:商品表,库存表,日志表

有了数据库,我们就创建C#项目,本项目采用C# DataBaseFirst 模式,结构如下:

项目很简单,采用EF DataBaseFirst 模式很好构建。

项目构建好了,下面我们模拟并发的发生?

主要代码如下(减少库存、插入日志):

复制代码
#region 未做并发处理
        /// <summary>
        /// 模仿一个减少库存操作  不加并发控制
        /// </summary>
        public void SubMitOrder_3()
        {
            int productId = 1;

            using (BingFaTestEntities context = new BingFaTestEntities())
            {
                var InventoryLogDbSet = context.InventoryLog;
                var InventoryDbSet = context.Inventory;//库存表

                using (var Transaction = context.Database.BeginTransaction())
                {
                    //减少库存操作
                    var Inventory_Mol = InventoryDbSet.Where(A => A.ProductId == productId).FirstOrDefault();//库存对象
                    Inventory_Mol.ProductCount = Inventory_Mol.ProductCount - 1;
                    int A4 = context.SaveChanges();
                    //插入日志
                    InventoryLog LogModel = new InventoryLog()
                    {
                        Title = "插入一条数据,用于计算是否发生并发",

                    };
                    InventoryLogDbSet.Add(LogModel);
                    context.SaveChanges();
                    //1.5  模拟耗时
                    Thread.Sleep(500); //消耗半秒钟
                    Transaction.Commit();
                }

            }
        }
        #endregion
复制代码

此时我们 int productId=1 处加上断点,并运行程序(打开四个浏览器同时执行),如下:

由上图可知,四个访问者同时访问这个未采用并发控制的方法,得到的结果如下:

结果显示:日志生成四条数据,而库存量缺只减少1个。这个结果显然是不正确的,原因是因为发生了并发,其本质原因是脏读,误读,不可重读造成的。

那么,问题既然发生了,我们就想办法法解决,办法有两种,分别为:悲观锁方法、乐观锁方法。

悲观者方法:

悲观者方法(加了uodlock锁,锁定了更新操作,也就是说,一旦被锁定,其他访问者不允许访问此操作)类似这种方法,可以通过存储过程实现,在此不作解释了

乐观者方法(通用版/存储过程实现):

在上述数据库脚本中,有字段叫做:VersionNum,类型为:TimeStamp。

字段 VersionNum 大家可以理解为版本号,版本号的作用是一旦有访问者修改数据,版本号的值就会相应发生改变。当然,版本号的同步更改是和数据库相关的,在SQLServer中会随着数据的修改同步更新版本号,但是在MySQL里就不会随着数据的修改而更改。因此,如果你采用的是MYSQL数据库,就需要写一个触发器,如下:

OK,了解了类型为Timestamp的字段,下面我们结合上述的小型数据库创建一个处理并发的存储过程,如下

复制代码
create proc LockProc --乐观锁控制并发
(
@ProductId int, 
@IsSuccess bit=0 output
)
as
declare @count as int
declare @flag as TimeStamp
declare @rowcount As int 
begin tran
select @count=ProductCount,@flag=VersionNum from Inventory where ProductId=@ProductId
 
update Inventory set ProductCount=@count-1 where VersionNum=@flag and ProductId=@ProductId
insert into InventoryLog values('插入一条数据,用于计算是否发生并发')
set @rowcount=@@ROWCOUNT
if @rowcount>0
set @IsSuccess=1
else
set @IsSuccess=0
commit tran
复制代码

这个存储过程很简单,执行两个操作:减少库存和插入一条数据。有一个输入参数:productId ,一个输出参数,IsSuccess。如果发生并发,IsSuccess的值为False,如果执行成功,IsSuccess值为True。

在这里,向大家说明一点:程序采用悲观锁,是串行的,采用乐观锁,是并行的。

也就是说:采用悲观锁,一次仅执行一个访问者的请求,待前一个访问者访问完成并释放锁时,下一个访问者会依次进入锁定的程序并执行,直到所有访问者执行结束。因此,悲观锁严格按照次序执行的模式能保证所有访问者执行成功。

采用乐观锁时,访问者是并行执行的,大家同时访问一个方法,只不过同一时刻只会有一个访问者操作成功,其他访问者执行失败。那么,针对这些执行失败的访问者怎么处理呢?直接返回失败信息是不合理的,用户体验不好,因此,需要定制一个规则,让执行失败的访问者重新执行之前的请求即可。

时间有限,就不多写了…因为并发的控制是在数据库端存储过程,所以,C#代码也很简单。如下:

复制代码
#region 通用并发处理模式 存储过程实现
        /// <summary>
        /// 存储过程实现
        /// </summary>
        public void SubMitOrder_2()
        {
            int productId = 1;
            bool bol = LockForPorcduce(productId);
            //1.5  模拟耗时
            Thread.Sleep(500); //消耗半秒钟
            int retry = 10;
            while (!bol && retry > 0)
            {
                retry--;
                LockForPorcduce(productId);
            }
        }


        private bool LockForPorcduce(int ProductId)
        {
            using (BingFaTestEntities context = new BingFaTestEntities())
            {
                SqlParameter[] parameters = {
                    new SqlParameter("@ProductId", SqlDbType.Int),
                    new SqlParameter("@IsSuccess", SqlDbType.Bit)
                    };
                parameters[0].Value = ProductId;
                parameters[1].Direction = ParameterDirection.Output;
                var data = context.Database.ExecuteSqlCommand("exec LockProc @ProductId,@IsSuccess output", parameters);
                string n2 = parameters[1].Value.ToString();
                if (n2 == "True")
                {
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }
        #endregion
复制代码

在此,需要说明如下:

当IsSuccess的值为False时,应该重复执行该方法,我定的规则是重复请求十次,这样就很好的解决了直接反馈给用户失败的消息。提高了用户体验。

下面着重说下EF框架如何避免数据库并发,在讲解之前,先允许我引用下别人博客中的几段话:

在软件开发过程中,并发控制是确保及时纠正由并发操作导致的错误的一种机制。从 ADO.NET 到 LINQ to SQL 再到如今的 ADO.NET Entity Framework,.NET 都为并发控制提供好良好的支持方案。

相对于数据库中的并发处理方式,Entity Framework 中的并发处理方式实现了不少的简化。

在System.Data.Metadata.Edm 命名空间中,存在ConcurencyMode 枚举,用于指定概念模型中的属性的并发选项。
ConcurencyMode 有两个成员:

成员名称 说明
        None 在写入时从不验证此属性。 这是默认的并发模式。
        Fixed 在写入时始终验证此属性。

当模型属性为默认值 None 时,系统不会对此模型属性进行检测,当同一个时间对此属性进行修改时,系统会以数据合并方式处理输入的属性值。
当模型属性为Fixed 时,系统会对此模型属性进行检测,当同一个时间对属性进行修改时,系统就会激发OptimisticConcurrencyException 异常。

开发人员可以为对象的每个属性定义不同的 ConcurencyMode 选项,选项可以在*.Edmx找看到:

 

Edmx文件用记事本打开如下:

<?xml version="1.0" encoding="utf-8"?><edmx:Edmx Version="3.0" xmlns:edmx="http://schemas.microsoft.com/ado/2009/11/edmx">  <!-- EF Runtime content -->  <edmx:Runtime>    <!-- SSDL content -->    <edmx:StorageModels>      <Schema Namespace="BingFaTestModel.Store" Alias="Self" Provider="System.Data.SqlClient" ProviderManifestToken="2008" xmlns:store="http://schemas.microsoft.com/ado/2007/12/edm/EntityStoreSchemaGenerator" xmlns="http://schemas.microsoft.com/ado/2009/11/edm/ssdl">        <EntityContainer Name="BingFaTestModelStoreContainer">          <EntitySet Name="Inventory" EntityType="BingFaTestModel.Store.Inventory" store:Type="Tables" Schema="dbo" />          <EntitySet Name="InventoryLog" EntityType="BingFaTestModel.Store.InventoryLog" store:Type="Tables" Schema="dbo" />          <EntitySet Name="Product" EntityType="BingFaTestModel.Store.Product" store:Type="Tables" Schema="dbo" />          <AssociationSet Name="FK__Inventory__Produ__145C0A3F" Association="BingFaTestModel.Store.FK__Inventory__Produ__145C0A3F">            <End Role="Product" EntitySet="Product" />            <End Role="Inventory" EntitySet="Inventory" />          </AssociationSet>        </EntityContainer>        <EntityType Name="Inventory">          <Key>            <PropertyRef Name="InventoryId" />          </Key>          <Property Name="InventoryId" Type="int" Nullable="false" StoreGeneratedPattern="Identity" />          <Property Name="ProductId" Type="int" />          <Property Name="ProductCount" Type="int" />          <Property Name="VersionNum" Type="timestamp" Nullable="false" StoreGeneratedPattern="Computed" />          <Property Name="InventoryTime" Type="datetime" />        </EntityType>        <EntityType Name="InventoryLog">          <Key>            <PropertyRef Name="Id" />          </Key>          <Property Name="Id" Type="int" Nullable="false" StoreGeneratedPattern="Identity" />          <Property Name="Title" Type="nvarchar" MaxLength="50" />        </EntityType>        <EntityType Name="Product">          <Key>            <PropertyRef Name="ProductId" />          </Key>          <Property Name="ProductId" Type="int" Nullable="false" StoreGeneratedPattern="Identity" />          <Property Name="ProductName" Type="nvarchar" MaxLength="50" />          <Property Name="ProductPrice" Type="money" />          <Property Name="ProductUnit" Type="nvarchar" MaxLength="10" />          <Property Name="AddTime" Type="datetime" />        </EntityType>        <Association Name="FK__Inventory__Produ__145C0A3F">          <End Role="Product" Type="BingFaTestModel.Store.Product" Multiplicity="0..1" />          <End Role="Inventory" Type="BingFaTestModel.Store.Inventory" Multiplicity="*" />          <ReferentialConstraint>            <Principal Role="Product">              <PropertyRef Name="ProductId" />            </Principal>            <Dependent Role="Inventory">              <PropertyRef Name="ProductId" />            </Dependent>          </ReferentialConstraint>        </Association>      </Schema>    </edmx:StorageModels>    <!-- CSDL content -->    <edmx:ConceptualModels>      <Schema Namespace="BingFaTestModel" Alias="Self" p1:UseStrongSpatialTypes="false" xmlns:annotation="http://schemas.microsoft.com/ado/2009/02/edm/annotation" xmlns:p1="http://schemas.microsoft.com/ado/2009/02/edm/annotation" xmlns="http://schemas.microsoft.com/ado/2009/11/edm">        <EntityContainer Name="BingFaTestEntities" p1:LazyLoadingEnabled="true">          <EntitySet Name="Inventory" EntityType="BingFaTestModel.Inventory" />          <EntitySet Name="InventoryLog" EntityType="BingFaTestModel.InventoryLog" />          <EntitySet Name="Product" EntityType="BingFaTestModel.Product" />          <AssociationSet Name="FK__Inventory__Produ__145C0A3F" Association="BingFaTestModel.FK__Inventory__Produ__145C0A3F">            <End Role="Product" EntitySet="Product" />            <End Role="Inventory" EntitySet="Inventory" />          </AssociationSet>        </EntityContainer>        <EntityType Name="Inventory">          <Key>            <PropertyRef Name="InventoryId" />          </Key>          <Property Name="InventoryId" Type="Int32" Nullable="false" p1:StoreGeneratedPattern="Identity" />          <Property Name="ProductId" Type="Int32" />          <Property Name="ProductCount" Type="Int32" />          <Property Name="VersionNum" Type="Binary" Nullable="false" MaxLength="8" FixedLength="true" p1:StoreGeneratedPattern="Computed" ConcurrencyMode="None" />          <Property Name="InventoryTime" Type="DateTime" Precision="3" />          <NavigationProperty Name="Product" Relationship="BingFaTestModel.FK__Inventory__Produ__145C0A3F" FromRole="Inventory" ToRole="Product" />        </EntityType>        <EntityType Name="InventoryLog">          <Key>            <PropertyRef Name="Id" />          </Key>          <Property Name="Id" Type="Int32" Nullable="false" p1:StoreGeneratedPattern="Identity" />          <Property Name="Title" Type="String" MaxLength="50" Unicode="true" FixedLength="false" />        </EntityType>        <EntityType Name="Product">          <Key>            <PropertyRef Name="ProductId" />          </Key>          <Property Name="ProductId" Type="Int32" Nullable="false" p1:StoreGeneratedPattern="Identity" />          <Property Name="ProductName" Type="String" MaxLength="50" Unicode="true" FixedLength="false" />          <Property Name="ProductPrice" Type="Decimal" Precision="19" Scale="4" />          <Property Name="ProductUnit" Type="String" MaxLength="10" Unicode="true" FixedLength="false" />          <Property Name="AddTime" Type="DateTime" Precision="3" />          <NavigationProperty Name="Inventory" Relationship="BingFaTestModel.FK__Inventory__Produ__145C0A3F" FromRole="Product" ToRole="Inventory" />        </EntityType>        <Association Name="FK__Inventory__Produ__145C0A3F">          <End Role="Product" Type="BingFaTestModel.Product" Multiplicity="0..1" />          <End Role="Inventory" Type="BingFaTestModel.Inventory" Multiplicity="*" />          <ReferentialConstraint>            <Principal Role="Product">              <PropertyRef Name="ProductId" />            </Principal>            <Dependent Role="Inventory">              <PropertyRef Name="ProductId" />            </Dependent>          </ReferentialConstraint>        </Association>      </Schema>    </edmx:ConceptualModels>    <!-- C-S mapping content -->    <edmx:Mappings>      <Mapping Space="C-S" xmlns="http://schemas.microsoft.com/ado/2009/11/mapping/cs">        <EntityContainerMapping StorageEntityContainer="BingFaTestModelStoreContainer" CdmEntityContainer="BingFaTestEntities">          <EntitySetMapping Name="Inventory">            <EntityTypeMapping TypeName="BingFaTestModel.Inventory">              <MappingFragment StoreEntitySet="Inventory">                <ScalarProperty Name="InventoryId" ColumnName="InventoryId" />                <ScalarProperty Name="ProductId" ColumnName="ProductId" />                <ScalarProperty Name="ProductCount" ColumnName="ProductCount" />                <ScalarProperty Name="VersionNum" ColumnName="VersionNum" />                <ScalarProperty Name="InventoryTime" ColumnName="InventoryTime" />              </MappingFragment>            </EntityTypeMapping>          </EntitySetMapping>          <EntitySetMapping Name="InventoryLog">            <EntityTypeMapping TypeName="BingFaTestModel.InventoryLog">              <MappingFragment StoreEntitySet="InventoryLog">                <ScalarProperty Name="Id" ColumnName="Id" />                <ScalarProperty Name="Title" ColumnName="Title" />              </MappingFragment>            </EntityTypeMapping>          </EntitySetMapping>          <EntitySetMapping Name="Product">            <EntityTypeMapping TypeName="BingFaTestModel.Product">              <MappingFragment StoreEntitySet="Product">                <ScalarProperty Name="ProductId" ColumnName="ProductId" />                <ScalarProperty Name="ProductName" ColumnName="ProductName" />                <ScalarProperty Name="ProductPrice" ColumnName="ProductPrice" />                <ScalarProperty Name="ProductUnit" ColumnName="ProductUnit" />                <ScalarProperty Name="AddTime" ColumnName="AddTime" />              </MappingFragment>            </EntityTypeMapping>          </EntitySetMapping>        </EntityContainerMapping>      </Mapping>    </edmx:Mappings>  </edmx:Runtime>  <!-- EF Designer content (DO NOT EDIT MANUALLY BELOW HERE) -->  <Designer xmlns="http://schemas.microsoft.com/ado/2009/11/edmx">    <Connection>      <DesignerInfoPropertySet>        <DesignerProperty Name="MetadataArtifactProcessing" Value="EmbedInOutputAssembly" />      </DesignerInfoPropertySet>    </Connection>    <Options>      <DesignerInfoPropertySet>        <DesignerProperty Name="ValidateOnBuild" Value="true" />        <DesignerProperty Name="EnablePluralization" Value="False" />        <DesignerProperty Name="IncludeForeignKeysInModel" Value="True" />        <DesignerProperty Name="CodeGenerationStrategy" Value="无" />      </DesignerInfoPropertySet>    </Options>    <!-- Diagram content (shape and connector positions) -->    <Diagrams></Diagrams>  </Designer></edmx:Edmx>

其实,在EF DataBaseFirst中,我们只需设置下类型为 TimeStamp 版本号的属性即可,如下:

设置好了版本号属性后,你就可以进行并发测试了,当系统发生并发时,程序会抛出异常,而我们要做的就是要捕获这个异常,而后就是按照自己的规则,重复执行请求的方法,直至返回成功为止。

那么如何捕获并发异常呢?

在C#代码中需要使用异常类:DbUpdateConcurrencyException 来捕获,EF中具体用法如下:

复制代码
public class SaveChangesForBF : BingFaTestEntities
    {
        public override int SaveChanges()
        {
            try
            {
                return base.SaveChanges();
            }
            catch (DbUpdateConcurrencyException ex)//(OptimisticConcurrencyException)
            {
                //并发保存错误
                return -1;
            }
        }
    }
复制代码

设置好属性后,EF会帮我们自动检测并发并抛出异常,我们用上述方法捕获异常后,就可以执行我们重复执行的规则了,具体代码如下:

复制代码
#region EF专属并发处理模式
        /// <summary>
        /// 存储过程实现
        /// </summary>
        public void SubMitOrder()
        {
            int C = LockForEF();
            //1.5  模拟耗时
            Thread.Sleep(500); //消耗半秒钟
            int retry = 10;
            while (C<0 && retry > 0)
            {
                retry--;
                C= LockForEF();
            }
        }
        /// <summary>
        /// 模仿一个减少库存操作  EF专属并发处理模式
        /// </summary>
        public int LockForEF()
        {
            int productId = 1;
            int C = 0;
            using (SaveChangesForBF context = new SaveChangesForBF())
            {
                var InventoryLogDbSet = context.InventoryLog;
                var InventoryDbSet = context.Inventory;//库存表

                using (var Transaction = context.Database.BeginTransaction())
                {
                    //减少库存操作
                    var Inventory_Mol = InventoryDbSet.Where(A => A.ProductId == productId).FirstOrDefault();//库存对象
                    Inventory_Mol.ProductCount = Inventory_Mol.ProductCount - 1;
                    C = context.SaveChanges();
                    //插入日志
                    InventoryLog LogModel = new InventoryLog()
                    {
                        Title = "插入一条数据,用于计算是否发生并发",
                        
                    };
                    InventoryLogDbSet.Add(LogModel);
                    context.SaveChanges();
                    //1.5  模拟耗时
                    Thread.Sleep(500); //消耗半秒钟
                    Transaction.Commit();
                }

            }
            return C;
        }
        #endregion
复制代码

至此,C#并发处理就讲解完了,是不是很简单呢?

项目源码地址:http://download.csdn.net/download/wolongbb/9977216

@陈卧龙的博客

详解ASP.Net Core 中如何借助CSRedis实现一个安全高效的分布式锁_脚本语言之家jiaoben.net

mikel阅读(564)

来源: 详解ASP.Net Core 中如何借助CSRedis实现一个安全高效的分布式锁_脚本语言之家jiaoben.net

引言:最近回头看了看开发的.Net Core 2.1项目的复盘总结,其中在多处用到Redis实现的分布式锁,虽然在OnResultExecuting方法中做了防止死锁的处理,但在某些场景下还是会发生死锁的问题,下面我只展示部分代码:

问题:

(1)这里setnx设置的值“1”,我想问,你最后del的这个值一定是你自己创建的吗?

(2)图中标注的步骤1和步骤2不是原子操作,会有死锁的概率吗?

大家可以思考一下先,下面让我们带着这两个问题往下看,下面介绍一下使用Redis实现分布式锁常用的几个命令。

一、使用Redis实现分布式锁常见的几个命令

► Setnx

命令:SETNX key value
说明:将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不做任何动作。SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
时间复杂度:O(1)
返回值:设置成功,返回1 ; 设置失败,返回 0

► Getset

命令:GETSET key value
说明:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。当 key 存在但不是字符串类型时,返回一个错误。
时间复杂度:O(1)
返回值:返回给定 key 的旧值; 当 key 没有旧值时,也即是, key 不存在时,返回 nil 。

 Expire

命令:EXPIRE key seconds
说明:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
时间复杂度:O(1)
返回值:设置成功返回 1 ;当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。

► Del

命令:DEL key [key …]
说明:删除给定的一个或多个 key 。不存在的 key 会被忽略。
时间复杂度:O(N); N 为被删除的 key 的数量。
删除单个字符串类型的 key ,时间复杂度为O(1)。
删除单个列表、集合、有序集合或哈希表类型的 key ,时间复杂度为O(M), M 为以上数据结构内的元素数量。
返回值:被删除 key 的数量。

好了,命令熟悉之后,下面我们就开始一步一步实现分布式锁。

二、使用Redis实现分布式锁版本一:与时间戳的结合

对于上面的setnx设置的默认值1,我们采用时间戳来防止问题一,下面先让我们来看下想当然写法流程图。

流程图:

C#代码实现:

static void Main(string[] args)
    {
      var lockTimeout = 5000;//单位是毫秒
      var currentTime = DateTime.Now.ToUnixTime(true);
      if (SetNx("lockkey", currentTime+ lockTimeout,lockTimeout))
      {
        //TODO:一些业务逻辑代码
        //.....
        //.....
        //最后释放锁
        Remove("lockkey");
      }
      else
      {
        Console.WriteLine("没有获得分布式锁");
      }
      Console.ReadKey();
    }

    public static bool SetNx(string key,long time ,double expireMS)
    {
      if (redisClient.SetNx(key, time))
      {
        if (expireMS > 0)
          redisClient.Expire(key, TimeSpan.FromMilliseconds(expireMS));
        return true;
      }
      return false;
    }

    public static bool Remove(string key)
    {
      return redisClient.Del(key) > 0;
    }

上面的代码中value的值我们使用时间戳,不是一个固定的值了,至少能保证你删除的key确实是你自己的,所以,建议大家在设value的值时,不要设置一个固定的值,最好是随机的。但是这样写虽然解决了问题一,但是这种写法还是存在一定的风险,虽然Redis是单线程的并且setnx、expire是原子操作,但是先setnx再expire就不是原子操作了!!!我们要考虑多线程环境和容器部署时多实例环境等等,那这样的写法就会出现问题。

比如:现在有A、B两台服务器在跑这个应用,当A台应用跑到:setnx成功但是还没有设置过期时间的时候,突然重启服务,这个时候在分布式环境中就会发生死锁的问题,因为你没有设置过期时间。

下面我们通过调试来展示死锁的场景:

A应用:在执行到setnx成功但是在执行expire之前宕机了,此时的Redis已经有数据了,但是没有过期时间

B应用:运行正常

但是B应用就会一直获取不到锁,导致死锁。

所以上面在获取锁的逻辑还是有问题的,为了解决这个问题,我们采用下面的方式来处理。

三、使用Redis实现分布式锁版本二:双重防死锁

流程图:

C#代码实现:

public static void RedisLockV2()
    {
      var lockTimeout = 5000;//单位是毫秒
      var currentTime = DateTime.Now.ToUnixTime(true);

      if (SetNxV2("lockkey",DateTime.Now.ToUnixTime(true)+lockTimeout))
      {
        //设置过期时间
        redisClient.Expire("lockkey", TimeSpan.FromMilliseconds(5000));
        //TODO:一些业务逻辑代码
        
        Console.WriteLine("处理业务ing");
        Thread.Sleep(100000);

        Console.WriteLine("处理业务ed");
        //最后释放锁
        Remove("lockkey");
      }
      else
      {
        //未获取到锁,继续判断,判断时间戳看看是否可以重置并获取锁
        var lockValue = redisClient.Get("lockkey");
        var time = DateTime.Now.ToUnixTime(true);

        if (!string.IsNullOrEmpty(lockValue) && time> lockValue.ToInt64())
        {
          //再次用当前时间戳getset
          //返回固定key的旧值,旧值判断是否可以获取锁
          var getsetResult = redisClient.GetSet("lockkey", time);
          if (getsetResult == null || (getsetResult != null && getsetResult == lockValue))
          {
            Console.WriteLine("获取到Redis锁了");
            //真正获取到锁
            redisClient.Expire("lockkey", TimeSpan.FromMilliseconds(5000));
            //TODO:一些业务逻辑代码
            //.....
            //.....
            Console.WriteLine("处理业务");
            //最后释放锁
            Remove("lockkey");
          }
          else
          {
            Console.WriteLine("没有获取到锁");
          }

        }
        else
        {
          Console.WriteLine("没有获取到锁");
        }
      }
      
    }

现在,Redis中的情况如下:

我们运行上面的代码,结果如下:

副本.exe中添加一行代码。来模拟这种场景:有A、B两台服务器在跑这个应用,当A台应用跑到:setnx成功但是还没有设置过期时间的时候,突然重启服务,这个时候在分布式环境中就会发生死锁的问题,因为你没有设置过期时间

我们先执行Lottery.ThriftRpc – 副本.exe,等Redis里面有值了,并且这个key是没有过期时间,再关闭掉该程序:

然后,再执行Lottery.ThriftRpc.exe

看,我们是不是解决了该问题,至于过期时间设置为多少要结合你的具体业务处理时间来计算出一个合理的值,好了,聊到这里关于Redis的分布式锁就讲完了,希望对你有帮助,谢谢。

四、总结:

上面的示例中Redis的组件用的是CSRedisCore,这里只是自己的一点体会,如果你有更好的办法,可以在评论区讨论,关于Redis的理论讲解有太多的文章了,大家可以参考,关于Redis的文章我只总结工作中遇到的一些问题,关于文章中的源码,我就不提供了,太简单了。后面我会不定期分享一些Redis的问题,希望大家多多支持。

以上所述是小编给大家介绍的ASP.NET Core 中如何借助CSRedis实现一个安全高效的分布式锁详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本大全网站的支持!

详解C# Tuple VS ValueTuple(元组类 VS 值元组) - 永远薰薰 - 博客园

mikel阅读(452)

来源: 详解C# Tuple VS ValueTuple(元组类 VS 值元组) – 永远薰薰 – 博客园

C# 7.0已经出来一段时间了,大家都知道新特性里面有个对元组的优化:ValueTuple。这里利用详尽的例子详解Tuple VS ValueTuple(元组类VS值元组),10分钟让你更了解ValueTuple的好处和用法。

如果您对Tuple足够了解,可以直接跳过章节”回顾Tuple”,直达章节”ValueTuple详解”,查看值元组的炫丽用法。

 

回顾Tuple

Tuple是C# 4.0时出的新特性,.Net Framework 4.0以上版本可用。

元组是一种数据结构,具有特定数量和元素序列。比如设计一个三元组数据结构用于存储学生信息,一共包含三个元素,第一个是名字,第二个是年龄,第三个是身高。

元组的具体使用如下:

1.    如何创建元组

默认情况.Net Framework元组仅支持1到7个元组元素,如果有8个元素或者更多,需要使用Tuple的嵌套和Rest属性去实现。另外Tuple类提供创造元组对象的静态方法。

  • 利用构造函数创建元组:
var testTuple6 = new Tuple<int, int, int, int, int, int>(1, 2, 3, 4, 5, 6);
Console.WriteLine($"Item 1: {testTuple6.Item1}, Item 6: {testTuple6.Item6}");

var testTuple10 = new Tuple<int, int, int, int, int, int, int, Tuple<int, int, int>>(1, 2, 3, 4, 5, 6, 7, new Tuple<int, int, int>(8, 9, 10));
Console.WriteLine($"Item 1: {testTuple10.Item1}, Item 10: {testTuple10.Rest.Item3}");
  • 利用Tuple静态方法构建元组,最多支持八个元素:
var testTuple6 = Tuple.Create<int, int, int, int, int, int>(1, 2, 3, 4, 5, 6);
Console.WriteLine($"Item 1: {testTuple6.Item1}, Item 6: {testTuple6.Item6}");

var testTuple8 = Tuple.Create<int, int, int, int, int, int, int, int>(1, 2, 3, 4, 5, 6, 7, 8);
Console.WriteLine($"Item 1: {testTuple8.Item1}, Item 8: {testTuple8.Rest.Item1}");

Note:这里构建出来的Tuple类型其实是Tuple<int, int, int, int, int, int, int, Tuple<int>>,因此testTuple8.Rest取到的数据类型是Tuple<int>,因此要想获取准确值需要取Item1属性。

2.    表示一组数据

如下创建一个元组表示一个学生的三个信息:名字、年龄和身高,而不用单独额外创建一个类。

var studentInfo = Tuple.Create<string, int, uint>("Bob", 28, 175);
Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");

3.    从方法返回多个值

当一个函数需要返回多个值的时候,一般情况下可以使用out参数,这里可以用元组代替out实现返回多个值。

复制代码
static Tuple<string, int, uint> GetStudentInfo(string name)
{
    return new Tuple<string, int, uint>("Bob", 28, 175);
}
static void RunTest()
{
    var studentInfo = GetStudentInfo("Bob");
    Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");
}
复制代码

4.    用于单参数方法的多值传递

当函数参数仅是一个Object类型时,可以使用元组实现传递多个参数值。

复制代码
static void WriteStudentInfo(Object student)
{
    var studentInfo = student as Tuple<string, int, uint>;
    Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");
}
static void RunTest()
{
    var t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(WriteStudentInfo));
    t.Start(new Tuple<string, int, uint>("Bob", 28, 175));
    while (t.IsAlive)
    {
        System.Threading.Thread.Sleep(50);
    }
}
复制代码

 

尽管元组有上述方便使用的方法,但是它也有明显的不足:

  • 访问元素的时候只能通过ItemX去访问,使用前需要明确元素顺序,属性名字没有实际意义,不方便记忆;
  • 最多有八个元素,要想更多只能通过最后一个元素进行嵌套扩展;
  • Tuple是一个引用类型,不像其它的简单类型一样是值类型,它在堆上分配空间,在CPU密集操作时可能有太多的创建和分配工作。

因此在C# 7.0中引入了一个新的ValueTuple类型,详见下面章节。

ValueTuple详解

ValueTuple是C# 7.0的新特性之一,.Net Framework 4.7以上版本可用。

值元组也是一种数据结构,用于表示特定数量和元素序列,但是是和元组类不一样的,主要区别如下:

  • 值元组是结构,是值类型,不是类,而元组(Tuple)是类,引用类型;
  • 值元组元素是可变的,不是只读的,也就是说可以改变值元组中的元素值;
  • 值元组的数据成员是字段不是属性。

值元组的具体使用如下:

1.    如何创建值元组

和元组类一样,.Net Framework值元组也只支持1到7个元组元素,如果有8个元素或者更多,需要使用值元组的嵌套和Rest属性去实现。另外ValueTuple类可以提供创造值元组对象的静态方法。

  • 利用构造函数创建元组:
var testTuple6 = new ValueTuple<int, int, int, int, int, int>(1, 2, 3, 4, 5, 6);
Console.WriteLine($"Item 1: {testTuple6.Item1}, Item 6: {testTuple6.Item6}"); 

var testTuple10 = new ValueTuple<int, int, int, int, int, int, int, ValueTuple<int, int, int>>(1, 2, 3, 4, 5, 6, 7, new ValueTuple <int, int, int>(8, 9, 10));
Console.WriteLine($"Item 1: {testTuple10.Item1}, Item 10: {testTuple10.Rest.Item3}");
  • 利用Tuple静态方法构建元组,最多支持八个元素:
var testTuple6 = ValueTuple.Create<int, int, int, int, int, int>(1, 2, 3, 4, 5, 6);
Console.WriteLine($"Item 1: {testTuple6.Item1}, Item 6: {testTuple6.Item6}"); 

var testTuple8 = ValueTuple.Create<int, int, int, int, int, int, int, int>(1, 2, 3, 4, 5, 6, 7, 8);
Console.WriteLine($"Item 1: {testTuple8.Item1}, Item 8: {testTuple8.Rest.Item1}");

注意这里构建出来的Tuple类型其实是Tuple<int, int, int, int, int, int, int, Tuple<int>>,因此testTuple8.Rest取到的数据类型是Tuple<int>,因此要想获取准确值需要取Item1属性。

优化区别:当构造出超过7个元素以上的值元组后,可以使用接下来的ItemX进行访问嵌套元组中的值,对于上面的例子,要访问第十个元素,既可以通过testTuple10.Rest.Item3访问,也可以通过testTuple10.Item10来访问。

var testTuple10 = new ValueTuple<int, int, int, int, int, int, int, ValueTuple<int, int, int>>(1, 2, 3, 4, 5, 6, 7, new ValueTuple<int, int, int>(8, 9, 10));
Console.WriteLine($"Item 10: {testTuple10.Rest.Item3}, Item 10: {testTuple10.Item10}");

2.    表示一组数据

如下创建一个值元组表示一个学生的三个信息:名字、年龄和身高,而不用单独额外创建一个类。

var studentInfo = ValueTuple.Create<string, int, uint>("Bob", 28, 175);
Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");

3.    从方法返回多个值

值元组也可以在函数定义中代替out参数返回多个值。

复制代码
static ValueTuple<string, int, uint> GetStudentInfo(string name)
{
    return new ValueTuple <string, int, uint>("Bob", 28, 175);
}
static void RunTest()
{
    var studentInfo = GetStudentInfo("Bob");
    Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");
}
复制代码

优化区别:返回值可以不明显指定ValueTuple,使用新语法(,,)代替,如(string, int, uint):

复制代码
static (string, int, uint) GetStudentInfo1(string name)
{
    return ("Bob", 28, 175);
}
static void RunTest1()
{
    var studentInfo = GetStudentInfo1("Bob");
    Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");
}
复制代码

调试查看studentInfo的类型就是ValueType三元组。

优化区别:返回值可以指定元素名字,方便理解记忆赋值和访问:

复制代码
static (string name, int age, uint height) GetStudentInfo1(string name)
{
    return ("Bob", 28, 175);
}
static void RunTest1()
{
    var studentInfo = GetStudentInfo1("Bob");
    Console.WriteLine($"Student Information: Name [{studentInfo.name}], Age [{studentInfo.age}], Height [{studentInfo.height}]");
}
复制代码

方便记忆赋值:

方便访问:

4.    用于单参数方法的多值传递

当函数参数仅是一个Object类型时,可以使用值元组实现传递多个值。

复制代码
static void WriteStudentInfo(Object student)
{
    var studentInfo = (ValueTuple<string, int, uint>)student;
    Console.WriteLine($"Student Information: Name [{studentInfo.Item1}], Age [{studentInfo.Item2}], Height [{studentInfo.Item3}]");
}
static void RunTest()
{
    var t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(WriteStudentInfo));
    t.Start(new ValueTuple<string, int, uint>("Bob", 28, 175));
    while (t.IsAlive)
    {
        System.Threading.Thread.Sleep(50);
    }
}
复制代码

5.    解构ValueTuple

可以通过var (x, y)或者(var x, var y)来解析值元组元素构造局部变量,同时可以使用符号”_”来忽略不需要的元素。

复制代码
static (string name, int age, uint height) GetStudentInfo1(string name)
{
    return ("Bob", 28, 175);
}

static void RunTest1()
{
    var (name, age, height) = GetStudentInfo1("Bob");
    Console.WriteLine($"Student Information: Name [{name}], Age [{age}], Height [{height}]");

    (var name1, var age1, var height1) = GetStudentInfo1("Bob");
    Console.WriteLine($"Student Information: Name [{name1}], Age [{age1}], Height [{height1}]");

    var (_, age2, _) = GetStudentInfo1("Bob");
    Console.WriteLine($"Student Information: Age [{age2}]");
}
复制代码

 

由上所述,ValueTuple使C#变得更简单易用。较Tuple相比主要好处如下:

  • ValueTuple支持函数返回值新语法”(,,)”,使代码更简单;
  • 能够给元素命名,方便使用和记忆,这里需要注意虽然命名了,但是实际上value tuple没有定义这样名字的属性或者字段,真正的名字仍然是ItemX,所有的元素名字都只是设计和编译时用的,不是运行时用的(因此注意对该类型的序列化和反序列化操作);
  • 可以使用解构方法更方便地使用部分或全部元组的元素;
  • 值元组是值类型,使用起来比引用类型的元组效率高,并且值元组是有比较方法的,可以用于比较是否相等,详见:https://msdn.microsoft.com/en-us/library/system.valuetuple

 

[原创文章,转载请注明出处,仅供学习研究之用,如有错误请留言,如觉得不错请推荐,谢谢支持]

[原文:http://www.cnblogs.com/lavender000/p/6916157.html,来自永远薰薰]

Redis的事件机制 - lijihong0723 - 博客园

mikel阅读(586)

来源: Redis的事件机制 – lijihong0723 – 博客园

Redis程序的运行过程是一个处理事件的过程,也称Redis是一个事件驱动的服务。Redis中的事件分两类:文件事件(File Event)、时间事件(Time Event)。文件事件处理文件的读写操作,特别是与客户端通信的Socket文件描述符的读写操作;时间事件主要用于处理一些定时处理的任务。

本文首先介绍Redis的运行过程,阐明Redis程序是一个事件驱动的程序;接着介绍事件机制实现中涉及的数据结构以及事件的注册;最后介绍了处理客户端中涉及到的套接字文件读写事件。

一、Redis的运行过程

Redis的运行过程是一个事件处理的过程,可以通过下图反映出来:

​ 图1 Redis的事件处理过程

从上图可以看出:Redis服务器的运行过程就是循环等待并处理事件的过程。通过时间事件将运行事件分成一个个的时间分片,如图1的右半部分所示。如果在指定的时间分片中,有文件事件发生,如:读文件描述符可读、写文件描述符可写,则调用相应的处理函数进行文件的读写处理。文件事件处理完成之后,处理期望发生时间在当前时间之前或正好是当前时刻的时间事件。然后再进入下一次循环迭代处理。

如果在指定的事件间隔中,没有文件事件发生,则不需要处理,直接进行时间事件的处理,如下图所示。

​ 图2 Redis的事件处理过程(无文件事件发生)

二、事件数据结构

2.1 文件事件数据结构

Redis用如下结构体来记录一个文件事件:

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

通过mask来描述发生了什么事件:

  • AE_READABLE:文件描述符可读;
  • AE_WRITABLE:文件描述符可写;
  • AE_BARRIER:文件描述符阻塞

rfileProc和wfileProc分别为读事件和写事件发生时的回调函数,其函数签名如下:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
2.2 事件事件数据结构

Redis用如下结构体来记录一个时间事件:

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

when_sec和when_ms指定时间事件发生的时间,timeProc为时间事件发生时的处理函数,签名如下:

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

prev和next表明时间事件构成了一个双向链表。

3.3 事件循环

Redis用如下结构体来记录系统中注册的事件及其状态:

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

这一结构体中,最主要的就是文件事件指针events和时间事件头指针timeEventHead。文件事件指针event指向一个固定大小(可配置)数组,通过文件描述符作为下标,可以获取文件对应的事件对象。

三、事件的注册过程

事件驱动的程序实际上就是在事件发生时,调用相应的处理函数(即:回调函数)进行逻辑处理。因此关于事件,程序需要知道:①事件的发生;② 回调函数。事件的注册过程就是告诉程序这两。下面我们分别从文件事件、时间事件的注册过程进行阐述。

3.1 文件事件的注册过程

对于文件事件:

  • 事件的发生:应用程序需要知道哪些文件描述符发生了哪些事件。感知文件描述符上有事件发生是由操作系统的职责,应用程序需要告诉操作系统,它关心哪些文件描述符的哪些事件,这样通过相应的系统API就会返回发生了事件的文件描述符。
  • 回调函数:应用程序知道了文件描述符发生了事件之后,需要调用相应回调函数进行处理,因而需要在事件发生之前将相应的回调函数准备好。

这就是文件事件的注册过程,函数的实现如下:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

这段代码逻辑非常清晰:首先根据文件描述符获得文件事件对象,接着在操作系统中添加自己关心的文件描述符(addApiAddEvent),最后将回调函数记录到文件事件对象中。因此,一个线程就可以同时监听多个文件事件,这就是IO多路复用。操作系统提供多种IO多路复用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有这些模型,用户可以根据需要进行选择。不同的模型,向操作系统添加文件描述符方式也不同,Redis将这部分逻辑封装在aeApiAddEvent中,下面代码是EPOLL模型的实现:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

这段代码就是对操作系统调用epoll_ctl()的封装,EPOLLIN对应的是读(输入)事件,EPOLLOUT对应的是写(输出)事件。

3.2 时间事件的注册过程

对于时间事件:

  • 事件的发生:当前时刻正好是事件期望发生的时刻,或者是晚于事件期望发生的时刻,所以需要让程序知道事件期望发生的时刻;
  • 回调函数:此时调用回调函数进行处理,所以需要让程序知道事件的回调函数。

对应的事件事件注册函数如下:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

这段代码逻辑也是非常简单:首先创建时间事件对象,接着设置事件,设置回调函数,最后将事件事件对象插入到时间事件链表中。设置时间事件期望发生的时间比较简单:

static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
    long cur_sec, cur_ms, when_sec, when_ms;

    aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec + milliseconds/1000;
    when_ms = cur_ms + milliseconds%1000;
    if (when_ms >= 1000) {
        when_sec ++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms = when_ms;
}

static void aeGetTime(long *seconds, long *milliseconds)
{
    struct timeval tv;

    gettimeofday(&tv, NULL);
    *seconds = tv.tv_sec;
    *milliseconds = tv.tv_usec/1000;
}

当前时间加上期望的时间间隔,作为事件期望发生的时刻。

四、套接字文件事件

Redis为客户端提供存储数据和获取数据的缓存服务,监听并处理来自请求,将结果返回给客户端,这一过程将会发生以下文件事件:

与上图相对应,对于一个请求,Redis会注册三个文件事件:

4.1 TCP连接建立事件

服务器初始化时,在服务器套接字上注册TCP连接建立的事件。

void initServer(void) {
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
}

回调函数为acceptTcpHandler,该函数最重要的职责是创建客户端结构。

4.2 客户端套接字读事件

创建客户端:在客户端套接字上注册客户端套接字可读事件。

if (aeCreateFileEvent(server.el,fd,AE_READABLE,
                      readQueryFromClient, c) == AE_ERR)
{
    close(fd);
    zfree(c);
    return NULL;
}

回调函数为readQueryFromClient,顾名思义,此函数将从客户端套接字中读取数据。

4.3 向客户端返回数据

Redis完成请求后,Redis并非处理完一个请求后就注册一个写文件事件,然后事件回调函数中往客户端写回结果。根据图1,检测到文件事件发生后,Redis对这些文件事件进行处理,即:调用rReadProc或writeProc回调函数。处理完成后,对于需要向客户端写回的数据,先缓存到内存中:

typedef struct client {
    // ...其他字段
    
    list *reply;            /* List of reply objects to send to the client. */
    
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
};

发送给客户端的数据会存放到两个地方:

  • reply指针存放待发送的对象;
  • buf中存放待返回的数据,bufpos指示数据中的最后一个字节所在位置。

这里遵循一个原则:只要能存放在buf中,就尽量存入buf字节数组中,如果buf存不下了,才存放在reply对象数组中。

写回发生在进入下一次等待文件事件之前,见图1中【等待前处理】,会调用以下函数来处理客户端数据写回逻辑:

int writeToClient(int fd, client *c, int handler_installed) {
    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            o = listNodeValue(listFirst(c->reply));
            objlen = o->used;
            if (objlen == 0) {
                c->reply_bytes -= o->size;
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }

            nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
        }
    }
}

上述函数只截取了数据发送部分,首先发送buf中的数据,然后发送reply中的数据。

有读者可能会疑惑:write()系统调用是阻塞式的接口,上述做法会不会在write()调用的地方有等待,从而导致性能低下?这里就要介绍Redis是怎么处理这个问题的。

首先,我们发现创建客户端的代码:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        anetNonBlock(NULL,fd);
    }
}

可以看到设置fd是非阻塞(NonBlock),这就保证了在套接字fd上的read()和write()系统调用不是阻塞的。

其次,和文件事件的处理操作一样,往客户端写数据的操作也是批量的,函数如下:

int handleClientsWithPendingWrites(void) {
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
        /* If after the synchronous writes above we still have data to
         * output to the client, we need to install the writable handler. */
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
}

可以看到,首先对每个客户端调用刚才介绍的writeToClient()函数进行写数据,如果还有数据没写完,那么注册写事件,当套接字文件描述符写就绪时,调用sendReplyToClient()进行剩余数据的写操作:

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    writeToClient(fd,privdata,1);
}

仔细想一下就明白了:处理完得到结果后,这时套接字的写缓冲区一般是空的,因此write()函数调用成功,所以就不需要注册写文件事件了。如果写缓冲区满了,还有数据没写完,此时再注册写文件事件。并且在数据写完后,将写事件删除:

int writeToClient(int fd, client *c, int handler_installed) {
    if (!clientHasPendingReplies(c)) {
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    }
}

注意到,在sendReplyToClient()函数实现中,第三个参数正好是1。

mstsc远程报:这可能是由于CredSSP 加密Oracle修正的两种完美解决方法 - 依乐祝 - 博客园

mikel阅读(478)

gpedit.msc

来源: mstsc远程报:这可能是由于CredSSP 加密Oracle修正的两种完美解决方法 – 依乐祝 – 博客园

win10很完美,用的也很舒服!当然人无完人,也总有不尽如人意的时候。比如说我们经常用的远程mstsc,就出现了一个坑,既然出现坑了,我们就得把坑解决掉吧!下面就记录一下这个坑的解决方法。

mstsc错误

本文地址:https://www.cnblogs.com/yilezhu/p/9327358.html
作者:yilezhu

查看微软CredSSP更新日志

查看win10系统升级日志,果然找到了原因,是因为CVE-2018-0886 的 CredSSP 2018 年 5 月 8 日更新默认设置从“易受攻击”更改为“缓解”的更新。相关的 Microsoft 知识库编号已在 CVE-2018-0886 中列出。默认情况下,安装此更新后,修补的客户端无法与未修补的服务器进行通信。 使用本文中描述的互操作性矩阵和组策略设置来启用“允许的”配置。
具体的内容请自行阅读:https://support.microsoft.com/zh-cn/help/4093492/credssp-updates-for-cve-2018-0886-march-13-2018

微软官方给出的解决“这可能是由于CredSSP 加密Oracle修正”的方法

  1. 组策略解决方法及步骤
    win+R 打开运行运行,然后输入“gpedit.msc”打开 如下图所示,在左侧窗口依次找到策略路径:“计算机配置”->“管理模板”->“系统”->“凭据分配” 然后右侧窗口设置名称中找到: 加密 Oracle 修正

然后双击“加密 Oracle 修正”打开如下的窗口,对着设置即可。

最后,微软建议对加密 Oracle 修正的任何更改都需要重启。所以这里最好还是重启一下电脑。不过博主本人试了,不需要重启远程连接也正常了!

  1. 注册表解决方法及步骤

这里建议先使用上面组策略的方式进行解决,如果解决不了,再使用注册表方式,因为注册表方式没有组策略这种图形化界面的简单方便。
同时,微软也给出了警告: 如果使用注册表编辑器或其他方法修改注册表不当,可能会出现严重问题。 这些问题可能需要您重新安装操作系统。 Microsoft 不能保证可解决这些问题。 请自行承担修改注册表的风险。

win+R 打开运行菜单,然后输入“regedit” 按Enter键即可打开如下的注册表,按照图示,找到此路径“
HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Policies\System\CredSSP\Parameters”

然后在右侧窗口双击“AllowEncryptionOracle” 并把值设置成“2” (跟上面组策略设置效果一样)然后确定。如下图所示

0表示强制更新的客户端
1表示缓解
2表示易受攻击 (跟上面组策略设置效果一样,有木有)

总结

今天主要是介绍下win10 mstsc远程遇到的坑“这可能是由于CredSSP 加密Oracle修正”的两种解决方法。从微软官方更新日志入手,然后引出组策略以及注册表的解决方法。图文也更容易让新手朋友也能按照步骤进行解决!最后还是提醒下,新手朋友最好通过组策略的方式进行解决,因为微软也给出了警告: 如果使用注册表编辑器或其他方法修改注册表不当,可能会出现严重问题。 这些问题可能需要您重新安装操作系统。 Microsoft 不能保证可解决这些问题。 请自行承担修改注册表的风险。

redis+redisClient使用心得 - anker_loving - 博客园

mikel阅读(662)

来源: redis+redisClient使用心得 – anker_loving – 博客园

最近在玩redisClient,顺便把noSQL中的神器redis的一些知识也连接起来,现分享如下,与君共进!

1.redisClient是一款非常方便的GUI,没什么可说的,直接上图:

 

2.redis安装

<1>在linux服务器安装rzsz—–>yum -y install lrzsz

<2>rz上传 && tar zxvf filename

<3>make && cd src && mkdir -p /usr/local/redis && cp redis-cli redis-server /usr/local/redis && cd .. && cp redis-conf /usr/local/redis

<4>start service:

配置redis-conf,将daemonize设置为yes

./redis-server redis-conf  携带配置启动

ps -A | grep redis查看daemonize

3.redis连接与配置

<1>使用redis-cli连接,其中p代表接口,a代表密码,h代表远程主机地址,quit/exit退出redis客户端:

<2>使用redis-conf文件进行配置,可以配置端口,ip限制,密码,主从,数据库个数等参数

3.redis的使用

<1>keys * ,get,set等不作赘述;

<2>flushdb删除当前db中所有key,flushall删除所有db中所有key,del key1 key2…,exists key;

<3>List(链表,消息队列):lpush,rpush + key + value;lpop,rpop+key;lrange key start end 返回list区间内的元素,下标从0开始,用于获取链表内容;

<4>Set(集合,String数组,其内元素不能重复):sadd/srem key member添加/移除成员;smove p1 p2 member将p1中member移动到p2;sinter/sunion/sdiff key1 key2…返回key交集/并集/差集;

<5>Sort Set(排序):是List和Set的集合,以权值对的形式保存,zadd+ name+ 权+值,通常用于保存前几个最高回复量,特点是:元素有排序功能,不存在多个相同元素

<6>持久化:redis会把数据以文件形式存储在硬盘上一份,在服务器重启时会自动把硬盘数据恢复到内存里面,数据保存到硬盘的过程就称为”持久化”

 

<7>主从模式

4.与其他语言结合:

<1>与php结合: