本地CPU+6G内存部署类ChatGPT模型(Vicuna 小羊驼) - 知乎

mikel阅读(458)

来源: 本地CPU+6G内存部署类ChatGPT模型(Vicuna 小羊驼) – 知乎

1.配置介绍

笔记本硬件配置为R7 4800U + 32g内存,但是实际上8g或者16g就够了

操作系统为Deepin 20,其他操作系统应该也不影响

2.环境配置

conda create -n llm python=3.9
conda activate llm
conda install pytorch==1.13.1 cpuonly -c pytorch
pip install llama-cpp-python

llama-cpp-python是llama.cpp的Python接口,具体介绍看下面链接

3. 模型下载

这里我们要下载4-bit量化的Vicuna模型,有7B和13B两个版本,每个版本里面还细分了几个版本,主要是速度和准确率之间的平衡,根据情况自己下载

4.模型执行

from llama_cpp import Llama

# 路径替换成你自己的模型路径
llm = Llama(model_path="./models/ggml-vicuna-13b-1.1-q4_2.bin")


while True:
    # Name the planets in the solar system?
    prompt = input("prompt: ")
    # 相关控制参数自己调节
    output = llm(f"Q:{prompt} A:", max_tokens=128, stop=[roles[0]], echo=False)
    print("Answer: ", output["choices"][0]["text"])

5.测试示例

(1) Implement a Python function to find the longest common subsequence of two input strings using dynamic programming.

Here is an example implementation of a Python function that finds the longest common subsequence of two input strings using dynamic programming:
```python
def lcs(s1, s2):
    n = len(s1)
    m = len(s2)

    # Initialize dp table with not available values
    dp = [[None] * (m+1) for _ in range(n+1)]

    # Fill dp table with starting value of 0
    for i in range(1, n+1):
        dp[i][0] = 0

    # Fill dp table with starting value of 0
    for j in range(1, m+1):
        dp[0][j] = 0

    # Iterate over all possible subsequences of s1 and s2
    for i in range(1, n+1):
        for j in range(1, m+1):
            if s1[i-1] == s2[j-1]:
                # If current characters match, increment dp[i][j] by 1
                dp[i][j] = max(dp[i][j], dp[i-1][j-1] + 1)
            else:
                # If current characters do not match, use the larger of the two options
                dp[i][j] = max(dp[i][j], max(dp[i-1][j], dp[i][j-1]))

    # Return the longest common subsequence
    return dp[n][m]
```
This function takes two input strings `s1` and `s2`, and returns the longest common subsequence of the two strings.

(2) 锅包肉怎么做

锅包肉是一道非常有名的广东菜,制作方法简单,口感鲜美。以下是一份锅包肉的做法:

所需材料:

* 猪里脊肉 500克
* 白蒜 3个
* 大葱 1根
* 洋葱 1根
* 红椒 1个
* 花椒 1个
* 生抽 适量
* 食用油 适量
* 盐、胡椒粉、蒜泥 适量

步骤:
1. 将猪里脊肉切成小块,用刀刻上竖直线,保持原有形状。
2. 將白蒜、大葱和洋葱切成顺序,再将红椒和花椒切成小段。
3. 在碗中加入生抽,用中火加热,放入肉片后盖上适量的食用油。
4. 当肉片发达香气时,将白蒜、大葱和洋葱一起加入碗中,用中火燒烂,直到肉片受烧后取出。

6. 总结

7B大小的Vicuna模型4-bit量化后,效果感觉不太好,尤其是中文,不如ChatGLM-6B

13B的的Vicuna模型4-bit量化后,效果还可以,英文可能达到了宣称的那样,中文感觉还略差于ChatGLM-6B一点

thinkphp:html中的if condition 嵌套写法、条件判断的各种情况(eq、neq、gt、lt、or、and)_

mikel阅读(264)

来源: thinkphp:html中的if condition 嵌套写法、条件判断的各种情况(eq、neq、gt、lt、or、and)_<if condition=-CSDN博客

伪代码 术语符号
大于 gt
小于 lt
等于 eq
不等于 neq
或者 or
并且 and
变量start_time代表的含义是活动的开始时间,变量end_time代表的含义是活动的结束时间。两个变量都是时间戳的格式。下面就将显示状态一列的数据,各种情况下的判断条件列举出来。

等于 eq

<td>
<if condition=”$vo[‘start_time’] eq $vo[‘end_time’] “>进行中
<else>已结束</else>
</if>
</td>

image.png

不等于 neq

<td>
<if condition=”$vo[‘start_time’] neq $vo[‘end_time’] “>进行中
<else>已结束</else>
</if>
</td>

image.png

大于 gt

<td>
<if condition=”$vo[‘start_time’] gt 0 “>进行中
<else>已结束</else>
</if>
</td>

image.png

小于 lt

<td>
<if condition=”$vo[‘start_time’] lt 0 “>进行中
<else>已结束</else>
</if>
</td>

image.png

或者 or

<td>
<if condition=”($vo[‘start_time’] – $time gt 0) OR ($time neq 666) “>
<else>已结束</else>
</if>
</td>

image.png

并且 and

<td>
<if condition=”($vo[‘start_time’] gt 0) AND ($time eq 666) “>进行中
<else>已结束</else>
</if>
</td>

image.png

各种if和else的嵌套
超过一个if else的写法

<div align=”center”>
<if condition=”$company_data.approval eq 0 “>
<!– 审核中 –>
<img id=”imgChange” src=”__TMPL__/public/assets/images/step2.png” alt=””>
<elseif condition=”$company_data.approval eq 1 “>
<!– 审核通过 –>
<img id=”imgChange” src=”__TMPL__/public/assets/images/step3.png” alt=””>
<elseif condition=”($company_data.approval eq 2) OR ($company_data.approval eq 99) “>
<!– 审核被拒或者未认证 –>
<img id=”imgChange” src=”__TMPL__/public/assets/images/step1.png” alt=””>
<else />
<!– 其余情况 –>
<img id=”imgChange” src=”__TMPL__/public/assets/images/step1.png” alt=””>
</if>
</div>
一个if else的写法

<if condition=”$company_data.approval eq 0 “>
<div align=”center” id=”step2ID” class=”step2_class” style=””>
<label class=”step2Class” style=”color:Red;font-size:30px;” id=”step2Label”>人工审核中,请等待1-2天
</label>
</div>
<else>
<div align=”center” id=”step2ID” class=”step2_class” style=”display:none;”>
<label class=”step2Class” style=”color:Red;font-size:30px;” id=”step2Label”>人工审核中,请等待1-2天
</label>
</div>
</else>

</if>

————————————————
版权声明:本文为CSDN博主「sdwflqzfx」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/sdwflqzfx/article/details/104200448

安装 Puppeteer 时跳过 Chromium 下载 | 自由行

mikel阅读(460)

来源: 安装 Puppeteer 时跳过 Chromium 下载 | 自由行

Puppeteer 包含的 Chromium 因为体积过大,我们在升级 Puppeteer 时,希望可以跳过 Chromium 重新安装,本文介绍这种方法。

下载Chromium

默认情况下,下载 puppeteer 的同时,执行

$ npm i puppeteer

会自动下载 Chromium,在命令行里会看到下面的日志:

Downloading Chromium r672088 – 108 Mb [========== ] 49% 25.7s
Chromium downloaded to /你的路径

看到这些,这说明 Chromium 已经下载完成。

另外,如果本地已经有了 Chrome/Chromium 或者 准备用远程的 Chrome/Chromium,可以只安装 Puppeteer的核心功能。运行下面的命令:

$ npm i puppeteer-core

这样安装的 puppeteer 将不包含 Chromium

二者关系

Puppeteer是在Chromium上层的脚本,以 CDP 协议控制 Chromium 的行为。二者关系大致如下图。

跳过下载Chromium

由于 Chromium 体积过大(>100M),有时候本地已经安装了Chromium,在后续升级 Puppeteer 时,不需要重新下载Chromium,这时候需要跳过 Chromium 的下载。

PUPPETEER_SKIP_CHROMIUM_DOWNLOAD

跳过的方法是:

$ PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=true npm i puppeteer

其实,只要有 PUPPETEER_SKIP_CHROMIUM_DOWNLOAD 这个环境变量存在(无论其值是不是 true ),都不会下载 Chromium. 跳过的时候,会提示下面的文字

**INFO** Skipping Chromium download. "PUPPETEER_SKIP_CHROMIUM_DOWNLOAD" environment variable was found.

注意

如果第一次安装 Puppeteer 的时候,使用了 PUPPETEER_SKIP_CHROMIUM_DOWNLOAD 环境变量,那么安装的 Puppeteer 中不会包含 Chromium. 这是即使删除 PUPPETEER_SKIP_CHROMIUM_DOWNLOAD 变量(unset),再次执行 npm i puppeteer 命令也不会重新下载 Chromium了,需要删除 puppeteer 重新安装。

五分钟看完,彻底理解协变逆变 - BruceNeter - 博客园

mikel阅读(396)

来源: 五分钟看完,彻底理解协变逆变 – BruceNeter – 博客园

其实这是C#的老知识点了,但是今天发现同事对这个竟然还一知半解,就和他们讲解了下,顺便也回顾了下,同事我也把我对这个的全部理解,融化成几分钟的讲解,保证大家5分钟内全部理解,看不懂来打我。

 

协变、逆变 解决的问题

泛型类型转换

比如Person类是Student的父类,我们平时可以直接:

Person A = new Student();

这是所谓的隐式转换,相信百分之999.99%的人都知道。
然后随着大家写代码越来越多,就会遇到这样的场景。

//我有一个集合
//我手上有一批学生
IEnumerable<Student> students = new List<Student>();
//我要他们先做人
IEnumerable<Person> peoples = students;

第一次看到这种代码,其实哪怕你一点不知道协变,逆变,你也觉得这是一段正常不过的代码,因为每个学生都是人,都可以直接转成 这个类型,那我一批学生不就是一批人吗。是的,你这样想绝对没错,不然微软怎么会能让你这样写没问题还编译通过呢?
但是如果我自己写一个:

//定义一个工作的泛型接口
public interface IWork<T> 
{
            
}

实现类
public class Work<T> : IWork<T> 
{
            
}

//直接报错
IWork<Person> work = new Work<Student>();;

现实给了我们当头一棒,这时候,我们应该找到 IEnumerable,选中然后狠狠的F12去看一下,为什么官方的就可以。

我们发现官方在泛型前面多了一个out关键字。破案了~
现在我们在我们的代码中也加入out关键字

public interface IWork<out T> 
{
            
}

public class Work<T> : IWork<T> 
{
            
}
IWork<Person> work = new Work<Student>();

OK~代码正常运行。

原则核心

这里开始我们挑战五分钟速通,如果按照正常博客上来先讲概念,别说五分钟了,可能大家也就迷迷糊糊地看完了,所以我们直接整活。

核心依据

正如数学的发展是从1+1=2作为开始,我们也需要一些真理来支撑我们讲下去。那么我们的核心依据就是:
里氏替换——C#里,子类转父类可以直接隐式转换
就这么短,就完事了?对,记住就行!!!

Out/In 输入输出?

