Nodejs学习笔记(九)--- 与Redis的交互(mranney/node_redis)入门 - porschev - 博客园

mikel阅读(685)

来源: Nodejs学习笔记(九)— 与Redis的交互(mranney/node_redis)入门 – porschev – 博客园

简介和安装

  • redis简介:
  1. 开源高性能key-value存储;采用内存中(in-memory)数据集的方式,也可以采用磁盘存储方式(前者性能高,但数据可能丢失,后者正好相反)
  2. 支持字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)和 有序集合(sorted sets)等;支持对复杂数据结构的高速操作。
  3. 特性多,支持主从同步、pub/sub等
  4. 支持多种客户端(http://redis.io/clients

  注:应用场景没有提到,暂时没有太多实际体会,不瞎说,以免误导人,但是从它的简介和特性来说,起码缓存场景是不错的!

Redis下载地址: https://github.com/dmajkic/redis/downloads

node.js客户端:node_redis https://github.com/mranney/node_redis/

  • redis安装(Windows平台)

redis非常方便,直接下载解压就可以使用,因为开发环境是win7 64位,直接下载(示例下载的安装包:redis-2.4.5-win32-win64.zip)

  • redis运行

解压到后运行”64bit”文件夹下的redis-server.exe即可,但是这样运行会出现一个如下警告提示:

#Warning: no config file specified,using the default config. In order to specify a config file use ‘redis-server /path/to/redis.conf’

  提示也比较明显,没有明确的配置文件,使用的是默认配置,请使用‘redis-server /path/to/redis.conf’指定明确的配置文件

   根据提示运行redis成功(如下图)

  在redis-server.exe同级目录下可以看到一个redis.conf文件,这就是配置文件

  • node_redis安装
npm install redis
或者
npm install hiredis redis

我这里采用 npm install hiredis redis 安装

  注:两种都可用,区别在于性能,hiredis是非阻塞的,而且速度更快;如果安装了hiredis,node_redis则会默认以它为解析器,没安装就会用纯JavaScript解释器,对于学习或者开发环境,用哪个都无所谓

redis.createClient()连接到redis服务器

环境都准备好了,就开始写一代简单的代码测试用nodejs连接一下服务器

var redis = require('redis'),
    client = redis.createClient();

client.on('ready',function(err){
    console.log('ready');
});

从上图中可以看到运行结果,输出ready,表示成功!

对代码还是讲一下:

redis.createClient():返回的是一个RedisClient的对象,大家可以输出来看一下此对象的具体信息。

ready:Redis的Connection事件之一,当与redis服务器连接成功后会触发这个事件,此时表示已经准备好接收命令,当这个事件触发之前client命令会存在队列中,当一切准备就绪后按顺序调用

 

对于上面的几句代码就能连接成功redis服务器,原因是当前redis服务器在本地,如果不在本地,怎么连接呢?

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_OPTS = {},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.on('ready',function(res){
    console.log('ready');    
});
复制代码

也是成功!这种方式和上一种在redis.createClient()时分别传入了端口号、服务器IP和设置项

这样就可以用于连接远程的redis服务器,或者利用第三个参数进行一些配置!

redis的默认端口:6379

认证 client.auth(password, callback)

上面试过了,连接到redis服务器,可以看出我们并没有输入密码进行验证的过程就成功连接到了服务器,因为redis服务器默认不需要密码,不过这不太安全,我们肯定要设置一下密码

打开redis.conf文件,找到requirepass,取消注释,设置密码为:porschev

requirepass porschev

然后重启redis服务器;再次利用上面的代码连接到redis服务器,出现错误提示(如下图):ERR operation not permitted

那么如何连接到有密码的redis服务器呢?

简单的试了一下,有两种方法(可能有更多,没试,其实一种完全就够了,多了也没用^_^!)

方式一:通过设置redis.createClient()的第三个参数,也就是设置项来完成

 

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_PWD = 'porschev',
    RDS_OPTS = {auth_pass:RDS_PWD},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.on('ready',function(res){
    console.log('ready');    
});
复制代码

上图可以连接成功,通过设置连接设置项中的auth_pass来通过认证!

auth_pass:默认值为null,默认情况下客户端将不通过auth命令连接,如果设置了此项,客户端将调用auth命令连接

方式二:通过client.auth(password, callback)

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_PWD = 'porschev',
    RDS_OPTS = {},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.auth(RDS_PWD,function(){
    console.log('通过认证');
});

client.on('ready',function(res){
    console.log('ready');    
});
复制代码

此方法也可以成功,第一个参数为密码,第二个为回调函数!

单值set和get

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_PWD = 'porschev',    //密码    
    RDS_OPTS = {},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.auth(RDS_PWD,function(){
    console.log('通过认证');
});

client.on('connect',function(){
    client.set('author', 'Wilson',redis.print);
    client.get('author', redis.print);
    console.log('connect');
});

client.on('ready',function(err){
    console.log('ready');
});
复制代码

从输出结果可以看出,set一个值和获取这个值都成功!

代码讲一下:

client.set(key,value,[callback]):设置单个key和value,回调函数可选

client.get(key,[callback]):得到key得到value,回调函数可选(虽然可选,但不写回调函数获取又有什么意义呢^_^!)

connect:Redis的Connection事件之一,在不设置client.options.no_ready_check的情况下,客户端触发connect同时它会发出ready,如果设置了client.options.no_ready_check,当这个stream被连接时会触发connect,

这时候就可以自由尝试发命令

  redis.print:简便的回调函数,测试时显示返回值(从示例的输出结果中可以看出)

 

  其它补充说明:

client.options.no_ready_check:默认值为false,当连接到一台redis服务器时,服务器也许正在从磁盘中加载数据库,当正在加载阶段,redis服务器不会响应任何命令,node_redis会发送一个“准备确认”的INFO命令,

INFO命令得到响应表示此时服务器可以提供服务,这时node_redis会触发”ready”事件,如果该设置项设置为true,则不会有这种检查

  client.set([key,value],callback):与client.set(key,value,[callback]);效果一致(可以自行对上面示例源码进行修改进行测试),必须要有回调函数

多值get和set

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_PWD = 'porschev',    //密码    
    RDS_OPTS = {},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.auth(RDS_PWD,function(){
    console.log('通过认证');
});

client.on('connect',function(){
    client.hmset('short', {'js':'javascript','C#':'C Sharp'}, redis.print);
    client.hmset('short', 'SQL','Structured Query Language','HTML','HyperText Mark-up Language', redis.print);

    client.hgetall("short", function(err,res){
        if(err)
        {
            console.log('Error:'+ err);
            return;
        }            
        console.dir(res);
    });
});

client.on('ready',function(err){
    console.log('ready');
});
复制代码

从输出结果可以看出,set多值和获取都成功!

代码讲一下:

client.hmset(hash, obj, [callback]):赋值操作,第一个参数是hash名称;第二个参数是object对象,其中key1:value1。。,keyn:valuen形式;第三个参数是可选回调函数

client.hmset(hash, key1, val1, … keyn, valn, [callback]):与上面做用一致,第2个参数到可选回调函数之前的参数都是key1, val1, … keyn, valn形式;

client.hgetall(hash, [callback]):获取值操作,返回一个对象

 

  其它补充说明:

  console.dir()用于显示一个对象所有的属性和方法

打包执行多个命令[事务]

复制代码
var redis = require('redis'),
    RDS_PORT = 6379,        //端口号
    RDS_HOST = '127.0.1.1',    //服务器IP
    RDS_PWD = 'porschev',    //密码    
    RDS_OPTS = {},            //设置项
    client = redis.createClient(RDS_PORT,RDS_HOST,RDS_OPTS);

client.auth(RDS_PWD,function(){
    console.log('通过认证');
});

client.on('end',function(err){
    console.log('end');
});

client.on('connect',function(){
    var key = 'skills';
      client.sadd(key, 'C#','java',redis.print);
      client.sadd(key, 'nodejs');
      client.sadd(key, "MySQL");
      
      client.multi()      
      .sismember(key,'C#')
      .smembers(key)
      .exec(function (err, replies) {
            console.log("MULTI got " + replies.length + " replies");
            replies.forEach(function (reply, index) {
                console.log("Reply " + index + ": " + reply.toString());
            });
            client.quit();
    });
});
复制代码

官方有个示例,我修改一下,可能更好理解一些,下面一步步说吧!

先了解一下API再看结果

client.multi([commands]):这个标记一个事务的开始,由Multi.exec原子性的执行;github上描述是可以理解为打包,把要执行的命令存放在队列中,redis服务器会原子性的执行所有命令,node_redis接口返回一个Multi对象

Multi.exec( callback ):执行事务内所有命令;github上描述是client.multi()返回一个Multi对象,它包含了所有命令,直到Multi.exec()被调用;

Multi.exec( callback )回调函数参数err:返回null或者Array,出错则返回对应命令序列链中发生错误的错误信息,这个数组中最后一个元素是源自exec本身的一个EXECABORT类型的错误

  Multi.exec( callback )回调函数参数results:返回null或者Array,返回命令链中每个命令的返回信息

  end:redis已建立的连接被关闭时触发

client.sadd(key,value1,…valuen,[callback]):集合操作,向集合key中添加N个元素,已存在元素的将忽略;redis2.4版本前只能添加一个值

sismember(key,value,[callback]):元素value是否存在于集合key中,存在返回1,不存在返回0

smembers(key,[callback]):返回集合 key 中的所有成员,不存在的集合key也不会报错,而是当作空集返回

client.quit():与之对应的还有一个client.end()方法,相对比较暴力;client.quit方法会接收到所有响应后发送quit命令,而client.end则是直接关闭;都是触发end事件

 

再看结果应该就比较简单了,client.multi打包了sismember和smembers两个命令,执行exec方法后,回调函数得到两个回应,分别输出两个回应的结果!

其它…

redis.Debug_mode:这个在开发中可能有用,大家自行设置试一下,设置为true后,看输出

Publish / Subscribe:这个官方示例比较简单清晰,大家运行起来看一下就能理解,深入的网上还有很多用它实现的聊天、监控示例,大家看一下,如果以后觉得有必要就再做个示例分享一下

client.monitor:监控,可能以后会用到,有需要的深入研究一下,入门可以略过

 

其它redis命令还有不少,能讲到的非常有限,深入练习可以对照:http://redis.readthedocs.org/en/latest/index.html

 

参数资料:

https://github.com/mranney/node_redis/

http://redis.readthedocs.org/en/latest/index.html

http://dumbee.net/archives/114

作   者:   Porschev[钟慰]
出   处:   http://www.cnblogs.com/zhongweiv/
微   博:     http://weibo.com/porschev
欢迎任何形式的转载,但请务必注明原文详细链接

【由浅至深】redis 实现发布订阅的几种方式 - nicye - 博客园

mikel阅读(684)

来源: 【由浅至深】redis 实现发布订阅的几种方式 – nicye – 博客园

非常感谢依乐祝发表文章《.NET Core开发者的福音之玩转Redis的又一傻瓜式神器推荐》,对csredis作了一次完整的诠释。

1|0前言

提到消息队列,最熟悉无疑是 rabbitmq,它基本是业界标准的解决方案。本文详细介绍 redis 多种实现轻订阅方法,作者认为非常有趣并加以总结,希望对有需要的朋友学习 redis 功能有一定的带入作用。

2|0方法一:SUBSCRIBE + PUBLISH

//程序1:使用代码实现订阅端 var sub = RedisHelper.Subscribe((“chan1”, msg => Console.WriteLine(msg.Body))); //sub.Disponse(); //停止订阅 //程序2:使用代码实现发布端 RedisHelper.Publish(“chan1”, “111”);

优势:支持多端订阅、简单、性能高;
缺点:数据会丢失;

参考资料:http://doc.redisfans.com/pub_sub/subscribe.html

3|0方法二:BLPOP + LPUSH(争抢)

//程序1:使用代码实现订阅端 while (running) { try { var msg = RedisHelper.BLPop(5, “list1”); if (string.IsNullOrEmpty(msg) == false) { Console.WriteLine(msg); } } catch (Exception ex) { Console.WriteLine(ex.Message); } } //程序2:使用代码实现发布端 RedisHelper.LPush(“list1”, “111”);

优势:数据不会丢失、简单、性能高;
缺点:不支持多端(存在资源争抢);

总结:为了解决方法一的痛点,我们实现了本方法,并且很漂亮的制造了一个新问题(不支持多端订阅)。

3|1学习使用 BLPOP

BLPOP key [key …] timeout

BLPOP 是列表的阻塞式(blocking)弹出原语。

它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。

非阻塞行为

当 BLPOP 被调用时,如果给定 key 内至少有一个非空列表,那么弹出遇到的第一个非空列表的头元素,并和被弹出元素所属的列表的名字一起,组成结果返回给调用者。

当存在多个给定 key 时, BLPOP 按给定 key 参数排列的先后顺序,依次检查各个列表。

假设现在有 job 、 command 和 request 三个列表,其中 job 不存在, command 和 request 都持有非空列表。考虑以下命令:

BLPOP job command request 0

BLPOP 保证返回的元素来自 command ,因为它是按”查找 job -> 查找 command -> 查找 request “这样的顺序,第一个找到的非空列表。

redis> DEL job command request # 确保key都被删除 (integer) 0 redis> LPUSH command “update system…” # 为command列表增加一个值 (integer) 1 redis> LPUSH request “visit page” # 为request列表增加一个值 (integer) 1 redis> BLPOP job command request 0 # job 列表为空,被跳过,紧接着 command 列表的第一个元素被弹出。 1) “command” # 弹出元素所属的列表 2) “update system…” # 弹出元素所属的值

阻塞行为

如果所有给定 key 都不存在或包含空列表,那么 BLPOP 命令将阻塞连接,直到等待超时,或有另一个客户端对给定 key 的任意一个执行 LPUSH 或 RPUSH 命令为止。

超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长(block indefinitely) 。

redis> EXISTS job # 确保两个 key 都不存在 (integer) 0 redis> EXISTS command (integer) 0 redis> BLPOP job command 300 # 因为key一开始不存在,所以操作会被阻塞,直到另一客户端对 job 或者 command 列表进行 PUSH 操作。 1) “job” # 这里被 push 的是 job 2) “do my home work” # 被弹出的值 (26.26s) # 等待的秒数 redis> BLPOP job command 5 # 等待超时的情况 (nil) (5.66s) # 等待的秒数

更多学习资料:http://doc.redisfans.com/list/blpop.html

4|0方法三:BLPOP + LPUSH(非争抢)

本方法根据方法二演变而来,设计图如下:

如何实现三端订阅,都可收到消息,三端分别为 sub3, sub4, sub5:

1、sub3, sub4, sub5 使用【方法二】订阅 listkey:list1_sub3,list1_sub4,list1_sub5;

2、总订阅端订阅 listkey:list1,总订阅端收到消息后,执行 lpush list1_sub1 msg, lpush list1_sub2 msg, lpush list1_sub3 msg;

总订阅端订阅原始消息,随后将消息分发给其他订阅端,从而解决【方法二】不支持多端同时订阅的缺点。

最终实现的逻辑为:多端先争抢 list1 消息,抢到者再向其他端转发消息。

5|0测试代码

nuget Install-Package CSRedisCore

var rds = new CSRedis.CSRedisClient(“127.0.0.1:6379,password=,poolsize=50,ssl=false,writeBuffer=10240”); //sub1, sub2 争抢订阅(只可一端收到消息) var sub1 = rds.SubscribeList(“list1”, msg => Console.WriteLine($”sub1 -> list1 : {msg})); var sub2 = rds.SubscribeList(“list1”, msg => Console.WriteLine($”sub2 -> list1 : {msg})); //sub3, sub4, sub5 非争抢订阅(多端都可收到消息) var sub3 = rds.SubscribeListBroadcast(“list2”, “sub3”, msg => Console.WriteLine($”sub3 -> list2 : {msg})); var sub4 = rds.SubscribeListBroadcast(“list2”, “sub4”, msg => Console.WriteLine($”sub4 -> list2 : {msg})); var sub5 = rds.SubscribeListBroadcast(“list2”, “sub5”, msg => Console.WriteLine($”sub5 -> list2 : {msg})); //sub6 是redis自带的普通订阅 var sub6 = rds.Subscribe((“chan1”, msg => Console.WriteLine(msg.Body))); Console.ReadKey(); sub1.Dispose(); sub2.Dispose(); sub3.Dispose(); sub4.Dispose(); sub5.Dispose(); sub6.Dispose(); rds.Dispose(); return;

测试功能时,发布端可以使用 redis-cli 工具。

6|0结语

redis 功能何其多且相当好玩有趣 ,大家应尽可能多带着兴趣爱好去学习它。

若文中有不好的地方,请提出批评与改正方法,谢谢观赏。

本文使用到 CSRedisCore 的开源地址:https://github.com/2881099/csredis

快速实现Docker到Redis的连接_sannerlittle的博客-CSDN博客_docker redis 连接

mikel阅读(607)

来源: 快速实现Docker到Redis的连接_sannerlittle的博客-CSDN博客_docker redis 连接

OS:Xenial

$ docker pull redis 

运行上面的命令下载镜像,Docker daemon会自动输出该Redis镜像的来源信息、下载状态,下载完成之后系统也会显示最终状态信息。

镜像拉取完成之后,大家可以用下面的命令启动Redis容器,记得要带上“-d”参数:

$ docker run --name myredis-itsmine -d redis 

“-d”参数的作用是让Redis在后台运行,因为本例中采用这种后台运行的方式较为合适,所以这里我们写上了这个参数。如果不带 “-d”参数的话处理起来就要麻烦一些,这种情况下我们需要先停止终端的运行或者退出容器,然后才能通过宿主机来访问Redis。

下面我们要进行最重要的一步操作,连接Redis。由于我们并没有实际的需要连接到Redis的应用,所以这里我们用了redis-cli工具。大家可以在宿主机上安装redis-cli,不过我建议大家新建一个容器,将redis-cli运行在里面,然后用下面的命令把这两个容器连接起来,这样我们就可以看到详细的应用信息:

$docker run --rm -it --link myredis-itsmine:redis redis /bin/bash 

运行该命令之后我们就可以在bash命令行下面看到容器的提示信息了:

root@f75bacab2715:/data#
$ docker run --rm -it --link myredis:redis redis /bin/bash
$ root@af47015c4a76:/data# redis-cli -h redis -p 6379
$ redis:6379> ping
$ PONG
$ redis:6379> set "Abc" 123
$ OK
$ redis:6379> get "Abc"
$ "123"
$ redis:6379> exit
root@af47015c4a76:/data# exit
$ exit

在上面的命令中,docker run命令后面跟的“–link myredis-itsmine:redis” 参数用于创建连接,Docker收到该指令后,就会尝试将我们新建的容器连接到当前的“myredis-itsmine” 容器,同时会将新容器中的redis-cli命名为“redis”。Docker会在容器中的/etc/hosts路径下为“redis”创建一个入口,并指向“myredis-itsmine”容器的IP地址。这样我们就可以在redis-cli中直接使用“redis”作为主机名,这种做法是很方便的,我们不必再去找其他办法来“发现”Redis的IP地址,然后加以引用了。

接下来我们就可以通过set和put命令来执行Redis的存取操作了,这里我们可以用一些示例数据来做个试验。当然,在开始存取操作之前,大家还要再运行一下Redis的ping命令,验证一下Redis服务器是否已经连接上了。

Recommend: http://www.hawu.me/operation/802

DOCKER简明教程 : 通过容器连接REDIS数据库

mikel阅读(570)

本文来源:Ghostcloud翻译(http://www.ghostcloud.cn/

序言
本文重点讲解了如何通过Redis的官方镜像和Docker容器来搭建redis-cli,并将其连接到Redis镜像。首先要跟大家简单介绍一下Redis,这是一个键值存储系统,除了对场景进行缓存之外,Redis还提供了很多强大的功能,因此也目前是非常受欢迎的一个数据库。

Docker镜像仓库简介
大家可以在Docker Hub里搜索到目前所有的主流应用和服务的镜像,像Python语言、MySQL数据库等等镜像在Docker Hub里面都有。而且Docker Hub里面的镜像数量非常多,不管我们搜什么关键词,都能搜出来一大堆结果。之所以会这样,是因为Docker官方镜像仓库的宗旨就是将已知来源、满足一定质量标准的Docker镜像组织在一起提供给用户。一般情况下,我建议大家都尽量使用Docker Hub提供的官方镜像,大家可以在查询结果列表中看到当前分类下有哪些官方镜像,一般官方镜像都会在列表中置顶显示,而且会有标有“官方”字样。

从官方镜像仓库pull镜像的时候,用户名的部分可以为空,也可以设置为library,比如说拉取 Casandra镜像的时候就可以设置成从Apache Cassandra项目获取。大家也可以在自己的终端上运行下面的命令,在Docker Hub中查找Cassandra镜像:

$docker search Cassandra

通过这种方式对Docker Hub进行查询的时候,系统会返回一条消息,提示用户“你所拉取的镜像已通过验证”,看到这条信息就表示镜像的校验码通过了Docker daemon的验证,来源是可靠的。

快速实现Docker到Redis的连接
闲话少说,我们下面就进入实战教程。首先运行下面命令,从Docker Hub拉取Redis镜像:

$ docker pull redis

运行上面的命令下载镜像,Docker daemon会自动输出该Redis镜像的来源信息、下载状态,下载完成之后系统也会显示最终状态信息。

镜像拉取完成之后,大家可以用下面的命令启动Redis容器,记得要带上“-d”参数:

$ docker run –name myredis-itsmine -d redis

“-d”参数的作用是让Redis在后台运行,因为本例中采用这种后台运行的方式较为合适,所以这里我们写上了这个参数。如果不带 “-d”参数的话处理起来就要麻烦一些,这种情况下我们需要先停止终端的运行或者退出容器,然后才能通过宿主机来访问Redis。

下面我们要进行最重要的一步操作,连接Redis。由于我们并没有实际的需要连接到Redis的应用,所以这里我们用了redis-cli工具。大家可以在宿主机上安装redis-cli,不过我建议大家新建一个容器,将redis-cli运行在里面,然后用下面的命令把这两个容器连接起来,这样我们就可以看到详细的应用信息:

$docker run –rm -it –link myredis-itsmine:redis redis /bin/bash

运行该命令之后我们就可以在bash命令行下面看到容器的提示信息了:

root@f75bacab2715:/data#
$ docker run –rm -it –link myredis:redis redis /bin/bash
$ root@af47015c4a76:/data# redis-cli -h redis -p 6379
$ redis:6379> ping
$ PONG
$ redis:6379> set “Abc” 123
$ OK
$ redis:6379> get “Abc”
$ “123”
$ redis:6379> exit
root@af47015c4a76:/data# exit
$ exit

在上面的命令中,docker run命令后面跟的“–link myredis-itsmine:redis” 参数用于创建连接,Docker收到该指令后,就会尝试将我们新建的容器连接到当前的“myredis-itsmine” 容器,同时会将新容器中的redis-cli命名为“redis”。Docker会在容器中的/etc/hosts路径下为“redis”创建一个入口,并指向“myredis-itsmine”容器的IP地址。这样我们就可以在redis-cli中直接使用“redis”作为主机名,这种做法是很方便的,我们不必再去找其他办法来“发现”Redis的IP地址,然后加以引用了。

接下来我们就可以通过set和put命令来执行Redis的存取操作了,这里我们可以用一些示例数据来做个试验。当然,在开始存取操作之前,大家还要再运行一下Redis的ping命令,验证一下Redis服务器是否已经连接上了。

本文讲述了如何通过容器来实现Redis数据库的连接,看到这里,大家是否已经对容器的网络连接有个初步的概念了?新版的Docker在网络功能方面也做出了一定的改进,相信在不久的将来,所有用户都可以很方便地通过Docker容器实现应用和服务的互连。

Redis安装完后redis-cli无法使用(redis-cli: command not found)_坤哥的博客-CSDN博客

mikel阅读(620)

来源: Redis安装完后redis-cli无法使用(redis-cli: command not found)_坤哥的博客-CSDN博客

之前安装redis后客户端无法使用,即redis-cli执行后报找不到的错误。这主要是安装redis的时候没有把客户端装上,在StackOverFlow上找到了一种只安装redis cli的方法,这里跟大家分享下。

wget http://download.redis.io/redis-stable.tar.gz(下载redis-cli的压缩包)
tar xvzf redis-stable.tar.gz(解压)
cd redis-stable(进入redis-stable目录)
make(安装)
sudo cp src/redis-cli /usr/local/bin/(将redis-cli拷贝到bin下,让redis-cli指令可以在任意目录下直接使用)

按照上面的指令执行之后redis-cli就可以正常执行了,注意上面的几条指令必须都执行,make是单独的一条。
————————————————
版权声明:本文为CSDN博主「月未明」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_35981283/article/details/71631540

.NETcore使用CSRedisCore操作Redis - 心冰之海 - 博客园

mikel阅读(1292)

来源: .NETcore使用CSRedisCore操作Redis – 心冰之海 – 博客园

因为Servicestack.Redies免费每天只能调用6000次,所以找了个免费的能在.NETcore使用的插件CSRedisCore,封装了一下。

redies订阅模式的调用:RedisCoreHelper.Sub(“quote”, action);

1
2
3
4
5
6
7
8
9
10
11
12
13
public void action(string message)
{
    if (!message.IsNullOrEmpty() && !"null".Equals(message))
    {
        
//dosomething
               
    }
    else
    {
        //Thread.Sleep(200);
    }
}       

 

封装如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
internal class RedisConfigManager
{
    /// <summary>
    /// 获取配置
    /// </summary>
    /// <returns></returns>
    public static RedisConfig GetConfig()
    {
        var path = WorkPath.CurrentDirectory + "\\redis.config.json";
        Log.Info("path:"+ path);
        var json = FileManager.GetTextFromPath(path);
        if (json.IsNullOrEmpty())
            return new RedisConfig();
        var config = JsonConvert.Deserialize<RedisConfig>(json);
        return config;
    }
}

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
namespace LT.Cache
{
    public class RedisCoreHelper
    {
        static CSRedisClient redisManger = null;
        static CSRedisClient GetClient()
        {
            return redisManger;
        }
        static RedisCoreHelper()
        {
            var redisconfig = RedisConfigManager.GetConfig();
            redisManger = new CSRedisClient(redisconfig.CoreRedisServer);      //Redis的连接字符串
        }
        /// <summary>
        /// TradeManageMessage 和 TradeManageMessage:MQ队列
        /// </summary>
        /// <returns></returns>
        public static bool EnQeenTradeManageMessage(string value)
        {
            try
            {
                Log.Info("yinzhou--EnQeenTradeManageMessage:" + value);
                //从头部插入
                GetClient().LPush("TradeManageMessage", value);
                GetClient().LPush("TradeManageMessage:MQ", value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQeenTradeManageMessage:key=TradeManageMessage:MQ,value={value}", e);
                return false;
            }
        }
        /// <summary>
        /// TradeManageMessage 和 TradeManageMessage:MQ队列
        /// </summary>
        /// <returns></returns>
        public static bool EnQeenTradeManageMessage<T>(T value)
        {
            try
            {
                //从头部插入
                GetClient().LPush("TradeManageMessage", value);
                GetClient().LPush("TradeManageMessage:MQ", value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQeenTradeManageMessage:key=TradeManageMessage:MQ,value={value}", e);
                return false;
            }
        }
        public static bool EnQueen(string key, string value)
        {
            try
            {
                //从头部插入
                GetClient().LPush(key, value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQueen:key={key},value={value}", e);
                return false;
            }
        }
        public static string DeQueen(string key)
        {
            string result = "";
            try
            {
                //从尾部取值
                result = GetClient().RPop(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
    //redis订阅模式
        public  static void Sub(string key,Action<string> action)
        {
            GetClient().Subscribe((key, msg => action(msg.Body)));
        }
        public static string[] DeQueenAll(string key)
        {
            string[] result = { };
            try
            {
                long len = GetClient().LLen(key);
            
                //取出指定数量数据
                result = GetClient().LRange(key,0, len-1);
                //删除指定数据
                bool res=GetClient().LTrim(key, len, -1);
  
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
        public static bool EnQueen<T>(string key, T value)
        {
            try
            {
                //从头部插入
                long len= GetClient().LPush(key, value);
                if(len>0)
                    return true;
                else
                    return false;
            }
            catch (Exception e)
            {
                Log.Error($"EnQueenObj:key={key},value={value}", e);
                return false;
            }
        }
        public static T DeQueen<T>(string key)
        {
            T result=default(T);
            try
            {
                //从尾部取值
                result = GetClient().RPop<T>(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
        /// <summary>
        /// 设置hash值
        /// </summary>
        /// <param name="key"></param>
        /// <param name="field"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public static bool SetHash(string key, string field,string value)
        {
            try
            {
                GetClient().HSet(key, field, value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"SetHash:key={key},value={value}", e);
                return false;
            }
        }
        /// <summary>
        /// 根据表名,键名,获取hash值
        /// </summary>
        /// <param name="key">表名</param>
        /// <param name="field">键名</param>
        /// <returns></returns>
        public static string GetHash(string key,string field)
        {
            string result = "";
            try
            {
          
                result = GetClient().HGet(key,field);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return result;
            }
        }
        /// <summary>
        /// 获取指定key中所有字段
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static Dictionary<string,string> GetHashAll(string key)
        {
            try
            {
                var result = GetClient().HGetAll(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return new Dictionary<stringstring>();
            }
        }
        /// <summary>
        /// 根据表名,键名,删除hash值
        /// </summary>
        /// <param name="key">表名</param>
        /// <param name="field">键名</param>
        /// <returns></returns>
        public static long DeleteHash(string key, string field)
        {
            long result = 0;
            try
            {
                result = GetClient().HDel(key, field);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return result;
            }
        }
        //private object deleteCache( Method method, Object[] args)
        //{
        //    object flag = false;
        //    String fieldkey = parseKey(method, args);
        //    try
        //    {
        //        if (fieldkey.equals(""))
        //        {
        //            cacheClient.del(cache.key());
        //        }
        //        else
        //        {
        //            cacheClient.hdel(cache.key(), fieldkey);
        //        }
        //        flag = true;
        //    }
        //    catch (Exception e)
        //    {
        //        //System.out.println(e.getMessage());
        //        flag = false;
        //    }
        //    return flag;
        //}
        /**
         * 获取值field
         * @param key
         * @param method
         * @param args
         * @return
         * @throws Exception
         */
        //        public string parseKey(Method method, Object[] args)
        //        {
        //            string fieldKey = "";
        //            for (int i = 0; i < method.getParameters().length; i++)
        //            {
        //                Parameter p = method.getParameters()[i];
        //                FieldAnnotation f = p.getAnnotation(FieldAnnotation.class);
        //          if(f!=null) {
        //              fieldKey+=args[i].toString()+":";
        //          }else {
        //              FieldOnlyKeyAnnotation fo = p.getAnnotation(FieldOnlyKeyAnnotation.class);
        //              if(fo != null) {
        //                  fieldKey+=args[i].toString();
        //}
        //          }
        //      }
        //      return fieldKey;
        //    }
    }
}

使用 .NET CLI 构建项目脚手架 - SpringLeee - 博客园

mikel阅读(694)

来源: 使用 .NET CLI 构建项目脚手架 – SpringLeee – 博客园

前言

在微服务场景中,开发人员分配到不同的小组,系统会拆分为很多个微服务,有一点是,每个项目都需要单元测试,接口文档,WebAPI接口等,创建新项目这些都是重复的工作,而且还要保证各个项目结构的大体一致,这时就需要一个适用于企业内部的框架模板,类似于前端的脚手架,可以做到开箱即用,注重业务功能开发,提升工作效率。

简介

NET 命令行接口 (CLI) 工具是用于开发、生成、运行和发布 .NET 应用程序的跨平台工具链。

本次主要介绍的是 dotnet new 命令,可以通过这个命令创建我们的自定义模板,我们安装完.NET SDK后,本身自带了一些项目模板,可以通过 dotnet new --list 查看已经安装的模板。

接下来,我会介绍如何构建自定义的项目模板。

准备工作

首先,我们需要准备一个简单的项目模板,我们希望以后可以通过脚手架,自动为我们生成这些项目和文件,这里面可能包含了单元测试项目,WebAPI项目等。

你也可以在这里找到项目源代码,https://github.com/SpringLeee/Dy.Template

在本地创建 Dy.Template 文件夹,并在文件夹内创建 templates 文件夹(后边所有的模板文件都在这里), 这里我创建了一个解决方案,里面包含了3个项目,WebAPI,Test 和 Task,项目结构如下:

构建模板

在 templates 文件夹内,创建一个名为 “.template.config” 的文件夹(可以通过命令 mkdir .template.config 创建, 然后进入该文件夹,再创建一个名为 “template.json” 的新文件, 文件夹结构应如下所示:

然后修改配置文件如下:

{
  "$schema": "http://json.schemastore.org/template",
  "author": "SpringLee",
  "classifications": [ "Template" ],
  "name": "Dy.Template",
  "identity": "Dy.Template", 
  "shortName": "dy-template",
  "tags": {
    "language": "C#" 
  },
  "sourceName": "Template" 
}

上面是一些基本的描述信息,需要注意的是 “sourceName” 属性,它相当于一个变量,我们通过这个属性,可以创建 Dy.Order.WebAPI, Dy.User.WebAPI 这样的项目,后边我会进行详细介绍。

打包模板

基础工作已经准备完成,我们还需要把项目打包,发布到Nuget.org 或者是公司的内部 Nuget Server,这样其他人才可以下载和安装这个模板。

你可能很熟悉在.NET 中对单个项目进行打包,比如类库,可以在VS中直接对项目使用右键打包,也可以使用dotnet pack命令,不一样的是,我们需要打包的是整个项目结构,而不是单个项目。

我们在 Dy.Template 文件夹下,创建 template-pack.csproj 文件

修改内容如下:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <PackageType>Template</PackageType>
    <PackageVersion>1.0.0</PackageVersion>
    <PackageId>Dy.Template</PackageId>
    <Title>Dy.Template</Title>
    <Authors>SpringLee</Authors>
    <Description>Dy.Template</Description>
    <PackageTags>dotnet-new;templates;Dy.Template</PackageTags>

    <TargetFramework>netstandard2.0</TargetFramework>

    <IncludeContentInPack>true</IncludeContentInPack>
    <IncludeBuildOutput>false</IncludeBuildOutput>
    <ContentTargetFolders>content</ContentTargetFolders>
    <NoWarn>$(NoWarn);NU5128</NoWarn>
  </PropertyGroup>

  <ItemGroup>
    <Content Include="templates\**\*" Exclude="templates\**\bin\**;templates\**\obj\**" />
    <Compile Remove="**\*" />
  </ItemGroup>

</Project>

我们指定了程序包的基础信息,版本ID, 描述信息,包含了 templates 文件夹下的所有文件,然后排除了 bin\ obj\ 文件夹的dll文件。

然后,运行 dotnet pack 命令进行打包, 你可以在 /bin/nuget/ 文件夹找到 生成的 nupkg 文件

在win10的应用商店(Microsoft Store)安装 Nuget Package Explore

我们把生成的 nupkg 文件 丢到 Nuget Package Explore 里查看,结构如下,包含了我们的 .config 配置文件,各个项目,解决方案。

最后,你可以把程序包推送到 nuget 服务器。

安装并使用

在终端中运行 dotnet new --install Dy.Template 命令安装,安装成功后,应该可以看到下边的输出,里边包含了我们的自定义模板

运行 dotnet new Dy.Template --name=Order,–name 指定了变量值,它会自动帮我们生成 Order 项目,这很棒!

欢迎扫码关注我们的公众号 【全球技术精选】,专注国外优秀博客的翻译和开源项目分享。

[Redis知识体系] 一文全面总结Redis知识体系 - pdai - 博客园

mikel阅读(726)

来源: [Redis知识体系] 一文全面总结Redis知识体系 – pdai – 博客园

作者:@pdai
本文为作者原创,转载请注明出处:https://www.cnblogs.com/pengdai/p/14509958.html


 


♥Redis教程 – Redis知识体系详解♥

本系列主要对Redis知识体系进行详解。@pdai

知识体系

知识体系

相关文章

首先,我们通过学习Redis的概念基础,了解它适用的场景。

  • Redis入门 – Redis概念和基础
    • Redis是一种支持key-value等多种数据结构的存储系统。可用于缓存,事件发布或订阅,高速队列等场景。支持网络,提供字符串,哈希,列表,队列,集合结构直接存取,基于内存,可持久化。

其次,这些适用场景都是基于Redis支持的数据类型的,所以我们需要学习它支持的数据类型;同时在redis优化中还需要对底层数据结构了解,所以也需要了解一些底层数据结构的设计和实现。

再者,需要学习Redis支持的核心功能,包括持久化,消息,事务,高可用;高可用方面包括,主从,哨兵等;高可拓展方面,比如 分片机制等。

  • Redis进阶 – 持久化:RDB和AOF机制详解
    • 为了防止数据丢失以及服务重启时能够恢复数据,Redis支持数据的持久化,主要分为两种方式,分别是RDB和AOF; 当然实际场景下还会使用这两种的混合模式。
  • Redis进阶 – 消息传递:发布订阅模式详解
    • Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
  • Redis进阶 – 事件:Redis事件机制详解
    • Redis 采用事件驱动机制来处理大量的网络IO。它并没有使用 libevent 或者 libev 这样的成熟开源方案,而是自己实现一个非常简洁的事件驱动库 ae_event。
  • Redis进阶 – 事务:Redis事务详解
    • Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
  • Redis进阶 – 高可用:主从复制详解
    • 我们知道要避免单点故障,即保证高可用,便需要冗余(副本)方式提供集群服务。而Redis 提供了主从库模式,以保证数据副本的一致,主从库之间采用的是读写分离的方式。本文主要阐述Redis的主从复制。
  • Redis进阶 – 高可用:哨兵机制(Redis Sentinel)详解
    • 在上文主从复制的基础上,如果注节点出现故障该怎么办呢? 在 Redis 主从集群中,哨兵机制是实现主从库自动切换的关键机制,它有效地解决了主从复制模式下故障转移的问题。
  • Redis进阶 – 高可拓展:分片技术(Redis Cluster)详解
    • 前面两篇文章,主从复制和哨兵机制保障了高可用,就读写分离而言虽然slave节点来扩展主从的读并发能力,但是写能力和存储能力是无法进行扩展的,就只能是master节点能够承载的上限。如果面对海量数据那么必然需要构建master(主节点分片)之间的集群,同时必然需要吸收高可用(主从复制和哨兵机制)能力,即每个master分片节点还需要有slave节点,这是分布式系统中典型的纵向扩展(集群的分片技术)的体现;所以在Redis 3.0版本中对应的设计就是Redis Cluster。

最后,就是具体的实践以及实践中遇到的问题和解决方法了:在不同版本中有不同特性,所以还需要了解版本;以及性能优化,大厂实践等。

学习资料

除此之外,我还推荐你看下 极客时间 《Redis核心技术与实战》(作者:蒋德钧)的相关内容,它是我看到的为数不多的含有实战经验比较多的专栏,部分文章中图片也来源于这个系列。

更多文章请参考 [Java 全栈知识体系](https://pdai.tech)

基于redis分布式锁实现“秒杀”_ 刘劭的博客-CSDN博客

mikel阅读(533)

来源: 基于redis分布式锁实现“秒杀”_ 刘劭的博客-CSDN博客

最近在项目中遇到了类似“秒杀”的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。

业务场景
所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。

一些可能的实现
刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:
1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;
2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;
3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。

上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。

那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。

何为分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。
通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。
而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。

具体的实现
先来看看一些redis的基本命令:
SETNX key value
如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
设置key的过期时间。如果key已过期,将会被自动删除。
del KEY
删除key
由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考redis官网。

需要考虑的问题
1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。
2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。
3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。
4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。
5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。

talk is cheap,show me the code
在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。

几个注解定义:
cachelock是方法级的注解,用于注解会产生并发问题的方法:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default “”;//redis 锁key的前缀
long timeOut() default 2000;//轮询锁的时间
int expireTime() default 1000;//key在redis里存在的时间,1000S
}
1
2
3
4
5
6
7
8
lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
//不需要值
}
1
2
3
4
5
6
LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default “”;//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID

}
1
2
3
4
5
6
7
CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:

public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;

public CacheLockInterceptor(Object proxied) {
this.proxied = proxied;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

CacheLock cacheLock = method.getAnnotation(CacheLock.class);
//没有cacheLock注解,pass
if(null == cacheLock){
System.out.println(“no cacheLock annotation”);
return method.invoke(proxied, args);
}
//获得方法中参数的注解
Annotation[][] annotations = method.getParameterAnnotations();
//根据获取到的参数注解和参数列表获得加锁的参数
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
//新建一个锁
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
//加锁
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){//取锁失败
ERROR_COUNT += 1;
throw new CacheLockException(“get lock fail”);

}
try{
//加锁成功,执行方法
return method.invoke(proxied, args);
}finally{
lock.unlock();//释放锁
}

}
/**
*
* @param annotations
* @param args
* @return
* @throws CacheLockException
*/
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException(“方法参数为空,没有被锁定的对象”);
}

if(null == annotations || annotations.length == 0){
throw new CacheLockException(“没有被注解的参数”);
}
//不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
int index = -1;//标记参数的位置指针
for(int i = 0;i < annotations.length;i++){
for(int j = 0;j < annotations[i].length;j++){
if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
index = i;
try {
return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException(“注解对象中没有该属性” + ((LockedComplexObject)annotations[i][j]).field());
}
}

if(annotations[i][j] instanceof LockedObject){
index = i;
break;
}
}
//找到第一个后直接break,不支持多参数加锁
if(index != -1){
break;
}
}

if(index == -1){
throw new CacheLockException(“请指定被锁定参数”);
}

return args[index];
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
最关键的RedisLock类中的lock方法和unlock方法:

/**
* 加锁
* 使用方式为:
* lock();
* try{
* executeMethod();
* }finally{
* unlock();
* }
* @param timeout timeout的时间范围内轮询锁
* @param expire 设置锁超时时间
* @return 成功 or 失败
*/
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
try {
//在timeout的时间范围内不断轮询锁
while (System.nanoTime() – nanoTime < timeout) {
//锁不存在的话,设置锁并设置锁过期时间,即加锁
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
//锁的情况下锁过期后消失,不会造成永久阻塞
this.lock = true;
return this.lock;
}
System.out.println(“出现锁等待”);
//短暂休眠,避免可能的活锁
Thread.sleep(3, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException(“locking error”,e);
}
return false;
}

public void unlock() {
try {
if(this.lock){
redisClient.delKey(key);//直接删除
}
} catch (Throwable e) {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。
先定义一个接口,接口里定义了一个秒杀方法:

public interface SeckillInterface {
/**
*现在暂时只支持在接口方法上注解
*/
//cacheLock注解可能产生并发的方法
@CacheLock(lockedPrefix=”TEST_PREFIX”)
public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
}
1
2
3
4
5
6
7
8
上述SeckillInterface接口的实现类,即秒杀的具体实现:

public class SecKillImpl implements SeckillInterface{
static Map<Long, Long> inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}

@Override
public void secKill(String arg1, Long arg2) {
//最简单的秒杀,这里仅作为demo示例
reduceInventory(arg2);
}
//模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) – 1);
return inventory.get(commodityId);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
模拟秒杀场景,1000个线程来争抢两个商品:

@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();

Thread[] threads = new Thread[threadCount];
//起500个线程,秒杀第一个商品
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill(“test”, commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();

}
//再起500个线程,秒杀第二件商品
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill(“test”, commidityId2);
//testClass.testFunc(“test”, 10000001L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();

}

long startTime = System.currentTimeMillis();
//主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
beginCount.countDown();

try {
//主线程等待结束信号量
endCount.await();
//观察秒杀结果是否正确
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println(“error count” + CacheLockInterceptor.ERROR_COUNT);
System.out.println(“total cost ” + (System.currentTimeMillis() – startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。
这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。
通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。

小结
这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。

源码仓库:https://github.com/lsfire/redisframework
————————————————
版权声明:本文为CSDN博主「lsfire」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010359884/article/details/50310387

RabbitMQ系列(二)--基础组件 - Diamond-Shine - 博客园

mikel阅读(654)

来源: RabbitMQ系列(二)–基础组件 – Diamond-Shine – 博客园

声明:对于RabbitMQ的学习基于某课网相关视频和《RabbitMQ实战指南》一书,后续关于RabbitMQ的博客都是基于二者

一、什么是RabbitMQ

RabbitMQ是开源代理和队列服务器,通过普通协议在不同的应用之间共享数据,使用Erlang编写(Erlang进行数据交换的性能很好,

和原生socket一样好的延迟响应效果),基于AMQP协议

二、AMQP

AMQP高级消息队列协议:具有现代特征的二进制协议,和JMS有点像,模型如下:

AMQP核心概念

1、Server:Broker,接受client连接,实现AMQP实体服务

2、Connection:应用程序和Broker的网络连接

3、Channel:网络信道,读写都是在Channel中进行(NIO的概念),包括对MQ进行的一些操作(例如clear queue等)都是在Channel中进行,

客户端可建立多个Channel,每个Channel代表一个会话任务

4、Message:由properties(有消息优先级、延迟等特性)和Body(消息内容)组成

5、Virtual host:用于消息隔离(类似Redis 16个db这种概念),最上层的消息路由,一个包含若干Exchange和Queue,同一个里面Exchange

和Queue的名称不能存在相同的。

6、Exchange:Routing and Filter

7、Binding:把Exchange和Queue进行Binding

8、Routing key:路由规则

9、Queue:物理上存储消息

三、哪些大厂在使用RabbitMQ,为什么?

滴滴、美团、头条、去哪儿。。。。都再使用RabbitMQ

原因:

1、开源、性能优秀、能保证稳定性,提供可靠性消息投递模式confirm、返回模式return,和springAMQP完美整合、API丰富

2、集群模式丰富,表达式配置,HA模式,镜像队列模式

3、保证数据不丢失的前提下做到高可靠性、高可用性

四、RabbitMQ基础组件

1、Exchange:

如果不指定Exchange的话,RabbitMQ默认使用,(AMQP default)注意一下,需要将routing key等于queue name相同

2、name、type:

fanout(效率最好,不需要routing key,routing key如何设置都可以)、direct、topic(#一个或多个,*一个)、headers

3、Auto Delete:

当最后一个Binding到Exchange的Queue删除之后,自动删除该Exchange

4、Binding:

Exchange和Queue之间的连接关系,Exchange之间也可以Binding

5、Queue:

实际物理上存储消息的

6、Durability:

是否持久化,Durable:是,即使服务器重启,这个队列也不会消失,Transient:否

7、Exclusive:

这个queue只能由一个exchange监听restricted to this connection,使用场景:顺序消费

8、Message:

由properties(有消息优先级、延迟等特性)和Body(Payload消息内容)组成,还有content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id等属性

五、安装

本人很早之前在centos安装过了,具体步骤都忘了,大家可以百度一下,有一大堆呢,比较推荐Docker安装,很方便

需要注意:注意rabbitMQ和erlang版本的对应,而且服务器的host name不要瞎改,当初RabbitMQ一直启动报错,就是这个原因,搞了两天,真的

蛋疼,先要确定erlang安装成功,然后按照RabbitMQ

浏览器可视化工具:rabbitmq-plugins enable rabbitmq_management  用户名、密码:guest guest