关于消息队列的使用 - Ruthless - 博客园

mikel阅读(398)

关于消息队列的使用

来源: 关于消息队列的使用 – Ruthless – 博客园

一、消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

二、消息队列应用场景
以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。

2.1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

2.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:

传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.3流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用

用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
秒杀业务根据消息队列中的请求信息,再做后续处理

2.4日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下

日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据

2.5消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
点对点通讯:

客户端A和客户端B使用同一队列,进行消息通讯。

聊天室通讯:

客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

三、消息中间件示例
3.1电商系统

消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

3.2日志收集系统

分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
Zookeeper注册中心,提出负载均衡和地址查找服务
日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
Kafka集群:接收,路由,存储,转发等消息处理
Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

四、JMS消息服务
讲消息队列就不得不提JMS 。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

4.1消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

4.1.1 P2P模式

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。

4.1.2 Pub/Sub模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点
每个消息可以有多个消费者
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
为了消费消息,订阅者必须保持运行的状态
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

4.2消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步
订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

(2)异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

JNDI:Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI在JMS中起到查找和访问发送目标或消息来源的作用。

五、常用消息队列

一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,Jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他们的特点。

5.1 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ特性如下:
⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
⒐ 支持与Axis的整合
⒑ 可以很容易得调用内嵌JMS provider,进行测试

5.2 Kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
支持通过Kafka服务器和消费机集群来分区消息
支持Hadoop并行数据加载
Kafka相关概念
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker[5]
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。

.Net Core里面appsettings.json的使用_65号腕的博客-CSDN博客

mikel阅读(396)

来源: .Net Core里面appsettings.json的使用_65号腕的博客-CSDN博客

从.NET Core开始,项目的配置文件就换成了appsettings.json,以下整理了她的常见用法。

1. IConfiguration
IConfiguration是用来读取配置的接口,她在Microsoft.Extensions.Configuration.Abstractions包里面,命名空间为Microsoft.Extensions.Configuration。通常需要在使用的地方将她作为依赖注入,对于ASP.NET Core来说,已经默认实现了基于Json的配置,即这次要用到的appsettings.json。

2. 读取连接字符串
appsettings.json这样定义:

{
“ConnectionStrings”: {
“DbConn”: “Server=.;Database=Db;Trusted_Connection=True;”
}
}
1
2
3
4
5
调用IConfiguration的GetConnectionString方法即可

public class Startup
{
public Startup(IConfiguration configuration)
{
string connString = configuration.GetConnectionString(“DbConn”);

}
}
1
2
3
4
5
6
7
8
3. 读取单值
对于如下配置

{
“App”: {
“Name”: “Bigname65”,
“Version”: “v1.0”
}
}
1
2
3
4
5
6
如果想读取Name的值,可以有以下四种方式,各级之间用冒号连接:

string appName = configuration[“App:Name”];
appName = configuration.GetValue<string>(“App:Name”);
appName = configuration.GetSection(“App:Name”).Value;
appName = configuration.GetSection(“App”)[“Name”];
1
2
3
4
4. 读取对象
还是上面的配置文件,如果有以下C#类,应该如何直接从配置文件,读取一个实例呢?

public class AppInfo
{
public string Name { get; set; }
public string Version { get; set; }
}
1
2
3
4
5
IConfiguration有一个Bind的方法,她在Microsoft.Extensions.Configuration.Binder包里面,命名空间也是Microsoft.Extensions.Configuration,ASP.NET Core项目已经默认包含了,有以下两种写法:

AppInfo app = new AppInfo();
configuration.GetSection(“App”).Bind(app);

app = new AppInfo();
configuration.Bind(“App”, app);
1
2
3
4
5
5. IOptions
对于以上的AppInfo类以及配置,有可能需要在项目的多个地方使用到,虽然使用IConfiguration也可以,但是会产生很多重复代码。这时我们可以使用IOptions接口,她在Microsoft.Extensions.Options包里面,而且ASP.NET Core项目也已经默认包含了.

首先在Startup的ConfigureServices里面添加如下代码,这样就直接把AppInfo和配置文件关联起来了:

public void ConfigureServices(IServiceCollection services)
{
services.Configure<AppInfo>(this.Configuration.GetSection(“App”));
}
1
2
3
4
需要的时候直接使用IOptions<AppInfo>即可:

public class HomeController : Controller
{
AppInfo _app;
public HomeController(IOptions<AppInfo> appOpt)
{
_app = appOpt.Value;
}

public IActionResult Index()
{
return Content($”{_app.Name}:{_app.Version}”);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
6. 多配置文件
ASP.NET Core项目默认会包含两个文件appsettings.json和appsettings.Development.json。看名字,后者肯定是用在开发环境的。

对于同一个配置字段,如果同时存在于appsettings.json和appsettings.Development.json两个文件当中,系统会优先取后者里面的。

正因为如此,我们还可以添加更多的配置文件,将特定环境的值(如连接字符串)放在特定的配置文件里面,比如appsettings.QA.json用于QA环境,appsettings.UAT.json用于UAT环境等。

那么不同的环境如何跟配置文件关联起来的呢?

对于开发环境,Visual Studio的项目属性中可以看到环境变量的设置。

部署以后则是在Web.config里面。

通过该环境变量则可指定配套的json文件。

6.1 发布时指定配置文件
如下图,在Visual Studio中可以看到有多个配置文件,但如果我们希望发布之后只包含需要的文件应该怎么做呢?

首先,双击项目,或者用记事本打开项目文件夹下面的.csproj文件, 添加以下代码,目的是根据不同的环境变量移除其他的配置文件。

<Project Sdk=”Microsoft.NET.Sdk.Web”>
….
<ItemGroup Condition=” ‘$(EnvironmentName)’ == ‘Development'”>
<Content Remove=”appsettings.QA.json” />
<Content Remove=”appsettings.UAT.json” />
</ItemGroup>
<ItemGroup Condition=” ‘$(EnvironmentName)’ == ‘UAT'”>
<Content Remove=”appsettings.QA.json” />
<Content Remove=”appsettings.Development.json” />
</ItemGroup>
<ItemGroup Condition=” ‘$(EnvironmentName)’ == ‘QA'”>
<Content Remove=”appsettings.UAT.json” />
<Content Remove=”appsettings.Development.json” />
</ItemGroup>
</Project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接着就是发布,发布之前会生成Profile文件,如下图,我们生成了两个Profile,分别对应QA环境和UAT环境:

 

打开其中的QA.pubxml文件,添加如下代码:

<Project>
<PropertyGroup>
….
<EnvironmentName>QA</EnvironmentName>
</PropertyGroup>
</Project>
1
2
3
4
5
6
同理UAT.pubxml则添加环境变量为UAT。

最后发布,在发布目录的web.config会自动包含对应的环境变量,同时剔除了不需要的配置文件。

7. console/winform
那么如何在控制台程序或者Winform项目中使用appsettings.json配置文件呢?

首先需要添加包:Microsoft.Extensions.Configuration.Json

然后使用以下代码即可得到IConfiguration了

IConfiguration config = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile(“appsettings.json”)
.Build();
1
2
3
4
最后附上源代码:

https://gitee.com/bigname65/csharp-practise/tree/master/UseAppsettings
————————————————
版权声明:本文为CSDN博主「65号腕」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_46295080/article/details/115219255

干掉RedisHelper,请这样用分布式缓存 - 掘金

mikel阅读(501)

来源: 干掉RedisHelper,请这样用分布式缓存 – 掘金

前言

我们在项目中使用Redis时通常是写一个单例模式的RedisHelper静态类,暴露一些常用的GetSet等操作,在需要使用地方直接RedisHelper.StringGet(xx,xx)就可以了,这样虽然简单粗暴地满足我们对Redis的所有操作需要,但是这在ASP.NET Core的项目显得不是那么优雅了。首先你的RedisHelper静态类无法使用ASP.NET Core容器,又如何优雅的通过依赖注入获取IConfiguration中的配置项呢?既然我们使用ASP.NET Core这么优秀的框架,最佳实践当然就是遵循官方建议的开发规范优雅的编写代码。

IDistributedCache

若要使用 SQL Server 分布式缓存,请添加对 Microsoft.Extensions.Caching.SQLServer 包的包引用。

若要使用 Redis 分布式缓存,请添加对 Microsoft.Extensions.Caching.StackExchangeRedis 包的包引用。

若要使用 NCache 分布式缓存,请添加对 NCache.Microsoft.Extensions.Caching.OpenSource 包的包引用。

无论选择哪种实现,应用都将使用 IDistributedCache 接口与缓存进行交互。

来看下IDistributedCache这个接口的定义

namespace Microsoft.Extensions.Caching.Distributed;

/// <summary>
/// Represents a distributed cache of serialized values.
/// </summary>
public interface IDistributedCache
{
    /// <summary>
    /// Gets a value with the given key.
    /// </summary>
    byte[]? Get(string key);

    /// <summary>
    /// Gets a value with the given key.
    /// </summary>
    Task<byte[]?> GetAsync(string key, CancellationToken token = default(CancellationToken));

    void Set(string key, byte[] value, DistributedCacheEntryOptions options);

    /// <summary>
    /// Sets the value with the given key.
    /// </summary>
    Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default(CancellationToken));

    /// <summary>
    /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
    /// </summary>
    void Refresh(string key);

    /// <summary>
    /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
    /// </summary>
    Task RefreshAsync(string key, CancellationToken token = default(CancellationToken));

    /// <summary>
    /// Removes the value with the given key.
    /// </summary>
    void Remove(string key);

    /// <summary>
    /// Removes the value with the given key.
    /// </summary>
    Task RemoveAsync(string key, CancellationToken token = default(CancellationToken));
}
复制代码

IDistributedCache 接口提供以下方法来处理分布式缓存实现中的项:

  • GetGetAsync:如果在缓存中找到,则接受字符串键并以 byte[] 数组的形式检索缓存项。
  • SetSetAsync:使用字符串键将项(作为 byte[] 数组)添加到缓存。
  • RefreshRefreshAsync:根据键刷新缓存中的项,重置其可调到期超时(如果有)。
  • RemoveRemoveAsync:根据字符串键删除缓存项。

干掉RedisHelper

官方不仅提出了如何最佳实践分布式缓存的使用,还提供了基本的实现库给我们直接用,比如我们在项目中用Redis为我们提供缓存服务:

  1. 添加引用Microsoft.Extensions.Caching.StackExchangeRedis
  2. 注册容器AddStackExchangeRedisCache,并配置参数
 builder.Services.AddStackExchangeRedisCache(options =>
     {
         options.Configuration = builder.Configuration.GetConnectionString("MyRedisConStr");
         options.InstanceName = "SampleInstance";
     });
复制代码
  1. 在需要使用Redis的地方通过构造函数注入IDistributedCache实例调用即可

这样就可以优雅的使用Redis了,更加符合Asp.Net Core的设计风格,养成通过容器注入的方式来调用我们的各种服务,而不是全局使用RedisHelper静态类,通过IOC的方式,结合面向接口开发,能方便的替换我们的实现类,统一由容器提供对象的创建,这种控制反转带来的好处只可意会不可言传,这里就不赘述了。

AddStackExchangeRedisCache到底干了什么

上面已经知道如何优雅的使用我们的Redis了,但是不看下源码就不知道底层实现,总是心里不踏实的。

源码比较好理解的,因为这个Nuget包的源码也就四个类,而上面注册容器的逻辑也比较简单
AddStackExchangeRedisCache主要干的活

// 1.启用Options以使用IOptions
services.AddOptions();
// 2.注入配置自定义配置,可以通过IOptions<T>注入到需要使用该配置的地方
services.Configure(setupAction);
// 3.注入一个单例IDistributedCache的实现类RedisCache
services.Add(ServiceDescriptor.Singleton<IDistributedCache, RedisCache>());
复制代码

所以我们在需要用Redis的地方通过构造函数注入IDistributedCache,而它对应的实现就是RedisCache,那看下它的源码。

这里就不细看所有的实现了,重点只需要知道它继承了IDistributedCache就行了,通过AddStackExchangeRedisCache传入的ConnectionString,实现IDistributedCacheGetSetRefreshRemove四个核心的方法,我相信这难不倒你,而它也就是干了这么多事情,只不过它的实现有点巧妙。 通过LUA脚本和HSET数据结构实现,HashKey是我们传入的InstanceName+key,做了一层包装。

源码中还有需要注意的就是,我们要保证Redis连接对象IConnectionMultiplexer的单例,不能重复创建多个实例,这个想必在RedisHelper中也是要保证的,而且是通过lock来实现的。

然而微软不是那么用的,玩了个花样,注意下面的_connectionLock.Wait();

private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

[MemberNotNull(nameof(_cache), nameof(_connection))]
private void Connect()
{
    CheckDisposed();
    if (_cache != null)
    {
        Debug.Assert(_connection != null);
        return;
    }

    _connectionLock.Wait();
    try
    {
        if (_cache == null)
        {
            if (_options.ConnectionMultiplexerFactory == null)
            {
                if (_options.ConfigurationOptions is not null)
                {
                    _connection = ConnectionMultiplexer.Connect(_options.ConfigurationOptions);
                }
                else
                {
                    _connection = ConnectionMultiplexer.Connect(_options.Configuration);
                }
            }
            else
            {
                _connection = _options.ConnectionMultiplexerFactory().GetAwaiter().GetResult();
            }

            PrepareConnection();
            _cache = _connection.GetDatabase();
        }
    }
    finally
    {
        _connectionLock.Release();
    }

    Debug.Assert(_connection != null);
}
复制代码

通过SemaphoreSlim限制同一时间只能有一个线程能访问_connectionLock.Wait();后面的代码。

学到装逼技巧+1

思考

IDistributedCache只有四个操作:GetSetRefreshRemove,我们表示很希望跟着官方走,但这个接口过于简单,不能满足我的其他需求咋办?
比如我们需要调用 StackExchange.Redis封装的LockTake,LockRelease来实现分布式锁的功能,那该怎么通过注入IDistributedCache调用? 我们可以理解官方上面是给我们做了示范,我们完全可以自己定义一个接口,比如:

public interface IDistributedCachePlus : IDistributedCache
{
    bool LockRelease(string key, byte[] value);

    bool LockTake(string key, byte[] value, TimeSpan expiry);
}
复制代码

继承IDistributedCache,对其接口进行增强,然后自己实现实现AddStackExchangeRedisCache的逻辑,我们不用官方给的实现,但是我们山寨官方的思路,实现任意标准的接口,满足我们业务。

services.Add(ServiceDescriptor.Singleton<IDistributedCachePlus, RedisCachePlus>());
复制代码

在需要使用缓存的地方通过构造函数注入IDistributedCachePlus

总结

官方提供的IDistributedCache标准及其实现类库,能方便的实现我们对缓存的简单的需求,通过遵循官方的建议,我们干掉了RedisHelper,优雅的实现了分布式Redis缓存的使用,你觉得这样做是不是很优雅呢?

作者:SpringHgui
链接:https://juejin.cn/post/7099476430001537060
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Redis的五种数据类型的简单介绍和使用 - DingYu - 博客园

mikel阅读(374)

来源: Redis的五种数据类型的简单介绍和使用 – DingYu – 博客园

1.准备工作:

1.1在Linux下安装Redis 

https://www.cnblogs.com/dddyyy/p/9763098.html

  1.2启动Redis

先把root/redis的redis.conf放到 /usr/local/redis/bin/目录下

使用vi 修改redis.conf 把daemonize no 变成daemonize yes

启动 ./redis-server redis.conf

测试一下 ps -ef|grep redis

