来源: Swoole整合ThinkPHP3.2系列教程二_一个不靠谱的程序员-CSDN博客
swoole和ThinkPHP的整合
先上一份在我们系统内部的swoole整合的架构预览
├─Application 应用目录
│ │
│ ├─Cli Cli模块
│ │ ├─Controller 制器类
│ │ │ ├─StartController TP框架加载时默认加载的控制器
│ │ │ ├─SwooleController 我们的业务逻辑写在这里面
│ │ └─ …
├─Swoole
│ │
│ ├─log swoole运行日志
│ ├─Server.php swoole的服务代码
│ ├─swoole.php 用于cli模式下启动和软重启swoole服务
1
2
3
4
5
6
7
8
9
10
11
12
最早测试的时候不是这样搭建的,而是把Server.php里的关于服务的东西放在了TP控制器里,在cli模式下调用
php index.php(入口文件) Swoole/start (控制器/方法)
1
这种模式是把swoole套在了TP里运行,也是可行的,但是总觉得启动个swoole服务为毛还要告诉TP一声?
我们想要的一种模式是业务服务器独立运行,swoole服务作为守护进程常驻内存,当浏览器需要运行比较耗时的操作时,需要跟swoole服务进程建立长连接,当耗时的任务执行完毕时会通知浏览器已经完毕。
整个过程下来,swoole服务和业务服务不应该耦合在一起的。最完美的状态是swoole独立运行(使用swoole框架重新写耗时操作的业务逻辑代码),独立连接数据库。当浏览器执行这些任务时,就不再找业务服务器了,而是直接跟swoole服务打交道。
可是我们没有那么多的时间和精力,我们只能在swoole服务的进程里调用TP框架的东西,来执行我们在TP里写的代码。最后参考了网上的一些方案,选择了这样与TP结合。
代码逻辑
/Swoole/Server.php
关于swoole服务的一些配置,比如监听地址,端口号(默认为9501),和一些基础配置
一些回调函数,里面的代码注释很完整,可以直接看代码
<?php
// +———————————————————————-
// 2017-8-23 09:03:40
// 此次修改为只作为websocket的服务端
// +———————————————————————-
class Server{
protected $swoole;
// 监听所有地址
protected $host = ‘0.0.0.0’;
// 监听 9501 端口
protected $port = 9501;
// 配置项
protected $option = [
//设置启动的worker进程数
‘worker_num’ => 2,
//task进程的数量
‘task_worker_num’ => 4,
//指定task任务使用消息队列模式,3表示完全争抢模式,task进程会争抢队列,无法使用定向投递
‘task_ipc_mode’ => 3,
//task进程的最大任务数
‘task_max_request’ => 1000,
// 守护进程化
‘daemonize’ => false,
// 监听队列的长度
‘backlog’ => 128,
//绑定uid时用
‘dispatch_mode’ => 5,
//设置日志路径
‘log_file’ => SWOOLE_LOG_PATH,
];
protected function init(){
//异步非阻塞多进程的websocket
$this->swoole = new swoole_websocket_server($this->host, $this->port);
$eventList = [‘Open’, ‘Message’, ‘Close’, ‘HandShake’ , ‘Task’, ‘Finish’ , ‘WorkerStart’ , ‘Receive’];
// 设置参数
if (!empty($this->option)) {
$this->swoole->set($this->option);
}
// 设置回调
foreach ($eventList as $event) {
if (method_exists($this, ‘on’ . $event)) {
$this->swoole->on($event, [$this, ‘on’ . $event]);
}
}
}
public function start(){
$this->init();
$this->swoole->start();
}
public function getHost(){
return $this->host;
}
public function getPort(){
return $this->port;
}
/**
* [onOpen 建立连接时的回调函数]
* @method onOpen
* @param swoole_server $serv [description]
* @param swoole_http_request $req [description]
* @return [type] [description]
*/
public function onOpen(swoole_server $serv, swoole_http_request $req){
//将连接绑定到uid上面
if(!empty($req->get)&&$req->get[‘uid’]){
$serv->bind($req->fd , $req->get[‘uid’]);
}
}
/**
* [onMessage 接收到socket客户端发送数据的回调函数]
* @method onMessage
* @param swoole_server $serv [description]
* @param swoole_websocket_frame $frame [description]
* @return [type] [description]
*/
public function onMessage(swoole_server $serv, swoole_websocket_frame $frame){
//收到数据时处理数据
//根据收到的cmd名字去调用指定的方法
$receive = json_decode($frame->data,true);
//为了避免数据处理量过大阻塞当前进程,导致服务响应变慢,我们把耗时的操作扔到TaskWorker进程池中执行
//当要执行的方法存在并且已经在swoole_log表里备案过的可以丢到task进程池
$swooleLog = D(‘SwooleLog’);
$swooleController = A(‘Swoole’);
//$receive[‘args’][‘id’]是业务服务器那边数据库SwooleLog表里的id
if (method_exists($swooleController, $receive[‘cmd’]) && $receive[‘args’][‘id’]) {
$task_id = $serv->task($receive);
//记录task_id信息
//…其他你想要做的精细化处理
}else{
if($receive[‘cmd’] === ‘reload’){
//利用Swoole提供的柔性终止/重启的机制
$rs = $serv -> reload();
$serv->push($frame->fd , $rs);
}elseif($receive[‘args’][‘id’]){
$returnData = $this->_returnStr(‘submit_error’);
$serv->push($frame->fd , $returnData);
}elseif(method_exists($this, $receive[‘cmd’])){
$this->{$receive[‘cmd’]}($serv , $frame);
}
}
}
/**
* [onTask 在task_worker进程池内被调用时的回调函数]
* @method onTask
* @param swoole_server $serv [description]
* @param int $task_id [description]
* @param int $src_worker_id [description]
* @param mixed $data [description]
* @return [type] [description]
*/
public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){
//记录任务开始执行的时间
//…自己发挥 我们项目的业务逻辑部分已经删除
//
try{
$swooleController = A(‘Swoole’);
$rs = $swooleController->{$data[‘cmd’]}($data[‘args’]);
return json_encode([‘id’=>$data[‘args’][‘id’] , ‘status’ => true , ‘other’ => $rs]);
}catch(\Exception $e){
return json_encode([‘id’=>$data[‘args’][‘id’] , ‘status’ => false , ‘other’ => $e->getMessage()]);
}
}
/**
* [onFinish 任务执行完毕时的回调函数]
* @method onFinish
* @param swoole_server $serv [description]
* @param int $task_id [description]
* @param string $data [description]
* @return [type] [description]
*/
public function onFinish(swoole_server $serv, $task_id, string $data){
$rs = json_decode($data , true);
$swooleLog = D(‘SwooleLog’);
//…
//通知用户该任务已经执行完毕了 可以来查看数据了
//检查客户端链接是否存在 如果存在的话发送消息
$returnData = $this->_returnStr(‘task_excute_success’);
foreach($serv->connections as $fd){
$conn = $serv->connection_info($fd);
//根据uid判断当前连接是活跃的
//这里是业务服务器那边能取到的用户信息,这里已经删除,保护我们的项目
$serv->push($fd , $returnData);
}
}
/**
* [onWorkerStart 为了应用能热加载 把框架的东西放到worker进程启动]
* @method onWorkerStart
* @param swoole_server $server [description]
* @param [type] $worker_id [description]
* @return [type] [description]
*/
public function onWorkerStart(swoole_server $server, $worker_id){
// 定义应用目录
define(‘APP_PATH’, ‘../Application/’);
// 定义应用模式
define(‘APP_Mode’, ‘cli’);
define(‘BIND_MODULE’,’Cli’);
// 引入ThinkPHP入口文件
require ‘../ThinkPHP/ThinkPHP.php’;
}
/**
* [_returnStr 返回给客户端的数据]
* @method _returnStr
* @param [type] $type [description]
* @return [type] [description]
*/
private function _returnStr($type){
switch ($type) {
case ‘submit_success’:
$returnData = [
‘code’ => 202,
‘info’ => ‘任务已经提交,请等待服务器计算数据!’
];
break;
case ‘submit_error’:
$returnData = [
‘code’ => 404,
‘info’ => ‘任务提交失败,请联系管理员进行处理!’
];
break;
case ‘task_excute_success’:
$returnData = [
‘code’ => 200,
‘info’ => ‘任务执行完毕!’
];
break;
}
return json_encode($returnData);
}
/**
* [clients 获取所有的客户端连接信息,返回我们的uid]
* @method clients
* @return [type] [description]
*/
public function clients(swoole_server $serv , swoole_websocket_frame $frame){
//…
$serv->push($frame->fd , json_encode($serv->connections));
}
public function __call($method, $args){
call_user_func_array([$this->swoole, $method], $args);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/Swoole/swoole.php
是一个面向cli编程的代码,主要提供了start和reload两个命令
#!/bin/env php
<?php
/**
* 定义项目根目录&swoole-task pid
*/
define(‘SWOOLE_PATH’, __DIR__);
define(‘SWOOLE_LOG_PATH’, SWOOLE_PATH . DIRECTORY_SEPARATOR . ‘log’ . DIRECTORY_SEPARATOR . ‘Swoole’ . ‘.log’);
/**
* 加载 swoole server
*/
include SWOOLE_PATH . DIRECTORY_SEPARATOR . ‘Server.php’;
//提示帮助信息
if ($argc != 2 || in_array($argv[1], array(‘–help’, ‘-help’, ‘-h’, ‘-?’))) {
echo <<<HELP
用法:php swoole.php 选项 … 可选的命令[start|reload|list]
–help|-help|-h|-? 显示本帮助说明
选项说明
start, 启动swoole服务[默认监测9501端口]
reload, 柔性重启所有workder进程
list, 查看当前所有连接的客户端数
HELP;
exit;
}
//执行命令行选项
switch($argv[1]){
case ‘start’:
start();
break;
case ‘reload’:
reload();
break;
case ‘list’:
clients();
break;
default:
exit(“输入命令有误 : {$argv[1]}, 请查看帮助文档\n”);
break;
}
//启动swoole服务
function start(){
$server = new Server();
$server->start();
}
//柔性重启swoole服务
function reload(){
echo “正在柔性重启swoole的worker进程…” . PHP_EOL;
try {
$server = new Server();
$port = $server->getPort();
$host = ‘127.0.0.1’;
$cli = new swoole_http_client($host, $port);
$cli->set([‘websocket_mask’ => true]);
$cli->on(‘message’, function ($_cli, $frame) {
if($frame->data){
exit(‘swoole重启成功!’.PHP_EOL);
}
});
$cli->upgrade(‘/’, function ($cli) {
$cli->push(json_encode([‘cmd’ => ‘reload’]));
});
} catch (Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}
function clients(){
try {
$server = new Server();
$port = $server->getPort();
$host = ‘127.0.0.1’;
$cli = new swoole_http_client($host, $port);
$cli->set([‘websocket_mask’ => true]);
$cli->on(‘message’, function ($_cli, $frame) {
$users = json_decode($frame->data , true);
if(empty($users)){
echo ‘当前没有客户端连接’.PHP_EOL;
}else{
foreach($users as $user){
echo ‘—‘.$user.PHP_EOL;
}
}
exit();
});
$cli->upgrade(‘/’, function ($cli) {
$cli->push(json_encode([‘cmd’ => ‘clients’]));
});
} catch (Exception $e) {
exit($e->getMessage() . PHP_EOL . $e->getTraceAsString());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
存在的问题
0.以上的代码是从项目里摘出来的,大体思路还在,业务部分代码删除了。不保证正常运行,但是问题不大,如果报错了聪明的你一定一看就知道是什么错误。
1.由于我们将TP框架的东西放在workstart回调函数里启动,这里只要启动worker进程和task进程,都会加载一次TP框架的东西到内存里,对内存的占用问题还需要仔细研究。
2.目前还不支持那些不支持HTML5 WebSocket的浏览器 我会尽快想办法解决。用flash模拟socket请求的方法没有试成功,近期重新再搞一波。
————————————————
版权声明:本文为CSDN博主「一个不靠谱的程序员」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u013705066/article/details/77679233