用户画像产品化——从零开始搭建实时用户画像(六) - 独孤风 - 博客园

mikel阅读(859)

来源: 用户画像产品化——从零开始搭建实时用户画像(六) – 独孤风 – 博客园

在开发好用户标签以后,如何将标签应用到实际其实是一个很重要的问题。只有做好产品的设计才能让标签发挥真正的价值,本文将介绍用户画像的产品化过程。

一、标签展示

首先是标签展示功能,这个主要供业务人员和研发人员使用,是为了更直观的看见整个的用户标签体系。

不同的标签体系会有不同的层级,那么这个页面的设计就需要我们展示成树状的结构,方便以后的扩展。

在最后一个层级,比如自然性别,可以设计一个统计页面,在进入页面后,可以展示相应的数据统计情况,

可以更直观看见标签中值得比例,也可以为业务提供好的建议,另外可以对标签的具体描述进行展示,起到一个说明的作用,还可以展示标签按天的波动情况,观察标签的变化情况。

这一部分的数据来源呢?之前也提到过,这些标签的元数据信息都存在mySQL中,方便我们查询。

所以树状图和标签描述信息需要去mySQL中获取,而比例等图表数据则是从Hbase,Hive中查询获取的,当然也有直接通过ES获取的。但是每天的标签历史波动情况,还是要通过每天跑完标签后存在mysql中作为历史记录进行展示。

二 、标签查询

这一功能可以提供给研发人员和业务人员使用。

标签查询功能其实就是对用户进行全局画像的过程,对于一个用户的全量标签信息,我们是需要对其进行展示的。

输入用户id后,可以查看该用户的属性信息、行为信息、风控属性等信息。从多方位了解一个具体的用户特征。

这些已经是标签的具体信息了,由于是对单一id的查找,从hive中获取会造成查询速度的问题,所以我们更建议从Hbase或者ES中查询获取,这样查询效率和实时性都能获得极大的提升。

三、标签管理

这一功能是提供给研发人员使用的。

对于标签,不能每一次新增一个标签都进行非常大改动,这样是非常耗费人力的,所以必须要有可以对标签进行管理的功能。

这里定义了标签的基本信息,开发方式,开发人员等等,在完成标签的开发以后,直接在此页面对标签进行录入,就可以完成标签的上线工作,让业务人员可以对标签进行使用。

新增和编辑标签的页面,可以提供下拉框或者输入框提供信息录入的功能。

之前已经提到过,这些标签的元数据信息都保存在了Mysql中,只要完成对其的新增和修改就可以了。

四、用户分群

作为用户画像最核心的功能,用户分群功能。是用户画像与业务系统建立联系的桥梁,也是用户画像的价值所在。

这项功能主要供业务人员使用。

此功能允许用户自定义的圈定一部分人员,圈定的规则就是对于标签的条件约束。

在圈定好人群以后,可以对这部分人群提供与业务系统的外呼系统,客服系统,广告系统,Push系统的交互,达到真正的精细化运营的目的。

对于标签规则的判断,需要将记录好的规则存储于Mysql中,在进行人群计算时又需要将规则解析成可计算的逻辑。不管是解析成Sql或者其他的查询语言都难度巨大,这对于研发是一个非常大的挑战。

在此功能中,还可以增加人群对比的功能,对不同人群的不同标签进行圈定,对比。这对于查询性能也是一个巨大的考验。

但是,用户分群功能作为用户画像的核心是我们必须要实现的。对于技术架构,Hbase更擅长与KV形式的查询,对于多维度查询性能较差,所以可以采取ES索引,在ES查询出Hbase的Rowkey,再去查询Hbase的方式。也有很多公司选择整体迁移到ES中完成此项工作。那么ES可以胜任这项工作吗?

下一章,我们来聊一聊如何用ES来实现用户分群,未完待续~

参考文献

《用户画像:方法论与工程化解决方案》

更多实时数据分析相关博文与科技资讯,欢迎关注 “实时流式计算” 获取用户画像相关资料 请关注 “实时流式计算” 回复 “用户画像”

你真的了解字典(Dictionary)吗? - 码农阿宇 - 博客园

mikel阅读(795)

来源: 你真的了解字典(Dictionary)吗? – 码农阿宇 – 博客园

从一道亲身经历的面试题说起

半年前,我参加我现在所在公司的面试,面试官给了一道题,说有一个Y形的链表,知道起始节点,找出交叉节点.
Y形链表
为了便于描述,我把上面的那条线路称为线路1,下面的称为线路2.

思路1#

先判断线路1的第一个节点的下级节点是否是线路2的第一个节点,如果不是,再判断是不是线路2的第二个,如果也不是,判断是不是第三个节点,一直到最后一个.
如果第一轮没找到,再按以上思路处理线路一的第二个节点,第三个,第四个… 找到为止.
时间复杂度n2,相信如果我用的是这种方法,可肯定被Pass了.

思路2#

