windows下Redis的安装和使用 - 刘清政 - 博客园

mikel阅读(1011)

来源: windows下Redis的安装和使用 – 刘清政 – 博客园

1、要安装Redis,首先要获取安装包。Windows的Redis安装包需要到以下GitHub链接找到。链接:https://github.com/MSOpenTech/redis。打开网站后,找到Release,点击前往下载页面。

2、在下载网页中,找到最后发行的版本(此处是3.2.100)。找到Redis-x64-3.2.100.msi和Redis-x64-3.2.100.zip,点击下载。这里说明一下,第一个是msi微软格式的安装包,第二个是压缩包。

3、双击刚下载好的msi格式的安装包(Redis-x64-3.2.100.msi)开始安装。

4、选择“同意协议”,点击下一步继续。

5、选择“添加Redis目录到环境变量PATH中”,这样方便系统自动识别Redis执行文件在哪里。

6、端口号可保持默认的6379,并选择防火墙例外,从而保证外部可以正常访问Redis服务。

7、设定最大值为100M。作为实验和学习,100M足够了。

8、点击安装后,正式的安装过程开始。稍等一会即可完成。

9、安装完毕后,需要先做一些设定工作,以便服务启动后能正常运行。使用文本编辑器,这里使用Notepad++,打开Redis服务配置文件。注意:不要找错了,通常为redis.windows-service.conf,而不是redis.windows.conf。后者是以非系统服务方式启动程序使用的配置文件。

10、找到含有requirepass字样的地方,追加一行,输入requirepass 12345。这是访问Redis时所需的密码,一般测试情况下可以不用设定密码。不过,即使是作为本地访问,也建议设定一个密码。此处以简单的12345来演示。

11、点击“开始”>右击“计算机”>选择“管理”。在左侧栏中依次找到并点击“计算机管理(本地)”>服务和应用程序>服务。再在右侧找到Redis名称的服务,查看启动情况。如未启动,则手动启动之。正常情况下,服务应该正常启动并运行了。

12、最后来测试一下Redis是否正常提供服务。进入Redis的目录,cd C:\Program Files\Redis。输入redis-cli并回车。(redis-cli是客户端程序)如图正常提示进入,并显示正确端口号,则表示服务已经启动。

 
13、使用服务前需要先通过密码验证。输入“auth 12345”并回车(12345是之前设定的密码)。返回提示OK表示验证通过。
14、实际测试一下读写。输入set mykey1 “I love you all!”并回车,用来保存一个键值。再输入get mykey1,获取刚才保存的键值。
15、注意事项
  • 1.Windows使用的这个Redis是64位版本的,32位操作系统的同学就不要折腾了。
  • 2.作为服务运行的Redis配置文件,通常为redis.windows-service.conf,而不是redis.windows.conf。小心不要选错了。

LPR | 张欢的博客

mikel阅读(1036)

来源: LPR | 张欢的博客

前言

这是一个车牌识别的实例

我的平台是vs2017和opencv3.4

opencv下载链接点这里

这是老早在学校做的一个作业。github地址

这里分解了车牌识别的步骤。提供了大体的解决思路,其实你每一步都可以尝试用其他的方法来更好地解决问题,譬如最后的识别字符你完全可以使用ocr或者训练好的神经网络来更好的完成输出。


任务:识别下图中的车牌

car

要实现车牌识别,一共分为两大步。
第一,车牌定位,就是在图片中确定出车牌的位置;
第二,字符识别,将提取出来的字符图片进行识别。

正文

一、车牌定位

1. 高斯滤波

为什么要高斯滤波呢?原因很简单,因为要平滑图像,去除噪声点。

car

2. sobel边缘提取

至于什么是sobel呢?
我先引用百度的一句话吧:

Sobel算子是像素图像边缘检测中最重要的算子之一,在技术上,它是一个离散的一阶差分算子,用来计算图像亮度函数的一阶梯度之近似值。在图像的任何一点使用此算子,将会产生该点对应的梯度矢量或是其法矢量。

那我们用sobel算子干嘛呢?我们的目标是定位车牌,因为车牌的周围轮廓分明,使用sobel可以清晰的提取边缘,效果如图所示:
car

3.二值化图像

这时我们就可以看到车牌成为了图像中亮度最大的区域,所以我们这次再来进行二值化,只保留高于一定阈值的点,效果图如下:

car

4.闭运算

图像中的这些点我们怎么利用呢?这时我们需要将这些点所在的区域连通起来,所以会用到闭运算。
什么是闭运算呢?

在数学形态学中,闭运算被定义为先膨胀后腐蚀。

先点这里了解腐蚀和膨胀

再点这里了解开运算、闭运算等

car

5.去除小区域

了解上面的之后,相信你也知道这里该怎么做啦!

car
car

6.提取轮廓

我们在上面已经找到了车牌所在区域,现在我们需要找到车牌的边界点。
我们有这一堆点集,我们可以迭代寻找出最左边且靠上的点之类的。
因为我们下一步要做仿射变换,需要三个边界点,所以我们找出如图所示的三个点就行了。

蓝色框可以点下面了解一下,后面会很有用。
绘制最小外接矩形

car

7.仿射变换

原理还是看博客吧,仿射变换能做到对图像旋转,平移和缩放。
这里根据提取到的点,再对高斯滤波后的图进行仿射变换,只保留车牌部分。

点一下,玩一年

深入版

car

二、字符识别

1.提取字符轮廓

这里先对图像二值化,再寻找最小外接矩形。
当然汉字可不是很简单,因为有很多独立的笔画,所以还需要在容器中移除小于一定面积的矩形。
这样一些点啊,就不会在我们需要识别的对象中。

car

2.识别字符

当时做这个时间因素,采用了最简单也是很有效的一种方法,对每个字符变换到标准大小,和模板点与点之间逐个计算,最后输出与之匹配度最高的字符。
car

输出结果如下:
car

写给程序员的机器学习入门 (八) - 卷积神经网络 (CNN) - 图片分类和验证码识别 - q303248153 - 博客园

mikel阅读(752)

来源: 写给程序员的机器学习入门 (八) – 卷积神经网络 (CNN) – 图片分类和验证码识别 – q303248153 – 博客园

这一篇将会介绍卷积神经网络 (CNN),CNN 模型非常适合用来进行图片相关的学习,例如图片分类和验证码识别,也可以配合其他模型实现 OCR。

使用 Python 处理图片

在具体介绍 CNN 之前,我们先来看看怎样使用 Python 处理图片。Python 处理图片最主要使用的类库是 Pillow (Python2 PIL 的 fork),使用以下命令即可安装:

pip3 install Pillow

一些简单操作的例子如下,如果你想了解更多可以参考 Pillow 的文档

# 打开图片
>>> from PIL import Image
>>> img = Image.open("1.png")

# 查看图片信息
>>> img.size
(175, 230)
>>> img.mode
'RGB'
>>> img
<PIL.PngImagePlugin.PngImageFile image mode=RGB size=175x230 at 0x10B807B50>

# 缩放图片
>>> img1 = img.resize((20, 30))
>>> img1
<PIL.Image.Image image mode=RGB size=20x30 at 0x106426FD0>

# 裁剪图片
>>> img2 = img.crop((0, 0, 16, 16))
>>> img2
<PIL.Image.Image image mode=RGB size=16x16 at 0x105E0EFD0>

# 保存图片
>>> img1.save("11.png")
>>> img2.save("12.png")

使用 pytorch 处理图片时要首先获取图片的数据,即各个像素对应的颜色值,例如大小为 175 * 230,模式是 RGB 的图片会拥有 175 * 230 * 3 的数据,3 分别代表红绿蓝的值,范围是 0 ~ 255,把图片转换为 pytorch 的 tensor 对象需要经过 numpy 中转,以下是转换的例子:

>>> import numpy
>>> import torch
>>> v = numpy.asarray(img)
>>> t = torch.tensor(v)
>>> t
tensor([[[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        ...,

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]],

        [[255, 253, 254],
         [255, 253, 254],
         [255, 253, 254],
         ...,
         [255, 253, 254],
         [255, 253, 254],
         [255, 253, 254]]], dtype=torch.uint8)
>>> t.shape
torch.Size([230, 175, 3])

可以看到 tensor 的维度是 高度 x 宽度 x 通道数 (RGB 图片为 3,黑白图片为 1),可是 pytorch 的 CNN 模型会要求维度为 通道数 x 宽度 x 高度,并且数值应该正规化到 0 ~ 1 的范围内,使用以下代码可以实现:

# 交换维度 0 (高度) 和 维度 2 (通道数)
>>> t1 = t.transpose(0, 2)
>>> t1.shape
torch.Size([3, 175, 230])

>>> t2 = t1 / 255.0
>>> t2
tensor([[[1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         ...,
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000],
         [1.0000, 1.0000, 1.0000,  ..., 1.0000, 1.0000, 1.0000]],

        [[0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         ...,
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922],
         [0.9922, 0.9922, 0.9922,  ..., 0.9922, 0.9922, 0.9922]],

        [[0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         ...,
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961],
         [0.9961, 0.9961, 0.9961,  ..., 0.9961, 0.9961, 0.9961]]])

之后就可以围绕类似上面例子中 t2 这样的 tensor 对象做文章了🥳。

卷积神经网络 (CNN)

卷积神经网络 (CNN) 会从图片的各个部分提取特征,然后再从一级特征提取二级特征,如有必要再提取三级特征 (以此类推),提取结束以后扁平化到最终特征,然后使用多层或单层线性模型来实现分类识别。提取各级特征会使用卷积层 (Convolution Layer) 和池化层 (Pooling Layer),提取特征时可以选择添加通道数量以增加各个部分的信息量,分类识别最终特征使用的线性模型又称全连接层 (Fully Connected Layer),下图是流程示例:

之前的文章介绍线性模型和递归模型的时候我使用了数学公式,但只用数学公式说明 CNN 将会非常难以理解,所以接下来我会伴随例子逐步讲解各个层具体做了怎样的运算。

卷积层 (Convolution Layer)

卷积层会对图片的各个部分做矩阵乘法操作,然后把结果作为一个新的矩阵,每个卷积层有两个主要的参数,一个是内核大小 (kernel_size),一个是处理间隔 (stride),下图是一个非常简单的计算流程例子:

如果增加处理间隔会怎样呢?下图展示了不同处理间隔的计算部分和输出结果维度的区别:

我们可以看到处理间隔决定了每次向右或者向下移动的距离,输出长度可以使用公式 (长度 - 内核大小) / 处理间隔 + 1 计算,输出宽度可以使用公式 (长度 - 内核大小) / 处理间隔 + 1 计算。

现在再来看看 pytorch 中怎样使用卷积层,创建卷积层可以使用 torch.nn.Conv2d

# 创建卷积层,入通道 = 1,出通道 = 1,内核大小 = 2,处理间隔 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1)

# 查看卷积层内部的参数,第一个是内核对应的权重矩阵,第二个是偏移值
>>> p = list(conv2d.parameters())
>>> p
[Parameter containing:
tensor([[[[-0.0650, -0.0575],
          [-0.0313, -0.3539]]]], requires_grad=True), Parameter containing:
tensor([0.1482], requires_grad=True)]

# 现在生成一个 5 x 5,单通道的图片数据,为了方便理解这里使用了 1 ~ 25,实际应该使用 0 ~ 1 之间的值
>>> x = torch.tensor(list(range(1, 26)), dtype=torch.float).reshape(1, 1, 5, 5)
>>> x
tensor([[[[ 1.,  2.,  3.,  4.,  5.],
          [ 6.,  7.,  8.,  9., 10.],
          [11., 12., 13., 14., 15.],
          [16., 17., 18., 19., 20.],
          [21., 22., 23., 24., 25.]]]])

# 使用卷积层计算输出
>>> y = conv2d(x)
>>> y
tensor([[[[ -2.6966,  -3.2043,  -3.7119,  -4.2196],
          [ -5.2349,  -5.7426,  -6.2502,  -6.7579],
          [ -7.7732,  -8.2809,  -8.7885,  -9.2962],
          [-10.3115, -10.8192, -11.3268, -11.8345]]]],
       grad_fn=<MkldnnConvolutionBackward>)

# 我们可以模拟一下处理单个部分的计算,看看和上面的输出是否一致