讲到这里,我们继续忽悠,out是啥?来个翻译!不就是输出吗?in是啥,不就是输入吗?那么带入一下,Out不就是返回值吗,In不就是入参吗。那不就是方法的特征么。(先假设,再假设)

In:那么根据核心依据,子类转父类可以直接转,入参如果限定是Person类型,那么你给我限定为Student或者任意的Person类型的派生类,我都是可以接受的,因为都是安全的,可以直接转换过来的。

这种从基类转向派生类的兼容,就是所谓的逆变。
说白了,我让你给我一个人,你说不行,我给你找个学生,那肯定是满足需求的。

Out:Out代表的是返回值,根据核心依据,我返回的是Student类型,你说不行,你给我返回Person类型,那我不是笑开花了,我连Student都能返回,你让我返回父类,那我不是直接转就过去了,总归是类型安全的。

这种从派生类转向基类的兼容,就是所谓的协变。
说白了,我可以造个学生,结果你说给个人就行, 那不是so easy。

In示意图

Out示意图

证明

好了,我们说了这么多,至少证明下In/Out是代表的入参和返回值吧?直接show you code:
当Out作为返回值时的泛型没有问题,但是入参就报错了

当In作为入参时的泛型没有问题,但是返回值就报错了

好了,这还需要再解释吗?最后我们总结下,逆变和协变就是让方法有了泛型类型上的转换能力,强化了方法的多态能力。

问题点

1.属性为啥可以用逆变协变?
属性不就是get/set方法。
2.为什么接口和委托可以用逆变协变,类不行?
拜托你找一下共同点,接口和委托的共同点,都是行为,也就是方法为核心。接口里不能有字段。这也印证了我说的逆变协变最终是为方法服务的。
之所以类不行,我大概理解是方法和实例是分开的,本身不和实例存储在一起,也不是每个实例一份,如果逆变和协变可以服务类,那么会出现同样的类型,但是每个实例内部的同一个字段的类型都不一样,这对于存储和类型安全都是问题。
3.逆变和协变有啥用?
当你…设计问题,我就有遇到,有时候用上能更加优雅或者灵活的写代码吧,看你吧,少年。

Kafka【入门】就这一篇! - 知乎

mikel阅读(374)

来源: Kafka【入门】就这一篇! – 知乎

一、Kafka 简介


Kafka 创建背景

Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。

近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。

Kafka 简介

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

Kafka 基础概念

概念一:生产者与消费者

对于 Kafka 来说客户端有两种基本类型:生产者(Producer)消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层做了封装。

这很容易理解,生产者(也称为发布者)创建消息,而消费者(也称为订阅者)负责消费or读取消息。

概念二:主题(Topic)与分区(Partition)

在 Kafka 中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入分区(Partition)的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。

概念三:Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。(现在动不动就百万量级..我特地去查了一把,好像确实集群的情况下吞吐量挺高的..摁..)

若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。

概念四:多集群

随着业务发展,我们往往需要多集群,通常处于下面几个原因:

  • 基于数据的隔离;
  • 基于安全的隔离;
  • 多数据中心(容灾)

当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。

上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。

二、Kafka 的设计与实现


上面我们知道了 Kafka 中的一些基本概念,但作为一个成熟的「消息队列」中间件,其中有许多有意思的设计值得我们思考,下面我们简单列举一些。

讨论一:Kafka 存储在文件系统上

是的,您首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。

现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。

上述的 Topic 其实是逻辑上的概念,面相消费者和生产者,物理上存储的其实是 Partition,每一个 Partition 最终对应一个目录,里面存储所有的消息和索引文件。默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹,这里的命名规则是:<topic_name>-<partition_id>

任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。

每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition中。

讨论二:Kafka 中的底层存储设计

假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「topic1」和「topic2」,Partition 数量分别为 1、2,那么我们的根目录下就会创建如下三个文件夹:

    | --topic1-0
    | --topic2-0
    | --topic2-1

在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件。

现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:

    | --topic1-0
        | --00000000000000000000.index
        | --00000000000000000000.log
        | --00000000000000368769.index
        | --00000000000000368769.log
        | --00000000000000737337.index
        | --00000000000000737337.log
        | --00000000000001105814.index
        | --00000000000001105814.log
    | --topic2-0
    | --topic2-1

Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。

以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:

其中以索引文件中元数据 <3, 497> 为例,依次在数据文件中表示第 3 个 message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497。

注意该 index 文件并不是从0开始,也不是每次递增1的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。

因为其文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。

由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这种顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。

Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:

  • offset:表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message,可以简单的认为是一个 id;
  • MessageSize:表示 message 内容 data 的大小;
  • data:message 的具体内容

讨论三:生产者设计概要

当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?

举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。

再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。

不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:

流程如下:

  1. 首先,我们需要创建一个ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
  2. 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
  3. 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
  4. 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
  5. 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常。
  6. 生产者接收到结果后,对于异常可能会进行重试。

讨论四:消费者设计概要

消费者与消费组

假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。

Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:

如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:

如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:

但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:

总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应用。

最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费组与分区重平衡

可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

Partition 与消费模型

上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?

答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。

下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition中被消费的消息是何时删除的? Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?

无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。
Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:

为什么 Kafka 是 pull 模型

消费者应该向 Broker 要数据(pull)还是 Broker 向消费者推送数据(push)?作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,比如 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适。pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

讨论五:Kafka 如何保证可靠性

当我们讨论可靠性的时候,我们总会提到保证*这个词语。可靠性保证是基础,我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

Kafka 中的可靠性保证有如下四点:

  • 对于一个分区来说,它的消息是有序的。如果一个生产者向一个分区先写入消息A,然后写入消息B,那么消费者会先读取消息A再读取消息B。
  • 当消息写入所有in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不一定刷新到磁盘。生产者可以等待不同时机的确认,比如等待分区主副本写入即返回,后者等待所有in-sync状态副本写入才返回。
  • 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
  • 消费者只能读取到已提交的消息。

使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。

想了解更多戳这里:dengshenyu.com/%E5%88%8

三、动手搭一个 Kafka


通过上面的描述,我们已经大致了解到了「Kafka」是何方神圣了,现在我们开始尝试自己动手本地搭一个来实际体验一把。

第一步:下载 Kafka

这里以 Mac OS 为例,在安装了 Homebrew 的情况下执行下列代码:

brew install kafka

由于 Kafka 依赖了 Zookeeper,所以在下载的时候会自动下载。

第二步:启动服务

我们在启动之前首先需要修改 Kafka 的监听地址和端口为 localhost:9092

vi /usr/local/etc/kafka/server.properties

然后修改成下图的样子:

依次启动 Zookeeper 和 Kafka:

brew services start zookeeper
brew services start kafka

然后执行下列语句来创建一个名字为 “test” 的 Topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我们可以通过下列的命令查看我们的 Topic 列表:

kafka-topics --list --zookeeper localhost:2181

第三步:发送消息

然后我们新建一个控制台,运行下列命令创建一个消费者关注刚才创建的 Topic:

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

用控制台往刚才创建的 Topic 中添加消息,并观察刚才创建的消费者窗口:

kafka-console-producer --broker-list localhost:9092 --topic test

能通过消费者窗口观察到正确的消息:

参考资料


  1. infoq.cn/article/kafka- – Kafka 设计解析(一):Kafka 背景及架构介绍
  2. dengshenyu.com/%E5%88%8 – Kafka系列(一)初识Kafka
  3. lotabout.me/2018/kafka- – Kafka 入门介绍
  4. zhihu.com/question/2892 – Kafka 中的 Topic 为什么要进行分区? – 知乎
  5. blog.joway.io/posts/kaf – Kafka 的设计与实践思考
  6. dengshenyu.com/%E5%88%8 – Kafka系列(六)可靠的数据传输

MySQL update使用select的结果 - 简书

mikel阅读(541)

来源: MySQL update使用select的结果 – 简书

1. MySQL UPDATE JOIN语法

在MySQL中,可以在 UPDATE语句 中使用JOIN子句执行跨表更新。MySQL UPDATE JOIN的语法如下:

UPDATE T1
[INNER JOIN | LEFT JOIN] T2 ON T1.C1 = T2. C1
SET T1.C2 = T2.C2, 
    T2.C3 = expr
WHERE condition

更详细地看看MySQL UPDATE JOIN语法:

  • 首先,在UPDATE子句之后,指定主表(T1)和希望主表连接表(T2)。
  • 第二,指定一种要使用的连接,即INNER JOINLEFT JOIN和连接条件。JOIN子句必须出现在UPDATE子句之后。
  • 第三,要为要更新的T1和/或T2表中的列分配新值。
  • 第四,WHERE子句中的条件用于指定要更新的行。

2. 示例

首先,我们将在这些例子中使用一个新的示例数据库(empdb)。示例数据库包含2个表:

  • employees表将存储在员工编号,姓名,工作表现和工资的数据。
  • merits表存储员工绩效和绩效百分比。
    以下语句在 empdb 示例数据库中创建表并导入数据:

CREATE DATABASE IF NOT EXISTS empdb;

USE empdb;
-- create tables
CREATE TABLE merits (
    performance INT(11) NOT NULL,
    percentage FLOAT NOT NULL,
    PRIMARY KEY (performance)
);

CREATE TABLE employees (
    emp_id INT(11) NOT NULL AUTO_INCREMENT,
    emp_name VARCHAR(255) NOT NULL,
    performance INT(11) DEFAULT NULL,
    salary FLOAT DEFAULT NULL,
    PRIMARY KEY (emp_id),
    CONSTRAINT fk_performance FOREIGN KEY (performance)
        REFERENCES merits (performance)
);

-- insert data for merits table
INSERT INTO merits(performance,percentage)
VALUES(1,0),
      (2,0.01),
      (3,0.03),
      (4,0.05),
      (5,0.08);

-- insert data for employees table
INSERT INTO employees(emp_name,performance,salary)      
VALUES('Mary Doe', 1, 50000),
      ('Cindy Minsu', 3, 65000),
      ('Sue Greenspan', 4, 75000),
      ('Grace Dell', 5, 125000),
      ('Nancy Johnson', 3, 85000),
      ('John Doe', 2, 45000),
      ('Lily Bush', 3, 55000);

2.1 使用INNER JOIN子句的MySQL UPDATE JOIN示例

假设想根据员工的工作表现来调整员工的工资。
因此,优点百分比存储在 merits 表中,必须使用 UPDATE INNER JOIN 语句根据存储在 merits 表中的百分比来调整 employees 表中员工的工资。
employeesmerits 表之间以是 performance 字段相关联的。 请参阅以下查询:

UPDATE employees
        INNER JOIN
    merits ON employees.performance = merits.performance 
SET 
    salary = salary + salary * percentage;

上面查询语句的工作原理是什么?
我们仅在 UPDATE 子句之后指定 employees 表,因为我们只想更新 employees 表中的数据。
对于 employees 表中的每一行,查询根据 merits 表中 performance 列中的值来检查 employees 表中的 performance 列中的值。 如果找到一个匹配,它将获得 merits 表中的百分比,并更新 employees 表中的 salary列。