首先,我遍历线路2的所有节点,把节点的索引作为key,下级节点索引作为value存入字典中.
然后,遍历线路1中节点,判断字典中是否包含该节点的下级节点索引的key,即dic.ContainsKey((node.next) ,如果包含,那么该下级节点就是交叉节点了.
时间复杂度是n.
那么问题来了,面试官问我了,为什么时间复杂度n呢?你有没有研究过字典的ContainsKey这个方法呢?难道它不是通过遍历内部元素来判断Key是否存在的呢?如果是的话,那时间复杂度还是n2才是呀?
我当时支支吾吾,确实不明白字典的工作原理,厚着面皮说 “不是的,它是通过哈希表直接拿出来的,不用遍历”,面试官这边是敷衍过去了,但在我心里却留下了一个谜,已经入职半年多了,欠下的技术债是时候还了.

带着问题来阅读

在看这篇文章前,不知道您使用字典的时候是否有过这样的疑问.

  1. 字典为什么能无限地Add呢?
  2. 从字典中取Item速度非常快,为什么呢?
  3. 初始化字典可以指定字典容量,这是否多余呢?
  4. 字典的桶buckets 长度为素数,为什么呢?

不管您以前有没有在心里问过自己这些问题,也不管您是否已经有了自己得答案,都让我们带着这几个问题接着往下走.

从哈希函数说起

什么是哈希函数?
哈希函数又称散列函数,是一种从任何一种数据中创建小的数字“指纹”的方法。
下面,我们看看JDK中Sting.GetHashCode()方法.

Copy
public int hashCode() {
        int h = hash;
 //hash default value : 0 
        if (h == 0 && value.length > 0) {
 //value : char storage
            char val[] = value;

            for (int i = 0; i < value.length; i++) {
                h = 31 * h + val[i];
            }
            hash = h;
        }
        return h;
    }

可以看到,无论多长的字符串,最终都会返回一个int值,当哈希函数确定的情况下,任何一个字符串的哈希值都是唯一且确定的.
当然,这里只是找了一种最简单的字符数哈希值求法,理论上只要能把一个对象转换成唯一且确定值的函数,我们都可以把它称之为哈希函数.
这是哈希函数的示意图.
哈希函数示意图
所以,一个对象的哈希值是确定且唯一的!.

字典

如何把哈希值和在集合中我们要的数据的地址关联起来呢?解开这个疑惑前我来看看一个这样不怎么恰当的例子:

有一天,我不小心干了什么坏事,警察叔叔没有逮到我本人,但是他知道是一个叫阿宇的干的,他要找我肯定先去我家,他怎么知道我家的地址呢?他不可能在全中国的家庭一个个去遍历,敲门,问阿宇是你们家的熊孩子吗?

正常应该是通过我的名字,找到我的身份证号码,然后我的身份证上登记着我的家庭地址(我们假设一个名字只能找到一张身份证).

阿宇—–> 身份证(身份证号码,家庭住址)——>我家

我们就可以把由阿宇找到身份证号码的过程,理解为哈希函数,身份证存储着我的号码的同时,也存储着我家的地址,身份证这个角色在字典中就是 bucket,它起一个桥梁作用,当有人要找阿宇家在哪时,直接问它,准备错的,字典中,bucket存储着数据的内存地址(索引),我们要知道key对应的数据的内存地址,问buckets要就对了.

key—>bucket的过程 ~= 阿宇—–>身份证 的过程.
Alt text

警察叔叔通过家庭住址找到了我家之后,我家除了住我,还住着我爸,我妈,他敲门的时候,是我爸开门,于是问我爸爸,阿宇在哪,我爸不知道,我爸便问我妈,儿子在哪?我妈告诉警察叔叔,我在书房呢.很好,警察叔叔就这样把我给逮住了.

字典也是这样,因为key的哈希值范围很大的,我们不可能声明一个这么大的数组作为buckets,这样就太浪费了,我们做法时HashCode%BucketSize作为bucket的索引.

假设Bucket的长度3,那么当key1的HashCode为2时,它数据地址就问buckets2要,当key2的HashCode为5时,它的数据地址也是问buckets2要的.

这就导致同一个bucket可能有多个key对应,即下图中的Johon Smith和Sandra Dee,但是bucket只能记录一个内存地址(索引),也就是警察叔叔通过家庭地址找到我家时,正常来说,只有一个人过来开门,那么,如何找到也在这个家里的我的呢?我爸记录这我妈在厨房,我妈记录着我在书房,就这样,我就被揪出来了,我爸,我妈,我 就是字典中的一个entry.

Alt text
如果有一天,我妈妈老来得子又生了一个小宝宝,怎么办呢?很简单,我妈记录小宝宝的位置,那么我的只能巴结小宝宝,让小宝宝来记录我的位置了.
Alt text
Alt text

既然大的原理明白了,是不是要看看源码,来研究研究代码中字典怎么实现的呢?

DictionaryMini

上次在苏州参加苏州微软技术俱乐部成立大会时,有幸参加了蒋金楠 老师讲的Asp .net core框架解密,蒋老师有句话让我印象很深刻,”学好一门技术的最好的方法,就是模仿它的样子,自己造一个出来”于是他弄了个Asp .net core mini,所以我效仿蒋老师,弄了个DictionaryMini

其源代码我放在了Github仓库,有兴趣的可以看看:https://github.com/liuzhenyulive/DictionaryMini

我觉得字典这几个方面值得了解一下:

  1. 数据存储的最小单元的数据结构
  2. 字典的初始化
  3. 添加新元素
  4. 字典的扩容
  5. 移除元素

字典中还有其他功能,但我相信,只要弄明白的这几个方面的工作原理,我们也就恰中肯綮,他么问题也就迎刃而解了.

数据存储的最小单元(Entry)的数据结构#

Copy
   private struct Entry
        {
            public int HashCode;
            public int Next;
            public TKey Key;
            public TValue Value;
        }

一个Entry包括该key的HashCode,以及下个Entry的索引Next,该键值对的Key以及数据Vaule.

字典初始化#

Copy
        private void Initialize(int capacity)
        {
            int size = HashHelpersMini.GetPrime(capacity);
            _buckets = new int[size];
            for (int i = 0; i < _buckets.Length; i++)
            {
                _buckets[i] = -1;
            }

            _entries = new Entry[size];

            _freeList = -1;
        }

字典初始化时,首先要创建int数组,分别作为buckets和entries,其中buckets的index是key的哈希值%size,它的value是数据在entries中的index,我们要取的数据就存在entries中.当某一个bucket没有指向任何entry时,它的value为-1.
另外,很有意思得一点,buckets的数组长度是多少呢?这个我研究了挺久,发现取的是大于capacity的最小质数.

添加新元素#

Copy
  private void Insert(TKey key, TValue value, bool add)
        {
            if (key == null)
            {
                throw new ArgumentNullException();
            }
            //如果buckets为空,则重新初始化字典.
            if (_buckets == null) Initialize(0);
            //获取传入key的 哈希值
            var hashCode = _comparer.GetHashCode(key);
            //把hashCode%size的值作为目标Bucket的Index.
            var targetBucket = hashCode % _buckets.Length;
            //遍历判断传入的key对应的值是否已经添加字典中
            for (int i = _buckets[targetBucket]; i > 0; i = _entries[i].Next)
            {
                if (_entries[i].HashCode == hashCode && _comparer.Equals(_entries[i].Key, key))
                {
                    //当add为true时,直接抛出异常,告诉给定的值已存在在字典中.
                    if (add)
                    {
                        throw new Exception("给定的关键字已存在!");
                    }
                    //当add为false时,重新赋值并退出.
                    _entries[i].Value = value;
                    return;
                }
            }
            //表示本次存储数据的数据在Entries中的索引
            int index;
            //当有数据被Remove时,freeCount会加1
            if (_freeCount > 0)
            {
                //freeList为上一个移除数据的Entries的索引,这样能尽量地让连续的Entries都利用起来.
                index = _freeList;
                _freeList = _entries[index].Next;
                _freeCount--;
            }
            else
            {
                //当已使用的Entry的数据等于Entries的长度时,说明字典里的数据已经存满了,需要对字典进行扩容,Resize.
                if (_count == _entries.Length)
                {
                    Resize();
                    targetBucket = hashCode % _buckets.Length;
                }
                //默认取未使用的第一个
                index = _count;
                _count++;
            }
            //对Entries进行赋值
            _entries[index].HashCode = hashCode;
            _entries[index].Next = _buckets[targetBucket];
            _entries[index].Key = key;
            _entries[index].Value = value;
            //用buckets来登记数据在Entries中的索引.
            _buckets[targetBucket] = index;
        }

字典的扩容#

Copy
private void Resize()
        {
            //获取大于当前size的最小质数
            Resize(HashHelpersMini.GetPrime(_count), false);
        }
 private void Resize(int newSize, bool foreNewHashCodes)
        {
            var newBuckets = new int[newSize];
            //把所有buckets设置-1
            for (int i = 0; i < newBuckets.Length; i++) newBuckets[i] = -1;
            var newEntries = new Entry[newSize];
            //把旧的的Enties中的数据拷贝到新的Entires数组中.
            Array.Copy(_entries, 0, newEntries, 0, _count);
            if (foreNewHashCodes)
            {
                for (int i = 0; i < _count; i++)
                {
                    if (newEntries[i].HashCode != -1)
                    {
                        newEntries[i].HashCode = _comparer.GetHashCode(newEntries[i].Key);
                    }
                }
            }
            //重新对新的bucket赋值.
            for (int i = 0; i < _count; i++)
            {
                if (newEntries[i].HashCode > 0)
                {
                    int bucket = newEntries[i].HashCode % newSize;
                    newEntries[i].Next = newBuckets[bucket];
                    newBuckets[bucket] = i;
                }
            }

            _buckets = newBuckets;
            _entries = newEntries;
        }

移除元素#

Copy
        //通过key移除指定的item
        public bool Remove(TKey key)
        {
            if (key == null)
                throw new Exception();

            if (_buckets != null)
            {
                //获取该key的HashCode
                int hashCode = _comparer.GetHashCode(key);
                //获取bucket的索引
                int bucket = hashCode % _buckets.Length;
                int last = -1;
                for (int i = _buckets[bucket]; i >= 0; last = i, i = _entries[i].Next)
                {
                    if (_entries[i].HashCode == hashCode && _comparer.Equals(_entries[i].Key, key))
                    {
                        if (last < 0)
                        {
                            _buckets[bucket] = _entries[i].Next;
                        }
                        else
                        {
                            _entries[last].Next = _entries[i].Next;
                        }
                        //把要移除的元素置空.
                        _entries[i].HashCode = -1;
                        _entries[i].Next = _freeList;
                        _entries[i].Key = default(TKey);
                        _entries[i].Value = default(TValue);
                        //把该释放的索引记录在freeList中
                        _freeList = i;
                        //把空Entry的数量加1
                        _freeCount++;
                        return true;
                    }
                }
            }

            return false;
        }

我对.Net中的Dictionary的源码进行了精简,做了一个DictionaryMini,有兴趣的可以到我的github查看相关代码.
https://github.com/liuzhenyulive/DictionaryMini

答疑时间

字典为什么能无限地Add呢#

向Dictionary中添加元素时,会有一步进行判断字典是否满了,如果满了,会用Resize对字典进行自动地扩容,所以字典不会向数组那样有固定的容量.

为什么从字典中取数据这么快#

Key–>HashCode–>HashCode%Size–>Bucket Index–>Bucket–>Entry Index–>Value
整个过程都没有通过遍历来查找数据,一步到下一步的目的性时非常明确的,所以取数据的过程非常快.

初始化字典可以指定字典容量,这是否多余呢#

前面说过,当向字典中插入数据时,如果字典已满,会自动地给字典Resize扩容.
扩容的标准时会把大于当前前容量的最小质数作为当前字典的容量,比如,当我们的字典最终存储的元素为15个时,会有这样的一个过程.
new Dictionary()——————->size:3
字典添加低3个元素—->Resize—>size:7
字典添加低7个元素—->Resize—>size:11
字典添加低11个元素—>Resize—>size:23

可以看到一共进行了三次次Resize,如果我们预先知道最终字典要存储15个元素,那么我们可以用new Dictionary(15)来创建一个字典.

new Dictionary(15)———->size:23

这样就不需要进行Resize了,可以想象,每次Resize都是消耗一定的时间资源的,需要把OldEnties Copy to NewEntries 所以我们在创建字典时,如果知道字典的中要存储的字典的元素个数,在创建字典时,就传入capacity,免去了中间的Resize进行扩容.

Tips:
即使指定字典容量capacity,后期如果添加的元素超过这个数量,字典也是会自动扩容的.

为什么字典的桶buckets 长度为素数#

我们假设有这样的一系列keys,他们的分布范围时K={ 0, 1,…, 100 },又假设某一个buckets的长度m=12,因为3是12的一个因子,当key时3的倍数时,那么targetBucket也将会是3的倍数.

Copy
        Keys {0,12,24,36,...}
        TargetBucket将会是0.
        Keys {3,15,27,39,...}
        TargetBucket将会是3.
        Keys {6,18,30,42,...}
        TargetBucket将会是6.
        Keys {9,21,33,45,...}
        TargetBucket将会是9.

如果Key的值是均匀分布的(K中的每一个Key中出现的可能性相同),那么Buckets的Length就没有那么重要了,但是如果Key不是均匀分布呢?
想象一下,如果Key在3的倍数时出现的可能性特别大,其他的基本不出现,TargetBucket那些不是3的倍数的索引就基本不会存储什么数据了,这样就可能有2/3的Bucket空着,数据大量第聚集在0,3,6,9中.
这种情况其实时很常见的。 例如,又一种场景,您根据对象存储在内存中的位置来跟踪对象,如果你的计算机的字节大小是4,而且你的Buckets的长度也为4,那么所有的内存地址都会时4的倍数,也就是说key都是4的倍数,它的HashCode也将会时4的倍数,导致所有的数据都会存储在TargetBucket=0(Key%4=0)的bucket中,而剩下的3/4的Buckets都是空的. 这样数据分布就非常不均匀了.
K中的每一个key如果与Buckets的长度m有公因子,那么该数据就会存储在这个公因子的倍数为索引的bucket中.为了让数据尽可能地均匀地分布在Buckets中,我们要尽量减少m和K中的key的有公因子出现的可能性.那么,把Bucket的长度设为质数就是最佳选择了,因为质数的因子时最少的.这就是为什么每次利用Resize给字典扩容时会取大于当前size的最小质数的原因.
确实,这一块可能有点难以理解,我花了好几天才研究明白,如果小伙伴们没有看懂建议看看这里.
https://cs.stackexchange.com/questions/11029/why-is-it-best-to-use-a-prime-number-as-a-mod-in-a-hashing-function/64191#64191

最后,感谢大家耐着性子把这篇文章看完,欢迎fork DictionaryMini进行进一步的研究,谢谢大家的支持.
https://github.com/liuzhenyulive/DictionaryMini

ConcurrentDictionary并发字典知多少? - 码农阿宇 - 博客园

mikel阅读(778)

来源: ConcurrentDictionary并发字典知多少? – 码农阿宇 – 博客园

背景

在上一篇文章你真的了解字典吗?一文中我介绍了Hash Function和字典的工作的基本原理.
有网友在文章底部评论,说我的Remove和Add方法没有考虑线程安全问题.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查阅相关资料后,发现字典.net中Dictionary本身时不支持线程安全的,如果要想使用支持线程安全的字典,那么我们就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源码后,我觉得在ConcurrentDictionary的线程安全的解决思路很有意思,其对线程安全的处理对对我们项目中的其他高并发场景也有一定的参考价值,在这里再次分享我的一些学习心得和体会,希望对大家有所帮助.

Concurrent

ConcurrentDictionary是Dictionary的线程安全版本,位于System.Collections.Concurrent的命名空间下,该命名空间下除了有ConcurrentDictionary,还有以下Class都是我们常用的那些类库的线程安全版本.

BlockingCollection:为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和限制功能。

ConcurrentBag:表示对象的线程安全的无序集合.

ConcurrentQueue:表示线程安全的先进先出 (FIFO) 集合。

如果读过我上一篇文章你真的了解字典吗?的小伙伴,对这个ConcurrentDictionary的工作原理应该也不难理解,它是简简单单地在读写方法加个lock吗?

工作原理

Dictionary#

如下图所示,在字典中,数组entries用来存储数据,buckets作为桥梁,每次通过hash function获取了key的哈希值后,对这个哈希值进行取余,即hashResult%bucketsLength=bucketIndex,余数作为buckets的index,而buckets的value就是这个key对应的entry所在entries中的索引,所以最终我们就可以通过这个索引在entries中拿到我们想要的数据,整个过程不需要对所有数据进行遍历,的时间复杂度为1.

Alt text

ConcurrentDictionary#

ConcurrentDictionary的数据存储类似,只是buckets有个更多的职责,它除了有dictionary中的buckets的桥梁的作用外,负责了数据存储.

Alt text

key的哈希值与buckets的length取余后hashResult%bucketsLength=bucketIndex,余数作为buckets的索引就能找到我们要的数据所存储的块,当出现两个key指向同一个块时,即上图中的John Smith和Sandra Dee他同时指向152怎么办呢?存储节点Node具有Next属性执行下个Node,上图中,node 152的Next为154,即我们从152开始找Sandra Dee,发现不是我们想要的,再到154找,即可取到所需数据.

由于官方原版的源码较为复杂,理解起来有所难度,我对官方源码做了一些精简,下文将围绕这个精简版的ConcurrentDictionary展开叙述.
https://github.com/liuzhenyulive/DictionaryMini

数据结构#

Node#

ConcurrentDictionary中的每个数据存储在一个Node中,它除了存储value信息,还存储key信息,以及key对应的hashcode

Copy
private class Node
        {
            internal TKey m_key;   //数据的key
            internal TValue m_value;  //数据值
            internal volatile Node m_next;  //当前Node的下级节点
            internal int m_hashcode;  //key的hashcode

            //构造函数
            internal Node(TKey key, TValue value, int hashcode, Node next)
            {
                m_key = key;
                m_value = value;
                m_next = next;
                m_hashcode = hashcode;
            }
        }

Table#

而整个ConcurrentDictionary的数据存储在这样的一个Table中,其中m_buckets的Index负责映射key,m_locks是线程锁,下文中会有详细介绍,m_countPerLock存储每个lock锁负责的node数量.

Copy

 private class Tables
        {
            internal readonly Node[] m_buckets;   //上文中提到的buckets
            internal readonly object[] m_locks;   //线程锁
            internal volatile int[] m_countPerLock;  //索格锁所管理的数据数量
            internal readonly IEqualityComparer<TKey> m_comparer;  //当前key对应的type的比较器

            //构造函数
            internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)
            {
                m_buckets = buckets;
                m_locks = locks;
                m_countPerLock = countPerlock;
                m_comparer = comparer;
            }
        }