   1.3连接客户端

2.第一种类型:String Key-Value

set key value 设置一个key 值为 value 

get key 获得key值得value 

注意:redis中的Key和Value时区分大小写的,命令不区分大小写, redis是单线程 不适合存储大容量的数据

incr key      —对应的value 自增1,如果没有这个key值 自动给你创建创建 并赋值为1

decr key     —对应的value 自减1

注意:自增的value是可以转成数字的

3.第二种类型:Hash:key-filed-value

相当于1个key 对应一个map

hset key filed value 设置值

hget key filed  获取值

 

4.第三种类型:List

List 有顺序可重复

lpush list 1  2  3  4 从左添加元素

rpush list 1 2 3 4    从右添加元素

lrange list 0 -1 (从0 到-1 元素查看:也就表示查看所有)

lpop list (从左边取,删除)

rpop list  (从右边取,删除)

5.第四种类型 :Set

Set 无顺序,不能重复

sadd set1 a b c d d (向set1中添加元素) 元素不重复

smembers set1 (查询元素)

srem set1 a (删除元素)

6.第五种类型:SortedSet(zset)

有顺序,不能重复

适合做排行榜 排序需要一个分数属性

zadd zset1 9 a 8 c 10 d 1 e   (添加元素 zadd key score member )

(ZRANGE key start stop [WITHSCORES])(查看所有元素:zrange key  0  -1  withscores)

如果要查看分数,加上withscores.

zrange zset1 0 -1 (从小到大)

zrevrange zset1 0 -1 (从大到小)

zincrby zset2 score member (对元素member 增加 score)

复制代码
127.0.0.1:6379> zadd zset1 8 a 4 b 5 c 1 d
(integer) 4
127.0.0.1:6379> zrange zset1 0 -1 
1) "d"
2) "b"
3) "c"
4) "a"
127.0.0.1:6379> zadd zset1 9 a
(integer) 0
127.0.0.1:6379> zrange zset1 0 -1 
1) "d"
2) "b"
3) "c"
4) "a"
127.0.0.1:6379> zrange zset1 0 -1 withscores
1) "d"
2) "1"
3) "b"
4) "4"
5) "c"
6) "5"
7) "a"
8) "9"
127.0.0.1:6379> zrevrange zset1 0 -1
1) "a"
2) "c"
3) "b"
4) "d"
127.0.0.1:6379> zincrby zset1 1 a
"10"
127.0.0.1:6379> zrevrange zset1 0 -1 withscores
1) "a"
2) "10"
3) "c"
4) "5"
5) "b"
6) "4"
7) "d"
8) "1"
复制代码

7.Key命令

expire key second  (设置key的过期时间)

ttl key (查看剩余时间)(-2 表示不存在,-1 表示已被持久化,正数表示剩余的时间)

persist key (清除过期时间,也即是持久化 持久化成功体提示 1 不成功0)。

del key: 删除key

select 0 表示:选择0号数据库。默认是0号数据库

c#使用csredis操作redis - 小y - 博客园

mikel阅读(579)

来源: c#使用csredis操作redis – 小y – 博客园

现在流行的redis连接客户端有StackExchange.Redis和ServiceStack.Redis,为什么选择csredis而不是这两个?

  • .net 最有名望的 ServiceStack.Redis 早已沦为商业用途,在 .NETCore 中使用只能充值;
  • 后来居上的 StackExchange.Redis 虽然能用,但线上各种 Timeout 错误把人坑到没脾气,两年多两年多两年多都不解决,最近发布的 2.0 版本不知道是否彻底解决了底层。
  • csredis支持.net40/.net45/.netstandard2.0,基本上满足了常见运行平台,而上面两个基本已经放弃.net40了。
  • csredis所有方法名与redis-cli保持一持,很容易上手!!!

环境:

csredis 源码地址: https://github.com/2881099/csredis

 

以windows服务安装Redis方法:

下载Redis服务安装包:https://github.com/tporadowski/redis/releases

下载完成后直接点击.exe下一步下一步OK。安装完后我们会在windows服务中找到Redis Service服务。注意启动服务后在进行相关测试。

1.在.net项目中引入CSRedisCore

包安装命令:

Install-Package CSRedisCore -Version 3.6.5

 

2.使用:

复制代码
//初始化RedisHelper对象
var csredis = new CSRedis.CSRedisClient("127.0.0.1:6379, defaultDatabase = 0, poolsize = 500, ssl = false, writeBuffer = 10240");
RedisHelper.Initialization(csredis);


//-------------字符串(string)----------------
// 添加字符串键-值对
csredis.Set("hello", "1");
csredis.Set("world", "2");
csredis.Set("hello", "3");

// 根据键获取对应的值
csredis.Get("hello");

// 移除元素
csredis.Del("world");

/*    数值操作    */
csredis.Set("num-key", "24");

// value += 5
csredis.IncrBy("num-key",5); 
// output -> 29

// value -= 10
csredis.IncrBy("num-key", -10); 
// output -> 19

/*    字节串操作    */
csredis.Set("string-key", "hello ");

// 在指定key的value末尾追加字符串
csredis.Append("string-key", "world"); 
// output -> "hello world"

// 获取从指定范围所有字符构成的子串(start:3,end:7)
csredis.GetRange("string-key",3,7)  
// output ->  "lo wo"
    
// 用新字符串从指定位置覆写原value(index:4)
csredis.SetRange("string-key", 4, "aa"); 
// output -> "hellaaword"





//-----------------列表(list)----------------
// 从右端推入元素
csredis.RPush("my-list", "item1", "item2", "item3", "item4"); 
// 从右端弹出元素
csredis.RPop("my-list");
// 从左端推入元素
csredis.LPush("my-list","LeftPushItem");
// 从左端弹出元素
csredis.LPop("my-list");

// 遍历链表元素(start:0,end:-1即可返回所有元素)
foreach (var item in csredis.LRange("my-list", 0, -1))
{
    Console.WriteLine(item);
}
// 按索引值获取元素(当索引值大于链表长度,返回空值,不会报错)
Console.WriteLine($"{csredis.LIndex("my-list", 1)}"); 

// 修剪指定范围内的元素(start:4,end:10)
csredis.LTrim("my-list", 4, 10);

// 将my-list最后一个元素弹出并压入another-list的头部
csredis.RPopLPush("my-list", "another-list");





//------------------集合(set)----------------
// 实际上只插入了两个元素("item1","item2")
csredis.SAdd("my-set", "item1", "item1", "item2"); 

// 集合的遍历
foreach (var member in csredis.SMembers("my-set"))
{
    Console.WriteLine($"集合成员:{member.ToString()}");
}

// 判断元素是否存在
string member = "item1";
Console.WriteLine($"{member}是否存在:{csredis.SIsMember("my-set", member)}"); 
// output -> True

// 移除元素
csredis.SRem("my-set", member);
Console.WriteLine($"{member}是否存在:{csredis.SIsMember("my-set", member)}"); 
// output ->  False

// 随机移除一个元素
csredis.SPop("my-set");

csredis.SAdd("set-a", "item1", "item2", "item3","item4","item5");
csredis.SAdd("set-b", "item2", "item5", "item6", "item7");

// 差集
csredis.SDiff("set-a", "set-b"); 
// output -> "item1", "item3","item4"

// 交集
csredis.SInter("set-a", "set-b"); 
// output -> "item2","item5"

// 并集
csredis.SUnion("set-a", "set-b");
// output -> "item1","item2","item3","item4","item5","item6","item7"







//------------------散列(hashmap)----------------
// 向散列添加元素
csredis.HSet("ArticleID:10001", "Title", "了解简单的Redis数据结构");
csredis.HSet("ArticleID:10001", "Author", "xscape");
csredis.HSet("ArticleID:10001", "PublishTime", "2019-01-01");

// 根据Key获取散列中的元素
csredis.HGet("ArticleID:10001", "Title");

// 获取散列中的所有元素
foreach (var item in csredis.HGetAll("ArticleID:10001"))
{
    Console.WriteLine(item.Value);
}

//HMGet和HMSet是他们的多参数版本,一次可以处理多个键值对
var keys = new string[] { "Title","Author","publishTime"};
csredis.HMGet("ID:10001", keys);

//和处理字符串一样,我们也可以对散列中的值进行自增、自减操作,原理同字符串是一样的
csredis.HSet("ArticleID:10001", "votes", "257");
csredis.HIncrBy("ID:10001", "votes", 40);
// output -> 297




//------------------有序集合----------------
// 向有序集合添加元素
csredis.ZAdd("Quiz", (79, "Math"));
csredis.ZAdd("Quiz", (98, "English"));
csredis.ZAdd("Quiz", (87, "Algorithm"));
csredis.ZAdd("Quiz", (84, "Database"));
csredis.ZAdd("Quiz", (59, "Operation System"));

//返回集合中的元素数量
csredis.ZCard("Quiz");

// 获取集合中指定范围(90~100)的元素集合
csredis.ZRangeByScore("Quiz",90,100);

// 获取集合所有元素并升序排序
csredis.ZRangeWithScores("Quiz", 0, -1);

// 移除集合中的元素
csredis.ZRem("Quiz", "Math");