mysql> select * from employees; -- 更新之前的数据
+--------+---------------+-------------+--------+
| emp_id | emp_name      | performance | salary |
+--------+---------------+-------------+--------+
|      1 | Mary Doe      |           1 |  50000 |
|      2 | Cindy Minsu   |           3 |  65000 |
|      3 | Sue Greenspan |           4 |  75000 |
|      4 | Grace Dell    |           5 | 125000 |
|      5 | Nancy Johnson |           3 |  85000 |
|      6 | John Doe      |           2 |  45000 |
|      7 | Lily Bush     |           3 |  55000 |
+--------+---------------+-------------+--------+
7 rows in set

mysql> UPDATE employees
        INNER JOIN
    merits ON employees.performance = merits.performance 
SET 
    salary = salary + salary * percentage; -- 执行连接更新
Query OK, 6 rows affected
Rows matched: 7  Changed: 6  Warnings: 0

mysql> select * from employees; -- 更新之后的数据
+--------+---------------+-------------+--------+
| emp_id | emp_name      | performance | salary |
+--------+---------------+-------------+--------+
|      1 | Mary Doe      |           1 |  50000 |
|      2 | Cindy Minsu   |           3 |  66950 |
|      3 | Sue Greenspan |           4 |  78750 |
|      4 | Grace Dell    |           5 | 135000 |
|      5 | Nancy Johnson |           3 |  87550 |
|      6 | John Doe      |           2 |  45450 |
|      7 | Lily Bush     |           3 |  56650 |
+--------+---------------+-------------+--------+
7 rows in set

因为省略了 UPDATE 语句中的 WHERE 子句,所以 employees表中的所有记录都被更新。如果需要 performance 等级大于1的员工才更新薪资,那么 sql 可以这样写:

UPDATE employees
        INNER JOIN
    merits ON employees.performance = merits.performance 
SET 
    salary = salary + salary * percentage
WHERE employees.performance > 1;

使用LEFT JOIN的MySQL UPDATE JOIN示例

假设公司又雇用了两名新员工:

INSERT INTO employees(emp_name,performance,salary)
VALUES('Jack William',NULL,43000),
      ('Ricky Bond',NULL,52000);

因为这些员工是新员工,所以他们的绩效(performance)数据不可用或为NULL。现在
employees 表中的数据,如下所示:

mysql> SELECT * FROM employees;
+--------+---------------+-------------+--------+
| emp_id | emp_name      | performance | salary |
+--------+---------------+-------------+--------+
|      1 | Mary Doe      |           1 |  50000 |
|      2 | Cindy Minsu   |           3 |  66950 |
|      3 | Sue Greenspan |           4 |  78750 |
|      4 | Grace Dell    |           5 | 135000 |
|      5 | Nancy Johnson |           3 |  87550 |
|      6 | John Doe      |           2 |  45450 |
|      7 | Lily Bush     |           3 |  56650 |
|      8 | Jack William  | NULL        |  43000 |
|      9 | Ricky Bond    | NULL        |  52000 |
+--------+---------------+-------------+--------+
9 rows in set

要计算新员工的工资,不能使用 UPDATE INNER JOIN 语句(为什么不能,可参考sql之left join、right join、inner join的区别),因为它们的绩效数据在 merits表中不可用。这就是为什么要使用 UPDATE LEFT JOIN 来实现了。
UPDATE LEFT JOIN 语句在另一个表中没有相应行时,就会更新表中的一行。
例如,可以使用以下语句将新雇员的工资增加1.5%:

UPDATE employees
        LEFT JOIN
    merits ON employees.performance = merits.performance 
SET 
    salary = salary + salary * 0.015
WHERE
    merits.percentage IS NULL;

执行结果如下:

mysql> UPDATE employees
        LEFT JOIN
    merits ON employees.performance = merits.performance 
SET 
    salary = salary + salary * 0.015
WHERE
    merits.percentage IS NULL;
Query OK, 2 rows affected
Rows matched: 2  Changed: 2  Warnings: 0

mysql> select * from employees;
+--------+---------------+-------------+--------+
| emp_id | emp_name      | performance | salary |
+--------+---------------+-------------+--------+
|      1 | Mary Doe      |           1 |  50000 |
|      2 | Cindy Minsu   |           3 |  66950 |
|      3 | Sue Greenspan |           4 |  78750 |
|      4 | Grace Dell    |           5 | 135000 |
|      5 | Nancy Johnson |           3 |  87550 |
|      6 | John Doe      |           2 |  45450 |
|      7 | Lily Bush     |           3 |  56650 |
|      8 | Jack William  | NULL        |  43645 |
|      9 | Ricky Bond    | NULL        |  52780 |
+--------+---------------+-------------+--------+
9 rows in set

扩展阅读

sql之left join、right join、inner join的区别

作者:sprainkle
链接:https://www.jianshu.com/p/60b3f987c477
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

如何解决“未在本地计算机上注册“Microsoft.ACE.OLEDB.12.0”提供程序”问题 - 小小邪 - 博客园

mikel阅读(464)

https://www.microsoft.com/zh-cn/download/details.aspx?id=13255

来源: 如何解决“未在本地计算机上注册“Microsoft.ACE.OLEDB.12.0”提供程序”问题 – 小小邪 – 博客园

最近在做Excel文件导入时候,出现”未在本地计算机上注册“Microsoft.ACE.OLEDB.12.0”提供程序” 问题

产生原因:这个问题一般是在导入Excel文件的时候报的错,原因是缺少了相对应的Microsoft Access Database Engine组件。

解决方法:安装AccessDatabaseEngine插件

1)访问下载路径(https://www.microsoft.com/zh-cn/download/details.aspx?id=13255),选择语言,点击下载

2)选择合适程序下载(32位或64位)

3)点击AccessDatabaseEngine.exe 进行安装

4)安装完成后,重启IIS或重新打开SQL Server管理工具导入(我是用程序调用的,需要重启IIS)

参考网址:https://www.bianchengquan.com/article/336648.html

redis 的五种数据类型有哪些呢?

mikel阅读(676)

来源: redis 的五种数据类型有哪些呢?

作者:Linux远航者
链接:https://www.zhihu.com/question/483702040/answer/3041484727
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

1. 内容大纲

Redis 提供了丰富的数据类型,常见的有五种:String(字符串),Hash(哈希),List(列表),Set(集合)、Zset(有序集合)

随着 Redis 版本的更新,后面又支持了四种数据类型: BitMap(2.2 版新增)、HyperLogLog(2.8 版新增)、GEO(3.2 版新增)、Stream(5.0 版新增)

每种数据对象都各自的应用场景,你能说出它们各自的应用场景吗?这次我们就来学习 Redis 数据类型的使用以及应用场景

2. String

2.1 介绍

String 是最基本的 key-value 结构,key 是唯一标识,value 是具体的值,value其实不仅是字符串, 也可以是数字(整数或浮点数),value 最多可以容纳的数据长度是 512M。

2.2 内部实现

String 类型的底层的数据结构实现主要是 int 和 SDS(简单动态字符串)。

SDS 和我们认识的 C 字符串不太一样,之所以没有使用 C 语言的字符串表示,因为 SDS 相比于 C 的原生字符串:

  • SDS 不仅可以保存文本数据,还可以保存二进制数据。因为 SDS 使用 len 属性的值而不是空字符来判断字符串是否结束,并且 SDS 的所有 API 都会以处理二进制的方式来处理 SDS 存放在 buf[] 数组里的数据。所以 SDS 不光能存放文本数据,而且能保存图片、音频、视频、压缩文件这样的二进制数据。
  • SDS 获取字符串长度的时间复杂度是 O(1)。因为 C 语言的字符串并不记录自身长度,所以获取长度的复杂度为 O(n);而 SDS 结构里用 len 属性记录了字符串长度,所以复杂度为 O(1)。
  • Redis 的 SDS API 是安全的,拼接字符串不会造成缓冲区溢出。因为 SDS 在拼接字符串之前会检查 SDS 空间是否满足要求,如果空间不够会自动扩容,所以不会导致缓冲区溢出的问题。

字符串对象的内部编码(encoding)有 3 种 :int、raw和 embstr

如果一个字符串对象保存的是整数值,并且这个整数值可以用long类型来表示,那么字符串对象会将整数值保存在字符串对象结构的ptr属性里面(将void*转换成 long),并将字符串对象的编码设置为int。

如果字符串对象保存的是一个字符串,并且这个字符申的长度小于等于 32 字节(redis 2.+版本),那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串,并将对象的编码设置为embstr, embstr编码是专门用于保存短字符串的一种优化编码方式:

如果字符串对象保存的是一个字符串,并且这个字符串的长度大于 32 字节(redis 2.+版本),那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串,并将对象的编码设置为raw:

注意,embstr 编码和 raw 编码的边界在 redis 不同版本中是不一样的:

  • redis 2.+ 是 32 字节
  • redis 3.0-4.0 是 39 字节
  • redis 5.0 是 44 字节

可以看到embstr和raw编码都会使用SDS来保存值,但不同之处在于embstr会通过一次内存分配函数来分配一块连续的内存空间来保存redisObject和SDS,而raw编码会通过调用两次内存分配函数来分别分配两块空间来保存redisObject和SDS。Redis这样做会有很多好处:

  • embstr编码将创建字符串对象所需的内存分配次数从 raw 编码的两次降低为一次;
  • 释放 embstr编码的字符串对象同样只需要调用一次内存释放函数;
  • 因为embstr编码的字符串对象的所有数据都保存在一块连续的内存里面可以更好的利用 CPU 缓存提升性能。

但是 embstr 也有缺点的:

  • 如果字符串的长度增加需要重新分配内存时,整个redisObject和sds都需要重新分配空间,所以embstr编码的字符串对象实际上是只读的,redis没有为embstr编码的字符串对象编写任何相应的修改程序。当我们对embstr编码的字符串对象执行任何修改命令(例如append)时,程序会先将对象的编码从embstr转换成raw,然后再执行修改命令。

2.3 常用指令

普通字符串的基本操作:

# 设置 key-value 类型的值
> SET name lin
OK
# 根据 key 获得对应的 value
> GET name
"lin"
# 判断某个 key 是否存在
> EXISTS name
(integer) 1
# 返回 key 所储存的字符串值的长度
> STRLEN name
(integer) 3
# 删除某个 key 对应的值
> DEL name
(integer) 1

批量设置 :

# 批量设置 key-value 类型的值
> MSET key1 value1 key2 value2 
OK
# 批量获取多个 key 对应的 value
> MGET key1 key2 
1) "value1"
2) "value2"

计数器(字符串的内容为整数的时候可以使用):

# 设置 key-value 类型的值
> SET number 0
OK
# 将 key 中储存的数字值增一
> INCR number
(integer) 1
# 将key中存储的数字值加 10
> INCRBY number 10
(integer) 11
# 将 key 中储存的数字值减一
> DECR number
(integer) 10
# 将key中存储的数字值键 10
> DECRBY number 10
(integer) 0

过期(默认为永不过期):

# 设置 key 在 60 秒后过期(该方法是针对已经存在的key设置过期时间)
> EXPIRE name  60 
(integer) 1
# 查看数据还有多久过期
> TTL name 
(integer) 51

#设置 key-value 类型的值,并设置该key的过期时间为 60 秒
> SET key  value EX 60
OK
> SETEX key  60 value
OK

不存在就插入:

# 不存在就插入(not exists)
>SETNX key value
(integer) 1

2.4 应用场景

2.4.1 缓存对象