ConcurrentDictionary会在构造函数中创建Table,这里我对原有的构造函数进行了简化,通过默认值进行创建,其中DefaultConcurrencyLevel默认并发级别为当前计算机处理器的线程数.

Copy
        //构造函数
        public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,
            EqualityComparer<TKey>.Default)
        {
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="concurrencyLevel">并发等级,默认为CPU的线程数</param>
        /// <param name="capacity">默认容量,31,超过31后会自动扩容</param>
        /// <param name="growLockArray">时否动态扩充锁的数量</param>
        /// <param name="comparer">key的比较器</param>
        internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
        {
            if (concurrencyLevel < 1)
            {
                throw new Exception("concurrencyLevel 必须为正数");
            }

            if (capacity < 0)
            {
                throw new Exception("capacity 不能为负数.");
            }

            if (capacity < concurrencyLevel)
            {
                capacity = concurrencyLevel;
            }

            object[] locks = new object[concurrencyLevel];
            for (int i = 0; i < locks.Length; i++)
            {
                locks[i] = new object();
            }

            int[] countPerLock = new int[locks.Length];
            Node[] buckets = new Node[capacity];
            m_tables = new Tables(buckets, locks, countPerLock, comparer);

            m_growLockArray = growLockArray;
            m_budget = buckets.Length / locks.Length;
        }

方法#

ConcurrentDictionary中较为基础重点的方法分别位Add,Get,Remove,Grow Table方法,其他方法基本上是建立在这四个方法的基础上进行的扩充.

Add#

向Table中添加元素有以下亮点值得我们关注.

  • 开始操作前会声明一个tables变量来存储操作开始前的m_tables,在正式开始操作后(进入lock)的时候,会检查tables在准备工作阶段是否别的线程改变,如果改变了,则重新开始准备工作并从新开始.
  • 通过GetBucketAndLockNo方法获取bucket索引以及lock索引,其内部就是取余操作.
Copy
 private void GetBucketAndLockNo(
            int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
        {
            //0x7FFFFFFF 是long int的最大值 与它按位与数据小于等于这个最大值
            bucketNo = (hashcode & 0x7fffffff) % bucketCount;
            lockNo = bucketNo % lockCount;
        }
  • 对数据进行操作前会从m_locks取出第lockNo个对象最为lock,操作完成后释放该lock.多个lock一定程度上减少了阻塞的可能性.
  • 在对数据进行更新时,如果该Value的Type为允许原子性写入的,则直接更新该Value,否则创建一个新的node进行覆盖.
Copy
        /// <summary>
        /// Determines whether type TValue can be written atomically
        /// </summary>
        private static bool IsValueWriteAtomic()
        {
            Type valueType = typeof(TValue);

            //
            // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without
            // the risk of tearing.
            //
            // See http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
            //
            if (valueType.IsClass)
            {
                return true;
            }
            switch (Type.GetTypeCode(valueType))
            {
                case TypeCode.Boolean:
                case TypeCode.Byte:
                case TypeCode.Char:
                case TypeCode.Int16:
                case TypeCode.Int32:
                case TypeCode.SByte:
                case TypeCode.Single:
                case TypeCode.UInt16:
                case TypeCode.UInt32:
                    return true;

                case TypeCode.Int64:
                case TypeCode.Double:
                case TypeCode.UInt64:
                    return IntPtr.Size == 8;

                default:
                    return false;
            }
        }

该方法依据CLI规范进行编写,简单来说,32位的计算机,对32字节以下的数据类型写入时可以一次写入的而不需要移动内存指针,64位计算机对64位以下的数据可一次性写入,不需要移动内存指针.保证了写入的安全.
详见12.6.6 http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf

Copy

 private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            while (true)
            {
                int bucketNo, lockNo;
                int hashcode;

                //https://www.cnblogs.com/blurhkh/p/10357576.html
                //需要了解一下值传递和引用传递
                Tables tables = m_tables;
                IEqualityComparer<TKey> comparer = tables.m_comparer;
                hashcode = comparer.GetHashCode(key);

                GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                bool resizeDesired = false;
                bool lockTaken = false;

                try
                {
                    if (acquireLock)
                        Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);

                    //如果表刚刚调整了大小,我们可能没有持有正确的锁,必须重试。
                    //当然这种情况很少见
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
                    {
                        if (comparer.Equals(node.m_key, key))
                        {
                            //key在字典里找到了。如果允许更新,则更新该key的值。
                            //我们需要为更新创建一个node,以支持不能以原子方式写入的TValue类型,因为free-lock 读取可能同时发生。
                            if (updateIfExists)
                            {
                                if (s_isValueWriteAtomic)
                                {
                                    node.m_value = value;
                                }
                                else
                                {
                                    Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
                                    if (prev == null)
                                    {
                                        tables.m_buckets[bucketNo] = newNode;
                                    }
                                    else
                                    {
                                        prev.m_next = newNode;
                                    }
                                }

                                resultingValue = value;
                            }
                            else
                            {
                                resultingValue = node.m_value;
                            }

                            return false;
                        }

                        prev = node;
                    }

                    //key没有在bucket中找到,则插入该数据
                    Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
                    //当m_countPerLock超过Int Max时会抛出OverflowException
                    checked
                    {
                        tables.m_countPerLock[lockNo]++;
                    }

                    //
                    // 如果m_countPerLock[lockNo] > m_budget,则需要调整buckets的大小。
                    // GrowTable也可能会增加m_budget,但不会调整bucket table的大小。.
                    // 如果发现bucket table利用率很低,也会发生这种情况。
                    //
                    if (tables.m_countPerLock[lockNo] > m_budget)
                    {
                        resizeDesired = true;
                    }
                }
                finally
                {
                    if (lockTaken)
                        Monitor.Exit(tables.m_locks[lockNo]);
                }

                if (resizeDesired)
                {
                    GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
                }

                resultingValue = value;
                return true;
            }
        }