//Key的过期
redis.Set("MyKey", "hello,world");
Console.WriteLine(redis.Get("MyKey")); 
// output -> "hello,world"

redis.Expire("MyKey", 5); // key在5秒后过期,也可以使用ExpireAt方法让它在指定时间自动过期

Thread.Sleep(6000); // 线程暂停6秒
Console.WriteLine(redis.Get("MyKey"));
// output -> ""
复制代码

https://www.cnblogs.com/xscape/p/10208638.html

3.高级玩法:发布订阅

复制代码
//普通订阅
rds.Subscribe(
  ("chan1", msg => Console.WriteLine(msg.Body)),
  ("chan2", msg => Console.WriteLine(msg.Body)));

//模式订阅(通配符)
rds.PSubscribe(new[] { "test*", "*test001", "test*002" }, msg => {
  Console.WriteLine($"PSUB   {msg.MessageId}:{msg.Body}    {msg.Pattern}: chan:{msg.Channel}");
});
//模式订阅已经解决的难题:
//1、分区的节点匹配规则,导致通配符最大可能匹配全部节点,所以全部节点都要订阅
//2、本组 "test*", "*test001", "test*002" 订阅全部节点时,需要解决同一条消息不可执行多次

//发布
rds.Publish("chan1", "123123123");
//无论是分区或普通模式,rds.Publish 都可以正常通信
复制代码

4.高级玩法:缓存壳

复制代码
//不加缓存的时候,要从数据库查询
var t1 = Test.Select.WhereId(1).ToOne();

//一般的缓存代码,如不封装还挺繁琐的
var cacheValue = rds.Get("test1");
if (!string.IsNullOrEmpty(cacheValue)) {
    try {
        return JsonConvert.DeserializeObject(cacheValue);
    } catch {
        //出错时删除key
        rds.Remove("test1");
        throw;
    }
}
var t1 = Test.Select.WhereId(1).ToOne();
rds.Set("test1", JsonConvert.SerializeObject(t1), 10); //缓存10秒

//使用缓存壳效果同上,以下示例使用 string 和 hash 缓存数据
var t1 = rds.CacheShell("test1", 10, () => Test.Select.WhereId(1).ToOne());
var t2 = rds.CacheShell("test", "1", 10, () => Test.Select.WhereId(1).ToOne());
var t3 = rds.CacheShell("test", new [] { "1", "2" }, 10, notCacheFields => new [] {
  ("1", Test.Select.WhereId(1).ToOne()),
  ("2", Test.Select.WhereId(2).ToOne())
});
复制代码

5.高级玩法:管道

使用管道模式,打包多条命令一起执行,从而提高性能。

var ret1 = rds.StartPipe().Set("a", "1").Get("a").EndPipe();
var ret2 = rds.StartPipe(p => p.Set("a", "1").Get("a"));

var ret3 = rds.StartPipe().Get("b").Get("a").Get("a").EndPipe();
//与 rds.MGet("b", "a", "a") 性能相比,经测试差之毫厘

6.高级玩法:多数据库

var connectionString = "127.0.0.1:6379,password=123,poolsize=10,ssl=false,writeBuffer=10240,prefix=key前辍";
var redis = new CSRedisClient[14]; //定义成单例
for (var a = 0; a< redis.Length; a++) redis[a] = new CSRedisClient(connectionString + "; defualtDatabase=" + a);

//访问数据库1的数据
redis[1].Get("test1");

 

7.性能比拼

 

 

Redis的五种数据类型的简单介绍和使用 https://www.cnblogs.com/dddyyy/p/9803828.html

.net 5.0 - 使用CSRedisCore操作redis - gygtech - 博客园

mikel阅读(562)

来源: .net 5.0 – 使用CSRedisCore操作redis – gygtech – 博客园

 为什么选择CSRedisCore

ServiceStack.Redis 是商业版,免费版有限制;

StackExchange.Redis 是免费版,但是内核在 .NETCore 运行有问题经常 Timeout,暂无法解决;

CSRedis于2016年开始支持.NETCore一直迭代至今,实现了低门槛、高性能,和分区高级玩法的.NETCore redis-cli SDK;

在v3.0版本更新中,CSRedis中的所有方法名称进行了调整,使其和redis-cli保持一致,如果你熟悉redis-cli的命令的话,CSRedis可以直接上手,这样学习成本就降低很多。

 如何集成:引用和配置
  •  引用包
1
CSRedisCore
  •  appsettings.json
1
2
3
4
5
6
7
8
9
10
{
  //Redis服务配置
  "Redis": {
    "Default": {
      "Connection": "192.168.1.101:6379",
      "InstanceName": "local",
      "DefaultDB": 0
    }
  }
}
 如何集成:redis 控制台 引用方式
  •  初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using CSRedis;
namespace RedisCommon
{
    public class RedisInit
    {
        public static void RedisInitialization()
        {
            string connection = Appsettings.Instance.GetByKey("Redis:Default:Connection");
            string defaultDB = Appsettings.Instance.GetByKey("Redis:Default:DefaultDB");
            var csRedis = new CSRedisClient($"{connection},defaultDatabase={defaultDB},prefix=test");
            RedisHelper.Initialization(csRedis);
        }
    }
}

 

1
2
3
4
5
static void Main(string[] args)
{
    //初始化Redis
    RedisInit.RedisInitialization();   
}
 如何集成:redis webapi 引用方式
  •  StartUp类配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/// <summary>
/// 初始化Redis缓存
/// </summary>
private void InitRedis()
{
    //redis缓存
    var section = Configuration.GetSection("Redis:Default");
    //连接字符串
    string _connectionString = section.GetSection("Connection").Value;
    //默认数据库
    int _defaultDB = int.Parse(section.GetSection("DefaultDB").Value ?? "0");
    var csredis = new CSRedis.CSRedisClient($"{_connectionString},defaultDatabase={_defaultDB},idleTimeout=3000,poolsize=5,prefix=GYG-API:KEY_");
    RedisHelper.Initialization(csredis);
}

 链接字符串详解

127.0.0.1:6379,password=YourPassword,defaultDatabase=0,prefix=hr_

Parameter   Default Explain  说明 
 password  <Empty>  Redis server password  Redis服务器密码
 defaultDatabase  0  Redis server database  Redis服务器数据库
 asyncPipeline  false  The asynchronous method automatically uses pipeline, and the 10W concurrent time is 450ms (welcome to feedback)  异步方式自动使用管道,10W并发时间450ms(欢迎反馈)
 poolsize  50  Connection pool size  连接池大小
 idleTimeout  20000  idle time of elements in the connection pool(MS),suitable for connecting to remote redis server  连接池中元素的空闲时间(MS),适合连接到远程redis服务器
 connectTimeout  5000  Connection timeout(MS)  连接超时(毫秒)
 syncTimeout  10000  Send / receive timeout(MS)  发送/接收超时(毫秒)
 preheat  5  Preheat connections, receive values such as preheat = 5 preheat 5 connections  预热连接,接收值,例如Preheat=5 Preheat 5 connections
 autoDispose  true  Follow system exit event to release automatically  跟随系统退出事件自动释放
 ssl  false  Enable encrypted transmission  启用加密传输
 testcluster  true  是否尝试集群模式,阿里云、腾讯云集群需要设置此选项为false
 tryit  0  Execution error, retry  attempts  执行错误,重试次数
 name  <Empty>  Connection name, use client list command to view  连接名称,使用client list命令查看
 prefix  <Empty>  key前缀,所有方法都会附带此前缀,csredis.Set(prefix + “key”, 111);
 0、通用指令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//查找所有分区节点中符合给定模式(pattern)的 key
string[] keyAll = RedisHelper.Keys("*");
            
//以秒为单位,返回给定 key 的剩余生存时间
long ttl1 = RedisHelper.Ttl("keyString1");
//用于在 key 存在时删除 key
long del1 = RedisHelper.Del("keyString1");
//检查给定 key 是否存在
bool isExists1 = RedisHelper.Exists("keyString1");
//为给定 key 设置过期时间
bool isExpire1 = RedisHelper.Expire("keyString1", 100);
//为给定 key 设置过期时间
RedisHelper.ExpireAt("keyString1", new DateTime(2021, 6, 11, 16, 0, 0));
 1、string(字符串)
  • 简单操作