# 第 1 部分
>>> x[0,0,0:2,0:2]
tensor([[1., 2.],
        [6., 7.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,0:2]).sum() + p[1]
tensor([-2.6966], grad_fn=<AddBackward0>)

# 第 2 部分
>>> x[0,0,0:2,1:3]
tensor([[2., 3.],
        [7., 8.]])
>>> (p[0][0,0,:,:] * x[0,0,0:2,1:3]).sum() + p[1]
tensor([-3.2043], grad_fn=<AddBackward0>)

# 第 3 部分
>>> (p[0][0,0,:,:] * x[0,0,0:2,2:4]).sum() + p[1]
tensor([-3.7119], grad_fn=<AddBackward0>)

# 一致吧🥳

到这里你应该了解单通道的卷积层是怎样计算的,那么多通道呢?如果有多个入通道,那么卷积层的权重矩阵会相应有多份,如果有多个出通道,那么卷积层的权重矩阵数量也会乘以出通道的倍数,例如有 3 个入通道,2 个出通道时,卷积层的权重矩阵会有 6 个 (3 * 2),偏移值会有 2 个,计算规则如下:

部分输出[出通道1] = 部分输入[入通道1] * 权重矩阵[0][0] + 部分输入[入通道2] * 权重矩阵[0][1] + 部分输入[入通道3] * 权重矩阵[0][2] + 偏移值1
部分输出[出通道2] = 部分输入[入通道1] * 权重矩阵[1][0] + 部分输入[入通道2] * 权重矩阵[1][1] + 部分输入[入通道3] * 权重矩阵[1][2] + 偏移值2

从计算规则可以看出,出通道越多每个部分可提取的特征数量 (信息量) 也就越多,但计算量也会相应增大。

最后看看卷积层的数学公式 (基本和 pytorch 文档的公式相同),现在应该可以理解了吧🤢?

池化层 (Pooling Layer)

池化层的处理比较好理解,它会对每个图片每个区域进行求最大值或者求平均值等运算,如下图所示:

现在再来看看 pytorch 中怎样使用卷积层,创建求最大值的池化层可以使用 torch.nn.MaxPool2d,创建求平均值的池化层可以使用 torch.nn.AvgPool2d

# 创建池化层,内核大小 = 2,处理间隔 = 2
>>> maxPool = torch.nn.MaxPool2d(2, stride=2)

# 生成一个 6 x 6,单通道的图片数据
>>> x = torch.tensor(range(1, 37), dtype=float).reshape(1, 1, 6, 6)
>>> x
tensor([[[[ 1.,  2.,  3.,  4.,  5.,  6.],
          [ 7.,  8.,  9., 10., 11., 12.],
          [13., 14., 15., 16., 17., 18.],
          [19., 20., 21., 22., 23., 24.],
          [25., 26., 27., 28., 29., 30.],
          [31., 32., 33., 34., 35., 36.]]]], dtype=torch.float64)

# 使用池化层计算输出
>>> maxPool(x)
tensor([[[[ 8., 10., 12.],
          [20., 22., 24.],
          [32., 34., 36.]]]], dtype=torch.float64)

# 很好理解吧🥳

# 创建和使用求平均值的池化层也很简单
>>> avgPool = torch.nn.AvgPool2d(2, stride=2)
>>> avgPool(x)
tensor([[[[ 4.5000,  6.5000,  8.5000],
          [16.5000, 18.5000, 20.5000],
          [28.5000, 30.5000, 32.5000]]]], dtype=torch.float64)

全连接层 (Fully Connected Layer)

全连接层实际上就是多层或单层线性模型,但把特征传到全连接层之前还需要进行扁平化 (Flatten),例子如下所示:

# 模拟创建一个批次数量为 2,通道数为 3,长宽各为 2 的特征
>>> x = torch.rand((2, 3, 2, 2))
>>> x
tensor([[[[0.6395, 0.6240],
          [0.4194, 0.6054]],

         [[0.4798, 0.4690],
          [0.2647, 0.6087]],

         [[0.5727, 0.7567],
          [0.8287, 0.1382]]],


        [[[0.7903, 0.8635],
          [0.0053, 0.6417]],

         [[0.7093, 0.7740],
          [0.3115, 0.7587]],

         [[0.5875, 0.8268],
          [0.2923, 0.6016]]]])

# 对它进行扁平化,维度会变为 批次数量, 通道数*长*宽
>>> x_flatten = x.view(x.shape[0], -1)
>>> x_flatten
tensor([[0.6395, 0.6240, 0.4194, 0.6054, 0.4798, 0.4690, 0.2647, 0.6087, 0.5727,
         0.7567, 0.8287, 0.1382],
        [0.7903, 0.8635, 0.0053, 0.6417, 0.7093, 0.7740, 0.3115, 0.7587, 0.5875,
         0.8268, 0.2923, 0.6016]])

# 之后再传给线性模型即可
>>> linear = torch.nn.Linear(in_features=12, out_features=2)
>>> linear(x_flatten)
tensor([[-0.3067, -0.5534],
        [-0.1876, -0.6523]], grad_fn=<AddmmBackward>)

填充处理

在看前面提到的卷积层操作的时候,你可能会发现如果处理间隔 (stride) 小于内核大小 (kernel_size),那么图片边缘的像素参与运算的次数会比图片中间的像素要少,也就是说图片边缘对运算结果的影响会更小,如果图片边缘的信息同样比较重要,那么就会影响预测输出的精度。为了解决这个问题发明的就是填充处理,填充处理简单的来说就是在卷积层初期前给图片的周边添加 0,如果填充量等于 1,那么长宽会各增加 2,如下图所示:

在 pytorch 中添加填充处理可以在创建 Conv2d 的时候指定 padding 参数:

# 创建卷积层,入通道 = 1,出通道 = 1,内核大小 = 2,处理间隔 = 1, 填充量 = 1
>>> conv2d = torch.nn.Conv2d(in_channels = 1, out_channels = 1, kernel_size = 2, stride = 1, padding = 1)



使用 CNN 实现图片分类 (LeNet)

接下来我们试试使用 CNN 实现图片分类,也就是给出一张图片让程序识别里面的是什么东西,使用的数据集是 cifar-10,这是一个很经典的数据集,包含了 60000 张 32×32 的小图片,图片有十个分类 (飞机,汽车,鸟,猫,鹿,狗,青蛙,马,船,货车),官方下载地址在这里

需要注意的是,官方下载地址只包含二进制数据,通常很多文章或者教程都会让我们使用 torchvision.datasets.CIFAR10 等现成的加载器来加载这个数据集,但我不推荐使用这种方法,因为如果我们需要训练实际业务上的数据,那么肯定不会有现成的加载器可以用,还是得一张张图片的加载和转换。所以这里我使用了 cifar-10 的原始图片库,然后演示怎样从代码加载图片和标签,然后转换到训练使用的 tensor 对象。

以下的代码使用了 LeNet 模型,这是 30 年前就已经被提出的模型,结构和本文第一个图片介绍的一样。此外还有一些需要注意的地方:

  • cifar-10 官方默认划分了 50000 张图片作为训练集,10000 张图片作为验证集;而我的代码划分了 48000 张图片作为训练集,6000 张图片作为验证集,6000 张图片作为测试集,所以正确率等数据会和其他文章或者论文不一致
  • 训练时的损失计算器使用了 CrossEntropyLoss, 这个计算器的特征是要求预测输出是 onehot,实际输出是索引值 (只有一个分类是正确输出),例如图片分类为  时,预测输出应该为 [0, 0, 1, 0, 0, 0, 0, 0, 0, 0] 实际输出应该为 2
  • 转换各个分类的数值到概率使用了 Softmax 函数, 这个函数必须放在模型之外,如果放在模型内部会导致训练效果变差,因为 CrossEntropyLoss 损失计算器会尽量让正确输出的数值更高,错误输出的数值更低,而不是分别接近 1 和 0,使用 softmax 会干扰损失的计算
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目标的图片大小,全部图片都会先缩放到这个大小
IMAGE_SIZE = (32, 32)
# 分析目标的图片所在的文件夹
IMAGE_DIR = "./cifar"
# 包含所有图片标签的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"

class MyModel(nn.Module):
    """图片分类 (LeNet)"""
    def __init__(self, num_labels):
        super().__init__()
        # 卷积层和池化层
        self.cnn_model = nn.Sequential(
            nn.Conv2d(3, 6, kernel_size=5), # 维度: B,3,32,32 => B,6,28,28
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2), # 维度: B,6,14,14
            nn.Conv2d(6, 16, kernel_size=5), # 维度: B,16,10,10
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2) # 维度: B,16,5,5
        )
        # 全连接层
        self.fc_model = nn.Sequential(
            nn.Linear(16 * 5 * 5, 120), # 维度: B,120
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(120, 60), # 维度: B,60
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(60, num_labels), # 维度: B,num_labels
        )

    def forward(self, x):
        # 应用卷积层和池化层
        cnn_features = self.cnn_model(x)
        # 扁平化输出的特征
        cnn_features_flatten = cnn_features.view(cnn_features.shape[0], -1)
        # 应用全连接层
        y = self.fc_model(cnn_features_flatten)
        return y

def save_tensor(tensor, path):
    """保存 tensor 对象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """从文件读取 tensor 对象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """转换图片对象到 tensor 对象"""
    in_img = img.resize(IMAGE_SIZE)
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H
    t = t / 255.0 # 正规化数值使得范围在 0 ~ 1
    return t

def load_image_labels():
    """读取图片分类列表"""
    return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))

def prepare_save_batch(batch, tensor_in, tensor_out):
    """准备训练 - 保存单个批次的数据"""
    # 切分训练集 (80%),验证集 (10%) 和测试集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盘
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """准备训练"""
    # 数据集转换到 tensor 以后会保存在 data 文件夹下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 准备图片分类到序号的索引
    labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }

    # 查找所有图片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 分类名称在文件名中,例如
            # 2598_cat.png => cat
            label = filename.split(".")[0].split("_")[1]
            label_index = labels_to_index.get(label)
            if label_index is None:
                continue
            image_paths.append((path, label_index))

    # 打乱图片顺序
    random.shuffle(image_paths)

    # 分批读取和保存图片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                t = image_to_tensor(img)
                image_tensors.append(t)
            image_labels.append(label_index)
        tensor_in = torch.stack(image_tensors) # 维度: B,C,W,H
        tensor_out = torch.tensor(image_labels) # 维度: B
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """开始训练"""
    # 创建模型实例
    num_labels = len(load_image_labels())
    model = MyModel(num_labels)

    # 创建损失计算器
    # 计算单分类输出最好使用 CrossEntropyLoss, 多分类输出最好使用 BCELoss
    # 使用 CrossEntropyLoss 时实际输出应该为标签索引值,不需要转换为 onehot
    loss_function = torch.nn.CrossEntropyLoss()

    # 创建参数调整器
    optimizer = torch.optim.Adam(model.parameters())

    # 记录训练集和验证集的正确率变化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 记录最高的验证集正确率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 读取批次的工具函数
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 计算正确率的工具函数
    def calc_accuracy(actual, predicted):
        # 把最大的值当作正确分类,然后比对有多少个分类相等
        predicted_labels = predicted.argmax(dim=1)
        acc = (actual == predicted_labels).sum().item() / actual.shape[0]
        return acc

    # 划分输入和输出的工具函数
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 开始训练过程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根据训练集训练并修改参数
        # 切换模型到训练模式,将会启用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助于泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 划分输入和输出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 计算预测值
                predicted = model(batch_x)
                # 计算损失
                loss = loss_function(predicted, batch_y)
                # 从损失自动微分求导函数值
                loss.backward()
                # 使用参数调整器调整参数
                optimizer.step()
                # 清空导函数值
                optimizer.zero_grad()
                # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 输出批次正确率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 检查验证集
        # 切换模型到验证模式,将会禁用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 记录最高的验证集正确率与当时的模型状态,判断是否在 20 次训练后仍然没有刷新记录
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次训练后仍然没有刷新记录,结束训练
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用达到最高正确率时的模型状态
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 检查测试集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 显示训练集和验证集的正确率变化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用训练好的模型"""
    # 创建模型实例,加载训练好的状态,然后切换到验证模式
    labels = load_image_labels()
    num_labels = len(labels)
    model = MyModel(num_labels)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 询问图片路径,并显示可能的分类一览
    while True:
        try:
            # 构建输入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 维度 C,W,H => 1,C,W,H
            # 预测输出
            tensor_out = model(tensor_in)
            # 转换到各个分类对应的概率
            tensor_out = nn.functional.softmax(tensor_out, dim=1)
            # 显示按概率排序后的分类一览
            rates = (t.item() for t in tensor_out[0])
            label_with_rates = list(zip(labels, rates))
            label_with_rates.sort(key=lambda p:-p[1])
            for label, rate in label_with_rates[:5]:
                rate = rate * 100
                print(f"{label}: {rate:0.2f}%")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函数"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数
    # 这是为了让过程可重现,你也可以选择不这样做
    random.seed(0)
    torch.random.manual_seed(0)

    # 根据命令行参数选择操作
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

准备训练使用的数据和开始训练需要分别执行以下命令:

python3 example.py prepare
python3 example.py train

最终输出结果如下,可以看到训练集正确率达到了 71%,验证集和测试集正确率达到了 61%,这个正确率代表可以精准说出图片所属的分类,也称 top 1 正确率;此外计算正确分类在概率排前三的分类之中的比率称为 top 3 正确率,如果是电商上传图片以后给出三个可能的商品分类让商家选择,那么计算 top 3 正确率就有意义了。

training accuracy: 0.7162083333333331
validating accuracy: 0.6134999999999998
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.6183333333333333 from epoch 40
testing accuracy: 0.6168333333333332

训练集与验证集正确率变化如下图所示:

实际使用模型的例子如下,输出代表预测图片有 79.23% 的概率是飞机,你也可以试试在互联网上随便找一张图片让这个模型识别:

$ python3 example.py eval
Image path: ./cifar/test/2257_airplane.png
airplane: 79.23%
deer: 6.06%
automobile: 4.04%
cat: 2.89%
frog: 2.11%

使用 CNN 实现图片分类 (ResNet)

上述的模型 top 1 正确率只达到了 61%, 毕竟是 30 年前的老模型了🧔,这里我再介绍一个相对比较新的模型,ResNet 是在 2015 年中提出的模型,论文地址在这里,特征是会把输入和输出结合在一块,例如原来计算 y = f(x) 会变为 y = f(x) + x,从而抵消层数变多带来的梯度消失问题 (参考我之前写的训练过程中常用的技巧)。

下图是 ResNet-18 模型的结构,内部可以分为 4 组,每个组都包括 2 个基础块和 4 个卷积层,并且每个基础块会把输入和输出结合在一起,层数合计一共有 16,加上最开始转换输入的层和全连接层一共有 18 层,所以称为 ResNet-18,除此之外还有 ResNet-34,ResNet-50 等等变种,如果有兴趣可以参考本节末尾给出的 torchvision 的实现代码。

从图中可以看到,从第二组开始会把长宽变为一半,同时通道数增加一倍,然后维持通道数和长宽不变,所有组结束后使用一个 AvgPool2d 来让长宽强制变为 1x1,最后交给全连接层。计算卷积层输出长宽的公式是 (长度 - 内核大小 + 填充量*2) / 处理间隔 + 1,让长宽变为一半会使用内核大小 3,填充量 1,处理间隔 2 ,例如长度为 32 可以计算得出 (32 - 3 + 2) / 2 + 1 == 16;而维持长宽的则会使用内核大小 3,填充量 1,处理间隔 1,例如长度为 32 可以计算得出 (32 - 3 + 2) / 1 + 1 == 32

以下是使用 ResNet-18 进行训练的代码:

import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目标的图片大小,全部图片都会先缩放到这个大小
IMAGE_SIZE = (32, 32)
# 分析目标的图片所在的文件夹
IMAGE_DIR = "./cifar"
# 包含所有图片标签的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"

class BasicBlock(nn.Module):
    """ResNet 使用的基础块"""
    expansion = 1 # 定义这个块的实际出通道是 channels_out 的几倍,这里的实现固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷积层
        # 处理间隔 stride = 1 时,输出的长宽会等于输入的长宽,例如 (32-3+2)//1+1 == 32
        # 处理间隔 stride = 2 时,输出的长宽会等于输入的长宽的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷积层不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定义一个让输出和输入维度相同的 3x3 卷积层
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 让原始输入和输出相加的时候,需要维度一致,如果维度不一致则需要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """图片分类 (ResNet-18)"""
    def __init__(self, num_labels, block_type = BasicBlock):
        super().__init__()
        # 记录上一层的出通道数量
        self.previous_channels_out = 64
        # 把 3 通道转换到 64 通道,长宽不变
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各个层
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最后一层的长宽转换为 1x1 的池化层,Adaptive 表示会自动检测原有长宽
        # 例如 B,512,4,4 的矩阵会转换为 B,512,1,1,每个通道的单个值会是原有 16 个值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全连接层,只使用单层线性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, num_labels)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一个块
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩余的块,剩余的块固定处理间隔为 1,不会改变长宽
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 转换出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 应用 ResNet 的各个层
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 转换长宽到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,维度会变为 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 应用全连接层
        y = self.fc_model(tmp)
        return y

def save_tensor(tensor, path):
    """保存 tensor 对象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """从文件读取 tensor 对象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """转换图片对象到 tensor 对象"""
    in_img = img.resize(IMAGE_SIZE)
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H
    t = t / 255.0 # 正规化数值使得范围在 0 ~ 1
    return t

def load_image_labels():
    """读取图片分类列表"""
    return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))

def prepare_save_batch(batch, tensor_in, tensor_out):
    """准备训练 - 保存单个批次的数据"""
    # 切分训练集 (80%),验证集 (10%) 和测试集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盘
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """准备训练"""
    # 数据集转换到 tensor 以后会保存在 data 文件夹下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 准备图片分类到序号的索引
    labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }

    # 查找所有图片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 分类名称在文件名中,例如
            # 2598_cat.png => cat
            label = filename.split(".")[0].split("_")[1]
            label_index = labels_to_index.get(label)
            if label_index is None:
                continue
            image_paths.append((path, label_index))

    # 打乱图片顺序
    random.shuffle(image_paths)

    # 分批读取和保存图片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                t = image_to_tensor(img)
                image_tensors.append(t)
            image_labels.append(label_index)
        tensor_in = torch.stack(image_tensors) # 维度: B,C,W,H
        tensor_out = torch.tensor(image_labels) # 维度: B
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """开始训练"""
    # 创建模型实例
    num_labels = len(load_image_labels())
    model = MyModel(num_labels)

    # 创建损失计算器
    # 计算单分类输出最好使用 CrossEntropyLoss, 多分类输出最好使用 BCELoss
    # 使用 CrossEntropyLoss 时实际输出应该为标签索引值,不需要转换为 onehot
    loss_function = torch.nn.CrossEntropyLoss()

    # 创建参数调整器
    optimizer = torch.optim.Adam(model.parameters())

    # 记录训练集和验证集的正确率变化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 记录最高的验证集正确率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 读取批次的工具函数
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 计算正确率的工具函数
    def calc_accuracy(actual, predicted):
        # 把最大的值当作正确分类,然后比对有多少个分类相等
        predicted_labels = predicted.argmax(dim=1)
        acc = (actual == predicted_labels).sum().item() / actual.shape[0]
        return acc

    # 划分输入和输出的工具函数
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 开始训练过程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根据训练集训练并修改参数
        # 切换模型到训练模式,将会启用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助于泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 划分输入和输出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 计算预测值
                predicted = model(batch_x)
                # 计算损失
                loss = loss_function(predicted, batch_y)
                # 从损失自动微分求导函数值
                loss.backward()
                # 使用参数调整器调整参数
                optimizer.step()
                # 清空导函数值
                optimizer.zero_grad()
                # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 输出批次正确率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 检查验证集
        # 切换模型到验证模式,将会禁用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 记录最高的验证集正确率与当时的模型状态,判断是否在 20 次训练后仍然没有刷新记录
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次训练后仍然没有刷新记录,结束训练
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用达到最高正确率时的模型状态
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 检查测试集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 显示训练集和验证集的正确率变化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用训练好的模型"""
    # 创建模型实例,加载训练好的状态,然后切换到验证模式
    labels = load_image_labels()
    num_labels = len(labels)
    model = MyModel(num_labels)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 询问图片路径,并显示可能的分类一览
    while True:
        try:
            # 构建输入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 维度 C,W,H => 1,C,W,H
            # 预测输出
            tensor_out = model(tensor_in)
            # 转换到各个分类对应的概率
            tensor_out = nn.functional.softmax(tensor_out, dim=1)
            # 显示按概率排序后的分类一览
            rates = (t.item() for t in tensor_out[0])
            label_with_rates = list(zip(labels, rates))
            label_with_rates.sort(key=lambda p:-p[1])
            for label, rate in label_with_rates[:5]:
                rate = rate * 100
                print(f"{label}: {rate:0.2f}%")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函数"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数
    # 这是为了让过程可重现,你也可以选择不这样做
    random.seed(0)
    torch.random.manual_seed(0)

    # 根据命令行参数选择操作
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