使用 String 来缓存对象有两种方式:

  • 直接缓存整个对象的 JSON,命令例子: SET user:1 ‘{“name”:”xiaolin”, “age”:18}’。
  • 采用将 key 进行分离为 user:ID:属性,采用 MSET 存储,用 MGET 获取各属性值,命令例子: MSET user:1:name xiaolin user:1:age 18 user:2:name xiaomei user:2:age 20。

2.4.2 常规计数

因为 Redis 处理命令是单线程,所以执行命令的过程是原子的。因此 String 数据类型适合计数场景,比如计算访问次数、点赞、转发、库存数量等等。

比如计算文章的阅读量:

# 初始化文章的阅读量
> SET aritcle:readcount:1001 0
OK
#阅读量+1
> INCR aritcle:readcount:1001
(integer) 1
#阅读量+1
> INCR aritcle:readcount:1001
(integer) 2
#阅读量+1
> INCR aritcle:readcount:1001
(integer) 3
# 获取对应文章的阅读量
> GET aritcle:readcount:1001
"3"

2.4.3 分布式锁

SET 命令有个 NX 参数可以实现「key不存在才插入」,可以用它来实现分布式锁:

  • 如果 key 不存在,则显示插入成功,可以用来表示加锁成功;
  • 如果 key 存在,则会显示插入失败,可以用来表示加锁失败。

一般而言,还会对分布式锁加上过期时间,分布式锁的命令如下:

SET lock_key unique_value NX PX 10000
  • lock_key 就是 key 键;
  • unique_value 是客户端生成的唯一的标识;
  • NX 代表只在 lock_key 不存在时,才对 lock_key 进行设置操作;
  • PX 10000 表示设置 lock_key 的过期时间为 10s,这是为了避免客户端发生异常而无法释放锁。

而解锁的过程就是将 lock_key 键删除,但不能乱删,要保证执行操作的客户端就是加锁的客户端。所以,解锁的时候,我们要先判断锁的 unique_value 是否为加锁客户端,是的话,才将 lock_key 键删除。

可以看到,解锁是有两个操作,这时就需要 Lua 脚本来保证解锁的原子性,因为 Redis 在执行 Lua 脚本时,可以以原子性的方式执行,保证了锁释放操作的原子性。

// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

这样一来,就通过使用 SET 命令和 Lua 脚本在 Redis 单节点上完成了分布式锁的加锁和解锁。

2.4.4 共享 Session 信息

通常我们在开发后台管理系统时,会使用 Session 来保存用户的会话(登录)状态,这些 Session 信息会被保存在服务器端,但这只适用于单系统应用,如果是分布式系统此模式将不再适用。

例如用户一的 Session 信息被存储在服务器一,但第二次访问时用户一被分配到服务器二,这个时候服务器并没有用户一的 Session 信息,就会出现需要重复登录的问题,问题在于分布式系统每次会把请求随机分配到不同的服务器。

分布式系统单独存储 Session 流程图:

因此,我们需要借助 Redis 对这些 Session 信息进行统一的存储和管理,这样无论请求发送到那台服务器,服务器都会去同一个 Redis 获取相关的 Session 信息,这样就解决了分布式系统下 Session 存储的问题。

分布式系统使用同一个 Redis 存储 Session 流程图:

3. List

3.1 介绍

List 列表是简单的字符串列表,按照插入顺序排序,可以从头部或尾部向 List 列表添加元素。

列表的最大长度为 2^32 – 1,也即每个列表支持超过 40 亿个元素。

3.2 内部实现

List 类型的底层数据结构是由双向链表或压缩列表实现的:

  • 如果列表的元素个数小于 512 个(默认值,可由 list-max-ziplist-entries 配置),列表每个元素的值都小于 64 字节(默认值,可由 list-max-ziplist-value 配置),Redis 会使用压缩列表作为 List 类型的底层数据结构;
  • 如果列表的元素不满足上面的条件,Redis 会使用双向链表作为 List 类型的底层数据结构;

但是在 Redis 3.2 版本之后,List 数据类型底层数据结构就只由 quicklist 实现了,替代了双向链表和压缩列表

3.3 常用命令

# 将一个或多个值value插入到key列表的表头(最左边),最后的值在最前面
LPUSH key value [value ...] 
# 将一个或多个值value插入到key列表的表尾(最右边)
RPUSH key value [value ...]
# 移除并返回key列表的头元素
LPOP key     
# 移除并返回key列表的尾元素
RPOP key 

# 返回列表key中指定区间内的元素,区间以偏移量start和stop指定,从0开始
LRANGE key start stop

# 从key列表表头弹出一个元素,没有就阻塞timeout秒,如果timeout=0则一直阻塞
BLPOP key [key ...] timeout
# 从key列表表尾弹出一个元素,没有就阻塞timeout秒,如果timeout=0则一直阻塞
BRPOP key [key ...] timeout

3.4 应用场景

3.4.1 消息队列

消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性

Redis 的 List 和 Stream 两种数据类型,就可以满足消息队列的这三个需求。我们先来了解下基于 List 的消息队列实现方法,后面在介绍 Stream 数据类型时候,在详细说说 Stream。

1、如何满足消息保序需求?

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

List 可以使用 LPUSH + RPOP (或者反过来,RPUSH+LPOP)命令实现消息队列。

  • 生产者使用 LPUSH key value[value…] 将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。
  • 消费者使用 RPOP key 依次读取队列的消息,先进先出。

不过,在消费者读取数据时,有一个潜在的性能风险点。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个while(1)循环)。如果有新消息写入,RPOP命令就会返回结果,否则,RPOP命令返回空值,再继续循环。

所以,即使没有新消息写入List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

为了解决这个问题,Redis提供了 BRPOP 命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。

2、如何处理重复的消息?

消费者要实现重复消息的判断,需要 2 个方面的要求:

  • 每个消息都有一个全局的 ID。
  • 消费者要记录已经处理过的消息的 ID。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

但是 List 并不会为每个消息生成 ID 号,所以我们需要自行为每个消息生成一个全局唯一ID,生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

例如,我们执行以下命令,就把一条全局 ID 为 111000102、库存量为 99 的消息插入了消息队列:

> LPUSH mq "111000102:stock:99"
(integer) 1

3、如何保证消息可靠性?

当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存

这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

好了,到这里可以知道基于 List 类型的消息队列,满足消息队列的三大需求(消息保序、处理重复的消息和保证消息可靠性)。

  • 消息保序:使用 LPUSH + RPOP;
  • 阻塞读取:使用 BRPOP;
  • 重复消息处理:生产者自行实现全局唯一 ID;
  • 消息的可靠性:使用 BRPOPLPUSH

List 作为消息队列有什么缺陷?

List 不支持多个消费者消费同一条消息,因为一旦消费者拉取一条消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费。

要实现一条消息可以被多个消费者消费,那么就要将多个消费者组成一个消费组,使得多个消费者可以消费同一条消息,但是 List 类型并不支持消费组的实现

这就要说起 Redis 从 5.0 版本开始提供的 Stream 数据类型了,Stream 同样能够满足消息队列的三大需求,而且它还支持「消费组」形式的消息读取。

4. Hash

4.1 介绍

Hash 是一个键值对(key – value)集合,其中 value 的形式如: value=[{field1,value1},…{fieldN,valueN}]。Hash 特别适合用于存储对象。

Hash 与 String 对象的区别如下图所示:

4.2 内部实现

Hash 类型的底层数据结构是由压缩列表或哈希表实现的:

  • 如果哈希类型元素个数小于 512 个(默认值,可由 hash-max-ziplist-entries 配置),所有值小于 64 字节(默认值,可由 hash-max-ziplist-value 配置)的话,Redis 会使用压缩列表作为 Hash 类型的底层数据结构;
  • 如果哈希类型元素不满足上面条件,Redis 会使用哈希表作为 Hash 类型的 底层数据结构。

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了

4.3 常用命令

# 存储一个哈希表key的键值
HSET key field value   
# 获取哈希表key对应的field键值
HGET key field

# 在一个哈希表key中存储多个键值对
HMSET key field value [field value...] 
# 批量获取哈希表key中多个field键值
HMGET key field [field ...]       
# 删除哈希表key中的field键值
HDEL key field [field ...]    

# 返回哈希表key中field的数量
HLEN key       
# 返回哈希表key中所有的键值
HGETALL key 

# 为哈希表key中field键的值加上增量n
HINCRBY key field n                         

4.4. 应用场景

4.4.1 缓存对象

Hash 类型的 (key,field, value) 的结构与对象的(对象id, 属性, 值)的结构相似,也可以用来存储对象。

我们以用户信息为例,它在关系型数据库中的结构是这样的:

我们可以使用如下命令,将用户对象的信息存储到 Hash 类型:

# 存储一个哈希表uid:1的键值
> HMSET uid:1 name Tom age 15
2
# 存储一个哈希表uid:2的键值
> HMSET uid:2 name Jerry age 13
2
# 获取哈希表用户id为1中所有的键值
> HGETALL uid:1
1) "name"
2) "Tom"
3) "age"
4) "15"

Redis Hash 存储其结构如下图:

在介绍 String 类型的应用场景时有所介绍,String + Json也是存储对象的一种方式,那么存储对象时,到底用 String + json 还是用 Hash 呢?

【文章福利】小编推荐自己的Linux C++技术交流群:【1106675687】整理了一些个人觉得比较好的学习书籍、视频资料共享在群文件里面,有需要的可以自行添加哦!!!前100名进群领取,额外赠送大厂面试题。

资料领取直通车:大厂面试题锦集+视频教程

Linux服务器学习网站C/C++Linux服务器开发/后台架构师

一般对象用 String + Json 存储,对象中某些频繁变化的属性可以考虑抽出来用 Hash 类型存储。

4.4.2 购物车

以用户 id 为 key,商品 id 为 field,商品数量为 value,恰好构成了购物车的3个要素,如下图所示。

涉及的命令如下:

  • 添加商品:HSET cart:{用户id} {商品id} 1
  • 添加数量:HINCRBY cart:{用户id} {商品id} 1
  • 商品总数:HLEN cart:{用户id}
  • 删除商品:HDEL cart:{用户id} {商品id}
  • 获取购物车所有商品:HGETALL cart:{用户id}

当前仅仅是将商品ID存储到了Redis 中,在回显商品具体信息的时候,还需要拿着商品 id 查询一次数据库,获取完整的商品的信息。

5. Set

5.1 介绍

Set 类型是一个无序并唯一的键值集合,它的存储顺序不会按照插入的先后顺序进行存储。

一个集合最多可以存储 2^32-1 个元素。概念和数学中个的集合基本类似,可以交集,并集,差集等等,所以 Set 类型除了支持集合内的增删改查,同时还支持多个集合取交集、并集、差集。

Set 类型和 List 类型的区别如下:

  • List 可以存储重复元素,Set 只能存储非重复元素;
  • List 是按照元素的先后顺序存储元素的,而 Set 则是无序方式存储元素的。

5.2 内部实现

Set 类型的底层数据结构是由哈希表或整数集合实现的:

  • 如果集合中的元素都是整数且元素个数小于 512 (默认值,set-maxintset-entries配置)个,Redis 会使用整数集合作为 Set 类型的底层数据结构;
  • 如果集合中的元素不满足上面条件,则 Redis 使用哈希表作为 Set 类型的底层数据结构。