1
2
3
4
5
6
7
8
9
10
11
// 设置指定 key 值,默认不过期
bool set_string1 = RedisHelper.Set("keyString_String1", "测试值1");
// 设置指定 key 值,并设置过期时间(单位:秒)
bool set_string2 = RedisHelper.Set("keyString_String2", "测试值2", 1);
// 获取指定 key 的值,不存在的 key,值返回null
string get_string1 = RedisHelper.Get("keyString_String1");
// 获取指定 key 的值,不存在的 key,或者指定的 key 不是int型,则返回int类型的默认值0
int get_int1 = RedisHelper.Get<int>("keyString_String1");
  • 对整数类型进行自增,自减操作
1
2
3
4
5
6
7
bool set_int1 = RedisHelper.Set("keyString_Num1", "23");
// 将 key 所储存的值加上指定的增量值(increment)
long incrBy1 = RedisHelper.IncrBy("keyString_Num1", 2);// #25
// 将 key 所储存的值加上指定的增量值(increment),负数就是减量值
long incrBy2 = RedisHelper.IncrBy("keyString_Num1", -1);// #24
  • 在指定 key 的 value 末尾追加字符串
1
2
3
4
bool set_append1 = RedisHelper.Set("keyString_Append1", "qaz", 30);
// 将指定的 value 追加到该 key 原来值(value)的末尾
long append1 = RedisHelper.Append("keyString_Append1", "wsx");// #6 结果:key 中字符串的长度
 2、hash(哈希)
  • #HSet、HGet、HDel方法 [只能处理一个键值对]
1
2
3
4
5
6
7
8
9
10
11
12
13
// 将哈希表 key 中的字段 field 的值设为 value
bool set_hash_user1_uname = RedisHelper.HSet("User:10001", "uname", "gongyg"); // 冒号的作用相当于创建一个文件夹
bool set_hash_user1_upwd = RedisHelper.HSet("User:10001", "upassword", "123456");
bool set_hash_user1_uid = RedisHelper.HSet("User:10001", "uid", "12");
// 获取存储在哈希表中指定字段的值
string uName = RedisHelper.HGet("User:10001", "uname");
// 获取存储在哈希表中指定字段的值,并指定类型
int uId = RedisHelper.HGet<int>("User:10001", "uid");
// 删除一个或多个哈希表字段,不能删除key
long hDel1 = RedisHelper.HDel("User:10001", "uname");
  • #HGetAll、HKeys、HVals
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 获取在哈希表中指定 key 的所有字段和值
Dictionary<string, string> user10001 = RedisHelper.HGetAll("User:10001");
foreach (var item in user10001)
{
    string key = item.Key;
    string value = item.Value;
}
// 获取所有哈希表中的字段 [虽然使用HGetAll可以取出所有的value,但是有时候散列包含的值可能非常大,容易造成服务器的堵塞,为了避免这种情况,我们可以使用HKeys取到散列的所有键(HVals可以取出所有值),然后再使用HGet方法一个一个地取出键对应的值。]
string[] fields = RedisHelper.HKeys("User:10001");
foreach (string item in fields)
{
    string val = RedisHelper.HGet("User:10001", item);
}
// 获取哈希表中所有的值
string[] vals = RedisHelper.HVals("User:10001");
  • #HMSet、HMGet [HGet和HSet方法执行一次只能处理一个键值对,而HMGet和HMSet是他们的多参数版本,一次可以处理多个键值对。]
1
2
3
4
5
6
7
8
//var keyValues = dic.Select(a => new [] { a.Key, a.Value.ToString() }).SelectMany(a => a).ToArray();
string[] user2 = new string[] { "uname", "gmd", "upwd", "123" };
// 同时将多个field-value(域-值)对设置到哈希表 key 中
bool set_hash_user2 = RedisHelper.HMSet("User:10002", user2);
string[] user_get2 = new string[] { "uname", "upwd", "sj" };
// 获取存储在哈希表中多个字段的值
string[] user_val2 = RedisHelper.HMGet("User:10002", user_get2); // #gmd,123,
  • #对散列中的值进行自增、自减操作
1
2
3
4
bool set_hash_user1_usex = RedisHelper.HSet("User:10003", "uage", "23");
// 为哈希表 key 中的指定字段和整数值加上增量(increment)自增(正数),自减(负数)
long hIncrBy = RedisHelper.HIncrBy("User:10001", "uage", 2);
 3、list(列表)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 将一个或多个值插入到列表头部
string[] lpush1 = new string[] { "003", "004" };
long len1 = RedisHelper.LPush("list", "000");
long len2 = RedisHelper.LPush("list", "001", "002");
long len3 = RedisHelper.LPush("list", lpush1);
// 在列表中添加一个或多个值 [列表尾部]
long len4 = RedisHelper.RPush("list", "010");
// 移除并获取列表的第一个元素
string val1 = RedisHelper.LPop("list");
// 移除并获取列表的最后一个元素
string val2 = RedisHelper.RPop("list");
// 获取列表指定范围内的元素[key, start, stop]
string[] lrang1 = RedisHelper.LRange("list", 0, 2); // #左侧开始,获取前3个元素
string[] lrang2 = RedisHelper.LRange("list", 0, -1); // #左侧开始,获取全部元素
// 将 list 最后一个元素弹出并压入 list_another 的头部 [只有一个元素的改变,源列表会少一个元素,目标列表多出一个元素]
RedisHelper.RPopLPush("list", "list_another");
RedisHelper.Expire("list_another", 30);
 4、set(无序集合)
  • #对集合中的成员进行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 向集合添加一个或多个成员 [返回添加成功个数]
long sadd1 = RedisHelper.SAdd("my_set", "qaz");
long sadd2 = RedisHelper.SAdd("my_set", "tgb", "yhn");
string[] set1 = new string[] { "wsx", "edc" , "rfv" };
long sadd3 = RedisHelper.SAdd("my_set", set1);
// 判断 member 元素是否是集合 key 的成员
bool isMember = RedisHelper.SIsMember("my_set", "qaz");
// 返回集合中的所有成员
string[] members = RedisHelper.SMembers("my_set");
// 返回集合中的一个随机成员
string member1 = RedisHelper.SRandMember("my_set");
// 移除集合中一个或多个成员
long sRem = RedisHelper.SRem("my_set", "qaz");
// 移除并返回集合中一个随机成员
string member2 = RedisHelper.SPop("my_set");
  • #对两个集合进行交、并、差操作
1
2
3
4
5
6
7
8
9
10
11
12
13
RedisHelper.SAdd("set-a", "item1", "item2", "item3", "item4", "item5");
RedisHelper.SAdd("set-b", "item2", "item5", "item6", "item7");
// 差集
RedisHelper.SDiff("set-a", "set-b"); // "item1", "item3","item4"
// 交集
RedisHelper.SInter("set-a", "set-b"); // "item2","item5"
// 并集
RedisHelper.SUnion("set-a", "set-b"); // "item1","item2","item3","item4","item5","item6","item7"
//#另外还可以用SDiffStore,SInterStore,SUnionStore将操作后的结果存储在新的集合中。
 5、zset(sorted set:有序集合)
  • 有序集合可以看作是可排序的散列,不过有序集合的val成为score分值,集合内的元素就是基于score进行排序的,score以双精度浮点数的格式存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 向有序集合添加一个或多个成员,或者更新已存在成员的分数
RedisHelper.ZAdd("sorted_set", (1, "beijing"));
RedisHelper.ZAdd("sorted_set", (2, "shanghai"), (3, "shenzhen"));
(decimal, object)[] set1 = new (decimal, object)[] { (4, "guangzhou"), (5, "tianjing"), (6, "chengdu") };
RedisHelper.ZAdd("sorted_set", set1);
// 有序集合中对指定成员的分数加上增量 increment
decimal incr = RedisHelper.ZIncrBy("sorted_set", "beijing", -2);
// 通过索引区间返回有序集合成指定区域内的成员,分数从低到高 [key, start, stop]
string[] zRange1 = RedisHelper.ZRange("sorted_set", 0, 2);
string[] zRange2 = RedisHelper.ZRange("sorted_set", 0, -1); // #stop=-1返回全部
// 返回有序集合中指定区域内的成员,通过索引,分数从高到底 [key, start, stop]
string[] zRevRange1 = RedisHelper.ZRevRange("sorted_set", 0 , 2);
string[] zRevRange2 = RedisHelper.ZRevRange("sorted_set", 0, -1); // #stop=-1返回全部
// 移除有序集合中一个或多个成员
RedisHelper.ZRem("sorted_set", "shenzhen");
// 获取有序集合的成员数量
long number = RedisHelper.ZCard("sorted_set");
// 通过分数返回有序集合指定区间内的成员
string[] ZRangByScore1 = RedisHelper.ZRangeByScore("sorted_set", 2, 4);
// 通过索引区间返回有序集合成指定区间内的成员和分数
(string member, decimal score)[] sets = RedisHelper.ZRangeWithScores("Quiz", 0, -1);
 6、Geo(经纬度)