Get#

从Table中获取元素的的流程与前文介绍ConcurrentDictionary工作原理时一致,但有以下亮点值得关注.

  • 读取bucket[i]在Volatile.Read()方法中进行,该方法会自动对读取出来的数据加锁,避免在读取的过程中,数据被其他线程remove了.
  • Volatile读取指定字段时,在读取的内存中插入一个内存屏障,阻止处理器重新排序内存操作,如果在代码中此方法之后出现读取或写入,则处理器无法在此方法之前移动它。
Copy

 public bool TryGetValue(TKey key, out TValue value)
        {
            if (key == null) throw new ArgumentNullException("key");

            // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.
            Tables tables = m_tables;
            IEqualityComparer<TKey> comparer = tables.m_comparer;
            GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);

            // We can get away w/out a lock here.
            // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].
            Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);

            while (n != null)
            {
                if (comparer.Equals(n.m_key, key))
                {
                    value = n.m_value;
                    return true;
                }
                n = n.m_next;
            }

            value = default(TValue);
            return false;
        }

Remove#

Remove方法实现其实也并不复杂,类似我们链表操作中移除某个Node.移除节点的同时,还要对前后节点进行链接,相信一块小伙伴们肯定很好理解.

Copy
 private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
        {
            while (true)
            {
                Tables tables = m_tables;

                IEqualityComparer<TKey> comparer = tables.m_comparer;

                int bucketNo, lockNo;

                GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                lock (tables.m_locks[lockNo])
                {
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
                    {
                        if (comparer.Equals(curr.m_key, key))
                        {
                            if (matchValue)
                            {
                                bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
                                if (!valuesMatch)
                                {
                                    value = default(TValue);
                                    return false;
                                }
                            }
                            if (prev == null)
                                Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);
                            else
                            {
                                prev.m_next = curr.m_next;
                            }

                            value = curr.m_value;
                            tables.m_countPerLock[lockNo]--;
                            return true;
                        }

                        prev = curr;
                    }
                }

                value = default(TValue);
                return false;
            }
        }

Grow table#

当table中任何一个m_countPerLock的数量超过了设定的阈值后,会触发此操作对Table进行扩容.

Copy
private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,
            int rehashCount)
        {
            int locksAcquired = 0;
            try
            {
                //首先锁住第一个lock进行resize操作.
                AcquireLocks(0, 1, ref locksAcquired);

                if (regenerateHashKeys && rehashCount == m_keyRehashCount)
                {
                    tables = m_tables;
                }
                else
                {
                    if (tables != m_tables)
                        return;

                    long approxCount = 0;
                    for (int i = 0; i < tables.m_countPerLock.Length; i++)
                    {
                        approxCount += tables.m_countPerLock[i];
                    }

                    //如果bucket数组太空,则将预算加倍,而不是调整表的大小
                    if (approxCount < tables.m_buckets.Length / 4)
                    {
                        m_budget = 2 * m_budget;
                        if (m_budget < 0)
                        {
                            m_budget = int.MaxValue;
                        }

                        return;
                    }
                }

                int newLength = 0;
                bool maximizeTableSize = false;
                try
                {
                    checked
                    {
                        newLength = tables.m_buckets.Length * 2 + 1;
                        while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
                        {
                            newLength += 2;
                        }
                    }
                }
                catch (OverflowException)
                {
                    maximizeTableSize = true;
                }

                if (maximizeTableSize)
                {
                    newLength = int.MaxValue;

                    m_budget = int.MaxValue;
                }

                AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);

                object[] newLocks = tables.m_locks;

                //Add more locks
                if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
                {
                    newLocks = new object[tables.m_locks.Length * 2];
                    Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);

                    for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
                    {
                        newLocks[i] = new object();
                    }
                }

                Node[] newBuckets = new Node[newLength];
                int[] newCountPerLock = new int[newLocks.Length];

                for (int i = 0; i < tables.m_buckets.Length; i++)
                {
                    Node current = tables.m_buckets[i];
                    while (current != null)
                    {
                        Node next = current.m_next;
                        int newBucketNo, newLockNo;
                        int nodeHashCode = current.m_hashcode;

                        if (regenerateHashKeys)
                        {
                            //Recompute the hash from the key
                            nodeHashCode = newComparer.GetHashCode(current.m_key);
                        }

                        GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,
                            newLocks.Length);

                        newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,
                            newBuckets[newBucketNo]);
                        checked
                        {
                            newCountPerLock[newLockNo]++;
                        }

                        current = next;
                    }
                }

                if (regenerateHashKeys)
                {
                    unchecked
                    {
                        m_keyRehashCount++;
                    }
                }

                m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);

                m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);
            }
            finally
            {
                ReleaseLocks(0, locksAcquired);
            }
        }

学习感悟

  • lock[]:在以往的线程安全上,我们对数据的保护往往是对数据的修改写入等地方加上lock,这个lock经常上整个上下文中唯一的,这样的设计下就可能会出现多个线程,写入的根本不是一块数据,却要等待前一个线程写入完成下一个线程才能继续操作.在ConcurrentDictionary中,通过哈希算法,从数组lock[]中找出key的准确lock,如果不同的key,使用的不是同一个lock,那么这多个线程的写入时互不影响的.
  • 写入要考虑线程安全,读取呢?不可否认,在大部分场景下,读取不必去考虑线程安全,但是在我们这样的链式读取中,需要自上而下地查找,是不是有种可能在查找个过程中,链路被修改了呢?所以ConcurrentDictionary中使用Volatile.Read来读取出数据,该方法从指定字段读取对象引用,在需要它的系统上,插入一个内存屏障,阻止处理器重新排序内存操作,如果在代码中此方法之后出现读取或写入,则处理器无法在此方法之前移动它。
  • 在ConcurrentDictionary的更新方法中,对数据进行更新时,会判断该数据是否可以原子写入,如果时可以原子写入的,那么就直接更新数据,如果不是,那么会创建一个新的node覆盖原有node,起初看到这里时候,我百思不得其解,不知道这么操作的目的,后面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,这样操作时为了防止torn reads(撕裂读取),什么叫撕裂读取呢?通俗地说,就是有的数据类型写入时,要分多次写入,写一次,移动一次指针,那么就有可能写了一半,这个结果被另外一个线程读取走了.比如说我把 刘振宇三个字改成周杰伦的过程中,我先把刘改成周了,正在我准备去把振改成杰的时候,另外一个线程过来读取结果了,读到的数据是周振宇,这显然是不对的.所以对这种,更安全的做法是先把周杰伦三个字写好在一张纸条上,然后直接替换掉刘振宇.更多信息在CLI规范12.6.6有详细介绍.
  • checkedunckecked关键字.非常量的运算(non-constant)运算在编译阶段和运行时下不会做溢出检查,如下这样的代码时不会抛出异常的,算错了也不会报错。
Copy
int ten = 10;
int i2 = 2147483647 + ten;

但是我们知道,int的最大值是2147483647,如果我们将上面这样的代码嵌套在checked就会做溢出检查了.

Copy
checked
{
int ten = 10;
int i2 = 2147483647 + ten;
}

相反,对于常量,编译时是会做溢出检查的,下面这样的代码在编译时就会报错的,如果我们使用unckeck标签进行标记,则在编译阶段不会做移除检查.

Copy
int a = int.MaxValue * 2;