5.3 常用命令

Set常用操作:

# 往集合key中存入元素,元素存在则忽略,若key不存在则新建
SADD key member [member ...]
# 从集合key中删除元素
SREM key member [member ...] 
# 获取集合key中所有元素
SMEMBERS key
# 获取集合key中的元素个数
SCARD key

# 判断member元素是否存在于集合key中
SISMEMBER key member

# 从集合key中随机选出count个元素,元素不从key中删除
SRANDMEMBER key [count]
# 从集合key中随机选出count个元素,元素从key中删除
SPOP key [count]

Set运算操作:

# 交集运算
SINTER key [key ...]
# 将交集结果存入新集合destination中
SINTERSTORE destination key [key ...]

# 并集运算
SUNION key [key ...]
# 将并集结果存入新集合destination中
SUNIONSTORE destination key [key ...]

# 差集运算
SDIFF key [key ...]
# 将差集结果存入新集合destination中
SDIFFSTORE destination key [key ...]

5.4 应用场景

集合的主要几个特性,无序、不可重复、支持并交差等操作。

因此 Set 类型比较适合用来数据去重和保障数据的唯一性,还可以用来统计多个集合的交集、错集和并集等,当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。

但是要提醒你一下,这里有一个潜在的风险。Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞

在主从集群中,为了避免主库因为 Set 做聚合计算(交集、差集、并集)时导致主库被阻塞,我们可以选择一个从库完成聚合统计,或者把数据返回给客户端,由客户端来完成聚合统计。

5.4.1 点赞

Set 类型可以保证一个用户只能点一个赞,这里举例子一个场景,key 是文章id,value 是用户id。

uid:1 、uid:2、uid:3 三个用户分别对 article:1 文章点赞了。

# uid:1 用户对文章 article:1 点赞
> SADD article:1 uid:1
(integer) 1
# uid:2 用户对文章 article:1 点赞
> SADD article:1 uid:2
(integer) 1
# uid:3 用户对文章 article:1 点赞
> SADD article:1 uid:3
(integer) 1

uid:1 取消了对 article:1 文章点赞。

> SREM article:1 uid:1
(integer) 1

获取 article:1 文章所有点赞用户 :

> SMEMBERS article:1
1) "uid:3"
2) "uid:2"

获取 article:1 文章的点赞用户数量:

> SCARD article:1
(integer) 2

判断用户 uid:1 是否对文章 article:1 点赞了:

> SISMEMBER article:1 uid:1
(integer) 0  # 返回0说明没点赞,返回1则说明点赞了

5.4.2 共同关注

Set 类型支持交集运算,所以可以用来计算共同关注的好友、公众号等。

key 可以是用户id,value 则是已关注的公众号的id。

uid:1 用户关注公众号 id 为 5、6、7、8、9,uid:2 用户关注公众号 id 为 7、8、9、10、11。

# uid:1 用户关注公众号 id 为 5、6、7、8、9
> SADD uid:1 5 6 7 8 9
(integer) 5
# uid:2  用户关注公众号 id 为 7、8、9、10、11
> SADD uid:2 7 8 9 10 11
(integer) 5

uid:1 和 uid:2 共同关注的公众号:

# 获取共同关注
> SINTER uid:1 uid:2
1) "7"
2) "8"
3) "9"

给 uid:2 推荐 uid:1 关注的公众号:

> SDIFF uid:1 uid:2
1) "5"
2) "6"

验证某个公众号是否同时被 uid:1 或 uid:2 关注:

> SISMEMBER uid:1 5
(integer) 1 # 返回0,说明关注了
> SISMEMBER uid:2 5
(integer) 0 # 返回0,说明没关注

5.4.3 抽奖活动

存储某活动中中奖的用户名 ,Set 类型因为有去重功能,可以保证同一个用户不会中奖两次。

key为抽奖活动名,value为员工名称,把所有员工名称放入抽奖箱 :

>SADD lucky Tom Jerry John Sean Marry Lindy Sary Mark
(integer) 5

如果允许重复中奖,可以使用 SRANDMEMBER 命令。

# 抽取 1 个一等奖:
> SRANDMEMBER lucky 1
1) "Tom"
# 抽取 2 个二等奖:
> SRANDMEMBER lucky 2
1) "Mark"
2) "Jerry"
# 抽取 3 个三等奖:
> SRANDMEMBER lucky 3
1) "Sary"
2) "Tom"
3) "Jerry"

如果不允许重复中奖,可以使用 SPOP 命令。

# 抽取一等奖1个
> SPOP lucky 1
1) "Sary"
# 抽取二等奖2个
> SPOP lucky 2
1) "Jerry"
2) "Mark"
# 抽取三等奖3个
> SPOP lucky 3
1) "John"
2) "Sean"
3) "Lindy"

6. Zset

6.1 介绍

Zset 类型(有序集合类型)相比于 Set 类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。

有序集合保留了集合不能有重复成员的特性(分值可以重复),但不同的是,有序集合中的元素可以排序。

6.2 内部实现

Zset 类型的底层数据结构是由压缩列表或跳表实现的:

  • 如果有序集合的元素个数小于 128 个,并且每个元素的值小于 64 字节时,Redis 会使用压缩列表作为 Zset 类型的底层数据结构;
  • 如果有序集合的元素不满足上面的条件,Redis 会使用跳表作为 Zset 类型的底层数据结构;

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

6.3 常用命令

Zset 常用操作:

# 往有序集合key中加入带分值元素
ZADD key score member [[score member]...]   
# 往有序集合key中删除元素
ZREM key member [member...]                 
# 返回有序集合key中元素member的分值
ZSCORE key member
# 返回有序集合key中元素个数
ZCARD key 

# 为有序集合key中元素member的分值加上increment
ZINCRBY key increment member 

# 正序获取有序集合key从start下标到stop下标的元素
ZRANGE key start stop [WITHSCORES]
# 倒序获取有序集合key从start下标到stop下标的元素
ZREVRANGE key start stop [WITHSCORES]

# 返回有序集合中指定分数区间内的成员,分数由低到高排序。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

# 返回指定成员区间内的成员,按字典正序排列, 分数必须相同。
ZRANGEBYLEX key min max [LIMIT offset count]
# 返回指定成员区间内的成员,按字典倒序排列, 分数必须相同
ZREVRANGEBYLEX key max min [LIMIT offset count]

Zset 运算操作(相比于 Set 类型,ZSet 类型没有支持差集运算):

# 并集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZUNIONSTORE destkey numberkeys key [key...] 
# 交集计算(相同元素分值相加),numberkeys一共多少个key,WEIGHTS每个key对应的分值乘积
ZINTERSTORE destkey numberkeys key [key...]

6.4 应用场景

Zset 类型(Sorted Set,有序集合) 可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。

在面对需要展示最新列表、排行榜等场景时,如果数据更新频繁或者需要分页显示,可以优先考虑使用 Sorted Set。

6.4.1 排行榜

有序集合比较典型的使用场景就是排行榜。例如学生成绩的排名榜、游戏积分排行榜、视频播放排名、电商系统中商品的销量排名等。

我们以博文点赞排名为例,小林发表了五篇博文,分别获得赞为 200、40、100、50、150。

# arcticle:1 文章获得了200个赞
> ZADD user:xiaolin:ranking 200 arcticle:1
(integer) 1
# arcticle:2 文章获得了40个赞
> ZADD user:xiaolin:ranking 40 arcticle:2
(integer) 1
# arcticle:3 文章获得了100个赞
> ZADD user:xiaolin:ranking 100 arcticle:3
(integer) 1
# arcticle:4 文章获得了50个赞
> ZADD user:xiaolin:ranking 50 arcticle:4
(integer) 1
# arcticle:5 文章获得了150个赞
> ZADD user:xiaolin:ranking 150 arcticle:5
(integer) 1

文章 arcticle:4 新增一个赞,可以使用 ZINCRBY 命令(为有序集合key中元素member的分值加上increment):

> ZINCRBY user:xiaolin:ranking 1 arcticle:4
"51"

查看某篇文章的赞数,可以使用 ZSCORE 命令(返回有序集合key中元素个数):

> ZSCORE user:xiaolin:ranking arcticle:4
"50"

获取小林文章赞数最多的 3 篇文章,可以使用 ZREVRANGE 命令(倒序获取有序集合 key 从start下标到stop下标的元素):

# WITHSCORES 表示把 score 也显示出来
> ZREVRANGE user:xiaolin:ranking 0 2 WITHSCORES
1) "arcticle:1"
2) "200"
3) "arcticle:5"
4) "150"
5) "arcticle:3"
6) "100"

获取小林 100 赞到 200 赞的文章,可以使用 ZRANGEBYSCORE 命令(返回有序集合中指定分数区间内的成员,分数由低到高排序):

> ZRANGEBYSCORE user:xiaolin:ranking 100 200 WITHSCORES
1) "arcticle:3"
2) "100"
3) "arcticle:5"
4) "150"
5) "arcticle:1"
6) "200"

6.4.2 电话、姓名排序

使用有序集合的 ZRANGEBYLEX 或 ZREVRANGEBYLEX 可以帮助我们实现电话号码或姓名的排序,我们以 ZRANGEBYLEX (返回指定成员区间内的成员,按 key 正序排列,分数必须相同)为例。

注意:不要在分数不一致的 SortSet 集合中去使用 ZRANGEBYLEX和 ZREVRANGEBYLEX 指令,因为获取的结果会不准确。

1、电话排序

我们可以将电话号码存储到 SortSet 中,然后根据需要来获取号段:

> ZADD phone 0 13100111100 0 13110114300 0 13132110901 
(integer) 3
> ZADD phone 0 13200111100 0 13210414300 0 13252110901 
(integer) 3
> ZADD phone 0 13300111100 0 13310414300 0 13352110901 
(integer) 3

获取所有号码:

> ZRANGEBYLEX phone - +
1) "13100111100"
2) "13110114300"
3) "13132110901"
4) "13200111100"
5) "13210414300"
6) "13252110901"
7) "13300111100"
8) "13310414300"
9) "13352110901"

获取 132 号段的号码:

> ZRANGEBYLEX phone [132 (133
1) "13200111100"
2) "13210414300"
3) "13252110901"

获取132、133号段的号码:

> ZRANGEBYLEX phone [132 (134
1) "13200111100"
2) "13210414300"
3) "13252110901"
4) "13300111100"
5) "13310414300"
6) "13352110901"

2、姓名排序

> zadd names 0 Toumas 0 Jake 0 Bluetuo 0 Gaodeng 0 Aimini 0 Aidehua 
(integer) 6

获取所有人的名字:

> ZRANGEBYLEX names - +
1) "Aidehua"
2) "Aimini"
3) "Bluetuo"
4) "Gaodeng"
5) "Jake"
6) "Toumas"

获取名字中大写字母A开头的所有人:

> ZRANGEBYLEX names [A (B
1) "Aidehua"
2) "Aimini"

获取名字中大写字母 C 到 Z 的所有人:

> ZRANGEBYLEX names [C [Z
1) "Gaodeng"
2) "Jake"
3) "Toumas"

7. BitMap

7.1 介绍