1
2
3
4
5
6
//1. 添加地点经纬度 [存储到 sorted set 中]
RedisHelper.GeoAdd("myLocation", Convert.ToDecimal(116.20), Convert.ToDecimal(39.56), "北京");
RedisHelper.GeoAdd("myLocation", Convert.ToDecimal(120.51), Convert.ToDecimal(30.40), "上海");
//2. 求两点之间的距离
var d1 = RedisHelper.GeoDist("myLocation", "北京", "上海", GeoUnit.km);
 7、事务
1
2
3
4
5
6
7
8
// 开启事务
var pipe = RedisHelper.StartPipe();
//中间对redis进行操作
pipe.Set("pipe1", "wsx");
// 提交
pipe.EndPipe();

 

Redis ZADD 命令

mikel阅读(460)

来源: Redis ZADD 命令

Redis ZADD 命令用于将一个或多个 member 元素及其 score 值加入到有序集 key 当中。

如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。

如果有序集合 key 不存在,则创建一个空的有序集并执行 ZADD 操作。

当 key 存在但不是有序集类型时,返回一个错误。

score 值可以是整数值或双精度浮点数,score 可为正也可以为负。

对有序集的更多介绍请参见 sorted set 。

注意: 在 Redis 2.4 版本以前, ZADD 每次只能添加一个元素。

语法

redis ZADD 命令基本语法如下:

redis 127.0.0.1:6379> ZADD key [NX|XX] [CH] [INCR] score member [score member …]

添加单个元素

redis> ZADD page_rank 10 google.com
(integer) 1

添加多个元素

redis> ZADD page_rank 9 baidu.com 8 redis.com.cn
(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "redis.com.cn"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

添加已存在元素,且 score 值不变

redis> ZADD page_rank 10 google.com
(integer) 0

redis> ZRANGE page_rank 0 -1 WITHSCORES  # 没有改变
1) "redis.com.cn"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

添加已存在元素,但是改变 score 值

redis> ZADD page_rank 6 redis.com.cn
(integer) 0

redis> ZRANGE page_rank 0 -1 WITHSCORES  # redis.com.cn 元素的 score 值被改变
1) "redis.com.cn"
2) "6"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

ZADD 参数

ZADD 支持参数,参数位于 key 名字和第一个 score 参数之间:

  • XX: 仅更新存在的成员,不添加新成员。
  • NX: 不更新存在的成员。只添加新成员。
  • LT: 更新新的分值比当前分值小的成员,不存在则新增。
  • GT: 更新新的分值比当前分值大的成员,不存在则新增。
  • CH: 返回变更成员的数量。变更的成员是指 新增成员 和 score值更新的成员,命令指明的和之前score值相同的成员不计在内。 注意: 在通常情况下,ZADD返回值只计算新添加成员的数量。
  • INCRZADD 使用该参数与 ZINCRBY 功能一样。一次只能操作一个score-element对。

注意: GTLT 和 NX 三者互斥不能同时使用。

scores 有效值范围

Redis 有序集合的分数使用双精度64位浮点数表示。在Redis所支持的平台上,称为IEEE 754 floating point number,它能包括的整数范围是-(2^53) 到 +(2^53)。或者说是-9007199254740992 到 9007199254740992。更大的整数在内部用指数形式表示,所以,如果为分数设置一个非常大的整数,你得到的是一个近似的十进制数。

Sorted sets 101

有序集合按照分数以递增的方式进行排序。相同的成员(member)只存在一次,有序集合不允许存在重复的成员。 分数可以通过ZADD命令进行更新或者也可以通过ZINCRBY命令递增来修改之前的值,相应的他们的排序位置也会随着分数变化而改变。

获取一个成员当前的分数可以使用 ZSCORE 命令,也可以用它来验证成员是否存在。

更多关于有序集合的信息请参考 sorted sets.

相同分数的成员

有序集合里面的成员是不能重复的都是唯一的,但是,不同成员间有可能有相同的分数。当多个成员有相同的分数时,他们将是按字典排序(ordered lexicographically)(仍由分数作为第一排序条件,然后,相同分数的成员按照字典序排序)。

字典顺序排序用的是二进制,它比较的是字符串的字节数组。

如果用户将所有元素设置相同分数(例如0),有序集合里面的所有元素将按照字典顺序进行排序,范围查询元素可以使用 ZRANGEBYLEX 命令(注:范围查询分数可以使用 ZRANGEBYSCORE 命令)。

返回值

整数:

  • 被成功添加的新成员的数量,不包括那些被更新分数的、已经存在的成员。

如果使用 INCR 选项,则返回 多行字符串:

  • 以字符串形式表示的 member 的 score 值(双精度浮点数)。(双精度浮点数) , 执行失败返回 nil (当使用 XX 或 NX 选项)。

历史

  • >= 2.4: 支持一次增加或更新多个成员。
  • >= 3.0.2: 增加 XXNXCH 和 INCR 选项。
  • >=6.2: 增加 GT 和 LT 选项。

例子

redis> ZADD myzset 1 “one”

(integer) 1

redis> ZADD myzset 1 “uno”

(integer) 1

redis> ZADD myzset 2 “two” 3 “three”

(integer) 2

redis> ZRANGE myzset 0 -1 WITHSCORES

1) "one"
2) "1"
3) "uno"
4) "1"
5) "two"
6) "2"
7) "three"
8) "3"
redis> 

MSMQ 的持久化_走错路的程序员的博客-CSDN博客

mikel阅读(374)

来源: MSMQ 的持久化_走错路的程序员的博客-CSDN博客

MSMQ的消息默认是放在内存里面的. 重启服务或者断电的时候消息就没了. 对于重要的消息来讲这样肯定是不行的.

百度了好久也没发现如何持久化. 后来实在不行上Google 才找到对应的答案.
废话少说. 直接上答案.

标准答案就一句话 msg1.Recoverable = true;

System.Messaging.Message msg1 = new System.Messaging.Message(person);
msg1.Recoverable = true;//为true 的时候进行持久化.每个信息的Recoverable 为true 那么持久保存
mq.Send(msg1, transaction);

//下面的是全部的测试代码.. 可以不看..
//主测试方法

public class TestMQ
{

static string path = “.\\private$\\TestMQ”;
// 接收消息的函数
public static Object[] Receive()
{
MessageQueue mq;
if (MessageQueue.Exists(path))
{
mq = new MessageQueue(path);
}
else
{
mq = MessageQueue.Create(path);
}

// 定义消息队列中所有的消息种类(种类之间的顺序可以互换)
Type[] msgTypes = new Type[] { typeof(Person), typeof(Order), typeof(String) };

// 规定消息以XML格式编码
mq.Formatter = new XmlMessageFormatter(msgTypes);

// 定义事务
MessageQueueTransaction transaction = new MessageQueueTransaction();

try
{
// 如果消息队列采用了事务,则开始事务
if (mq.Transactional)
transaction.Begin();

System.Messaging.Message msg1 = mq.Receive(transaction);
Person person = (Person)msg1.Body;

System.Messaging.Message msg2 = mq.Receive(transaction);
Order order = (Order)msg2.Body;

System.Messaging.Message msg3 = mq.Receive(transaction);
string description = (String)msg3.Body;

// 如果消息队列采用了事务,则停止事务
if (mq.Transactional)
transaction.Commit();

return new Object[] { person, order, description };
}
catch (Exception ex)
{
// 如果消息队列采用了事务并且出现了异常,则终止事务
if (mq.Transactional)
transaction.Abort();

return null;
}
}

// 发送消息的函数
public static bool Send(Person person, Order order, string description)
{
MessageQueue mq;
if (MessageQueue.Exists(path))
{
mq = new MessageQueue(path);
}
else
{
mq = MessageQueue.Create(path);
}
mq.DefaultPropertiesToSend.Recoverable = true; //为true 的时候进行持久化. 经测试好像没用.
//mq.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.NegativeReceive; 消息是否需要反馈.
// 规定消息以XML格式编码
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Person), typeof(Order), typeof(String) });