那么问题来了,我们当然知道checked很有用,那么uncheck呢?如果我们只是需要那么一个数而已,至于溢出不溢出的关系不大,比如说生成一个对象的HashCode,比如说根据一个算法计算出一个相对随机数,这都是不需要准确结果的,ConcurrentDictionary中对于m_keyRehashCount++这个运算就使用了unchecked,就是因为m_keyRehashCount是用来生成哈希值的,我们并不关心它有没有溢出.

  • volatile关键字,表示一个字段可能是由在同一时间执行多个线程进行修改。出于性能原因,编译器\运行时系统甚至硬件可以重新排列对存储器位置的读取和写入。声明的字段volatile不受这些优化的约束。添加volatile修饰符可确保所有线程都能按照执行顺序由任何其他线程执行的易失性写入,易失性写入是一件疯狂的事情的事情:普通玩家慎用.

本博客所涉及的代码都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs

VS2017git 提交提示错误 Git failed with a fatal error. - 不见不散com - 博客园

mikel阅读(657)

来源: VS2017git 提交提示错误 Git failed with a fatal error. – 不见不散com – 博客园

具体错误信息:Git failed with a fatal error.
error: open(“ConsoleApp1/.vs/ConsoleApp1/v15/Server/SQLite3/db.lock”): Permission denied

fatal: Unable to process path ConsoleApp1/.vs/ConsoleApp1/v15/Server/SQLite3/db.lock

因为git上传要忽略vs文件, Git因致命错误而失败。权限被拒绝 无法处理的路径。

解决方法下:

1. 打开团队资源管理器,点击设置

Commit1

2. 点击 储存库设置

Commit2

3. 添加忽略文件

Commit3

完成.

Commit4

Newbe.Claptrap - 一套以 “事件溯源” 和 “Actor 模式” 作为基本理论的服务端开发框架 | newbe

mikel阅读(570)

来源: Newbe.Claptrap – 一套以 “事件溯源” 和 “Actor 模式” 作为基本理论的服务端开发框架 | newbe

Newbe.Claptrap – 一套以 “事件溯源” 和 “Actor 模式” 作为基本理论的服务端开发框架

本文是关于 Newbe.Claptrap 项目主体内容的介绍,读者可以通过这篇文章,大体了解项目内容。

Newbe.Claptrap

轮子源于需求

随着互联网应用的蓬勃发展,相关的技术理论和实现手段也在被不断创造出来。诸如 “云原生架构”、“微服务架构”、“DevOps” 等一系列关键词越来越多的出现在工程师的视野之中。总结来看,这些新理论和新技术的出现,都是为了解决互联网应用中出现的一些技术痛点:

更高的容量扩展性要求。在商业成功的基础前提下,互联网应用的用户数量、系统压力和硬件设备数量等方面都会随着时间的推移出现明显的增长。这就对应用本省的容量可扩展性提出了要求。这种容量可扩展性通常被描述为 “应用需要支持水平扩展”。

更高的系统稳定性要求。应用程序能够不间断运行,确保商业活动的持续进展,这是任何与这个应用系统相关的人员都希望见到的。但是要做到这点,通常来说是十分困难的。而现今的互联网应用在面对诸多同类竞争者的情况下,如果在这方面做得不够健全,那么很可能会失去一部分用户的青睐。

更高的功能扩展性要求。“拥抱变化”,当人们提到 “敏捷项目管理” 相关的内容时,都会涉及到的一个词语。这个词语充分体现了当今的互联网应用若要成功,在功能性上做到出彩做到成功是多么的重要。也从一个侧面体现了当前互联网环境下产品需求的多变。而作为系统工程师,在应用建立之初就应该考虑这点。

更高的开发易用度要求。这里所属的开发易用度是指,在应用系统自身在进行开发时的难易程度。要做到越易于开发,在应用自身的代码结构,可测试性,可部署性上都需要作出相应的努力。

更高的性能要求。这里提到的性能要求,是特指在系统容量增加时的性能要求。避免系统的单点性能问题,让应用系统具备可水平扩展的特性。通常来说,在性能出现问题时,若可以通过增加物理设备来解决问题,通常来说是最为简单的办法。而在不同的系统容量之下,系统性能的优化方案通常是不同的。因此结合应用场景进行技术方案的选型一直都是系统工程师所需要考虑的问题。

本项目,就是基于以上这些系统功能特性要求所总结出来的一套开发框架。这其中包含了相关的理论基石、开发类库和技术规约。

世界上本也不存在 “银弹”。一套框架解决不了所有问题。 —— 不愿意透露姓名的月落

从需求出发

在讲解分布式系统时,常常会用到 “账号转账” 这个简单的业务场景来配合描述。这里阐述一下这个业务场景。

假设我们需要建设一个具备账号体系的业务系统。每个账号都有余额。现在需要执行一次转账操作,将账号 A 的余额中的 300 划转给账号 B。另外,基于上节的基本要求,我们在实现这个场景时,需要考虑以下这些内容:

  • 需要应对系统容量的激增。应用初期可能只有 1000 个初始用户。由于应用推广效果良好以及机器人账号的涌入,用户数量实现了在一个月内实现了三个数量级的攀升,也就是增长到了百万级别。
  • 需要考虑系统的稳定性和可恢复性。尽可能减少系统整体的平均故障时间,即使出现系统故障也应该是尽可能易于恢复的。也就是,要避免出现单点故障。
  • 需要考虑业务的可扩展性。后续可能需要增加一些业务逻辑:按照账户等级限制日转账额、转账成功后进行短信通知、转账支持一定额度的免密转账、特定的账号实现 “T+1” 到账。
  • 需要考虑代码的可测试性。系统的业务代码和系统代码能够良好的分离,能够通过单元测试的手段初步验证业务代码和系统代码的正确性和性能。

轮子的理论

本节将介绍一些和本框架紧密结合的理论内容,便于读者在后续的过程中理解本框架的工作过程。

Actor 模式

Actor 模式是一种并发编程模型。通过这种编程模型的应用可以很好的解决一些系统的并发问题。这里所提到的并发问题是指计算机对同一数据进行逻辑处理时,可能由于存在多个同时发起的请求可能导致数据出现不正确的问题。这个问题在进行多线程编程时一定会遇到的问题。举个简单的例子,假如在不加同步锁的情况下,使用 100 个线程并发对内存中的一个 int 变量执行 ++ 操作。那么最终这个变量的结果往往小于 100。此处 Actor 模式是如何避免此问题的。