最终输出结果如下,可以看到训练集正确率达到了 99%,验证集正确率达到了 85%,测试集正确率达到了 84%,比起上面的 LeNet 模型改进了很多吧🤗。

training accuracy: 0.9972708333333337
validating accuracy: 0.8373333333333337
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.8521666666666667 from epoch 38
testing accuracy: 0.8464999999999996

随便在网上找的猫狗图片:

输出结果如下,不错吧:

Image path: BlogArchive/ml-08/cat.jpg
cat: 100.00%
dog: 0.00%
frog: 0.00%
deer: 0.00%
horse: 0.00%

Image path: BlogArchive/ml-08/dog.jpg
dog: 100.00%
bird: 0.00%
deer: 0.00%
frog: 0.00%
horse: 0.00%

pytorch 有专门用于处理视觉信息的 torchvision,其中包含了 ResNet 的实现,也就是说其实我们不用自己去写🤒,如果你有兴趣可以参考里面的实现代码,再试试 ResNet-50 等层数更多的模型是否可以带来更好的效果。

AI 鉴黄

相信很多人都看过 AI 鉴黄的新闻🥴🤭🥺,如果你想自己实现一个,可以从 nsfw_data_scraper 下载图片资源然后使用上面介绍的方法训练,识别起来会比 cifar 简单很多。因为实际只需要两个标签(1 黄色图片,0 正常图片),所以也可以使用单个值代表结果,然后用 sigmoid 代替 softmax。此外你也可以在 github 上搜索 nsfw 找到现成的模型。

使用 CNN 实现验证码识别 (ResNet-18)

最后再给出一个实用的例子。很多网站为了防机器人操作会使用验证码机制,传统的验证码会显示一张包含数字字母的图片,然后让用户填写里面的内容再对比是否正确,来判断用户是普通人还是机器人,这样的验证码可以用本篇介绍的 CNN 模型识别出来😈。

首先我们来选一个生成验证码的类库,github 上搜索 captcha C# 里面难度相对比较高的是 Hei.Captcha,这篇就使用 CNN 模型识别这个类库生成的验证码。(我的 zkweb 里面也有生成验证码的模块,但难度比较低所以就不用了)

以下步骤和代码会生成十万张用于训练和测试使用的验证码图片:

mkdir generate-captcha
cd generate-captcha
dotnet new console
dotnet add package Hei.Captcha
mkdir output
mkdir fonts
cd fonts
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/Candara.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/STCAIYUN.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/impact.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/monbaiti.ttf?raw=true
cd ..
# 添加程序代码
dotnet run -c Release
using System;
using System.IO;
using Hei.Captcha;

namespace generate_captcha
{
    class Program
    {
        static void Main(string[] args)
        {
            var helper = new SecurityCodeHelper();
            var iterations = 100000;
            for (var x = 0; x < iterations; ++x)
            {
                var code = helper.GetRandomEnDigitalText(4);
                var bytes = helper.GetEnDigitalCodeByte(code);
                File.WriteAllBytes($"output/{x:D5}-{code}.png", bytes);
                if (x % 100 == 0)
                    Console.WriteLine($"{x}/{iterations}");
            }
        }
    }
}

以下是生成的验证码图片例子,变形旋转干扰线动态背景色该有的都有😠:

接下来我们想想应该用什么数据结构来表达验证码。在图片识别的例子中有十个分类,我们用了 onehot 编码,即使用长度为 10 的 tensor 对象来表示结果,正确的分类为 1,不正确的分类为 0。换成验证码以后,可以用长度为 36 的 tensor 对象来表示 1 位验证码 (26 个英文数字 + 10 个字母,假设验证码不分大小写),如果有多位则可以 36 * 位数的 tensor 对象来表达多位验证码。以下函数可以把验证码转换为对应的 tensor 对象:

# 字母数字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 验证码位数
DIGITS = 4
# 标签数量,字母数字混合*位数
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

def code_to_tensor(code):
    """转换验证码到 tensor 对象,使用 onehot 编码"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 验证码不分大小写
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

转换例子如下:

>>> code_to_tensor("abcd")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> code_to_tensor("a123")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.])

反过来也一样,我们可以把 tensor 的长度按 36 分为多组,然后求每一组最大的值所在的索引,再根据该索引找到对应的字母或者数字,就可以把 tensor 对象转换回验证码:

def tensor_to_code(tensor):
    """转换 tensor 对象到验证码"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

接下来就可以用前面介绍过的 ResNet-18 模型进行训练了😎,相比前面的图片分类,这份代码有以下几点不同:

  • 因为是多分类,损失计算器应该使用 BCELoss 代替 CrossEntropyLoss
  • BCELoss 要求模型输出值范围在 0 ~ 1 之间,所以需要在模型内部添加控制函数 (CrossEntropyLoss 这么做会影响训练效果,但 BCELoss 不会)
  • 因为每一组都只有一个值是正确的,用 softmax 效果会比 sigmoid 要好 (普通的多分类问题会使用 sigmoid)
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目标的图片大小,全部图片都会先缩放到这个大小
# 验证码原图是 120x50
IMAGE_SIZE = (56, 24)
# 分析目标的图片所在的文件夹
IMAGE_DIR = "./generate-captcha/output/"
# 字母数字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 验证码位数
DIGITS = 4
# 标签数量,字母数字混合*位数
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

class BasicBlock(nn.Module):
    """ResNet 使用的基础块"""
    expansion = 1 # 定义这个块的实际出通道是 channels_out 的几倍,这里的实现固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷积层
        # 处理间隔 stride = 1 时,输出的长宽会等于输入的长宽,例如 (32-3+2)//1+1 == 32
        # 处理间隔 stride = 2 时,输出的长宽会等于输入的长宽的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷积层不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定义一个让输出和输入维度相同的 3x3 卷积层
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 让原始输入和输出相加的时候,需要维度一致,如果维度不一致则需要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """识别验证码 (ResNet-18)"""
    def __init__(self, block_type = BasicBlock):
        super().__init__()
        # 记录上一层的出通道数量
        self.previous_channels_out = 64
        # 把 3 通道转换到 64 通道,长宽不变
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各个层
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最后一层的长宽转换为 1x1 的池化层,Adaptive 表示会自动检测原有长宽
        # 例如 B,512,4,4 的矩阵会转换为 B,512,1,1,每个通道的单个值会是原有 16 个值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全连接层,只使用单层线性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, NUM_LABELS)
        # 控制输出在 0 ~ 1 之间,BCELoss 需要
        # 因为每组只应该有一个值为真,使用 softmax 效果会比 sigmoid 好
        self.softmax = nn.Softmax(dim=2)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一个块
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩余的块,剩余的块固定处理间隔为 1,不会改变长宽
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 转换出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 应用 ResNet 的各个层
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 转换长宽到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,维度会变为 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 应用全连接层
        tmp = self.fc_model(tmp)
        # 划分每个字符对应的组,之后维度为 batch_size, digits, alpha_nums
        tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
        # 应用 softmax 到每一组
        tmp = self.softmax(tmp)
        # 重新扁平化,之后维度为 batch_size, num_labels
        y = tmp.reshape(tmp.shape[0], NUM_LABELS)
        return y

def save_tensor(tensor, path):
    """保存 tensor 对象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """从文件读取 tensor 对象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """转换图片对象到 tensor 对象"""
    in_img = img.resize(IMAGE_SIZE)
    in_img = in_img.convert("RGB") # 转换图片模式到 RGB
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H
    t = t / 255.0 # 正规化数值使得范围在 0 ~ 1
    return t

def code_to_tensor(code):
    """转换验证码到 tensor 对象,使用 onehot 编码"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 验证码不分大小写
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

def tensor_to_code(tensor):
    """转换 tensor 对象到验证码"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