Bitmap,即位图,是一串连续的二进制数组(0和1),可以通过偏移量(offset)定位元素。BitMap通过最小的单位bit来进行0|1的设置,表示某个元素的值或者状态,时间复杂度为O(1)。

由于 bit 是计算机中最小的单位,使用它进行储存将非常节省空间,特别适合一些数据量大且使用二值统计的场景

7.2 内部实现

Bitmap 本身是用 String 类型作为底层数据结构实现的一种统计二值状态的数据类型。

String 类型是会保存为二进制的字节数组,所以,Redis 就把字节数组的每个 bit 位利用起来,用来表示一个元素的二值状态,你可以把 Bitmap 看作是一个 bit 数组。

7.3 常用命令

bitmap 基本操作:

# 设置值,其中value只能是 0 和 1
SETBIT key offset value

# 获取值
GETBIT key offset

# 获取指定范围内值为 1 的个数
# start 和 end 以字节为单位
BITCOUNT key start end

bitmap 运算操作:

# BitMap间的运算
# operations 位移操作符,枚举值
  AND 与运算 &
  OR 或运算 |
  XOR 异或 ^
  NOT 取反 ~
# result 计算的结果,会存储在该key中
# key1 … keyn 参与运算的key,可以有多个,空格分割,not运算只能一个key
# 当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0。返回值是保存到 destkey 的字符串的长度(以字节byte为单位),和输入 key 中最长的字符串长度相等。
BITOP [operations] [result] [key1] [keyn…]

# 返回指定key中第一次出现指定value(0/1)的位置
BITPOS [key] [value]

7.4 应用场景

Bitmap 类型非常适合二值状态统计的场景,这里的二值状态就是指集合元素的取值就只有 0 和 1 两种,在记录海量数据时,Bitmap 能够有效地节省内存空间。

7.4.1 签到统计

在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所以它就是非常典型的二值状态。

签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂的集合类型。

假设我们要统计 ID 100 的用户在 2022 年 6 月份的签到情况,就可以按照下面的步骤进行操作。

第一步,执行下面的命令,记录该用户 6 月 3 号已签到。

SETBIT uid:sign:100:202206 2 1

第二步,检查该用户 6 月 3 日是否签到。

GETBIT uid:sign:100:202206 2

第三步,统计该用户在 6 月份的签到次数。

BITCOUNT uid:sign:100:202206

这样,我们就知道该用户在 6 月份的签到情况了。

如何统计这个月首次打卡时间呢?

Redis 提供了 BITPOS key bitValue [start] [end]指令,返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。

在默认情况下, 命令将检测整个位图, 用户可以通过可选的 start 参数和 end 参数指定要检测的范围。所以我们可以通过执行这条命令来获取 userID = 100 在 2022 年 6 月份首次打卡日期:

BITPOS uid:sign:100:202206 1

需要注意的是,因为 offset 从 0 开始的,所以我们需要将返回的 value + 1 。

7.4.2 判断用户登陆态

Bitmap 提供了 GETBIT、SETBIT 操作,通过一个偏移值 offset 对 bit 数组的 offset 位置的 bit 位进行读写操作,需要注意的是 offset 从 0 开始。

只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。通过 GETBIT判断对应的用户是否在线。 50000 万 用户只需要 6 MB 的空间。

假如我们要判断 ID = 10086 的用户的登陆情况:

第一步,执行以下指令,表示用户已登录。

SETBIT login_status 10086 1

第二步,检查该用户是否登陆,返回值 1 表示已登录。

GETBIT login_status 10086

第三步,登出,将 offset 对应的 value 设置成 0。

SETBIT login_status 10086 0

7.4.3 连续签到用户总数

如何统计出这连续 7 天连续打卡用户总数呢?

我们把每天的日期作为 Bitmap 的 key,userId 作为 offset,若是打卡则将 offset 位置的 bit 设置成 1。

key 对应的集合的每个 bit 位的数据则是一个用户在该日期的打卡记录。

一共有 7 个这样的 Bitmap,如果我们能对这 7 个 Bitmap 的对应的 bit 位做 『与』运算。同样的 UserID offset 都是一样的,当一个 userID 在 7 个 Bitmap 对应对应的 offset 位置的 bit = 1 就说明该用户 7 天连续打卡。

结果保存到一个新 Bitmap 中,我们再通过 BITCOUNT 统计 bit = 1 的个数便得到了连续打卡 7 天的用户总数了。

Redis 提供了 BITOP operation destkey key [key …]这个指令用于对一个或者多个 key 的 Bitmap 进行位元操作。

  • operation 可以是 and、OR、NOT、XOR。当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0 。空的 key 也被看作是包含 0 的字符串序列。

假设要统计 3 天连续打卡的用户数,则是将三个 bitmap 进行 AND 操作,并将结果保存到 destmap 中,接着对 destmap 执行 BITCOUNT 统计,如下命令:

# 与操作
BITOP AND destmap bitmap:01 bitmap:02 bitmap:03
# 统计 bit 位 =  1 的个数
BITCOUNT destmap

即使一天产生一个亿的数据,Bitmap 占用的内存也不大,大约占 12 MB 的内存(10^8/8/1024/1024),7 天的 Bitmap 的内存开销约为 84 MB。同时我们最好给 Bitmap 设置过期时间,让 Redis 删除过期的打卡数据,节省内存。

8. HyperLogLog

8.1 介绍

Redis HyperLogLog 是 Redis 2.8.9 版本新增的数据类型,是一种用于「统计基数」的数据集合类型,基数统计就是指统计一个集合中不重复的元素个数。但要注意,HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%。

所以,简单来说 HyperLogLog 提供不精确的去重计数

HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

这什么概念?举个例子给大家对比一下。

用 Java 语言来说,一般 long 类型占用 8 字节,而 1 字节有 8 位,即:1 byte = 8 bit,即 long 数据类型最大可以表示的数是:2^63-1。对应上面的2^64个数,假设此时有2^63-1这么多个数,从 0 ~ 2^63-1,按照long以及1k = 1024 字节的规则来计算内存总数,就是:((2^63-1) * 8/1024)K,这是很庞大的一个数,存储空间远远超过12K,而 HyperLogLog 却可以用 12K 就能统计完。

8.2 内部实现

HyperLogLog 的实现涉及到很多数学问题,太费脑子了,我也没有搞懂,如果你想了解一下,课下可以看看这个:HyperLogLog (opens new window)。

8.3 常见命令

HyperLogLog 命令很少,就三个。

# 添加指定元素到 HyperLogLog 中
PFADD key element [element ...]

# 返回给定 HyperLogLog 的基数估算值。
PFCOUNT key [key ...]

# 将多个 HyperLogLog 合并为一个 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]

8.4 应用场景

8.4.1 百万级网页 UV 计数

Redis HyperLogLog 优势在于只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。

所以,非常适合统计百万级以上的网页 UV 的场景。

在统计 UV 时,你可以用 PFADD 命令(用于向 HyperLogLog 中添加新元素)把访问页面的每个用户都添加到 HyperLogLog 中。

PFADD page1:uv user1 user2 user3 user4 user5

接下来,就可以用 PFCOUNT 命令直接获得 page1 的 UV 值了,这个命令的作用就是返回 HyperLogLog 的统计结果。

PFCOUNT page1:uv

不过,有一点需要你注意一下,HyperLogLog 的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是 0.81%。

这也就意味着,你使用 HyperLogLog 统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果你需要精确统计结果的话,最好还是继续用 Set 或 Hash 类型。

9. GEO

Redis GEO 是 Redis 3.2 版本新增的数据类型,主要用于存储地理位置信息,并对存储的信息进行操作。

在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。

9.1 内部实现

GEO 本身并没有设计新的底层数据结构,而是直接使用了 Sorted Set 集合类型。

GEO 类型使用 GeoHash 编码方法实现了经纬度到 Sorted Set 中元素权重分数的转换,这其中的两个关键机制就是「对二维地图做区间划分」和「对区间进行编码」。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为 Sorted Set 元素的权重分数。

这样一来,我们就可以把经纬度保存到 Sorted Set 中,利用 Sorted Set 提供的“按权重进行有序范围查找”的特性,实现 LBS 服务中频繁使用的“搜索附近”的需求。

9.2 常用命令

# 存储指定的地理空间位置,可以将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中。
GEOADD key longitude latitude member [longitude latitude member ...]

# 从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。
GEOPOS key member [member ...]

# 返回两个给定位置之间的距离。
GEODIST key member1 member2 [m|km|ft|mi]

# 根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

9.3 应用场景

9.3.1 滴滴叫车

这里以滴滴叫车的场景为例,介绍下具体如何使用 GEO 命令:GEOADD 和 GEORADIUS 这两个命令。

假设车辆 ID 是 33,经纬度位置是(116.034579,39.030452),我们可以用一个 GEO 集合保存所有车辆的经纬度,集合 key 是 cars:locations。

执行下面的这个命令,就可以把 ID 号为 33 的车辆的当前经纬度位置存入 GEO 集合中:

GEOADD cars:locations 116.034579 39.030452 33

当用户想要寻找自己附近的网约车时,LBS 应用就可以使用 GEORADIUS 命令。

例如,LBS 应用执行下面的命令时,Redis 会根据输入的用户的经纬度信息(116.054579,39.030452 ),查找以这个经纬度为中心的 5 公里内的车辆信息,并返回给 LBS 应用。

GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10

10. Stream

10.1 介绍

Redis Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。

基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。

10.2 常见命令

Stream 消息队列操作命令:

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XLEN :查询消息长度;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XDEL : 根据消息 ID 删除消息;
  • DEL :删除整个 Stream;
  • XRANGE :读取区间消息
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息;XACK 命令用于向消息队列确认消息处理已完成;

10.3 应用场景

10.3.1 消息队列

生产者通过 XADD 命令插入一条消息:

# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID
# 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 xiaolin
> XADD mymq * name xiaolin
"1654254953808-0"

插入成功后会返回全局唯一的 ID:”1654254953808-0″。消息的全局唯一 ID 由两部分组成:

  • 第一部分“1654254953808”是数据插入时,以毫秒为单位计算的当前服务器时间;
  • 第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。例如,“1654254953808-0”就表示在“1654254953808”毫秒内的第 1 条消息。

消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入ID的消息)。

# 从 ID 号为 1654254953807-0 的消息开始,读取后续的所有消息(示例中一共 1 条)。
> XREAD STREAMS mymq 1654254953807-0
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"

如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XRAED 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。

比如,下面这命令,设置了 BLOCK 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。

# 命令最后的“$”符号表示读取最新的消息
> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.00s)

Stream 的基础方法,使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列,交互流程如下图所示:

前面介绍的这些操作 List 也支持的,接下来看看 Stream 特有的功能。

Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

创建两个消费组,这两个消费组消费的消息队列是 mymq,都指定从第一条消息开始读取:

# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group1 0-0
OK
# 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。
> XGROUP CREATE mymq group2 0-0
OK

消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息的命令如下:

# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"

消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:

> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)

但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)

比如说,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1654254953808-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:

> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"

因为我创建两组的消费组都是从第一条消息开始读取,所以可以看到第二组的消费者依然可以消费 id 为 1654254953808-0 的这一条消息。因此,不同的消费组的消费者可以消费同一条消息。

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654254953808-0"
         2) 1) "name"
            2) "xiaolin"
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654256265584-0"
         2) 1) "name"
            2) "xiaolincoding"