// 定义事务
MessageQueueTransaction transaction = new MessageQueueTransaction();
System.Messaging.Message msg1 = new System.Messaging.Message(person);
System.Messaging.Message msg2 = new System.Messaging.Message(order);
System.Messaging.Message msg3 = new System.Messaging.Message(description);

try
{
// 如果消息队列采用了事务,则开始事务
if (mq.Transactional)
transaction.Begin();

msg1.Recoverable = true;//为true 的时候进行持久化.
mq.Send(msg1, transaction);

msg2.Recoverable = true;//为true 的时候进行持久化.
mq.Send(msg2, transaction);

msg3.Recoverable = false;//为true 的时候进行持久化.
mq.Send(msg3, transaction);

// 如果消息队列采用了事务,则停止事务
if (mq.Transactional) {
transaction.Commit();
}
return true;
}
catch (Exception ex)
{
// 如果消息队列采用了事务并且出现了异常,则终止事务
if (mq.Transactional)
transaction.Abort();

return false;
}
finally
{

mq.Close();
msg1 = null;
msg2 = null;
msg3 = null;
}
}

}

下面是两个model类

// 购物清单
public class Order
{
public Order()
{
Price = 0.0;
Number = 0;
Time = DateTime.Now;
}

public Order(Double price, UInt64 number, DateTime time)
{
Price = price;
Number = number;
Time = time;
}

// 物品单价
Double Price;

// 物品数量
UInt64 Number;

// 下单时间
DateTime Time;
}

// 人员类
public class Person
{
public Person()
{
m_Name = “”;
m_Age = 0;
}

public Person(string name, UInt16 age)
{
m_Name = name;
m_Age = age;
}

// 姓名
public string Name
{
get { return m_Name; }
set { m_Name = value; }
}

// 年龄
public UInt16 Age
{
get { return m_Age; }
set { m_Age = value; }
}

private string m_Name;
private UInt16 m_Age;
}

//发送按钮
private void 发送按钮_Click(object sender, EventArgs e)
{

for (int i = 0; i < 100000; i++)
{
Person person = new Person(“Jackie”, 30);
Order order = new Order(110.0, 10, DateTime.Now);
string description = “This is a new order.”;
TestMQ.Send(person, order, description);
System.GC.Collect();
}

}

private void 接收按钮_Click(object sender, EventArgs e)
{
// 接收消息。返回值为 null 时表示接收失败
Object[] message = TestMQ.Receive();

if (message != null)
{
// 当消息接收成功时,将消息依次取出并存入下面的对象中
Person person = (Person)message[0];
Order prder = (Order)message[1];
string description = (string)message[2];
if (description == “This is a new order.”)
{
MessageBox.Show(“消息接收成功 !”);
}
else
{
MessageBox.Show(“消息接收失败 !”);
}

}
else
{
MessageBox.Show(“消息接收失败 !”);
}
}
————————————————
版权声明:本文为CSDN博主「走错路的程序员」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/phker/article/details/71124256

Redis实现消息队列 - 简书

mikel阅读(378)

来源: Redis实现消息队列 – 简书

Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性不是那么严格可以尝试使用。

Redis实现消息队列

列表类型

队列

Redis中列表List类型是按照插入顺序排序的字符串链表,和数据结构中的普通链表一样,可以在头部left和尾部right添加新的元素。插入时如果键不存在Redis将为该键创建一个新的链表。如果链表中所有元素均被删除,那么该键也会被删除。

Redis List

Redis的列表List可以包含的最大元素数量为4294967295,从元素插入和删除的效率来看,如果是在链表的两头插入或删除元素将是非常高效的操作。即使链表中已经存储了数百万条记录,该操作也能在常量时间内完成。然后需要说明的是,如果元素插入或删除操作是作用于链表中间,那将是非常低效的。

Redis中对列表List的操作命令中,L表示从左侧头部开始插入和弹出,R表示从右侧尾部开始插入和弹出。

Redis提供了两种方式来做消息队列,一种是生产消费模式,另一种是发布订阅模式。

生产消费模式

生产消费模式会让一个或多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的。如果队列中没有消息,消费者会继续监听。

PUSH/POP

Redis数据结构的列表List提供了pushpup命令,遵循着先入先出FIFO的原则。使用push/pop方式的优点在于消息可以持久化,缺点是一条消息只能被一个消费者接收,消费者完全靠手速来获取,是一种比较简陋的消息队列。

Redis的队列list是有序的且可以重复的,作为消息队列使用时可使用rpush/lpush操作入队,使用lpop/rpop操作出队。当发布消息是执行lpush命令,将消息从列表左侧加入队列。消息接收方执行rpop命令从列表右侧弹出消息。

如果队列空了怎么办呢?

如果队列空了,消费者会陷入pop死循环,即使没有数据也不会停止。空轮询不但消耗消费者的CPU资源还会影响Redis的性能。傻瓜式的做法是让消费者的线程按照一定的时间间隔不停的循环和监控队列,虽然可行但显然会造成不必要的资源浪费,而且循环周期也很难确定。

对于消费者而言存在一个问题,需要不停的调用rpop查看列表中是否有待处理的消息。每调用一次都会发起一次连接,势必造成不必要的资源浪费。如果使用休眠的方式让消费者线程间隔一段时间再消费,但这样做也有两个问题:

  • 如果生产者速度大于消费者消费的速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
  • 如果休眠时间过长,就无法处理一些时效性的消息。如果休眠时间过短也会在连接上造成比较大的开销。

LPOP返回一个元素给客户端时会从List中将该元素移除,这意味着该元素只存在于客户端的上下文中。如果客户端在处理这个返回元素的过程中崩溃了,这个元素就会永远的丢失掉。

LPUSH/BRPOP

LPUSH BRPOP

使用brpopblpop实现阻塞读取

由于需要一直调用rpop/lpop才可以实现不停的监听且消费消息,为解决这个问题,Redis提供了阻塞命令brpop/blpop。使用brpop会阻塞队列,而且每次只会弹出一个消息,如果没有消息则会阻塞。

Redis列表List支持带阻塞的命令,生产者从列表左侧lpush加入消息到队列,消费者使用brpop命令从列表右侧弹出消息并设置超时时间,如果列表中没有消息则一直阻塞直到超时。这样做的目的在于减小Redis的压力。

对于Redis来说提供了blpop/brpop阻塞读,阻塞读在队列没有数据时会立即进入休眠状态,一旦数据到来则立即被唤醒,消息的延迟几乎为零。需要注意的是如果线程一直阻塞在那里,连接就会被服务器主动断开来减少资源占用,这时blpop/brpop会抛出异常,所以编写消费段时需要注意异常的处理。

BRPOP key [key ...] timeout

BLPOP/BRPOP 列表阻塞式弹出

当给定列表内没有任何元素可供弹出时,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。当给定多个key参数时,按参数key的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。另外,BRPOP除了弹出元素的位置和BLPOP不同之处,其他表现一致。

列表的阻塞式弹出特点是如果列表中没有任务时,连接将会被阻塞。连接的阻塞存在一个超时时间,当超时时间设置为0时刻无限等待直到弹出消息。

Redis的PUSH/POP机制,利用Redis的列表list数据结构,生产者lpush消息,消费者brpop消息并设定超时时间以减少Redis压力。这种方案相对于发布订阅模式的好处是数据可靠性提高了,只有在Redis宕机且数据没有持久化的情况下会丢失数据。可以根据业务通过AOF和缩短持久化间隔来保证较高的可靠性,也可以通过多个客户端来提高消息速度。但相对于专业的消息队列中间件,发布订阅模式的状态过于简单(没有状态),而且没有ACK机制,消息取出后消费失败依赖于客户端记录日志或重新push到队列中。

Redis中实现生产者和消费者模型,可使用LPUSHRPOP来实现该功能。不过当列表为空时消费者就需要轮询来获取消息,这样会增加Redis的访问压力和消费者的CPU时间,另外很多访问也是无用的。为此Redis提供了阻塞式访问BRPOPBLPOP命令,消费者可以在获取数据时指定如果数据不存在阻塞的时间,如果在时限内获得数据则立即返回,如果超时还没有数据则返回NULL,可使用0表示一直阻塞。同时Redis会为所有阻塞的消费者以先后顺序排序。