def prepare_save_batch(batch, tensor_in, tensor_out):
    """准备训练 - 保存单个批次的数据"""
    # 切分训练集 (80%),验证集 (10%) 和测试集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盘
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """准备训练"""
    # 数据集转换到 tensor 以后会保存在 data 文件夹下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 查找所有图片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 验证码在文件名中,例如
            # 00000-R865.png => R865
            code = filename.split(".")[0].split("-")[1]
            image_paths.append((path, code))

    # 打乱图片顺序
    random.shuffle(image_paths)

    # 分批读取和保存图片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, code in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                image_tensors.append(image_to_tensor(img))
            image_labels.append(code_to_tensor(code))
        tensor_in = torch.stack(image_tensors) # 维度: B,C,W,H
        tensor_out = torch.stack(image_labels) # 维度: B,N
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """开始训练"""
    # 创建模型实例
    model = MyModel()

    # 创建损失计算器
    # 计算多分类输出最好使用 BCELoss
    loss_function = torch.nn.BCELoss()

    # 创建参数调整器
    optimizer = torch.optim.Adam(model.parameters())

    # 记录训练集和验证集的正确率变化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 记录最高的验证集正确率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 读取批次的工具函数
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 计算正确率的工具函数
    def calc_accuracy(actual, predicted):
        # 把每一位的最大值当作正确字符,然后比对有多少个字符相等
        actual_indices = actual.reshape(actual.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        predicted_indices = predicted.reshape(predicted.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        matched = (actual_indices - predicted_indices).abs().sum(dim=1) == 0
        acc = matched.sum().item() / actual.shape[0]
        return acc
 
    # 划分输入和输出的工具函数
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size, num_labels
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 开始训练过程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根据训练集训练并修改参数
        # 切换模型到训练模式,将会启用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助于泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 划分输入和输出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 计算预测值
                predicted = model(batch_x)
                # 计算损失
                loss = loss_function(predicted, batch_y)
                # 从损失自动微分求导函数值
                loss.backward()
                # 使用参数调整器调整参数
                optimizer.step()
                # 清空导函数值
                optimizer.zero_grad()
                # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 输出批次正确率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 检查验证集
        # 切换模型到验证模式,将会禁用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 记录最高的验证集正确率与当时的模型状态,判断是否在 20 次训练后仍然没有刷新记录
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次训练后仍然没有刷新记录,结束训练
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用达到最高正确率时的模型状态
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 检查测试集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 显示训练集和验证集的正确率变化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用训练好的模型"""
    # 创建模型实例,加载训练好的状态,然后切换到验证模式
    model = MyModel()
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 询问图片路径,并显示可能的分类一览
    while True:
        try:
            # 构建输入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 维度 C,W,H => 1,C,W,H
            # 预测输出
            tensor_out = model(tensor_in)
            # 转换到验证码
            code = tensor_to_code(tensor_out[0])
            print(f"code: {code}")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函数"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数
    # 这是为了让过程可重现,你也可以选择不这样做
    random.seed(0)
    torch.random.manual_seed(0)

    # 根据命令行参数选择操作
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

因为训练需要大量时间而我机器只有 CPU 可以用,所以这次我就只训练到 epoch 23 🤢,训练结果如下。可以看到训练集正确率达到了 98%,验证集正确率达到了 91%,已经是实用的级别了。

epoch: 23, batch: 98: batch accuracy: 0.99125
epoch: 23, batch: 99: batch accuracy: 0.9862500000000001
training accuracy: 0.9849874999999997
validating accuracy: 0.9103000000000003
highest validating accuracy updated

使用训练好的模型识别验证码,你可以对比上面的图片看看是不是识别对了 (第二张的 P 看起来很像 D 🤒):

$ python3 example.py eval
Image path: BlogArchive/ml-08/captcha-1.png
code: 8ca6

Image path: BlogArchive/ml-08/captcha-2.png
code: tp8s

Image path: BlogArchive/ml-08/captcha-3.png
code: k225

注意这里介绍出来的模型只能识别这一种验证码,其他不同种类的验证码需要分别训练和生成模型,做打码平台的话会先识别验证码种类再使用该种类对应的模型识别验证码内容。如果你的目标只是单种验证码,那么用这篇文章介绍的方法应该可以帮你节省调打码平台的钱 🤠。如果你机器有好显卡,也可以试试用更高级的模型提升正确率。

此外,有很多人问我现在流行的滑动验证码如何破解,其实破解这种验证码只需要做简单的图片分析,例如这里这里都没有使用机器学习。但滑动验证码一般会配合浏览器指纹和鼠标轨迹采集一起使用,后台会根据大量数据分析用户是普通人还是机器人,所以破解几次很简单,但一直破解下去则会有很大几率被检测出来。

写在最后

这个系列中预定要写的内容已经全部写出来了,接下来要写什么还不确定,有时间可能会重新维护那些放了半年以上的项目,也可能会想办法搞好饭店的生意,最近生意实在不好啊🤒。

SQL Server死锁总结 - Silent Void - 博客园

mikel阅读(530)

来源: SQL Server死锁总结 – Silent Void – 博客园

  1.  死锁原理

根据操作系统中的定义:死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

死锁的四个必要条件:
互斥条件(Mutual exclusion):资源不能被共享,只能由一个进程使用。
请求与保持条件(Hold and wait):已经得到资源的进程可以再次申请新的资源。
非剥夺条件(No pre-emption):已经分配的资源不能从相应的进程中被强制地剥夺。
循环等待条件(Circular wait):系统中若干进程组成环路,该环路中每个进程都在等待相邻进程正占用的资源。

对应到SQL Server中,当在两个或多个任务中,如果每个任务锁定了其他任务试图锁定的资源,此时会造成这些任务永久阻塞,从而出现死锁;这些资源可能是:单行(RID,堆中的单行)、索引中的键(KEY,行锁)、页(PAG,8KB)、区结构(EXT,连续的8页)、堆或B树(HOBT) 、表(TAB,包括数据和索引)、文件(File,数据库文件)、应用程序专用资源(APP)、元数据(METADATA)、分配单元(Allocation_Unit)、整个数据库(DB)。一个死锁示例如下图所示:

说明:T1、T2表示两个任务;R1和R2表示两个资源;由资源指向任务的箭头(如R1->T1,R2->T2)表示该资源被改任务所持有;由任务指向资源的箭头(如T1->S2,T2->S1)表示该任务正在请求对应目标资源;
其满足上面死锁的四个必要条件:
(1).互斥:资源S1和S2不能被共享,同一时间只能由一个任务使用;
(2).请求与保持条件:T1持有S1的同时,请求S2;T2持有S2的同时请求S1;
(3).非剥夺条件:T1无法从T2上剥夺S2,T2也无法从T1上剥夺S1;
(4).循环等待条件:上图中的箭头构成环路,存在循环等待。

 

  1. 死锁排查

(1). 使用SQL Server的系统存储过程sp_who和sp_lock,可以查看当前数据库中的锁情况;进而根据objectID(@objID)(SQL Server 2005)/ object_name(@objID)(Sql Server 2000)可以查看哪个资源被锁,用dbcc ld(@blk),可以查看最后一条发生给SQL Server的Sql语句;

CREATE Table #Who(spid int,
ecid int,
status nvarchar(50),
loginname nvarchar(50),
hostname nvarchar(50),
blk int,
dbname nvarchar(50),
cmd nvarchar(50),
request_ID int);

CREATE Table #Lock(spid int,
dpid int,
objid int,
indld int,
[Type] nvarchar(20),
Resource nvarchar(50),
Mode nvarchar(10),
Status nvarchar(10)
);

INSERT INTO #Who
EXEC sp_who active  –看哪个引起的阻塞,blk
INSERT INTO #Lock
EXEC sp_lock  –看锁住了那个资源id,objid

DECLARE @DBName nvarchar(20);
SET @DBName=’NameOfDataBase’

SELECT #Who.* FROM #Who WHERE dbname=@DBName
SELECT #Lock.* FROM #Lock
JOIN #Who
ON #Who.spid=#Lock.spid
AND dbname=@DBName;

–最后发送到SQL Server的语句
DECLARE crsr Cursor FOR
SELECT blk FROM #Who WHERE dbname=@DBName AND blk<>0;
DECLARE @blk int;
open crsr;
FETCH NEXT FROM crsr INTO @blk;
WHILE (@@FETCH_STATUS = 0)
BEGIN;
dbcc inputbuffer(@blk);
FETCH NEXT FROM crsr INTO @blk;
END;
close crsr;
DEALLOCATE crsr;

–锁定的资源
SELECT #Who.spid,hostname,objid,[type],mode,object_name(objid) as objName FROM #Lock
JOIN #Who
ON #Who.spid=#Lock.spid
AND dbname=@DBName
WHERE objid<>0;

DROP Table #Who;
DROP Table #Lock;

(2). 使用 SQL Server Profiler 分析死锁: 将 Deadlock graph 事件类添加到跟踪。此事件类使用死锁涉及到的进程和对象的 XML 数据填充跟踪中的 TextData 数据列。SQL Server 事件探查器 可以将 XML 文档提取到死锁 XML (.xdl) 文件中,以后可在 SQL Server Management Studio 中查看该文件。

 

  1. 避免死锁

上面1中列出了死锁的四个必要条件,我们只要想办法破其中的任意一个或多个条件,就可以避免死锁发生,一般有以下几种方法(FROM Sql Server 2005联机丛书):
(1).按同一顺序访问对象。(注:避免出现循环)
(2).避免事务中的用户交互。(注:减少持有资源的时间,较少锁竞争)
(3).保持事务简短并处于一个批处理中。(注:同(2),减少持有资源的时间)
(4).使用较低的隔离级别。(注:使用较低的隔离级别(例如已提交读)比使用较高的隔离级别(例如可序列化)持有共享锁的时间更短,减少锁竞争)
(5).使用基于行版本控制的隔离级别:2005中支持快照事务隔离和指定READ_COMMITTED隔离级别的事务使用行版本控制,可以将读与写操作之间发生的死锁几率降至最低:
SET ALLOW_SNAPSHOT_ISOLATION ON –事务可以指定 SNAPSHOT 事务隔离级别;
SET READ_COMMITTED_SNAPSHOT ON  –指定 READ_COMMITTED 隔离级别的事务将使用行版本控制而不是锁定。默认情况下(没有开启此选项,没有加with nolock提示),SELECT语句会对请求的资源加S锁(共享锁);而开启了此选项后,SELECT不会对请求的资源加S锁。
注意:设置 READ_COMMITTED_SNAPSHOT 选项时,数据库中只允许存在执行 ALTER DATABASE 命令的连接。在 ALTER DATABASE 完成之前,数据库中决不能有其他打开的连接。数据库不必一定要处于单用户模式中。
(6).使用绑定连接。(注:绑定会话有利于在同一台服务器上的多个会话之间协调操作。绑定会话允许一个或多个会话共享相同的事务和锁(但每个回话保留其自己的事务隔离级别),并可以使用同一数据,而不会有锁冲突。可以从同一个应用程序内的多个会话中创建绑定会话,也可以从包含不同会话的多个应用程序中创建绑定会话。在一个会话中开启事务(begin tran)后,调用exec sp_getbindtoken @Token out;来取得Token,然后传入另一个会话并执行EXEC sp_bindsession @Token来进行绑定(最后的示例中演示了绑定连接)。

 

  1. 死锁处理方法:

(1). 根据2中提供的sql,查看那个spid处于wait状态,然后用kill spid来干掉(即破坏死锁的第四个必要条件:循环等待);当然这只是一种临时解决方案,我们总不能在遇到死锁就在用户的生产环境上排查死锁、Kill sp,我们应该考虑如何去避免死锁。

(2). 使用SET LOCK_TIMEOUT timeout_period(单位为毫秒)来设定锁请求超时。默认情况下,数据库没有超时期限(timeout_period值为-1,可以用SELECT @@LOCK_TIMEOUT来查看该值,即无限期等待)。当请求锁超过timeout_period时,将返回错误。timeout_period值为0时表示根本不等待,一遇到锁就返回消息。设置锁请求超时,破环了死锁的第二个必要条件(请求与保持条件)。

服务器: 消息 1222,级别 16,状态 50,行 1
已超过了锁请求超时时段。

(3). SQL Server内部有一个锁监视器线程执行死锁检查,锁监视器对特定线程启动死锁搜索时,会标识线程正在等待的资源;然后查找特定资源的所有者,并递归地继续执行对那些线程的死锁搜索,直到找到一个构成死锁条件的循环。检测到死锁后,数据库引擎 选择运行回滚开销最小的事务的会话作为死锁牺牲品,返回1205 错误,回滚死锁牺牲品的事务并释放该事务持有的所有锁,使其他线程的事务可以请求资源并继续运行。

 

  1. 两个死锁示例及解决方法

5.1 SQL死锁

(1). 测试用的基础数据:

CREATE TABLE Lock1(C1 int default(0));
CREATE TABLE Lock2(C1 int default(0));
INSERT INTO Lock1 VALUES(1);
INSERT INTO Lock2 VALUES(1);

(2). 开两个查询窗口,分别执行下面两段sql

–Query 1
Begin Tran
Update Lock1 Set C1=C1+1;
WaitFor Delay ’00:01:00′;
SELECT * FROM Lock2
Rollback Tran;

 

–Query 2
Begin Tran
Update Lock2 Set C1=C1+1;
WaitFor Delay ’00:01:00′;
SELECT * FROM Lock1
Rollback Tran;

 

上面的SQL中有一句WaitFor Delay ’00:01:00’,用于等待1分钟,以方便查看锁的情况。

(3). 查看锁情况

在执行上面的WaitFor语句期间,执行第二节中提供的语句来查看锁信息:

Query1中,持有Lock1中第一行(表中只有一行数据)的行排他锁(RID:X),并持有该行所在页的意向更新锁(PAG:IX)、该表的意向更新锁(TAB:IX);Query2中,持有Lock2中第一行(表中只有一行数据)的行排他锁(RID:X),并持有该行所在页的意向更新锁(PAG:IX)、该表的意向更新锁(TAB:IX);

执行完Waitfor,Query1查询Lock2,请求在资源上加S锁,但该行已经被Query2加上了X锁;Query2查询Lock1,请求在资源上加S锁,但该行已经被Query1加上了X锁;于是两个查询持有资源并互不相让,构成死锁。

(4). 解决办法

a). SQL Server自动选择一条SQL作死锁牺牲品:运行完上面的两个查询后,我们会发现有一条SQL能正常执行完毕,而另一个SQL则报如下错误:

服务器: 消息 1205,级别 13,状态 50,行 1
事务(进程 ID  xx)与另一个进程已被死锁在  lock 资源上,且该事务已被选作死锁牺牲品。请重新运行该事务。

这就是上面第四节中介绍的锁监视器干活了。

b). 按同一顺序访问对象:颠倒任意一条SQL中的Update与SELECT语句的顺序。例如修改第二条SQL成如下:

–Query2
Begin Tran
SELECT * FROM Lock1–在Lock1上申请S锁
WaitFor Delay ’00:01:00′;
Update Lock2 Set C1=C1+1;–Lock2:RID:X
Rollback Tran;

当然这样修改也是有代价的,这会导致第一条SQL执行完毕之前,第二条SQL一直处于阻塞状态。单独执行Query1或Query2需要约1分钟,但如果开始执行Query1时,马上同时执行Query2,则Query2需要2分钟才能执行完;这种按顺序请求资源从一定程度上降低了并发性。

c). SELECT语句加With(NoLock)提示:默认情况下SELECT语句会对查询到的资源加S锁(共享锁),S锁与X锁(排他锁)不兼容;但加上With(NoLock)后,SELECT不对查询到的资源加锁(或者加Sch-S锁,Sch-S锁可以与任何锁兼容);从而可以是这两条SQL可以并发地访问同一资源。当然,此方法适合解决读与写并发死锁的情况,但加With(NoLock)可能会导致脏读。

SELECT * FROM Lock2 WITH(NOLock)
SELECT * FROM Lock1 WITH(NOLock)

d). 使用较低的隔离级别。SQL Server 2000支持四种事务处理隔离级别(TIL),分别为:READ UNCOMMITTED、READ COMMITTED、REPEATABLE READ、SERIALIZABLE;SQL Server 2005中增加了SNAPSHOT TIL。默认情况下,SQL Server使用READ COMMITTED TIL,我们可以在上面的两条SQL前都加上一句SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED,来降低TIL以避免死锁;事实上,运行在READ UNCOMMITTED TIL的事务,其中的SELECT语句不对结果资源加锁或加Sch-S锁,而不会加S锁;但还有一点需要注意的是:READ UNCOMMITTED TIL允许脏读,虽然加上了降低TIL的语句后,上面两条SQL在执行过程中不会报错,但执行结果是一个返回1,一个返回2,即读到了脏数据,也许这并不是我们所期望的。

e). 在SQL前加SET LOCK_TIMEOUT timeout_period,当请求锁超过设定的timeout_period时间后,就会终止当前SQL的执行,牺牲自己,成全别人。

f). 使用基于行版本控制的隔离级别(SQL Server 2005支持):开启下面的选项后,SELECT不会对请求的资源加S锁,不加锁或者加Sch-S锁,从而将读与写操作之间发生的死锁几率降至最低;而且不会发生脏读。

SET ALLOW_SNAPSHOT_ISOLATION ON
SET READ_COMMITTED_SNAPSHOT ON

       g). 使用绑定连接(使用方法见下一个示例。)

 

5.2 程序死锁(SQL阻塞)

看一个例子:一个典型的数据库操作事务死锁分析,按照我自己的理解,我觉得这应该算是C#程序中出现死锁,而不是数据库中的死锁;下面的代码模拟了该文中对数据库的操作过程:

//略去的无关的code
SqlConnection conn = new SqlConnection(connectionString);
conn.Open();
SqlTransaction tran = conn.BeginTransaction();
string sql1 = “Update Lock1 SET C1=C1+1”;
string sql2 = “SELECT * FROM Lock1”;
ExecuteNonQuery(tran, sql1); //使用事务:事务中Lock了Table
ExecuteNonQuery(null, sql2); //新开一个connection来读取Table

public static void ExecuteNonQuery(SqlTransaction tran, string sql)
{
SqlCommand cmd = new SqlCommand(sql);
if (tran != null)
{
cmd.Connection = tran.Connection;
cmd.Transaction = tran;
cmd.ExecuteNonQuery();
}
else
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
conn.Open();
cmd.Connection = conn;
cmd.ExecuteNonQuery();
}
}
}

执行到ExecuteNonQuery(null, sql2)时抛出SQL执行超时的异常,下图从数据库的角度来看该问题:

 

代码从上往下执行,会话1持有了表Lock1的X锁,且事务没有结束,回话1就一直持有X锁不释放;而会话2执行select操作,请求在表Lock1上加S锁,但S锁与X锁是不兼容的,所以回话2的被阻塞等待,不在等待中,就在等待中获得资源,就在等待中超时。。。从中我们可以看到,里面并没有出现死锁,而只是SELECT操作被阻塞了。也正因为不是数据库死锁,所以SQL Server的锁监视器无法检测到死锁。

我们再从C#程序的角度来看该问题:

 

C#程序持有了表Lock1上的X锁,同时开了另一个SqlConnection还想在该表上请求一把S锁,图中已经构成了环路;太贪心了,结果自己把自己给锁死了。。。

虽然这不是一个数据库死锁,但却是因为数据库资源而导致的死锁,上例中提到的解决死锁的方法在这里也基本适用,主要是避免读操作被阻塞,解决方法如下:

a). 把SELECT放在Update语句前:SELECT不在事务中,且执行完毕会释放S锁;
b). 把SELECT也放加入到事务中:ExecuteNonQuery(tran, sql2);
c). SELECT加With(NOLock)提示:可能产生脏读;
d). 降低事务隔离级别:SELECT语句前加SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;同上,可能产生脏读;
e). 使用基于行版本控制的隔离级别(同上例)。
g). 使用绑定连接:取得事务所在会话的token,然后传入新开的connection中;执行EXEC sp_bindsession @Token后绑定了连接,最后执行exec sp_bindsession null;来取消绑定;最后需要注意的四点是:
(1). 使用了绑定连接的多个connection共享同一个事务和相同的锁,但各自保留自己的事务隔离级别;
(2). 如果在sql3字符串的“exec sp_bindsession null”换成“commit tran”或者“rollback tran”,则会提交整个事务,最后一行C#代码tran.Commit()就可以不用执行了(执行会报错,因为事务已经结束了-,-)。
(3). 开启事务(begin tran)后,才可以调用exec sp_getbindtoken @Token out来取得Token;如果不想再新开的connection中结束掉原有的事务,则在这个connection close之前,必须执行“exec sp_bindsession null”来取消绑定连接,或者在新开的connectoin close之前先结束掉事务(commit/tran)。
(4). (Sql server 2005 联机丛书)后续版本的 Microsoft SQL Server 将删除该功能。请避免在新的开发工作中使用该功能,并着手修改当前还在使用该功能的应用程序。 请改用多个活动结果集 (MARS) 或分布式事务。

tran = connection.BeginTransaction();
string sql1 = “Update Lock1 SET C1=C1+1″;
ExecuteNonQuery(tran, sql1); //使用事务:事务中Lock了测试表Lock1
string sql2 = @”DECLARE @Token varchar(255);
exec sp_getbindtoken @Token out;
SELECT @Token;”;
string token = ExecuteScalar(tran, sql2).ToString();
string sql3 = “EXEC sp_bindsession @Token;Update Lock1 SET C1=C1+1;exec sp_bindsession null;”;
SqlParameter parameter = new SqlParameter(“@Token”, SqlDbType.VarChar);
parameter.Value = token;
ExecuteNonQuery(null, sql3, parameter); //新开一个connection来操作测试表Lock1
tran.Commit();

 

 

附:锁兼容性(FROM SQL Server 2005 联机丛书)

锁兼容性控制多个事务能否同时获取同一资源上的锁。如果资源已被另一事务锁定,则仅当请求锁的模式与现有锁的模式相兼容时,才会授予新的锁请求。如果请求锁的模式与现有锁的模式不兼容,则请求新锁的事务将等待释放现有锁或等待锁超时间隔过期。

happyhippy作者:Silent Void 
出处:http://happyhippy.cnblogs.com/
转载须保留此声明,并注明在文章起始位置给出原文链接。

如何使用ABP进行软件开发之基础概览 - 溪源More - 博客园

mikel阅读(519)

来源: 如何使用ABP进行软件开发之基础概览 – 溪源More – 博客园

ABP框架简述

1)简介

在.NET众多的技术框架中,ABP框架(本系列中指aspnetboilerplate项目)以其独特的魅力吸引了一群优秀开发者广泛的使用。

在该框架的赋能之下,开发者可根据需求通过官方网站https://aspnetboilerplate.com/Templates】选择下载例如Vue/AngluarJS/MVC等不同类型的模板项目,轻松加入ABP开发者的队伍中,尽享基于ABP开发带来的乐趣。

ABP开发框架也提供了丰富的文档,能够为开发者带来许多便捷。目前ABP的文档网站为:

官方文档:https://aspnetboilerplate.com/Pages/Documents

文档库不可谓不全,加上国内众多的ABP开发者参与的活跃的技术圈子,使得学习成本只是在第一个项目中比较高,后期将会越来越平滑。

2)现状

当然,目前ABP的框架开发者和社区已经把更多的精力投入到了ABP.VNEXT开发框架,这个新框架以其DDD+微服务+模块化的理念获得了大量拥趸,使ABP框架的开发优先级已经开始逐渐降低。

但这是因为ABP框架的功能已经成熟稳定,且ABP是一种增量式的架构设计,开发者在熟练掌握这种框架后,可以根据自己的需要进行方便的扩展,使其成为小项目架构选型中一种不错的备选方案。

当然,也存在一些弊端。例如由于ABP被称为.NET众多开发框架中面向领域驱动设计的最佳实践,而囿于领域驱动设计本身不低的门槛,使得学习的过程变得看起来非常陡峭;

除此之外,ABP也广泛使用了目前ASP.NET/ASP.NET Core框架的大量比较新的特性,对于不少无法由于各种原因无法享受.NET技术飞速发展红利的传统开发者来说,无形中也提高了技术门槛。

3)综述

在这个系列中,本文计划分成三篇来介绍ABP框架,第一篇介绍ABP的基础概览,介绍基础知识,第二篇介绍ABP的模式实践,第三篇,试图介绍如何从更传统的三层甚至是单层+SQL的单层架构,如何迁移到ABP框架。

(毕竟。。.NET遗留应用实在是太多了,拯救或不拯救?)

代码结构结构

基本文件夹简述

当我们通过ABP模板项目的官方网站下载一个项目后,我们所获得的代码包的结构如下图所示,其中:

  • vue为使用iview框架构建的管理系统基本模板,该脚手架使用了yarn作为包管理器,并集成了vuex/axios等常用框架,并提供了用户,租户,权限三个基本功能的示例代码,开发者只需发挥聪明才智就能快速的通过该框架入手前端项目。
  • (当然,该项目广泛使用了typescript+面向对象的设计,似乎前端开发者。。普遍不擅长面向对象开发?)
  • aspnet-core则是一个完整的ASP.NETcore项目的快速开发脚手架。该脚手架集成了docker打包于一体,并包含基本的单元测试示例,使用了identity作为权限控制单元,使用swagger作为接口文档管理工具,集成了efcore、jwt等常用组件,对于开发者来说,基本上算是开箱即用了。

前端vue项目

打开vue文件夹之后,该项目的基本目录如下图所示。(src文件夹)

lib文件夹

定义了与abp+vue脚手架项目的基础组件和常见类库,封装了一系列基本方法。例如权限控制,数据请求,菜单操作,SignalR等基础组件的用法。

router文件夹

定义了vue项目的路由规则,其中index.ts文件是项目的入口,router.ts文件定义了vue文件的路由规则。

store文件夹

由于本项目使用了vuex框架,所以我们可以来看看对于store文件夹的介绍。

在vuex框架中:

每一个 Vuex 应用的核心就是 store(仓库)。“store”基本上就是一个容器,它包含着你的应用中大部分的状态 (state)。
Vuex 和单纯的全局对象有以下两点不同:
Vuex 的状态存储是响应式的。当 Vue 组件从 store 中读取状态的时候,若 store 中的状态发生变化,那么相应的组件也会相应地得到高效更新。
你不能直接改变 store 中的状态。改变 store 中的状态的唯一途径就是显式地提交 (commit) mutation。这样使得我们可以方便地跟踪每一个状态的变化,从而让我们能够实现一些工具帮助我们更好地了解我们的应用。

即vuex框架中,将原来的请求链路,抽象化为状态的变化,通过维护状态,使得数据的管理更加便捷,也易于扩展。

views文件夹

定义了登录、首页、用户、角色、租户的基本页面,并提供了新增、查看、编辑、删除的代码示例。

综上,该项目是一个结构清晰,逻辑缜密的前端框架,可以作为常见管理系统的脚手架。

后端项目

简介

后端项目是一个遵循了领域驱动设计的分层,同时又符合Robert Martin在《代码整洁之道》提出的【整洁架构】。

领域驱动设计简介

在领域驱动设计的分层设计中,共有四个功能分层,分别是:

表示层(Presentation Layer):为用户提供接口,使用应用层实现用户交互。

应用层(Application Layer):介于用户层和领域层之间,协调用户对象,完成对应的任务。

领域层(Domain Layer):包含业务对象和规则,是应用程序的心脏。

基础设施层(Infrastructure Layer):提供高层级的通用技术功能,主要使用第三方库完成。

在后文中,基于abp对领域驱动设计的功能分层将进行多次、详细叙述,本小节不再赘述。

整洁架构简介

整洁架构是由Bob大叔提出的一种架构模型,来源于《整洁架构》这本书,顾名思义,其目的并不是为了介绍这一种优秀的架构本身,而是介绍如何设计一种整洁的架构,使得代码结构易于维护。

(整洁架构就是这样一个洋葱,所以也有人称它为“洋葱”架构)

  1. 依赖规则(Dependency Rule)

用一组同心圆来表示软件的不同领域。一般来说,越深入代表你的软件层次越高。外圆是战术是实现机制(mechanisms),内圆的是核心原则(policy)。

Policy means the application logic.

Mechanism means the domain primitives.

使此体系架构能够工作的关键是依赖规则。这条规则规定软件模块只能向内依赖,而里面的部分对外面的模块一无所知,也就是内部不依赖外部,而外部依赖内部。同样,在外面圈中使用的数据格式不应被内圈中使用,特别是如果这些数据格式是由外面一圈的框架生成的。我们不希望任何外圆的东西会影响内圈层

  1. 实体 (Entities)

实体封装的是整个企业范围内的业务核心原则(policy),一个实体能是一个带有方法的对象,或者是一系列数据结构和函数,只要这个实体能够被不同的应用程序使用即可。

如果你没有编写企业软件,只是编写简单的应用程序,这些实体就是应用的业务对象,它们封装着最普通的高级别业务规则,你不能希望这些实体对象被一个页面的分页导航功能改变,也不能被安全机制改变,操作实现层面的任何改变不能影响实体层,只有业务需求改变了才可以改变实体

  1. 用例 (Use case)

在这个层的软件包含只和应用相关的业务规则,它封装和实现系统的所有用例,这些用例会混合各种来自实体的各种数据流程,并且指导这些实体使用企业规则来完成用例的功能目标。

我们并不期望改变这层会影响实体层. 我们也不期望这层被更外部如数据库 UI或普通框架影响,而这也正是我们分离出这一层来的原因所在。

然而,应用层面的操作改变将会影响到这个用例层,如果需求中用例发生改变,这个层的代码就会随之发生改变。所以可以看到,这一层是和应用本身紧密相关的

  1. 接口适配器 (Interface Adapters)

这一层的软件基本都是一些适配器,主要用于将用例和实体中的数据转换为外部系统如数据库或Web使用的数据,在这个层次,可以包含一些GUI的MVC架构,表现视图 控制器都属于这个层,模型Model是从控制器传递到用例或从用例传递到视图的数据结构。

通常在这个层数据被转换,从用例和实体使用的数据格式转换到持久层框架使用的数据,主要是为了存储到数据库中,这个圈层的代码是一点和数据库没有任何关系,如果数据库是一个SQL数据库, 这个层限制使用SQL语句以及任何和数据库打交道的事情。

  1. 框架和驱动器

最外面一圈通常是由一些框架和工具组成,如数据库Database, Web框架等. 通常你不必在这个层不必写太多代码,而是写些胶水性质的代码与内层进行粘结通讯。

这个层是细节所在,Web技术是细节,数据库是细节,我们将这些实现细节放在外面以免它们对我们的业务规则造成影响伤害

ABP的分层实现

在ABP项目中,层次划分如下。

1. 应用层(Application项目)

在领域驱动设计的分层式架构中,应用层作为应用系统的北向网关,对外提供业务外观的功能。在Abp模板项目中,Application项目也是编写主要用例代码的位置,开发者们在此定义与界面有关的数据行为,实现面向接口的开发实践。

应用服务层包含应用服务,数据传输单元,工作单元等对象。

  • Application Service

为面向用户界面层实现业务逻辑代码。例如需要为某些界面对象组装模型,通常会定义ApplicationService,并通过DTO对象,实现与界面表现层的数据交换。

  • Data Transfer Object (DTO)

最常见的数据结构为DTO(数据传输对象),这是来源于马丁弗勒在《企业架构应用模式》中提到的名词,其主要作用为:

是一种设计模式之间传输数据的软件应用系统。 数据传输目标往往是数据访问对象从数据库中检索数据。

在ABP的设计中,有两种不同类型的DTO,分别是用于新增、修改、删除的Input DTO,和用于查询的Output DTO。

  • Unit of Work:

工作单元。工作单元与事务类似,封装了一系列原子级的数据库操作。

2. 核心层(Core项目)

核心层包含领域实体、值对象、聚合根,以及领域上下文实现。

  • Entity(实体):

实体有别于传统意义上大家所理解的与数据库字段一一匹配的实体模型,在领域驱动设计中,虽然实体同样可能持久化到数据库,但实体包含属性和行为两种不同的抽象。

例如,如果有一个实体为User,其中有一个属性为Phone,数据为086-132xxxxxxxx,我们有时需要判断该手机号码的国际代号,可能会添加一个新的判定 GetNationCode(),可以通过从Phone字段中取出086来实现,这就是一种通俗意义上的行为。

  • Value Object(值对象):

值对象无需持久化到数据库,往往是从其他实体或聚合中“剥离”出来的与某些聚合具备逻辑相关性或语义相关性的对象,有时值对象甚至只有个别属性。

例如,上述实体,包含Phone字段,我们可以将整个Phone“剥离”为一个Telephone对象,该对象可包含PhoneNumber和NationCode字段。

复制代码
public class User
{
     public Telephone Phone{public get;private set;}
}
public class Telephone
{
    public string  PhoneNumber {get;set;}
     public string NationCode  {get;set;}
}
  • Aggregate & Aggregate Root(聚合,聚合根):

聚合是业务的最小工作单元,有时,一个实体就是一个小聚合,而为聚合对外提供访问机制的对象,就是聚合根。

在领域驱动设计中,识别聚合也是一件非常重要的工作,有一组系统的方法论可以为我们提供参考。

当然,事实上识别领域对象,包括且不限定于识别聚合、值对象、实体识别该对象的行为或(方法)本身是一件需要经验完成的工作,有时需要UML建模方法的广泛参与。

有时,我们会习惯于通过属性赋值完成梭代码的过程,从而造成领域行为流失在业务逻辑层的问题,那么或许可以采取这样的方法:

1、对象的创建,使用构造函数赋值,或工厂方法创建。

2、将所有对于属性的访问级别都设置为

复制代码
public string Phone{public get;private set;}

然后再通过一个绑定手机号码的方法,来给这个对象设置手机号码。

复制代码
public string BindPhone(string phone)
{
}

将所有一切涉及到对Phone的操作,都只能通过规定的方法来赋值,这样可以实现我们开发过程中,无意识的通过属性赋值,可能导致的“领域行为”丢失的现象发生。
这种方式可以使得对对象某些属性的操作,只能通过唯一的入口完成,符合单一职责原则的合理运用,如果要扩展方法,可以使用开闭原则来解决。

但是,采用这种方式,得尽量避免出现:SetPhone(string phone) 这样的方法出现,毕竟这样的方法,其实和直接的属性赋值,没有任何区别。

  • Repository(仓储)

仓储封装了一系列对象数据库操作的方法,完成对象从数据库到对象的转换过程。在领域驱动设计中,一个仓储往往会负责一个聚合对象从数据库到创建的全过程。

  • Domain Service(领域服务)

领域服务就是“实干家”,那些不适合在领域对象中出现,又不属于对象数据库操作的方法,又与领域对象息息相关的方法,都可以放到领域服务中实现。

  • Specification(规格定义)

规范模式是一种特殊的软件设计模式,通过使用布尔逻辑将业务规则链接在一起,可以重新组合业务规则。

实际上,它主要用于为实体或其他业务对象定义可重用的过滤器。

3. 其他基础设施(EntityFrameworkCore,Web.Core,Web.Host项目)

EntityFrameworkCore负责定义数据库上下文和对EFCore操作的一系列规则、例如种子数据的初始化等。

Web.Core:定义了应用程序的外观和接口。虽然从表面上看,Web.Core定义了作为Web访问入口的控制器方法和登录验证的逻辑,看起来像是用户表现层的东西,但是仔细想想,这些东西,何尝不是一种基础设施?

Web.Host:定义WEB应用程序的入口。

总结

本文简述了ABP框架的前后端项目的分层结构,通过了解这些结构,将有助于我们在后续的实战中更快入手,为应用开发插上翅膀。

如何使用ABP进行软件开发(2) 领域驱动设计和三层架构的对比 - 溪源More - 博客园

mikel阅读(875)

来源: 如何使用ABP进行软件开发(2) 领域驱动设计和三层架构的对比 – 溪源More – 博客园

简述

上一篇简述了ABP框架中的一些基础理论,包括ABP前后端项目的分层结构,以及后端项目中涉及到的知识点,例如DTO,应用服务层,整洁架构,领域对象(如实体,聚合,值对象)等。

笔者也曾经提到,ABP依赖于领域驱动设计这门方法论,由于其门槛较高,给使用者带来了不少理解上的难度。尤其是三层架构对.NET开发者影响太深,有时很难对领域驱动设计产生直观的理解。

在本文中,打算从传统的简单三层架构谈起,介绍一个实际场景下的三层业务逻辑实现,然后再与领域驱动设计中的对应实现形成对比,以便让开发者形成直观具体的印象。

回顾三层架构

对于.NET开发者来说,三层架构相比都不陌生,这种架构,将代码层次划分为用户界面层,业务逻辑层、数据访问层三个逻辑层次,实现了代码的关注度分离,且因其易于理解,已经成为众多.NET开发者的”条件反射”。

三层架构简介

三层架构就是为了符合“高内聚,低耦合”思想,把各个功能模块划分为表示层(UI)、业务逻辑层(BLL)和数据访问层(DAL)三层架构,各层之间采用接口相互访问,并通过对象模型的实体类(Model)作为数据传递的载体,不同的对象模型的实体类一般对应于数据库的不同表,实体类的属性与数据库表的字段名一致。

三层架构区分层次的目的是为了 “高内聚,低耦合”。开发人员分工更明确,将精力更专注于应用系统核心业务逻辑的分析、设计和开发,加快项目的进度,提高了开发效率,有利于项目的更新和维护工作。

三层架构的分层逻辑

UI层:用户界面层,实现与UI交互有关的逻辑。用于输入用户数据,输出和呈现数据。在基于WebAPI的现代Web框架中,往往会使用MVC架构,将界面的数据行为,拆分成“模型-视图-控制器”,实现了针对对UI层上关注度的进一步分离,

业务逻辑层: 业务逻辑层是用户界面层和数据访问层之间的转换层,负责完成对数据的业务组装,界面数据处理,将数据层的对象输出(转换)给用户界面层。

数据访问层:实现数据的存储(持久化)操作,包括集成存储过程,集成SQL语句,或集成现代ORM组件的形式,实现实现数据的存储。

三层架构的应用

遇到项目,先从实体关系建模开始,使用PowerDesign或其他数据库设计软件分析业务与业务之间的关系,是一对多,还是一对一,还是多对多,绘制实体关系图。

在进行软件开发时,根据数据需求,定制想要的数据接口,从而实现以数据为核心的业务功能开发。

于是,在业务层次上,这种三层架构,进一步可以表示为如下分层结构:

在三层架构中,实体是业务的核心,所有的业务代码,都是围绕实体展开,而左侧三个功能层,其主要目的都是为了实现对实体的“增删改查”操作。

以下代码简述了一个订单对象提交的全过程。(模型和代码仅供参考,不能直接运行)

复制代码
/// <summary>
/// UI控制器
/// </summary>
public class OrderController
{
  private OrderBll OrderBll = new OrderBll();
  /// <summary>
  /// 新增订单
  /// </summary>
  /// <param name="userId"></param>
  /// <param name="productId"></param>
  /// <param name="count"></param>
  public void AddOrder(int userId, int productId, int count)
  {
      OrderBll.AddOrder(userId, productId, count);
  }
}

/// <summary>
/// 业务逻辑层
/// </summary>
public class OrderBll
{
  private UserInfoDal userInfoDal = new UserInfoDal();
  private ProductInfoDal productInfoDal = new ProductInfoDal();
  private OrderDal orderDal = new OrderDal();
  /// <summary>
  /// 新增订单
  /// </summary>
  /// <param name="userId"></param>
  /// <param name="productId"></param>
  /// <param name="count"></param>
  public void CreateOrder(int userId, int productId, int count)
  {
      UserInfo userInfo = userInfoDal.Get(userId);
      ProductInfo productInfo = productInfoDal.Get(productId);
      //新订单
      Order order = new Order();
      order.Address = userInfo.Address;
      order.UserId = userId;
      order.TotalPrice = productInfo.Price * count;
      order.ProductId = productId;
      orderDal.Insert(order);
  }
}
/// <summary>
/// 数据访问层
/// </summary>
public class OrderDal
{
    /// <summary>
    /// 插入数据
    /// </summary>
    /// <param name="order"></param>
    public void Insert(Order order)
    { 
    }
}

这种基于实体驱动建模的三层架构,变成了以数据为核心的“表模块模式”。
参见《企业应用架构模式》第87页中关于表模块的介绍:

表模块以一个类对应数据库中的一个表来组织领域逻辑,而且使用单一的类实体来包含将对数据进行的各种操作程序。
通常,表模块会与面向表的后端数据结构一起使用。以列表形式排列的数据通常是某个SQL调用的结果,它们被至于一个记录集中,用于某一个SQL表。表模块提供了一个明确的基于方法的接口对数据进行操作。
要进行一些实际的操作,一般需要多个表模块的行为。
表模块中的“表”一词,暗示你数据库中的每一个表对应一个表模块。虽然大多数情况下都是如此,但也并非绝对。对于通用的视图或其他查询,建立一个表模块也是有用的。事实上,表模块的结构并非真的取决于数据库表的结构,更多的是由应用程序能识别的虚拟表所标识,例如视图或查询。

在《Microsoft.NET企业级架构设计》一书中,作者认为“多数.NET开发者在成长的过程中都受到了表模块模式的影响”。而相比之下,多数Java开发者则“深陷事务脚本的泥足”。

三层架构的优缺点

优点:

软件分层架构的目的是为了分离关注点,三层架构也同样如此,简简单单的三层代码+ER图,就能设计出一个良好结构的软件系统。

这种模式,建立了以数据库表为核心的开发模式,使得开发者能够很便捷的对业务进行分析,进而驱动软件功能的快速开发。

在应对简单业务变迁过程中,由于能够快速完成代码的堆积,也使得开发者只需关注数据库表的拼凑,就能快速的完成代码开发,为开发项目带来了不少便利。

除了简单业务普遍采用三层,事实上许多复杂项目也会同样采用,大概是由于三层架构的思想已经深入人心,许多资深开发者都形成的只要有表就能完成项目的开发的思维定势。

缺点:

还是使用上述示例代码,我们假设需求发生了变化,要求减少订单的数量或增加订单,我们会怎么做?也许,我们很容易就写出了下面的代码:

(当然,实际项目中,如果订单已经提交,很少会直接对订单数量进行修改的,往往会重新发起新订单,但为了演示方便,我们先设定有这么一个奇怪的需求吧。)

复制代码
/// <summary>
/// 减少订单数量
/// </summary>
/// <param name="orderId"></param>
/// <param name="minusCount"></param>
public void MinusOrder(int orderId, int minusCount)
{
    Order order = orderDal.Get(orderId);
    order.Count -= minusCount;
    order.TotalPrice -= order.Price * minusCount;
    orderDal.Update(order);
}
/// <summary>
/// 增加订单数量
/// </summary>
/// <param name="orderId"></param>
/// <param name="minusCount"></param>
public void AddOrder(int orderId, int addCount)
{
    Order order = orderDal.Get(orderId);
    order.Count += addCount;
    order.TotalPrice += order.Price * addCount;
    orderDal.Update(order);
}

这个代码写起来非常快,因为只是新增了两小段代码逻辑,而从减少订单,到新增订单,只是加法和减法的区别,自然而然就更快了。
但是,速度快,一定是优点么?如果需求继续持续不断的累积呢。

例如,我们要修改订单收货人,收货地址,修改订单价格,是不是我们这种代码逻辑会越来越多,而且不同的业务逻辑互相搅合,使得后期的维护变得越来越困难?

所以,笔者认为,三层架构的缺点,就是前期开发速度太快,由于缺乏设计思想和设计模式的参与,太容易导致异味、垃圾代码、重复代码等问题产生。

所有这些问题,最终都被归类于“技术债”的范畴。

详见维基百科。
技术债:指开发人员为了加速软件开发,在应该采用最佳方案时进行了妥协,改用了短期内能加速软件开发的方案,从而在未来给自己带来的额外开发负担。
这种技术上的选择,就像一笔债务一样,虽然眼前看起来可以得到好处,但必须在未来偿还。
软件工程师必须付出额外的时间和精力持续修复之前的妥协所造成的问题及副作用,或是进行重构,把架构改善为最佳实现方式。

回顾领域驱动设计

领域驱动设计简介

领域驱动设计思想来源于埃里克埃文斯在2002年前后出版的技术书籍《领域驱动设计·软件系统复杂性核心应对之道》,在这本书中,作者介绍了领域驱动设计相关的核心模式,例如:统一语言,模型驱动设计,领域实体,聚合,值对象,仓储,限界上下文等模式。

随着微服务的不断兴起,领域驱动设计也越来越受到互联网人的广泛追捧,在许多不同的行业应用实践过程中,已经逐渐扮演了非常基础的作用。无论是微服务架构下的服务粒度拆分,或者甚至是中台应用,以及传统的单体应用,都可以利用领域驱动设计思想下提供的模式,为应用程序的开发插上想象的翅膀。

领域驱动设计的分层逻辑

在上一篇博客中,我们也介绍了领域驱动设计思想分层逻辑结构,共划分为如下四个层次:

  • 用户界面层(或者表示层):负责向用户显示信息和解释用户指令。这里的用户,既可以是使用用户界面的人,也可以是另外一个计算机系统。
  • 应用层:定义软件要完成的任务,并且只会表达领域概念的对象来解决问题。这一层实际上负责的是系统与应用层进行交互的必要渠道。
  • 领域层:负责表达业务概念、业务状态信息以及业务规则。尽管技术细节由基础设施层实现,但业务情况状态的反映则需要有领域层进行控制。领域层是业务软件的核心。
  • 基础设施层:为上面各层提供通用的技术能力:为应用层传递消息,为领域层提供持久化机制,为用户界面层绘制屏幕组件等等。基础设施层还能够通过架构框架来支持4个层次间的交互模式。

领域驱动设计的应用步骤

1)形成统一语言

统一语言是围绕产品展开的一系列流程,方案,术语和名词解释及匹配的注释。在领域驱动设计为每个应用设计成体系的【统一语言】是核心要点。

统一语言的形成是团队成员协同参与,围绕不同的需求,达成一致性理解的过程。

形成统一语言有时需要领域专家的参与,但有时可能难以达到这个条件,用需求代言人也同样能够满足这个条件。

2)使用UML建模和画图

  1. 建模的必要性

在我们工作过程中模型无处不在,不管是在纸上绘制的简单模型,或者使用专业软件绘制的各种模型,都是模型。领域驱动设计本身,依然依赖于模型驱动设计。

学会建模对于广大开发者来说,都是一项基本技能,当然也是众多最弱技能中的一种,因为广泛依赖于实体关系建模的思维模式,使得开发者已经很难形成有效的模型设计思想,代码也越来越趋于【过程化】。

有时开发者甚至连实体关系建模这个步骤都会省略,直接使用Code First或甚至数据库开始建表,这样看起来速度非常快,但是太容易翻车了。

在团队协作项目中,没有良好的模型,仅凭高级开发者或有经验开发者的“”一面之词”进行设计,几乎很难完成一个复杂项目。

而uml统一建模语言也是这样的良好工具。

  1. 使用哪些模型

笔者曾经有幸请教国内.NET技术圈拥有多年DDD实践经验的阿里技术专家,汤雪华老师,他指出:

采用实体关系建模很容易看出对象与对象的关系,但仅此而已。数据并非对象,数据也无法看出行为,如果要依托实体关系建模来构建系统,往往需要开发者发挥自己的主观抽象思维,根据客户提供的资料或可用的原型,自行思考代码的逻辑实现。
但显然,具备优秀逻辑思维能力和设计思想的开发者凤毛麟角,仅凭ER图,代码写出来往往很糟糕。

他认为,采用领域驱动设计,产品架构图,系统架构图,领域模型图,类图,关键业务场景的交互时序图,这些是必不可少的。

  • 产品架构图:列出产品功能,表现出产品模块间的相关性。

图来自http://www.woshipm.com/pmd/1065960.html

  • 系统架构图:从技术层面列出系统模块组成关系。

原图来自互联网

  • 领域模型关系图:反映出各领域模型间的相关性,限界上下文,聚合,和聚合根。

来自https://102.alibaba.com/detail?id=174

  1. 如何建模?

如果说代码语言是为了与其他开发者进行沟通交流,那我们建立的各种软件设计模型将极大的方便不同领域的人员进行交流。建模也可以称之为语言的一部分。利用uml建立类图,是一种可以比较易于接受的方式。我们可以采用以下手段来建立领域模型。

1)建立一个与实现绑定的模型。初版的模型也许很简陋,但是它可以成为一个基础,然后在后期逐渐完善。

2)建立一种基于模型的通用语言或表达形式和机制。通过通用语言让参与项目的所有人理解模型。

3)开发一个蕴含丰富知识的模型。模型不是单纯的数据结构,它更是各类知识的聚合体。

4)提炼模型,模型应该能在项目过程中动态改变,发现新的概念就加进来,过时的概念就适时移除,避免臃肿。

5)头脑风暴和实验。模型在于实践和应用,它需要项目参与者共同的努力,而头脑风暴是发挥集体智慧的良好方式。对模型进行实验或者进行场景的模拟,有利于让模型更符合需求。

当然,对于领域专家而言,不同类型的模型也许无法理解,例如类图可能过于复杂,可以使用画图的形式,通过解释性的图形,甚至纸面上的图,更能直观的表现出领域的逻辑层次。

这张来自TW分享的一张图,就是一个基于.NET MVC的产品设计UML设计图。

建模也并非这篇博客所能讲清楚的,包括笔者自己,也只是偶尔设计过用例图,时序图和类图,可能需要在后期系统的学习一下。

3)代码实现

回到最开始的那个三层架构下的代码示例,如果采用领域驱动设计,大概如下图所示:

回到开始那个示例代码,如果采用DDD的代码实现,大概是这样的:

复制代码
/// <summary>
/// 应用服务层
/// </summary>
public class OrderAppService
{
    private OrderRepository _orderRepository;
    private UserInfoRepository _userInfoRepository;
    private ProductInfoRepository _productInfoRepository;
    public OrderAppService(OrderRepository orderRepository, UserInfoRepository userInfoRepository, ProductInfoRepository productInfoRepository)
    {
        _orderRepository = orderRepository;
        _userInfoRepository = userInfoRepository;
        _productInfoRepository = productInfoRepository; 
    }
  /// <summary>
  /// 新增订单
        /// </summary>
        /// <param name="userId"></param>
        /// <param name="productId"></param>
        /// <param name="count"></param>
        public void CreateOrder(int userId, int productId, int count)
        {
              UserInfo userInfo = _userInfoRepository.Get(userId);
            ProductInfo productInfo = _productInfoRepository.Get(productId);
            if (userInfo != null && productInfo != null)
            {
                //新订单
                Order order = Order.CreateOrder(productId, userInfo.Address, userId, productInfo.Price, count);
                _orderRepository.Insert(order);
            }
        }
        /// <summary>
        /// 减少订单数量
        /// </summary>
        /// <param name="orderId"></param>
        /// <param name="minusCount"></param>
        public void MinusOrder(int orderId, int minusCount)
        {
            Order order = _orderRepository.Get(orderId);
            order.Minus(minusCount);
            _orderRepository.Update(order);
        }
        /// <summary>
        /// 增加订单数量
        /// </summary>
        /// <param name="orderId"></param>
        /// <param name="minusCount"></param>
        public void AddOrder(int orderId, int addCount)
        {
            Order order = _orderRepository.Get(orderId);
            order.Add(addCount);
            _orderRepository.Update(order);
        }
}
    /// <summary>
    ///订单对象 
    /// </summary>
    public class Order
    {
        /// <summary>
        /// 主键
        /// </summary>
        public int Id { get; protected set; }
        /// <summary>
        /// 地址
        /// </summary>
        public string Address { get; protected set; }
        /// <summary>
        /// 用户id
        /// </summary>
        public int UserId { get; protected set; }
        /// <summary>
        /// 产品id
        /// </summary>
        public int ProductId { get; protected set; }
        /// <summary>
        /// 数量
        /// </summary>
        public int Count { get; protected set; }

        /// <summary>
        /// 单价
        /// </summary>
        public double Price { get; protected set; }
        /// <summary>
        /// 总价
        /// </summary>
        public double TotalPrice { get; protected set; }
        /// <summary>
        /// 创建订单
        /// </summary>
        /// <param name="productId"></param>
        /// <param name="address"></param>
        /// <param name="userId"></param>
        /// <param name="price"></param>
        /// <param name="count"></param>
        /// <returns></returns>
        public static Order CreateOrder(int productId, string address, int userId, double price, int count)
        {
            return new Order()
            {
                Address = address,
                UserId = userId,
                TotalPrice = price * count,
                ProductId = productId,
            };
        }
        /// <summary>
        /// 新增
        /// </summary>
        /// <param name="count"></param>
        public void Add(int count)
        {
        }
        /// <summary>
        /// 减少
        /// </summary>
        /// <param name="count"></param>
        public void Minus(int count)
        {
        }

    }

这段代码,最主要的变化是如下几点:

  1. 引入领域模型,在三层架构的示例代码中,我们建立了如下模型:

这个模型是当我们Entity Framework脚本时生成的实体模型,在业内通常称其为“贫血模型”。对人类来说,红细胞负责把氧气输送到组织细胞,然后新陈代谢,产生ATP,产生动力。

而“贫血模型”这个术语恰如其份的表现出这类模型虽然还能有效的工作,但是需要由其他对象来驱动其完成动作的含义。

领域模型与贫血模型相比,更关注对象的行为,而关注行为的目的是创建带有公共接口并与在现实世界观察到的实体相似的对象,使得依照统一语言的名字和规则进行建模变得更加容易。

  1. 将原来的Order对象抽象化建模为一个DDD实体。DDD实体是一个包含数据(属性)和行为(方法)的POCO对象。

在《Microsoft .NET企业级应用架构实战》书第9.2.2中指出了领域实体的特点:

定义明确的身份标识。
通过公共和非公共方法表示行为。
通过只读属性暴露状态。
限制基元类型的使用,使用值对象代替。
工厂方法优于多个构造函数。

  1. 私有set或protected set:

在示例代码中将Order中的所有属性设置为

复制代码
public double TotalPrice { get; protected set; }

这样的目的是为了避免对该属性的随意更改,使得开发者在对属性进行操作过程中,多了一个环节,即需要谨慎思考这样的代码修改,从行为角度来分析是否符合业务需要。
在设计时实体时,开放set可能会带来严重的副作用,例如影响实体的状态。

在张逸老师的《领域驱动设计实战,战术篇第15课》中,作者指出:

对象之间若要默契配合,形成良好的协作关系,就需要通过行为进行协作,而不是让参与协作的对象成为数据的提供者。
《ThoughtWorks 软件开发沉思录》中的“对象健身操”提出了优秀软件设计的九条规则,其中最后一条提出:不使用任何 Getter/Setter/Property。
作者 Jeff Bay 认为:“如果可以从对象之外随便询问实例变量的值,那么行为与数据就不可能被封装到一处。在严格的封装边界背后,真正的动机是迫使程序员在完成编码之后,一定有为这段代码的行为找到一个适合的位置,确保它在对象模型中的唯一性。”

当然,在实际开发过程中,有时并不一定把get方法也设置为protected,毕竟有时候还需要有所妥协。

  1. 将创建方法从业务逻辑层,移动到了领域对象Order中的静态工厂方法。
复制代码
/// <summary>
/// 创建订单
/// </summary>
/// <param name="productId"></param>
/// <param name="address"></param>
/// <param name="userId"></param>
/// <param name="price"></param>
/// <param name="count"></param>
/// <returns></returns>
public static Order CreateOrder(int productId, string address, int userId, double price, int count)
{        
    return new Order()
    {
        Address = address,
        UserId = userId,
        TotalPrice = price * count,
        ProductId = productId,
    };
}

创建过程应该是一个非常严谨的过程,而原来在业务逻辑层中初始化对象的方法,随意性比较高,很容易就出现开发者在创建过程中将无关属性赋值的现象。
但如果把创建过程改成使用构造方法,又可能会造成可读性问题,而使用工厂方法,并创建一个受保护的构造方法则不会造成这个担忧。

  1. 将订单新增内容和减少内容从业务逻辑层移动到了领域对象上,并封装为方法。采用迪米卡法则,只暴露最小的参数,每次只对最该赋值的属性进行操作,也容易约束开发者的操作。
复制代码
/// <summary>
/// 新增
/// </summary>
/// <param name="count"></param>
public void Add(int count)
{
}

/// <summary>
/// 减少
/// </summary>
/// <param name="count"></param>
public void Minus(int count)
{

}

大概修改过程是最容易造成领域知识丢失的地方,而通过封装为方法,使得这个过程得以以受控的形式进行,有助于让其他开发者通过暴露的方法。
但这样做要确保所使用的命名规范符合统一语言,否则会重蹈贫血模型的覆辙。当然,在领域设计中,经常会纠结于哪些行为应该放在领域对象中,可以参考这样的规则:

  • 如果方法只处理实体的成员,它可能属于这个实体。
  • 如果方法访问相同聚合的其他实体或值对象,它可能属于聚合根。
  • 如何方法里的代码需要查询或更新持久层,或者需要用到实体(或聚合)边界以外的引用,它属于领域服务方法。

对比分析

二者的对比

笔者整理了一个简单的图表来表现二者的对比关系。显然,三层架构并非毫无优势,领域驱动设计也并非银弹。

三层架构 领域驱动设计
业务识别方法 结合瀑布模型,通过需求分析,形成数据字典,指导数据库设计。 团队协作形成统一语言,并从统一语言中提取术语,指导类、流程,变量,行为定义等。
业务参与者 具备IT知识的开发人员,业务人员只能提供需求,往往不能参与设计过程。 由需求提供者或客户、开发者、测试、产品经理等组成的跨职能团队全力参与。
建模方法 实体关系建模为主,有时可以用UML 以UML方法为主,画图为辅
业务代码分层 业务代码理论上应该在业务逻辑层,但有时游离在控制器、业务逻辑层或数据访问层,甚至受依赖的其他业务逻辑中 业务代码在领域层,有时在领域对象上,有时在领域服务中。
修改代码的难易程度 随时随地想改就改 需要遵循一定的设计原则或步骤、流程
可维护性 项目简单时,易于维护;复杂时,难于维护。 掌握方法时,维护难度比较平滑。
数据持久化 在数据访问层中完成,有时可以适当复用;也有开发者将数据访问层提取出仓储的模板方法进行复用。 一般在仓储层中实现,且仓储一般是基础设施,意味着除特定场景外,基础设施不会依赖于领域而二外定制行为。
多业务逻辑的整合 一般在业务逻辑层中实现 一般在应用服务层实现。
可测试性 比较难以加入测试代码 易于加入测试代码;也可以根据UML使用TDD来进行开发。

该如何取舍?

下图这种流传已久,同样来自马丁弗勒老爷子《企业架构应用模式》。

表现了随着软件复杂度的逐渐提升,数据驱动设计和领域驱动设计模式两种不同类型的设计模式的开发效率(时间)对比曲线。

  • 数据驱动设计建立了一个比较平滑的发展轨迹,但是随着拐点的到来,将变得越来越为难以维护,最终造出一个难以维护的“意大利面”。

  • 领域驱动设计,前期的起点确实比数据驱动设计要高很多,而且甚至在刚刚使用一段时间后,由于业务复杂度的提升,会迎来一个拐点。这个拐点有点像“邓宁·克鲁格效应”中遇到的“绝望之谷”,让开发者和管理层感觉有点力不从心,不少企业最终又拆掉了他们的领域驱动设计搭建的软件;
  • 使用领域驱动设计,随着复杂度的逐渐推移,软件开发人员的信心越来越足,代码自然也能够不断演进,平滑发展,。

  • 从长期来看,领域驱动建模将给复杂系统带来更加高效的维护效能。

总结

本文介绍了三层架构和领域驱动设计两种不同的设计思想中如何实现业务逻辑代码的过程,并对针对代码的维护性问题进行了分析。由于时间仓促,部分观点、设计图、代码可能还不够成熟,还请大家批评指正。

下一篇将介绍ABP框架开发中的具体实践步骤。

使用python爬虫,批量爬取抖音app视频(requests+Fiddler+appium) - 北平吴彦祖 - 博客园

mikel阅读(1112)

来源: 使用python爬虫,批量爬取抖音app视频(requests+Fiddler+appium) – 北平吴彦祖 – 博客园

抖音很火,楼主使用python随机爬取抖音视频,并且无水印下载,人家都说天下没有爬不到的数据,so,楼主决定试试水,纯属技术爱好,分享给大家。。

1.楼主首先使用Fiddler4来抓取手机抖音app这个包,具体配置的操作,网上有很多教程供大家参考。

上面得出抖音的视频的url,这些url均能在网页中打开,楼主数了数,这些url的前缀有些不同,一共有这4种类型:

v1-dy.ixigua.com

v3-dy.ixigua.com

v6-dy.ixigua.com

v9-dy.ixigua.com

楼主查看这四种类型得知,v6-dy.ixigua.com 这个前缀后面的参数其中有一个是Expires(中文含义过期的意思)

Expires=1536737310,这个是时间戳,标记的是过期的时间如下图所示,过了15:28分30秒,则表示url不能使用,楼主算了一下,url有效期是一个小时。

看到这些url,楼主不能手动一个一个粘贴,so楼主需要在+Fiddler4中(在Fiddler4使用script代码网上有大量详细教程)使用如下代码,自动保存到一个txt文档中。

 

 

复制代码
        //保存到本地添加开始
        
        //这是抖音的地址||"v1-dy.ixigua.com"||"v3-dy.ixigua.com"||"v6-dy.ixigua.com"||"v9-dy.ixigua.com"||
        if (oSession.fullUrl.Contains("v1-dy.ixigua.com")||
            oSession.fullUrl.Contains("v3-dy.ixigua.com")||
            oSession.fullUrl.Contains("v6-dy.ixigua.com")||
            oSession.fullUrl.Contains("v9-dy.ixigua.com")){
            
            var fso;
            var file;
            fso = new ActiveXObject("Scripting.FileSystemObject");
            //文件保存路径,可自定义
            file = fso.OpenTextFile("H:\\Request.txt",8 ,true);
            //file.writeLine("Request-url:" + oSession.url);
            file.writeLine("http://"+oSession.url)
            //file.writeLine("Request-host:" + oSession.host);
            //file.writeLine("Request-header:" + "\n" + oSession.oRequest.headers);
            //file.writeLine("Request-body:" + oSession.GetRequestBodyAsString());
            //file.writeLine("\n");
            file.close();
        }        
        //保存到本地添加结束
复制代码

 

把上边的代码插入到如下图所示的地方即可。

2.上面的url是楼主手动点击一个个刷新抖音app出现的,so楼主使用appium来自动刷新抖音app,自动获得url,自动保存到txt文档中。

首先需要在appium中得到抖音这个app包的一些用的信息,如下图所示

楼主使用的是红米手机,至于appium怎么安装配置,大家可参考网上相关教程,appium客户端连接上手机(需要数据线连接)后,在控制台打印出log日志文件,在日志文件中找到这四个参数即可,然后保存

到appium客户端中即可,就能在appium客户端中操作抖音app。

{
“platformName”: “Android”,
“deviceName”: “Redmi Note5”,
“appPackage”: “com.ss.Android.ugc.aweme”,
“appActivity”: “.main.MainActivity”
}

appPackage这一项com.ss.Android.ugc.aweme 则表示抖音短视频。

楼主使用如下代码来实现无限刷新抖音app,前提是需要手机连着数据线连在电脑上并且开启appium客户端的服务和打开Fiddler4抓包(配置好环境手机)。

复制代码
from appium import webdriver
from time import sleep

#以下代码可以操控手机app(抖音)
class Action():
    def __init__(self):
        #初始化配置,设置Desired Capabilities参数
        self.desired_caps = {
            "platformName": "Android",
            "deviceName": "Mi_Note_3",
            "appPackage": "com.ss.android.ugc.aweme",
            "appActivity": ".main.MainActivity"
        }
        #指定Appium Server
        self.server = 'http://localhost:4723/wd/hub'
        #新建一个Session
        self.driver = webdriver.Remote(self.server, self.desired_caps)
        #设置滑动初始坐标和滑动距离
      self.start_x = 500
        self.start_y = 1500
        self.distance = 1300

    def comments(self):
        sleep(3)
        #app开启之后点击一次屏幕,确保页面的展示
        self.driver.tap([(500, 1200)], 500)
    def scroll(self):
        #无限滑动
        while True:
            #模拟滑动
            self.driver.swipe(self.start_x, self.start_y, self.start_x,
            self.start_y - self.distance)
            #设置延时等待
            sleep(5)
    def main(self):
        self.comments()
        self.scroll()

if __name__ == '__main__':
    action = Action()
    action.main()
复制代码

楼主运行次代码就能在Fiddler4中得到无限量的url。
3.楼主拿到url后,会发现有些url会重复,so楼主加入了去重的功能,为了好看楼主也加入了进度条花里花哨的功能,运行代码最终会下载下来。

复制代码
# _*_ coding: utf-8 _*_
import requests
import sys
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36" }

#去重方法
def distinct_data():
    #读取txt中文档的url列表
    datalist_blank=[]
    pathtxt='H:/Request.txt'
    with open(pathtxt) as f:
        f_data_list=f.readlines()#d得到的是一个list类型
        for a in f_data_list:
            datalist_blank.append(a.strip())#去掉\n strip去掉头尾默认空格或换行符
    data_dict={}
    for data in datalist_blank:
        #url中以/为切分,在以m为切分   ##把m后面的值放进字典key的位置,利用字典特性去重
        if int(data.split('/').index('m'))==4 :#此处为v6开头的url
            #print(data,44,data.split('/')[5])
            data_key1=data.split("/")[5]
            data_dict[data_key1]=data
        elif int(data.split('/').index('m'))==6: #此处为v1或者v3或者v9开头的url
            data_key2=data.split("/")[7]
            data_dict[data_key2] =data
    #print(len(data_dict),data_dict)
    data_new=[]
    for x,y in data_dict.items():
        data_new.append(y)
    return data_new

def responsedouyin():
    data_url=distinct_data()
    #使用request获取视频url的内容
    #stream=True作用是推迟下载响应体直到访问Response.content属性
    #将视频写入文件夹
    num = 1
    for url in data_url:
        res = requests.get(url,stream=True,headers=headers)
        #res = requests.get(url=url, stream=True, headers=headers)
        #定义视频存放的路径
        pathinfo = 'H:/douyin-video/%d.mp4' % num  #%d 用于整数输出   %s用于字符串输出
        #实现下载进度条显示,这一步需要得到总视频大小
        total_size = int(res.headers['Content-Length'])
        #设置流的起始值为0
        temp_size = 0
        if res.status_code == 200:
            with open(pathinfo, 'wb') as file:
                #file.write(res.content)
                #print(pathinfo + '下载完成啦啦啦啦啦')
                num += 1
                #当流下载时,下面是优先推荐的获取内容方式,iter_content()函数就是得到文件的内容,指定chunk_size=1024,大小可以自己设置哟,设置的意思就是下载一点流写一点流到磁盘中
                for chunk in res.iter_content(chunk_size=1024):
                    if chunk:
                        temp_size += len(chunk)
                        file.write(chunk)
                        file.flush() #刷新缓存
                #############下载进度条部分start###############
                        done = int(50 * temp_size / total_size)
                        #print('百分比:',done)
                        sys.stdout.write("\r[%s%s] %d % %" % ('█' * done, ' ' * (50 - done), 100 * temp_size / total_size)+" 下载信息:"+pathinfo + "下载完成啦啦啦啦啦")
                        sys.stdout.flush()#刷新缓存
                #############下载进度条部分end###############
                print('\n')#每一条打印在屏幕上换行输出


if __name__ == '__main__':
    responsedouyin()
复制代码

运行代码,效果图如下

视频最终保存到文件夹中

github地址:https://github.com/Stevenguaishushu/douyin

Rowlock、UPDLOCK - 风无心客 - 博客园

mikel阅读(931)

来源: Rowlock、UPDLOCK – 风无心客 – 博客园

ROWLOCK 使用行级锁,而不使用粒度更粗的页级锁和表级锁。
UPDLOCK 读取表时使用更新锁,而不使用共享锁,并将锁一直保留到语句或事务的结束。UPDLOCK 的优点是允许您读取数据(不阻塞其它事务)并在以后更新数据,同时确保自从上次读取数据后数据没有被更改。
XLOCK 使用排它锁并一直保持到由语句处理的所有数据上的事务结束时。可以使用 PAGLOCK 或 TABLOCK 指定该锁,这种情况下排它锁适用于适当级别的粒度。
ROWLOCK
使用行级锁,而不使用粒度更粗的页级锁和表级锁。
UPDLOCK
读取表时使用更新锁,而不使用共享锁,并将锁一直保留到语句或事务的结束。UPDLOCK   的优点是允许您读取数据(不阻塞其它事务)并在以后更新数据,同时确保自从上次读取数据后数据没有被更改。
在update语句加行级更新锁,实际上更新语句本身默认更新锁,下面两个语句是一样的
update User with (updlock,rowlock)  set f_money = f_money + 100 where f_name = ‘张三’
 update User   with (rowlock)     set f_money = f_money + 100   where f_name = ‘张三’

SQL Server 中 ROWLOCK 行级锁 - 风无心客 - 博客园

mikel阅读(551)

来源: SQL Server 中 ROWLOCK 行级锁 – 风无心客 – 博客园

一、ROWLOCK的使用

1、ROWLOCK行级锁确保,在用户取得被更新的行,到该行进行更新,这段时间内不被其它用户所修改。因而行级锁即可保证数据的一致性,又能提高数据操作的并发性。

2、ROWLOCK告诉SQL Server只使用行级锁,ROWLOCK语法可以使用在SELECT,UPDATE和DELETE语句中。

3、例如select语句中

 

A 连接中执行

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ

begin tran

select * from tablename with (rowlock,UpdLock) where id=3

waitfor delay ’00:00:05′

commit tran

B连接中如果执行

update tablename set colname=’10’ where id=3 –则要等待5秒

update tablename set colname=’10’ where id <>3 –可立即执行

 

二、SQL Server中使用ROWLOCK需要注意的地方

 

1、如果你错误地使用在过多行上,数据库并不会聪明到自动将行级锁升级到页面锁,服务器也会因为行级锁的开销而消耗大量的内存和CPU,直至无法响应。

2、select 语句中,RowLock在不使用组合的情况下是没有意义的,With(RowLock,UpdLock) 这样的组合才成立,查询出来的数据使用RowLock来锁定,当数据被Update的时候,锁将被释放