# 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1654256271337-0"
         2) 1) "name"
            2) "Tom"

基于 Stream 实现的消息队列,如何保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息?

Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示:

如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息

例如,我们来查看一下 group2 中各个消费者已读取、但尚未确认的消息个数,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 3
2) "1654254953808-0"  # 表示 group2 中所有消费者读取的消息最小 ID
3) "1654256271337-0"  # 表示 group2 中所有消费者读取的消息最大 ID
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

如果想查看某个消费者具体读取了哪些数据,可以执行下面的命令:

# 查看 group2 里 consumer2 已从 mymq 消息队列中读取了哪些消息
> XPENDING mymq group2 - + 10 consumer2
1) 1) "1654256265584-0"
   2) "consumer2"
   3) (integer) 410700
   4) (integer) 1

可以看到,consumer2 已读取的消息的 ID 是 1654256265584-0。

一旦消息 1654256265584-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除

> XACK mymq group2 1654256265584-0
(integer) 1

当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

> XPENDING mymq group2 - + 10 consumer2
(empty array)

好了,基于 Stream 实现的消息队列就说到这里了,小结一下:

  • 消息保序:XADD/XREAD
  • 阻塞读取:XREAD block
  • 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
  • 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
  • 支持消费组形式消费数据

Redis 基于 Stream 消息队列与专业的消息队列有哪些差距?

一个专业的消息队列,必须要做到两大块:

  • 消息不丢。
  • 消息可堆积。

1、Redis Stream 消息会丢失吗?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。

Redis Stream 消息队列能不能保证三个环节都不丢失数据?

  • Redis 生产者会不会丢消息?生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。 从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
  • Redis 消费者会不会丢消息?不会,因为 Stream ( MQ 中间件)会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,但是未被确认的消息。消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认 XACK 命令,也能保证消息的不丢失。
  • Redis 消息中间件会不会丢消息?,Redis 在以下 2 个场景下,都会导致数据丢失:AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能主从复制也是异步的,主从切换时,也存在丢失数据的可能 (opens new window)。

可以看到,Redis 在队列中间件环节无法保证消息不丢。像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

2、Redis Stream 消息可堆积吗?

Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

但 Kafka、RabbitMQ 专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。

因此,把 Redis 当作队列来使用时,会面临的 2 个问题:

  • Redis 本身可能会丢数据;
  • 面对消息挤压,内存资源会紧张;

所以,能不能将 Redis 作为消息队列来使用,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

补充:Redis 发布/订阅机制为什么不可以作为消息队列?

发布订阅机制存在以下缺点,都是跟丢失数据有关:

  1. 发布/订阅机制没有基于任何数据类型实现,所以不具备「数据持久化」的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失。
  2. 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。
  3. 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60。

所以,发布/订阅机制只适合即时通讯的场景,比如构建哨兵集群 (opens new window)的场景采用了发布/订阅机制。

11. 总结

Redis 常见的五种数据类型:String(字符串),Hash(哈希),List(列表),Set(集合)及 Zset(sorted set:有序集合)

这五种数据类型都由多种数据结构实现的,主要是出于时间和空间的考虑,当数据量小的时候使用更简单的数据结构,有利于节省内存,提高性能。

这五种数据类型与底层数据结构对应关系图如下,左边是 Redis 3.0版本的,也就是《Redis 设计与实现》这本书讲解的版本,现在看还是有点过时了,右边是现在 Github 最新的 Redis 代码的。

可以看到,Redis 数据类型的底层数据结构随着版本的更新也有所不同,比如:

  • 在 Redis 3.0 版本中 List 对象的底层数据结构由「双向链表」或「压缩表列表」实现,但是在 3.2 版本之后,List 数据类型底层数据结构是由 quicklist 实现的;
  • 在最新的 Redis 代码中,压缩列表数据结构已经废弃了,交由 listpack 数据结构来实现了。

Redis 五种数据类型的应用场景:

  • String 类型的应用场景:缓存对象、常规计数、分布式锁、共享session信息等。
  • List 类型的应用场景:消息队列(有两个问题:1. 生产者需要自行实现全局唯一 ID;2. 不能以消费组形式消费数据)等。
  • Hash 类型:缓存对象、购物车等。
  • Set 类型:聚合计算(并集、交集、差集)场景,比如点赞、共同关注、抽奖活动等。
  • Zset 类型:排序场景,比如排行榜、电话和姓名排序等。

Redis 后续版本又支持四种数据类型,它们的应用场景如下:

  • BitMap(2.2 版新增):二值状态统计的场景,比如签到、判断用户登陆状态、连续签到用户总数等;
  • HyperLogLog(2.8 版新增):海量数据基数统计的场景,比如百万级网页 UV 计数等;
  • GEO(3.2 版新增):存储地理位置信息的场景,比如滴滴叫车;
  • Stream(5.0 版新增):消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息ID,支持以消费组形式消费数据。

针对 Redis 是否适合做消息队列,关键看你的业务场景:

  • 如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
  • 如果你的业务有海量消息,消息积压的概率比较大,并且不能接受数据丢失,那么还是用专业的消息队列中间件吧。

打造自己的视频会议系统 GGMeeting(附送源码,PC+手机,支持Linux、国产OS、银河麒麟、统信UOS) - C#开源即时通讯GGTalk - 博客园

mikel阅读(716)

来源: 打造自己的视频会议系统 GGMeeting(附送源码,PC+手机,支持Linux、国产OS、银河麒麟、统信UOS) – C#开源即时通讯GGTalk – 博客园

自从在博客园发布开源即时通信系统GG(QQ高仿版)以来,结识了很多做IM的朋友,然后我和我的伙伴们也接到了很多与IM相关的项目。相比在发布GG之前难以接到项目的状况相比,现在简直太幸福了,虽然做项目很辛苦,但毕竟有钱赚,那辛苦也值了。

饮水思源,这里要感谢博客园提供了这么好的一个平台,让我们能展现自己的实力,提升我们的知名度,然后才能接到了更多项目。所以,我强烈建议那些希望接项目、接私单的朋友,都来博客园写博客吧,写出自己的知名度后,真是好处多多!

言归正传,前段时间做了个在线教育培训的项目,与视频会议比较类似,所以了,我打算像GGTalk开源即时通讯系统一样,搞一个开源视频会议系统并把实现的原理和源码都分享出来,让有兴趣的朋友可以参考下。继承GG的名称,我把这个开源视频会议系统命名为GGMeeting,目前版本为1.0,后续功能会不断增强。

目前,GGMeeting 服务端支持 Windows、Linux,客户端支持 Windows、Android、iOS、Linux、国产OS(银河麒麟、统信UOS)。

一般而言,视频会议的主要核心功能是:多人语音聊天、多人视频聊天、公共电子白板、会议房间管理。本文我们将介绍视频会议系统的主要功能及其实现原理,后面有空在介绍详细每个功能的详细实现细节。

想要直接下载体验的朋友请点击:“源码下载中心”         

GGMeeting 支持两种视窗布局风格:大视窗自动显示主持人或当前发言人的视频;或 1×1,2×2,3×3 分屏显示。

下图是布局风格1:大视窗自动显示主持人或当前发言人的视频。

      

下图是布局风格2:1×1,2×2,3×3 分屏显示。

以下是手机版截图:

一.语音通话

1.基础模型

在视频会议中,网络语音通话通常多对多的的,但就模型层面来说,我们讨论一个方向的通道就可以了。一方说话,另一方则听到声音。看似简单而迅捷,但是其背后的流程却是相当复杂的。我们将其经过的各个主要环节简化成下图所示的概念模型:

这是一个最基础的模型,由五个重要的环节构成:采集、编码、传送、解码、播放。

语音采集指的是从麦克风采集音频数据,即声音样本转换成数字信号。其涉及到几个重要的参数:采样频率、采样位数、声道数。

假设我们将采集到的音频帧不经过编码,而直接发送,那么我们可以计算其所需要的带宽要求,仍以上例:320*100 =32KBytes/s,如果换算为bits/s,则为256kb/s。这是个很大的带宽占用。而通过网络流量监控工具,我们可以发现采用类似QQ等IM软件进行语音通话时,流量为3-5KB/s,这比原始流量小了一个数量级。而这主要得益于音频编码技术。 所以,在实际的语音通话应用中,编码这个环节是不可缺少的。目前有很多常用的语音编码技术,像G.729、iLBC、AAC、SPEEX等等。

当一个音频帧完成编码后,即可通过网络发送给通话的对方。对于语音对话这样Realtime应用,低延迟和平稳是非常重要的,这就要求我们的网络传送非常顺畅。

当对方接收到编码帧后,会对其进行解码,以恢复成为可供声卡直接播放的数据。

完成解码后,即可将得到的音频帧提交给声卡进行播放。

2.高级功能

如果仅仅依靠上述的技术就能实现一个效果良好的应用于广域网上的语音对话系统,那就太easy了。正是由于很多现实的因素为上述的概念模型引入了众多挑战,使得网络语音系统的实现不是那么简单,其涉及到很多专业技术。一个“效果良好”的语音对话系统应该达到如下几点:低延迟,背景噪音小,声音流畅、没有卡、停顿的感觉,没有回音。

对于低延迟,只有在低延迟的情况下,才能让通话的双方有很强的Realtime的感觉。当然,这个主要取决于网络的速度和通话双方的物理位置的距离,就单纯软件的角度,优化的可能性很小。

(1)回音消除

现在大家几乎都已经都习惯了在语音聊天时,直接用PC或笔记本的声音外放功能。当使用外放功能时,扬声器播放的声音会被麦克风再次采集,传回给对方,这样对方就听到了自己的回音。

回音消除的原理简单地来说就是,回音消除模块依据刚播放的音频帧,在采集的音频帧中做一些类似抵消的运算,从而将回声从采集帧中清除掉。这个过程是相当复杂的,因为它还与你聊天时所处的房间的大小、以及你在房间中的位置有关,因为这些信息决定了声波反射的时长。 智能的回音消除模块,能动态调整内部参数,以最佳适应当前的环境。

(2)噪声抑制

噪声抑制又称为降噪处理,是根据语音数据的特点,将属于背景噪音的部分识别出来,并从音频帧中过滤掉。有很多编码器都内置了该功能。

(3)抖动缓冲区

抖动缓冲区(JitterBuffer)用于解决网络抖动的问题。所谓网络抖动,就是网络延迟一会大一会小,在这种情况下,即使发送方是定时发送数据包的(比如每100ms发送一个包),而接收方的接收就无法同样定时了,有时一个周期内一个包都接收不到,有时一个周期内接收到好几个包。如此,导致接收方听到的声音就是一卡一卡的。

JitterBuffer工作于解码器之后,语音播放之前的环节。即语音解码完成后,将解码帧放入JitterBuffer,声卡的播放回调到来时,从JitterBuffer中取出最老的一帧进行播放。

JitterBuffer的缓冲深度取决于网络抖动的程度,网络抖动越大,缓冲深度越大,播放音频的延迟就越大。所以,JitterBuffer是利用了较高的延迟来换取声音的流畅播放的,因为相比声音一卡一卡来说,稍大一点的延迟但更流畅的效果,其主观体验要更好。

