Workerman 原理解析概述
首先说明一下Workerman究竟是什么东西:
WebSocket 是 HTML5 提供的一种网络通讯协议,用于服务端与客户端实时数据传输。广泛用于浏览器与服务器的实时通讯,APP与服务器的实时通讯等场景,相比传统HTTP协议请求响应式通讯,WebSocket协议可以做到实时的双向通讯,服务端可以在任何时候向客户端推送数据(HTTP协议需要客户端发起请求后才能推送),就是传统说的长链接(指在一个连接上可以连续发送多个数据包)
workman特点:
1.Workerman类似一个PHP版本的nginx
2.核心也是多进程+Epoll+非阻塞IO
3.Workerman每个进程能维持上万并发连接
4.由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能
5.同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。
6.拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。
7.主进程为了保持稳定性,只负责监控子进程,不负责接收数据也不做任何业务逻辑
8.纯PHP开发
9.支持高并发:WorkerMan支持Libevent事件轮询库(需要安装event扩展),如果没有安装Event扩展则使用PHP内置的Select相关系统调用
相关命令:
启动命令:php php文件名 start
停止命令:php php文件名 stop
重启命令:PHP php文件名 restart
查看状态:PHP php文件名 status
链接状态:PHP php文件名 connections
注意点:
心跳是必须的:一定要有(非常重要):否则连接可能由于长时间不活跃而被路由节点防火墙断开 客户端和服务端协议一定要对应才能通讯
支持协议的示例:
websocket $websocket_worker = new Worker('websocket://0.0.0.0:2345');text $text_worker = new Worker('text://0.0.0.0:2346');frame $frame_worker = new Worker('frame://0.0.0.0:2347');tcp $tcp_worker = new Worker('tcp://0.0.0.0:2348');udp $udp_worker = new Worker('udp://0.0.0.0:2349');unix $unix_worker = new Worker('unix:///tmp/wm.sock');
获取包:
composer config -g --unset repos.packagist
composer create-project workerman/webmanw
目录结构:├── app 应用目录 │ ├── controller 控制器目录 │ ├── model 模型目录 │ ├── view 视图目录 │ └── middleware 中间件目录 │ └── StaticFile.php 自带静态文件中间件 | |—— functions.php 自定义函数 ├── config 配置目录 │ ├── app.php 应用配置 │ ├── autoload.php 这里配置的文件会被自动加载 │ ├── bootstrap.php 进程启动时onWorkerStart时运行的回调配置 │ ├── container.php 容器配置 │ ├── dependence.php 容器依赖配置 │ ├── database.php 数据库配置 │ ├── exception.php 异常配置 │ ├── log.php 日志配置 │ ├── middleware.php 中间件配置 │ ├── process.php 自定义进程配置 │ ├── redis.php redis配置 │ ├── route.php 路由配置 │ ├── server.php 端口、进程数等服务器配置 │ ├── view.php 视图配置 │ ├── static.php 静态文件开关及静态文件中间件配置 │ ├── translation.php 多语言配置 │ └── session.php session配置 ├── public 静态资源目录 ├── process 自定义进程目录 ├── runtime 应用的运行时目录,需要可写权限 ├── start.php 服务启动文件 ├── vendor composer安装的第三方类库目录 └── support 类库适配(包括第三方类库) ├── Request.php 请求类 ├── Response.php 响应类 ├── Plugin.php 插件安装卸载脚本 ├── helpers.php 助手函数 └── bootstrap.php 进程启动后初始化脚本原理(这里是精髓,请逐行阅读,这里面的几个函数一定要对照手册仔细研究): class worker {//连接事件回调public $onConnect =null;//消息事件回调public $onCMessage =null;//连接关闭事件回调public $onClose =null;//所有socket,包括监听的所有socketpublic $allsockets =array();//构造函数function __construct($address){//创建监听socket$this->socket = stream_socket_server($address,$errno,$errstr);//设置为非阻塞stream_set_blocking($this->socket,0);//把监听的socet放入allsockets$this->allsockets[(int)$this->socket]=$this->socket;}//运行public function run(){while (1){//这里不监听socket可写事件和数据可读事件$write = $except = null;//监听所有的socket可读事件,包括客户端socket和监听端口的socket$read = $this->allsockets;//整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个可读参数stream_select($read,$write,$except);//$read被重新赋值,遍历所有状态为可读的socketforeach ($read as $index=>$socket){//如果监听的是可读的socket,说明有新的连接if($socket ===$this->socket){//通过stream_socket_accept获取新的连接$new_connect_socket = stream_socket_accept($this->socket);if(!$new_connect_socket);continue;//如果有onConnect事件回调,则尝试触发if($this->onConnect){call_user_func($this->onConnect,$new_connect_socket);}//将新的客户端新连接的socket放到allsockets中,stream_select监听可读事件$this->allsockets[(int)$new_connect_socket] = $new_connect_socket;}else{//读数据$buffer = fread($socket,65535);//数据为空,代表连接已断开if($buffer === '' || $buffer ===false){//尝试触发onClose回调if($this->onClose){call_user_func($this->onClose,$socket);}fclose($socket);//从allsockets中删除对应的连接unset($this->allsockets[(int)$socket]);continue;}//尝试触发onMessage回调call_user_func($this->onCMessage);}}}} } $server = new worker("tcp://0.0.0.0:12115");$server->onConnect = function ($conn){echo 'onConnect\\n'; };$server->onCMessage = function ($conn,$msg){fwrite($conn,"HTTP/1.1 200 OK\\r\\n Connection:Keep-alive\\r\\nServer:workman\\1.1.4\\r\\nContent-length:5\\r\\n\\rhellow"); };$server->onClose = function ($conn){echo 'onCLose\\n'; };$server->run();
例子:
发送端:
<!doctype html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>WebSocket大屏</title>
<script src="jquery.min.js"></script>
</head>
<body>
<input type="text" id="content">
<input type="button" value="发送" οnclick="send()">
<script>
function connect() {
ws = new WebSocket('ws://127.0.0.1:6161');
ws.onmessage = function (e) {
console.log(e.data);
};
ws.timer = setInterval(function () {
ws.send('ping');
}, 50000);
ws.onclose = function () {
clearTimeout(ws.timer);
setTimeout(connect, 1000);
};
}
// 通过WebSocket连接将数据发送给服务端
function send() {
ws.send($('#content').val());
$('#content').val('');
}
connect();
</script>
</body>
</html>
接收端:
<!doctype html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<script src="jquery.min.js"></script>
<title>WebSocket大屏</title>
</head>
<body>
<ul id="content">
</ul>
</body>
<script>
function connect() {
// 与服务端建立WebSocket连接
//(为了方便测试这里ip使用的是127.0.0.1,正式环境请使用外网ip)
ws = new WebSocket('ws://127.0.0.1:6161');
// 连接建立后发送daping,表明自己是电脑浏览器
ws.onopen = function() {
ws.send('daping');
};
// 收到服务端推送的数据后,将数据显示在浏览器里(心跳数据pong除外)
ws.onmessage = function (e) {
if (e.data !== 'pong') {
$($('#content')).append('<li>'+e.data+'</li>');
}
};
// 没隔50秒发送一个心跳数据 ping 给服务器,保持连接
ws.timer = setInterval(function () {
ws.send('ping');
}, 50000);
// 当连接关闭时清除定时器,并设置1秒后重连
ws.onclose = function () {
clearTimeout(ws.timer);
setTimeout(connect, 1000);
};
}
// 执行连接
connect();
</script>
</html>
服务文件:
<?php
require 'autoload.php';
use Workerman\\Worker;
use Workerman\\Connection\\TcpConnection;
// 使用websocket协议监听6161端口
$worker = new Worker('websocket://0.0.0.0:6161');
// 当浏览器(包括用户手机浏览器和电脑浏览器)发来消息时的处理逻辑
$worker->onMessage = function(TcpConnection $connection, $data) {
// 这个静态变量用来存储电脑浏览器的websocket连接,方便推送使用
static $daping_connection = null;
switch ($data) {
// 发送 daping 字符串的是电脑浏览器,将其连接保存到静态变量中
case 'daping':
$daping_connection = $connection;
break;
// ping 是心跳数据,用来维持连接,只返回 pong 字符串,无需做其它处理
case 'ping':
$connection->send('pong');
break;
// 用户手机浏览器发来的祝福语
default:
// 直接使用电脑浏览器的连接将祝福语推送给电脑
if ($daping_connection) {
$daping_connection->send($data);
}
}
};
Worker::runAll();