使用Redis的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH写队列,一个作为消费者使用RPOP读队列。由于消费者并不知道什么时候会有消息过来,所以消费者需要一直循环读取数据。两者的消息可以使用JSON进行封装协议传输。由于消费者在没有读到数据的情况下,会一直循环读取,对系统来说十分耗费资源,此时可利用Redis提供的阻塞读取命令BRPOP进行改进。使用BRPOP改进后,消费者不会一直循环读取,而是一直阻塞等到有消息过来时才读取。

发布订阅模式

发布订阅模式是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

PUB/SUB

Redis自带pub/sub机制即发布订阅模式,此模式中生产者producer和消费者consumer之间的关系是一对多的,也就是一条消息会被多个消费者所消费,当只有一个消费者时可视为一对一的消息队列。

发布订阅机制模型

首先发布者将消息发布到频道,客户端订阅频道后就能获得频道的消息。

发布订阅模式命令

  • psubscribe 订阅一个或多个符合给定模式的频道
  • publish 将消息发布到指定的频道
  • pubsub查看订阅与发布系统状态
  • pubsub channels pattern 列出当前的活跃频道
  • pubsub numsub channel-1 channel-n 获取给定频道的订阅者数量
  • pubsub numpat 获取订阅模式的数量
  • punsubscribe 指示客户端退订所有给定模式
  • subscribe 订阅给定的一个或多个频道的消息
  • unsubscribe 指示客户端退订给定的频道

实现

使用PHP+Redis实现消息队列

操作流程

  1. PHP接收请求和数据
  2. PHP将数据写入Redis队列(入队)
  3. Shell定时调用PHP读取队列数据并写入数据库(出队)

入队inqueue.php

$result = $redis->rpush("queue", json_encode($data));
if($result){
  echo "inqueue success";
}

出队 outqueue.php

#! /usr/bin/php
<?php
$result = $redis->lpop("queue");
if($result){
  $data = json_decode($result, true);
}

定时任务:process.sh

# 每分钟调用一次定时脚本
* * * * * /scripts/process.sh

定时脚本:process.sh

#! /bin/bash
#filename : process.sh
php /scripts/outqueue.php

出队采用死循环方式,感谢 行走平凡 的指正。

$ vim outqueue.php

while(true){
  $result = $redis->lpop("queue");
  if($result){
    $data = json_decode($result, true);
  }
}

作者:JunChow520
链接:https://www.jianshu.com/p/02a923fea175
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

C#实现消息队列_Shuai_Sir的博客-CSDN博客_c# 消息队列

mikel阅读(446)

来源: C#实现消息队列_Shuai_Sir的博客-CSDN博客_c# 消息队列

C#实现消息队列
1.认识消息队列
消息队列 (MSMQ Microsoft Message Queuing)是MS提供的服务,也就是Windows操作系统的功能,并不是.Net提供的。

2.什么是消息队列
消息队列一般简称为 MQ (Messges Queue),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器。消息队列本质上是一个队列,而队列中存放的是一个个消息。

队列是一个数据结构,具有先进先出的特点。而消息队列就是将消息放到队列里,用队列做存储消息的介质。消息的发送放称为生产者,消息的接收方称为消费者。

消息队列由 Broker(消息服务器,核心部分)、Producer(消息生产者)、Consumer(消息消费者)、Topic(主题)、Queue(队列)和Message(消息体)组成。

3. 消息队列特点
消息队列有三个作用,分别是削峰、解耦和异步。

流量削峰:主要用于在高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

假设系统只能处理1000个请求,但这时突然来了3000个请求,如果不加以限制就会造成系统瘫痪。使用消息队列做缓冲,将多余的请求存放在消息队列中,等系统根据自己处理请求的能力去消息队列去。

应用解耦:主要用于当一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理时,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

假设某个服务 A 需要调用服务 B,但是服务 B 突然出现问题,这样会导致服务 A 也会出现问题。如果使用消息队列,当服务 A 执行完成之后,发送一条消息到队列中,服务 B 读取到这条消息,那么它立刻开始进行业务的执行。

异步通信:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

假设有一个业务,要先执行服务 A ,然后服务 A 去调用服务 B ,当服务 B 完成之后,服务 A 调用服务 C,这个业务需要一步步走下去。当使用了消息队列之后,服务 A 完成之后,可以同时执行服务 B 和 服务 C ,这样就减低业务的响应时间,提高用户体验。

消息队列简单例子
一下是WIN10的操作步骤:
1.打开运行,输入”OptionalFeatures”,钩上Microsoft Message Queue(MSMQ)服务器。

2. 消息队列分为以下几种,每种队列的路径表示形式如下:

公用队列 MachineName\QueueName

专用队列 MachineName\Private$\QueueName

日记队列 MachineName\QueueName\Journal$

计算机日记队列 MachineName\Journal$

计算机死信队列 MachineName\Deadletter$

计算机事务性死信队列 MachineName\XactDeadletter$

这里的MachineName可以用 “.”代替,代表当前计算机

3.打开“此电脑”–>左上角点击“计算机”–>点击“管理”

注意:如果没看到“消息队列”这个选项,那就是这东西在这电脑上还没使用过,没关系,一会编程的时候创建就会显示出来了(我印象中一开始我也没有这个选项,写了代码后就有了)

知道这东西在哪里后,开始进入正题。C#中使用消息队列需要添加新的引用System.Messaging

4.添加引用后在代码里添加命名空间就可以开始了

这里我们新建一个控制台程序:

存入数据:

static void Main()
{
//存
string msgPath = “.\\Private$\\MyMsg”;
//指定路径一个名称为“MyMsg”的专用队列的路径字符串
string studentName = “hello word”;
// 要写入消息消息队列的信息
SendMsg(msgPath, studentName);
Console.WriteLine(“发送成功” + studentName);
Console.ReadLine();
}
/// 存
/// </summary>
/// <param name=”mQPath”></param>
/// <param name=”studentName”></param>
public static void SendMsg (string mQPath,string studentName)
{
//先判断这个消息队列是否存在,如果存在则直接实例化对象,如果不存在则创建该消息队列
MessageQueue mq = (MessageQueue.Exists(mQPath)) ? (new MessageQueue(mQPath)) : (MessageQueue.Create(mQPath));
mq.Send(studentName);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
读取数据:

static void Main()
{
//读取数据
string msgPath = “.\\Private$\\MyMsg”;
ReceiveMsg(msgPath);//异步读取
// ReceiveMsgT(msgPath); 同步读取
Console.ReadLine();

}
///异步读取
private static void ReceiveMsg(string msgPath) {
if (MessageQueue.Exists(msgPath))
{
MessageQueue mq=new MessageQueue(msgPath);
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(ReceiveMethon);
mq.BeginReceive(MessageQueue.InfiniteTimeout);
Console.WriteLine(“异步已接收数据”);
}

}
static readonly XmlMessageFormatter f=new XmlMessageFormatter(new Type[] { typeof(string)}); //格式
private static void ReceiveMethon(object sender,ReceiveCompletedEventArgs e) {
Message m=e.Message;
m.Formatter = f;//转换格式
Console.WriteLine(m.Body.ToString());
}
//同步读取
private static void ReceiveMsgT(string msgPath) {
if (MessageQueue.Exists(msgPath))
{
MessageQueue mq= new MessageQueue(msgPath);
Message m = mq.Receive();
m.Formatter= f;//格式转换
Console.WriteLine(m.Body.ToString());
Console.WriteLine(“同步已接受数据”);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
这里简单说一下两种方式读取的区别,同步读取数据的话,会堵塞当前线程,直到读到数据为止才继续运行之后的程序,异步读取的话就避免了这个问题。如果读取的数据量小、速度快,为了编写代码方便可选择同步读取,如果读取的数据量大、速度慢,则建议使用异步读取。

消息队列有点像我们常见的数据库,能进行数据的存取,但是与数据库其中一点不同的就是消息队里中的消息被读取后就会被销毁,不需要我们手动删除。
————————————————
版权声明:本文为CSDN博主「Shuai_Sir」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Shuai_Sir/article/details/127899893