首先,为了便于理解,读者在此处可以将 Actor 认为是一个对象。在面向对象的语言(Java、C# 等)当中,可以认为 Actor 就是通过 new 关键词创建出来的对象。不过这个对象有一些特别的特性:

拥有属于自身的状态。对象都可以拥有自身的属性,这是面向对象语言基本都具备的功能。在 Actor 模式中,这些属性都被统称为 Actor的状态(State)。Actor 的状态由 Actor 自身进行维护。

这就强调了两点:

第一、Actor 的状态只能由自身进行改变,若要从外部改变 Actor 的状态,只能通过调用 Actor 才能改变。

更新Actor状态

第二、Actor 的状态只在 Actor 内部进行维护,不与当前 Actor 之外的任何对象共享。这里说的不共享也是强调其不能通过外部某个属性的改变而导致 Actor 内部状态的变化。这点主要是为了区别于一些具备 “对象引用” 语言特性的编程语言而言的。例如:在 C# 的 class 的 public 属性,假如是引用类型,那么在外部获得这个 class 之后是可以改变 class 中的属性的。但是这在 Actor 模式当中是不被允许的。

共享Actor状态

不过从 Actor 内部读取数据到外部,这仍然是允许的。

读取Actor状态

单线程。Actor 通常同一时间只能接受一个调用。这里所述的线程不完全是指计算机中的线程,是为了凸显 “Actor 同一时间只能处理一个请求的特性” 而使用的词语。假如当前 Actor 正在接受一个调用,那么剩余的调用都会阻塞,直到调用结束,下一个请求才允许被进入。这其实类似于一个同步锁的机制。通过这种机制就避免了对 Actor 内部状态进行修改时,存在并发问题的可能。具体一点说明:如果使用 100 个线程对一个 Actor 进行并发调用,让 Actor 对状态中的一个 int 变量进行 ++ 操作。最终这个状态的数值一定是 100。

并发调用Actor

不过单线程也不是绝对的,在不存在并发问题的请求情况下,允许并发处理。例如读取 Actor 中的状态,这通常不会有并发问题,那么此时就允许进行并发操作。

并发读取Actor

读到 Actor 单线程特性时,通常读者会考虑到这是否会导致 Actor 本身处理过慢而产生性能问题呢?关于这点,希望读者继续持有这个问题往后阅读,寻找答案。

事件溯源模式

事件溯源模式是一种软件设计思路。这种设计思路通常与传统的采用增删查改(CRUD)为主的系统设计思路相区别。CRUD 应用通常存在一些局限性:

  1. 通常来说 CRUD 应用会采用直接操作数据存储的做法。这样的实现方式可能会由于对数据库优化不足而导致性能瓶颈,并且这种做法会较难实现应用伸缩。
  2. 在特定的领域通常存在一些数据需要注意对并发问题进行处理,以防止数据更新的错误。这通常需要引入 “锁”、“事务” 等相关的技术来避免此类问题。但这样又有可能引发性能上的损失。
  3. 除非增加额外的审计手段,否则通常来说数据的变更历史是不可追踪的。因为数据存储中通常保存的是数据最终的状态。

与 CRUD 做法对比,事件溯源则从设计上避免了上述描述的局限性。接下来围绕上文中提到的 “转账” 业务场景简述事件溯源的基础工作方式。

采用 CRUD 的方法实现 “转账”。

采用CRUD的方法实现“转账”

采用事件溯源的方式实现 “转账”。

采用事件溯源的方法实现“转账”

如上图所示,通过事件溯源模式将转账业务涉及的余额变动采用事件的方式进行存储。同样也实现了业务本身,而这样却带来了一些好处:

  • 通过事件,可以还原出账号任何阶段的余额,这就一定程度实现了对账号余额的跟踪。
  • 由于两个账号的事件是独立处理的。因此,两个账号的处理速度不会相互影响。例如,账号 B 的转入可能由于需要额外的处理,稍有延迟,但账号 A 仍然可以的转出。
  • 可以通过订阅事件来做一些业务的异步处理。例如:更新数据库中的统计数据,发送短信通知等其他的一些异步操作。

当然引入事件溯源模式之后也就引入了事件溯源相关的一些技术问题。例如:事件所消耗的存储可能较为巨大;不得不应用最终一致性;事件具备不可变性,重构时可能较为困难等。相关的这些问题在一些文章中会有较为细致的说明。读者可以阅读后续的延伸阅读内容,进而进行了解与评估。

业务复杂度是不会因为系统设计变化而减少的,它只是从一个地方转移到了另外的地方。—— 总说自己菜的月落

让轮子转起来

基于读者已经大体理解了上节理论的基础上,本节将结合上述描述的 “转账” 业务场景,介绍本框架的工作原理。首先读者需要了解一下本框架的两个名词。

Claptrap

Claptrap

Claptrap 是本框架定义的一种特殊 Actor。除了上文中提到 Actor 两种特性之外,Claptrap 还被定义为具有以下特性:

状态由事件进行控制。Actor 的状态在 Actor 内部进行维护。Claptrap 同样也是如此,不过改变 Claptrap 的状态除了在 Actor 之外,还限定其只能通过事件进行改变。这就将事件溯源模式与 Actor 模式进行了结合。通过事件溯源模式保证了 Actor 状态的正确性和可追溯性。这些改变 Claptrap 状态的事件是由 Claptrap 自身产生的。事件产生的原因可以是外部的调用也可以是 Claptrap 内部的类触发器机制产生的。

Minion

Minion

Minion 是本框架定义的一种特殊 Actor。是在 Claptrap 基础上做出的调整。其具备以下特性:

从对应的 Claptrap 读取事件。与 Claptrap 相同,Minion 的状态也由事件进行控制。不同的是,Minion 就像其字面意思一样,总是从对应的 Claptrap 处获取事件,从而改变自身的状态。因此,其可以异步的处理 Claptrap 产生事件之后的后续操作。

业务实现

接下来有了前面的基础介绍,现在介绍一下本框架如何实现上文中的 “转账” 场景。首先可以通过下图来了解一下主要的流程:

Claptrap & Minion

如上图所示,整个流程便是本框架实现业务场景的大体过程。另外,还有一些需要指出的是:

  • 图中 Client 与 Claptrap 的调用等待只有第一阶段的时候存在,也就是说,这使得 Client 可以更快的得到响应,不必等待整个流程结束。
  • Claptrap A 在处理完自身请求,并将事件发送给 Minion A 之后就可以重新接受请求,这样提高了 Claptrap A 的吞吐量。
  • Minion 不仅仅只能处理 Claptrap 之间的调用代理。在 Minion 当中还可以根据业务需求进行:发送短信,更新数据库统计数据等其他操作。
  • Minion 也可以具备自己的状态,将部分数据维持在自身的状态中以便外部可以从自身进行查询,而不需要从对应的 Claptrap 中进行查询。例如:统计该账号最近 24 小时的转账变动,以便快速查询。

业务容量

前文提到本框架需要建设的是一个可以水平扩展的系统架构,只有如此才能应对业务容量的持续增长。在这点上,本框架现阶段采用的是微软开源的 Orleans 和 Service Fabric 搭配,实现应用程序和物理设备的放缩。当然,涉及数据存储部分时势必也涉及到数据库集群等一系列问题。这些属于技术应用的细节,而非框架理论设计的内容。因此,此处只表明本框架可以基于以上的开源架构进行容量放缩。应用过程中的实际问题,读者可以在后续的项目内容中寻求解答。

轮厂现状

当前项目正处于设立初期,主体的体系结构设计与编码仍在进行中,读者可以通过项目地址了解项目的最新进展:

Github,主要项目源码管理:https://github.com/newbe36524/Newbe.Claptrap

Gitee,码云仓库地址:https://gitee.com/yks/Newbe.Claptrap

点击链接 QQ 交流【Newbe.Claptrap】:https://jq.qq.com/?_wv=1027&k=5uJGXf5

延伸阅读

以下这些内容都对本框架产生了深远的影响。读者可以通过阅读以下这些内容,增加对本框架的理解。

C# 中的sealed修饰符学习 - 新西兰程序员 - 博客园

mikel阅读(590)

来源: C# 中的sealed修饰符学习 – 新西兰程序员 – 博客园

转载原地址 http://developer.51cto.com/art/200908/147327.htm

C#语言还是比较常见的东西,这里我们主要介绍C# sealed修饰符,包括介绍两个修饰符在含义上互相排斥用于方法和属性等方面。

C# sealed修饰符是干什么的?

C# sealed修饰符表示密封用于类时,表示该类不能再被继承,不能和 abstract 同时使用,因为这两个修饰符在含义上互相排斥用于方法和属性时,表示该方法或属性不能再被重写,必须和 override 关键字一起使用,因为使用 C# sealed修饰符的方法或属性肯定是基类中相应的虚成员通常用于实现第三方类库时不想被客户端继承,或用于没有必要再继承的类以防止滥用继承造成层次结构体系混乱恰当的利用 C# sealed修饰符也可以提高一定的运行效率,因为不用考虑继承类会重写该成员。

示例:

复制代码
using System;  
2.using System.Collections.Generic;  
3.using System.Text;  
4.   
5.namespace Example06  
6.{  
7.class Program  
8.{  
9.class A  
10.{  
11.public virtual void F()  
12.{  
13.Console.WriteLine("A.F");  
14.}  
15.public virtual void G()  
16.{  
17.Console.WriteLine("A.G");  
18.}  
19.}  
20.class B : A  
21.{  
22.public sealed override void F()  
23.{  
24.Console.WriteLine("B.F");  
25.}  
26.public override void G()  
27.{  
28.Console.WriteLine("B.G");  
29.}  
30.}  
31.class C : B  
32.{  
33.public override void G()  
34.{  
35.Console.WriteLine("C.G");  
36.}  
37.}  
38.static void Main(string[] args)  
39.{  
40.new A().F();  
41.new A().G();  
42.new B().F();  
43.new B().G();  
44.new C().F();  
45.new C().G();  
46.   
47.Console.ReadLine();  
48.}  
49.}  
50.}
复制代码

转载原地址 http://www.cnblogs.com/iamdaiyuan/archive/2007/02/06/642442.html
sealed的中文意思是密封,故名思义,就是由它修饰的类或方法将不能被继承或是重写。

        sealed关键字的作用:

在类声明中使用sealed可防止其它类继承此类;在方法声明中使用sealed修饰符可防止扩充类重写此方法。

sealed修饰符主要用于防止非有意的派生,但是它还能促使某些运行时优化。具体说来,由于密封类永远不会有任何派生类,所以对密封类的实例的虚拟函数成员的调用可以转换为非虚拟调用来处理。

         密封类:

密封类在声明中使用sealed 修饰符,这样就可以防止该类被其它类继承。如果试图将一个密封类作为其它类的基类,C#将提示出错。理所当然,密封类不能同时又是抽象类,因为抽象总是希望被继承的。
在哪些场合下使用密封类呢?实际上,密封类中不可能有派生类。如果密封类实例中存在虚成员函数,该成员函数可以转化为非虚的,函数修饰符virtual 不再生效。
让我们看下面的例子:

复制代码
1 abstract class AbstractClass
2 {
3       public abstract void Method( ) ;
4 }

5 sealed class SealedClass: AbstractClass
6 {
7        public override void Method( )
8         { //... }
9 }
复制代码

如果我们尝试写下面的代码

class OtherClass: SealedClass
{ 
}

C#会指出这个错误,告诉你SealedClass 是一个密封类,不能试图从SealedClass  中派生任何类。

            密封方法:

C#还提出了密封方法(sealed method) 的概念,以防止在方法所在类的派生类中对该方法的重载。对方法可以使用sealed 修饰符,这时我们称该方法是一个密封方法。
不是类的每个成员方法都可以作为密封方法密封方法,要作为密封方法必须对基类的虚方法进行重载,提供具体的实现方法。所以,在方法的声明中,sealed 修饰符总是和override 修饰符同时使用。请看下面的例子代码:

复制代码
 1 using System ;

 2 class A
 3 {
 4     public virtual void F( )
 5     { 
            Console.WriteLine("A.F") ; 
        }
 6     
        public virtual void G( )
 7     {     
            Console.WriteLine("A.G") ; 
        }
 8 }

 9 class B: A
10 {
11     sealed override public void F( )
12     { 
            Console.WriteLine("B.F") ; 
         }
13 
        override public void G( )
14     {
             Console.WriteLine("B.G") ; }
15     }
16 
    class C: B
17 {
18     override public void G( )
19     { 
            Console.WriteLine("C.G") ; 
        }
20 }
复制代码

类B 对基类A 中的两个虚方法均进行了重载,其中F 方法使用了sealed 修饰符,成为一个密封方法。G 方法不是密封方法,所以在B 的派生类C 中,可以重载方法G,但不能重载方法F。

C# 表达式树(Expression) - muzizongheng - 博客园

mikel阅读(1300)

来源: C# 表达式树(Expression) – muzizongheng – 博客园

C#中有Expression,即表达式。

通过Expression可以动态构造代码,并编译执行。
比如:
1.
创建参数表达式 :ParameterExpression numParam = Expression.Parameter(typeof(int), “num”);、
创建常量表达式:ConstantExpression five = Expression.Constant(5, typeof(int));
创建比较表达式:BinaryExpression numLessThanFive = Expression.LessThan(numParam, five);
创建标号:LabelTarget label = Expression.Label(typeof(int));
创建循环或者分支表达式:
  Expression.Loop(
    // Adding a conditional block into the loop.
           Expression.IfThenElse(
    // Condition: value > 1
               Expression.GreaterThan(value, Expression.Constant(1)),
    // If true: result *= value --
               Expression.MultiplyAssign(result,
                   Expression.PostDecrementAssign(value)),
    // If false, exit the loop and go to the label.
               Expression.Break(label, result)
           ),
    // Label to jump to.
       label
    )
2.
创建表达式树:Expression<Func<int, bool>> exprTree = num => num < 5;
3.
分解表达式:ParameterExpression param = (ParameterExpression)exprTree.Parameters[0];
4.
编译表达式:Func<int, bool> result = expr.Compile();


https://muzizongheng.blog.csdn.net/

[C#] C# 知识回顾 - 表达式树 Expression Trees - 反骨仔 - 博客园

mikel阅读(687)

来源: [C#] C# 知识回顾 – 表达式树 Expression Trees – 反骨仔 – 博客园

目录

 

简介

表达式树以树形数据结构表示代码,其中每一个节点都是一种表达式,比如方法调用和 x < y 这样的二元运算等。

  你可以对表达式树中的代码进行编辑和运算。这样能够动态修改可执行代码、在不同数据库中执行 LINQ 查询以及创建动态查询。

  表达式树还能用于动态语言运行时 (DLR) 以提供动态语言和 .NET Framework 之间的互操作性。

 

一、Lambda 表达式创建表达式树

若 lambda 表达式被分配给 Expression<TDelegate> 类型的变量,则编译器可以发射代码以创建表示该 lambda 表达式的表达式树。

C# 编译器只能从表达式 lambda (或单行 lambda)生成表达式树。

下列代码示例使用关键字 Expression创建表示 lambda 表达式:

1             Expression<Action<int>> actionExpression = n => Console.WriteLine(n);
2             Expression<Func<int, bool>> funcExpression1 = (n) => n < 0;
3             Expression<Func<int, int, bool>> funcExpression2 = (n, m) => n - m == 0;

 

二、API 创建表达式树

通过 API 创建表达式树需要使用 Expression 类

下列代码示例展示如何通过 API 创建表示 lambda 表达式:num => num == 0

复制代码
1             //通过 Expression 类创建表达式树
2             //  lambda:num => num == 0
3             ParameterExpression pExpression = Expression.Parameter(typeof(int));    //参数:num
4             ConstantExpression cExpression = Expression.Constant(0);    //常量:0
5             BinaryExpression bExpression = Expression.MakeBinary(ExpressionType.Equal, pExpression, cExpression);   //表达式:num == 0
6             Expression<Func<int, bool>> lambda = Expression.Lambda<Func<int, bool>>(bExpression, pExpression);  //lambda 表达式:num => num == 0
复制代码

代码使用 Expression 类的静态方法进行创建。

 

三、解析表达式树

下列代码示例展示如何分解表示 lambda 表达式 num => num == 0 的表达式树。

复制代码
1             Expression<Func<int, bool>> funcExpression = num => num == 0;
2 
3             //开始解析
4             ParameterExpression pExpression = funcExpression.Parameters[0]; //lambda 表达式参数
5             BinaryExpression body = (BinaryExpression)funcExpression.Body;  //lambda 表达式主体:num == 0
6 
7             Console.WriteLine($"解析:{pExpression.Name} => {body.Left} {body.NodeType} {body.Right}");
复制代码

 

 

 

四、表达式树永久性

  表达式树应具有永久性(类似字符串)。这意味着如果你想修改某个表达式树,则必须复制该表达式树然后替换其中的节点来创建一个新的表达式树。  你可以使用表达式树访问者遍历现有表达式树。第七节介绍了如何修改表达式树。

五、编译表达式树

Expression<TDelegate> 类型提供了 Compile 方法以将表达式树表示的代码编译成可执行委托。

复制代码
1             //创建表达式树
2             Expression<Func<string, int>> funcExpression = msg => msg.Length;
3             //表达式树编译成委托
4             var lambda = funcExpression.Compile();
5             //调用委托
6             Console.WriteLine(lambda("Hello, World!"));
7 
8             //语法简化
9             Console.WriteLine(funcExpression.Compile()("Hello, World!"));
复制代码

 

 

 

 

六、执行表达式树

执行表达式树可能会返回一个值,也可能仅执行一个操作(例如调用方法)。

  只能执行表示 lambda 表达式的表达式树。表示 lambda 表达式的表达式树属于 LambdaExpression 或 Expression<TDelegate> 类型。若要执行这些表达式树,需要调用 Compile 方法来创建一个可执行委托,然后调用该委托。
复制代码
 1             const int n = 1;
 2             const int m = 2;
 3 
 4             //待执行的表达式树
 5             BinaryExpression bExpression = Expression.Add(Expression.Constant(n), Expression.Constant(m));
 6             //创建 lambda 表达式
 7             Expression<Func<int>> funcExpression = Expression.Lambda<Func<int>>(bExpression);
 8             //编译 lambda 表达式
 9             Func<int> func = funcExpression.Compile();
10 
11             //执行 lambda 表达式
12             Console.WriteLine($"{n} + {m} = {func()}");
复制代码

 

七、修改表达式树

该类继承 ExpressionVisitor 类,通过 Visit 方法间接调用 VisitBinary 方法将 != 替换成 ==。基类方法构造类似于传入的表达式树的节点,但这些节点将其子目录树替换为访问器递归生成的表达式树。

复制代码
 1     internal class Program
 2     {
 3         private static void Main(string[] args)
 4         {
 5             Expression<Func<int, bool>> funcExpression = num => num == 0;
 6             Console.WriteLine($"Source: {funcExpression}");
 7 
 8             var visitor = new NotEqualExpressionVisitor();
 9             var expression = visitor.Visit(funcExpression);
10 
11             Console.WriteLine($"Modify: {expression}");
12 
13             Console.Read();
14         }
15 
16         /// <summary>
17         /// 不等表达式树访问器
18         /// </summary>
19         public class NotEqualExpressionVisitor : ExpressionVisitor
20         {
21             public Expression Visit(BinaryExpression node)
22             {
23                 return VisitBinary(node);
24             }
25 
26             protected override Expression VisitBinary(BinaryExpression node)
27             {
28                 return node.NodeType == ExpressionType.Equal
29                     ? Expression.MakeBinary(ExpressionType.NotEqual, node.Left, node.Right) //重新弄个表达式:用 != 代替 ==
30                     : base.VisitBinary(node);
31             }
32         }
33     }
复制代码

 

 

 

八、调试

8.1 参数表达式

1             ParameterExpression pExpression1 = Expression.Parameter(typeof(string));
2             ParameterExpression pExpression2 = Expression.Parameter(typeof(string), "msg");

图8-1

图8-2

DebugView 可知,如果参数没有名称,则会为其分配一个自动生成的名称。

 

1             const int num1 = 250;
2             const float num2 = 250;
3 
4             ConstantExpression cExpression1 = Expression.Constant(num1);
5             ConstantExpression cExpression2 = Expression.Constant(num2);

图8-3

图8-4

DebugView 可知,float 比 int 多了个后缀 F。

 

1             Expression lambda1 = Expression.Lambda<Func<int>>(Expression.Constant(250));
2             Expression lambda2 = Expression.Lambda<Func<int>>(Expression.Constant(250), "CustomName", null);

图8-5

图8-6

观察 DebugView ,如果 lambda 表达式没有名称,则会为其分配一个自动生成的名称。

 

 


【原文链接】http://www.cnblogs.com/liqingwen/p/5868688.html

ORM开发之解析lambda实现group查询(附测试例子) - hubro - 博客园

mikel阅读(885)

来源: ORM开发之解析lambda实现group查询(附测试例子) – hubro – 博客园

目的:以编程方式实现group查询,在开发ORM时,需要达到这样的效果

先看一个简单的group语句

select BarCode,ProductName,COUNT(BarCode) as total from ProductData  group by BarCode,ProductName
order by COUNT(BarCode) desc

结果

1
2
3
4
BarCode                        ProductName                    total
------------------------------ ------------------------------ -----------
1212122                        product2                       4
21312313                       product3                       2

group语法分解为

  • 查询哪些字段 BarCode,ProductName,COUNT(BarCode) as total
  • 按哪些字段进行分组 group by BarCode,ProductName
  • 按什么排序  order by COUNT(BarCode) desc

linq to SQL 表示为

1
2
3
4
from in  ProductData
group p.BarCode by p.ProductName into g
select  new
{
1
g.BarCode,<br>g.ProductName,<br>total=g.Count()
1
}

linq to SQL很容易表达,用lambda该如何表达呢

跟普通SQL查询不同,查询字段和排序可用聚合函数count,sum,抛开这,用lambda,同样的查询可表示为

这里用匿名对象来作选择器

设定 query=new LambdaQuery<Product>() 以下方法可能不实际存在,只作演示

1
2
3
4
5
query.Select(b=>new{b.BarCode,b.ProductName})
.GroupBy(b=>new{b.BarCode,b.ProductName})
.OrderBy(b=>b.BarCode,true);

query.Select(b=>new{b.BarCode,b.ProductName}) 能表示 select BarCode,ProductName

但匿名对象可没Count(b.ProductName)这样的语法,所以没法生成select count(BarCode)这样的语法

没有直接的方法,但是有间接的方法,扩展方法 扩展方法真是个好东西,解决了很多问题
定义一个扩展方法,名称定义为大写,为避免冲突
1
2
3
4
public static int COUNT(this object origin)
        {
            return 0;
        }

所有object对象将会有COUNT()方法

将上面语法进行改进为

1
query.Select(b=>new{b.BarCode,b.ProductName,total=b.BarCode.COUNT()})

以这样形式进行表示,这样在语法上是编译通过的,并且lambda支持这样的解析

完整表示改为

1
2
3
4
5
query.Select(b=>new{b.BarCode,b.ProductName,total=b.BarCode.COUNT()})
.GroupBy(b=>new{b.BarCode,b.ProductName})
.OrderBy(b=>b.BarCode.COUNT(),true);

这样,完整的group表示语法就完成了,致少在逻辑上,是能实现SQL的group语法了,剩下就需要进行解析了

定义参数

1
2
3
public List<string> queryFields = new List<string>();
        public List<string> queryOrderBy = new List<string>();
        public List<string> groupFields = new List<string>();

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//此方法解析方法调用表达式的属性名和方法名
string GetPropertyMethod(Expression item,out string methodName)
        { //转换为方法表达式
            var method = item as MethodCallExpression;
            MemberExpression memberExpression;            //获取访问属性表达式
            if (method.Arguments[0] is UnaryExpression)
            {
                memberExpression = (method.Arguments[0] as UnaryExpression).Operand as MemberExpression;
            }
            else
            {
                memberExpression = method.Arguments[0] as MemberExpression;
            }
            methodName = method.Method.Name;//调用的方法名
            return memberExpression.Member.Name;//返回访问的属性名
        }

解析Select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public LambdaQuery<T> Select<TResult>(Expression<Func<T, TResult>> resultSelector)
        {
            string queryFullName = "";
            var newExpression = resultSelector.Body as NewExpression;//转换为匿名对象表达式
            int i = 0;
            foreach (var item in newExpression.Arguments)//遍历所有参数
            {
                var memberName = newExpression.Members[i].Name;//获取构造的属性名
                if (item is MethodCallExpression)//如果是方法
                {
                    string methodName;
                    string propertyName = GetPropertyMethod(item, out methodName);//获取方法名和属性名
                    queryFullName = string.Format("{0}({1}) as {2}", methodName, propertyName, memberName);
                }
                else//直接属性
                {
                    var memberExpression = item as MemberExpression;//转换为属性访问表达式
                    queryFullName = memberExpression.Member.Name;//返回属性名
                }
                queryFields.Add(queryFullName);
                i += 1;
            }
            
            return this;
        }

 

解析OrderBy,过程和上面差不多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public LambdaQuery<T> OrderBy<TKey>(Expression<Func<T, TKey>> expression, bool desc = true)
        {
            string orderBy="";
            string name;
            if (expression.Body is MethodCallExpression)//如果是方法
            {
                string methodName;
                string propertyName = GetPropertyMethod(expression.Body, out methodName);
                name = string.Format("{1}({0})", propertyName, methodName);
                orderBy = string.Format(" {0} {1}", name, desc ? "desc" "asc");
            }
            else
            {
                MemberExpression mExp = (MemberExpression)expression.Body;
                if (!string.IsNullOrEmpty(orderBy))
                {
                    orderBy += ",";
                }
                name = mExp.Member.Name;
                orderBy = string.Format(" {0} {1}", name, desc ? "desc" "asc");
            }
            queryOrderBy.Add(orderBy);
            return this;
        }

解析GroupBy

1
2
3
4
5
6
7
8
9
public LambdaQuery<T> GroupBy<TResult>(Expression<Func<T, TResult>> resultSelector)
        {
            foreach (var item in (resultSelector.Body as NewExpression).Arguments)
            {
                var memberExpression = item as MemberExpression;//转换为属性访问表达式
                groupFields.Add(memberExpression.Member.Name);
            }
            return this;
        }

输出

1
2
3
4
string fileds=string.Join(",",query.queryFields);
            string groupFields = string.Join(",", query.groupFields);
            string queryOrderBy = string.Join(",", query.queryOrderBy);
            Console.Write(string.Format("select {0} from Product group by {1} order by {2}", fileds, groupFields, queryOrderBy));

结果截图

上面只实现了匿名对象简单的解析,ORM查询复杂的的是二元运算解析,如:

query.Where(b=>b.Id>10&&b.Name=”hubro”);

这样就需要解析表达式树了,情况比较复杂,回头整理一下

实现表达式树解析后就能实现having语法了,linq实现的group用lambda也能完整实现了

 

此示例只是实现lambda到SQL语句之间的转换,实际应用需要考虑参数化,结果集映射,路还很长,有兴趣的欢迎关注CRL框架

TO:管理员 现在达到放在首页的要求了吧???

例子下载地址:http://files.cnblogs.com/files/hubro/LambdaQueryTest.rar

缓存服务新思路,创建动态查询的缓存 - hubro - 博客园

mikel阅读(695)

来源: 缓存服务新思路,创建动态查询的缓存 – hubro – 博客园

本方案实现了客户端对服务端缓存数据任意查找,和使用本地缓存效果一样

先看看常用缓存的形式

本地缓存

缓存在当前应用程序内存中,通常以静态变量存储,它可以是任何对象,一个值,一个集合都行

因为是在当前程序中,能很好得到控制,创建,访问都很好办,特别是集合,通过集合查询语法或自已写的算法很好过滤,取出想要的结果

然而这些数据需要多程序共用,那么需要把它集中放在一个地方,供多应用程序调用

分布式缓存

分布式缓存就是为了解决二级缓存不能解决的问题,把数据放在独立的服务器上,提供访问接口,供不同客户端程序调用

一般分为两部份,服务端,客户端接口,通过统一的客户端接口,从服务端获取数据

因为达成了统一访问,没有业务关联,因此,缓存获取方式为KEY值的形式,通过KEY值读写数据,很像一个字典,KEY为唯一,值为object

在获取到这个object后,转换为指定的本地业务对象

然而带来的问题,所有值都为object,在获取到后才能转换为本地业务对象,如果存入的是一个对象集合,要获取其中某个条件的一项

那得每次把整个集合object获取到本地再换为集合再进行查找,这样意味着多了不必要的数据传输,因此一般不会这么做

问题来了,从分布式缓存服务器中,没法按业务要求对这样的数据进行查询,通常这样的需要,替代做法是搜索引擎形式,随之带来的一系列问题…

或还有更简单的方法,对指定业务数据查询写个查询接口,这样也能满足需求,但是每种查询需求都需要写个接口,很是麻烦

能不能直接把查询条件传给缓存服务端,让服务端用条件过滤数据?

理论是可以的,只需要服务端能识别解析就能,要达成这样的条件,服务端必须能识别业务对象

这样意味着,服务端的缓存对象是由业务创建的,需要对服务端进行开发

要实现这样的系统,关键点是在查询条件传输,在本地对集合查询,一般使用Linq扩展方法,使用lambda生成筛选委托

如果能把lambda表达式传过去,服务端再解析成筛选委托就能实现目的了

实现命令解析

由于lambda表达式很复杂,不能直接序列化,是没法直接传输的,需要解析成能传输的对象,那么,要做的过程表示为

客户端:lambda查询=>解析lambda=>转换为命令对象=>序列化发送命令

服务端:接收命令反序列化=>转换为lambda=>创建查询

lambda解析参考 解析lambda实现完整查询

lambda动态创建参考 http://www.cnblogs.com/libbybyron/articles/4134494.html

服务端按业务数据缓存

服务端使用同样的业务模型创建二级缓存,在接收到命令转换为lambda查询后,很轻松就能实现内存查询了

于是整体结构设计为

数据传输层

为了增加查询效率,传输层用长TCP连接,使用连接池的概念,启动后保持指定的连接数,有请求时找其中一个连接与服务器通讯,如果都满了,再创建新的连接

同时自动释放长时间不用的连接,通过这样处理,比HTTP接口,SERVICE/WCF连接效率高很多

分布式方案

因为缓存是服务器主动创建的,做成分布式,需要客户端知道服务端有哪些缓存,因此需要做一个缓存类型和服务器映射

查询缓存时,根据映射去指定的服务器查询

通过这样设计的缓存系统,大大增加了开发效率和使用便捷性,并与业务衔接紧密

整体涉及到的东西比较多也比较复杂,就不在这里写代码实现了,需要框架支持,这里只放上开发完成的测试DEMO

测试DEMO下载:点击下载

调用示例:http://119.10.29.11:8080/page/Cache2.aspx