当然,JitterBuffer的缓冲深度不是一直不变的,而是根据网络抖动程度的变化而动态调整的。当网络恢复到非常平稳通畅时,缓冲深度会非常小,这样因为JitterBuffer而增加的播放延迟就可以忽略不计了。

(4)静音检测

在语音对话中,要是当一方没有说话时,就不会产生流量就好了。静音检测就是用于这个目的的。静音检测通常也集成在编码模块中。静音检测算法结合前面的噪声抑制算法,可以识别出当前是否有语音输入,如果没有语音输入,就可以编码输出一个特殊的的编码帧(比如长度为0)。特别是在多人视频会议中,通常只有一个人在发言,这种情况下,利用静音检测技术而节省带宽还是非常可观的。

(5)混音

在视频会议中,多人同时发言时,我们需要同时播放来自于多个人的语音数据,而声卡播放的缓冲区只有一个,所以,需要将多路语音混合成一路,这就是混音算法要做的事情。

二.视频通话

1.基础模型

视频通话的概念模型与语音完全一致:

摄像头采集指的是从捕捉摄像头采集到的每一帧视频图像。在windows系统上,通常使用VFW技术或DirectShow技术来实现。采集视频的两个关键参数是帧频(fps)和分辨率。

一般而言,一个摄像头可以支持多种不同的采集分辨率和采集帧频,而不同的摄像头支持的分辨率的集合不一样。比如现在有很多高清摄像头可以支持30fps的1920*1080的图像采集。

编码用于压缩视频图像,同时也决定了图像的清晰度。视频编码常用的技术是H.263、H.264、MPEG-4、XVID等。

当对方接收到编码的视频帧后,会对其进行解码,以恢复成一帧图像,然后在UI的界面上绘制出来。

2.高级功能

相比于语音,视频的相关处理要简单一些。

(1)动态调整视频的清晰度

在Internet上,网络速度是实时动态变化的,所以,在视频会议中,为了优先保证语音的通话质量,需要实时调整视频的相关参数,其最主要的就是调整编码的清晰度,因为清晰度越高,对带宽要求越高,反之亦然。

比如,当检测网络繁忙时,就自动降低编码的清晰度,以降低对带宽的占用。

(2)自动丢弃视频帧

同样网络繁忙时,还有一个方法,就是发送方是主动丢弃要发送的视频帧,这样在接收方看来,就是帧频fps降低了。

三.电子白板

在视频会议中,电子白板的功能是很重要的。通常会议的主持人会在白板上画图进行讲解,然后其它的人能同步观看和操作电子白板的内容。

通常的电子白板都支持如下功能:线段、箭头线、双箭头线,水平肘型连接符、垂直肘型连接符,矩形、三角形、椭圆(圆),文本,自由曲线,插入图片,激光笔。

在实现上,电子白板主要是使用GDI+技术。

对于电子白板的同步,其原理是这样的:比如,当操作者在白板上绘制一个图像时,这个操作会被封装成一个Command对象(命令模式),然后,通过网络广播发送给会议中的其它人。当其他人接收到这个Command对象时,就将其转换成一个白板操作来执行,这样各个白板的内容就自动同步了。

四.会议房间管理

对于那些动态创建视频会议室,在用完之后就动态将其销毁的通常的视频会议应用场景来说,使用动态组来表示会议房间,是非常恰当的。

所谓“动态组”,就是在服务器内存中动态创建的组,不需要序列化存储到比如数据库或磁盘中,需要的时候就创建一个,然后加入多个成员进行组内沟通,当不再使用的时候,就直接从内存中销毁了。

基于Socket技术,我们可以在服务端实现DynamicGroupManager类来对动态组进行管理。

虽然,动态组仅仅存在于内存之中,但是,在项目需要时,我们仍然可以将其某些重要的信息持久化到数据库中存储。然后,在服务器重启时,可以从DB中加载重要的房间信息。

五.GGMeeting开源视频会议系统下载

请转到 GGTalk与GGMeeting 源码统一下载中心 进行下载。

广告:如果您有类似视频会议系统、在线培训系统、IM系统需要定制开发的,可以联系我们哦:)  QQ:2027224508

部署说明:

(1)将GGMeeting.Server部署到服务器上,并运行起来。

(2)修改Client配置文件GGMeeting.exe.config中的ServerIP的值。

(3)运行第一个Client实例,以随机帐号进入测试房间。

(4)在别的机器上继续运行Client,以随机帐号进入测试房间,大家即可在测试房间中进行视频会议。

注意:语音视频数据都是实时采集、实时播放的数据,所以测试时,服务器的带宽要求最好是独享带宽,共享带宽一般无法满足实时语音视频的要求。

________________________________________________________________________

欢迎和我探讨关于 GG 开源即时通讯系统和 GGMeeting开源视频会议系统 的一切,我的QQ:2027224508,多多交流!

大家有什么问题和建议,可以留言,也可以发送email到我邮箱:2027224508@qq.com。

如果你觉得还不错,请粉我,顺便再顶一下啊

源码下载中心 - C#开源即时通讯GGTalk - 博客园

mikel阅读(628)

来源: 源码下载中心 – C#开源即时通讯GGTalk – 博客园

GGTalk(简称GG)是可在广域网部署运行的QQ高仿版 ,2013.08.07发布V1.0版本,至今最新是8.0版本,关于GG更详细的介绍,可以查看 可在广域网部署运行的QQ高仿版 — GGTalk总览。

GGMeeting是可在广域网部署运行的视频会议系统Demo,2015.05.11发布V1.0版本,至今最新是3.2版本,关于GGMeeting更详细的介绍,可以查看 打造自己的视频会议系统 GGMeeting

ScreenMonitor 是屏幕监控系统,支持在PC和安卓手机上监控PC桌面、安卓屏幕。

1.GGTalk 即时通信系统源码

最新版本:V8.0  (详细介绍) 。 GGTalk 源码剖析系列:数据库设计、 服务端全局缓存虚拟数据库

最后更新:2023.04.14

源码下载:

(1)Windows 服务端、客户端 + Linux  服务端、客户端(支持国产OS):GGTalk_V8.0.zip(提取码: 1234)

(2)Android 端:GGTalk_V8.0_Android.rar (提取码: 1234)  备用下载地址:GGTalk_V8.0_Android.rar

(3)iOS 端:GGTalk_V8.0_iOS.zip (提取码: 1234)

(4)可直接部署运行:GGTalk_V8.0_Deploy.zip (提取码: 1234)

部署说明:

下面我们说明一下如何部署GGTalk_V8.0_Deploy.rar压缩包中的可直接运行的各个端。

(1)服务端默认配置是使用内存虚拟数据库,不需要真实数据库,这样方便测试。

(2)Windows 服务端:将GGTalk.Server文件夹拷贝到服务器上,运行 GGTalk.Server.exe。

(3)Linux 服务端:将GGTalk.Server.Linux文件夹拷贝到CentOS服务器上,进入该文件夹,打开终端,执行命令:dotnet GGTalk.Server.NetCore.dll。

注:GGTalk Windows服务端或Linux服务端只要启动一个就可以的,所有类型客户端都可以连到这个启动的服务端。

(4)Windows 客户端:修改GGTalk目录下的客户端配置文件GGTalk.exe.config中ServerIP配置项的值为服务器的IP,双击GGTalk.exe运行客户端,注册帐号登录试用。。

(5)Linux  客户端:将GGTalk.Linux文件夹拷贝到国产Linux系统(Ubuntu也可以)上,进入该文件夹,打开终端,执行命令:dotnet GGTalk.Linux.dll。

注:如果Linux电脑的CPU不是x86/x64架构的,则需要使用对应架构的libe_SQLite3.so、libSkiaSharp.so来替换运行目录下的同名文件。可联系我获取其它架构的so库。

(6)手机端:将对应的apk或ipa包在手机上安装,启动后,在登录界面设置服务器的IP地址,即可登录。

(7)内存虚拟数据库内置了测试帐号: 10000,10001,10002,10003,10004,一直到10009;密码都是 1。

         如果需要使用真实的物理数据库,则需按下列步骤进行:

(1)在SQLServer  2008+ 中新建数据库 GGTalk,然后在该库中执行 SQLServer.SQL 文件中的脚本以创建所需表。

(如果要使用MySQL数据库,则使用MySQL.sql脚本)

(2)打开服务端的配置文件GGTalk.Server.exe.config

(1)修改 UseVirtualDB 配置项的值为false。

(2)修改 DBType 为 SQLServer 或 MySQL。

(3)修改 DBIP 配置项的值为数据库的IP地址。

(4)修改 DBPwd 配置项的值为数据库管理员sa的密码。

后续其它的步骤就与虚拟数据库是一样的了。

 

2.GGMeeting 视频会议系统源码

最新版本:V3.2  (详细介绍

最后更新:2023.04.25(GGMeeting项目已更名为OVCS)

OVCS 服务端支持 Windows、Linux,客户端支持 Windows、Android、iOS、Linux、国产OS(银河麒麟、统信UOS)。

1. OVCS(服务端+PC端):开发环境为 VS 2022 。其中 PC 端源码包含 WPF 和 WinForm 版,二者选一即可。(详细介绍一文中的截图为WPF版的效果)

   VS 解决方案中的项目分别是:

(1)OVCS.Server : OVCS 的服务端。

(2)OVCS.ClientWpf : OVCS 的 Windows 客户端(基于WPF)。

(3)OVCS.Client.WinForm : OVCS 的 Windows 客户端(基于WinForm)。

(4)OVCS.ClientLinux : OVCS 的 Linux 客户端(基于 .Net Core 3.1)。

         注: Linux客户端内置的是x86/x64非托管so库,若需要其它架构的so,请联系我们免费获取。

2. OVCS Android端  :开发环境为 Android Studio 4.0+ 。

3. OVCS iOS端    :开发环境为 XCode 11+ 。

4. OVCS Web端    JavaScript 。 支持在Windows、Linux、国产OS上的浏览器中运行,需要安装OMCS Web插件:Win版本 、 Linux版本 。 

       备注:PC端增加了视频会议录制功能,可以将会议过程录制成mp4文件。(PC客户端启动后,点击主界面上方工具栏的“录制”按钮。)

 

3.ScreenMonitor 屏幕/桌面监控系统源码

最后更新:2023.01.13

项目源码下载: 服务端+PC端+安卓端

部署版本下载: 服务端+PC端+安卓端

支持:PC监控PC桌面,PC监控安卓屏幕,安卓监控PC桌面,安卓监控安卓屏幕。详细介绍请参见:如何实现监控手机屏幕?  

 

————————————————————————————————————————————

联系方式:

QQ:2027224508 

邮箱:2027224508@qq.com

 

如果大家有类似视频会议系统、在线培训系统、IM系统需要定制开发的,可以联系我们哦:)

 

虽然就如何将GG发展为一个有商业价值的产品,我还没有很清晰明确的思路,但是从GG发布以来,通过GG认识了一些朋友,也接了一些小单子,赚了一点小钱。有了一点甜头,目前和2、3个好朋友一起做做小项目也是不错的。

大家有什么问题和建议,都可以联系我,留言、加QQ、发邮件都可以。

欢迎大家与我探讨关于GG的一切!