> 文章列表 > Workerman 原理解析概述

Workerman 原理解析概述

Workerman 原理解析概述

首先说明一下Workerman究竟是什么东西: 

        WebSocket 是 HTML5 提供的一种网络通讯协议,用于服务端与客户端实时数据传输。广泛用于浏览器与服务器的实时通讯,APP与服务器的实时通讯等场景,相比传统HTTP协议请求响应式通讯,WebSocket协议可以做到实时的双向通讯,服务端可以在任何时候向客户端推送数据(HTTP协议需要客户端发起请求后才能推送),就是传统说的长链接(指在一个连接上可以连续发送多个数据包)

workman特点:
        1.Workerman类似一个PHP版本的nginx

        2.核心也是多进程+Epoll+非阻塞IO

        3.Workerman每个进程能维持上万并发连接

         4.由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能

        5.同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。

        6.拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。

        7.主进程为了保持稳定性,只负责监控子进程,不负责接收数据也不做任何业务逻辑

        8.纯PHP开发

        9.支持高并发:WorkerMan支持Libevent事件轮询库(需要安装event扩展),如果没有安装Event扩展则使用PHP内置的Select相关系统调用

相关命令:

        启动命令:php php文件名 start

        停止命令:php php文件名 stop

        重启命令:PHP php文件名 restart

        查看状态:PHP php文件名 status

        链接状态:PHP php文件名 connections

注意点:

        心跳是必须的:一定要有(非常重要):否则连接可能由于长时间不活跃而被路由节点防火墙断开 客户端和服务端协议一定要对应才能通讯

支持协议的示例:

websocket
$websocket_worker = new Worker('websocket://0.0.0.0:2345');text
$text_worker = new Worker('text://0.0.0.0:2346');frame
$frame_worker = new Worker('frame://0.0.0.0:2347');tcp
$tcp_worker = new Worker('tcp://0.0.0.0:2348');udp
$udp_worker = new Worker('udp://0.0.0.0:2349');unix
$unix_worker = new Worker('unix:///tmp/wm.sock');

获取包:

composer config -g --unset repos.packagist

composer create-project workerman/webmanw

目录结构:├── app 应用目录 
│   ├── controller 控制器目录 
│   ├── model 模型目录 
│   ├── view 视图目录 
│   └── middleware 中间件目录 
│        └── StaticFile.php 自带静态文件中间件 
|   |—— functions.php 自定义函数 ├── config 配置目录
│   ├── app.php 应用配置 
│   ├── autoload.php 这里配置的文件会被自动加载 
│   ├── bootstrap.php 进程启动时onWorkerStart时运行的回调配置 
│   ├── container.php 容器配置
│   ├── dependence.php 容器依赖配置 
│   ├── database.php 数据库配置 
│   ├── exception.php 异常配置 
│   ├── log.php 日志配置 
│   ├── middleware.php 中间件配置 
│   ├── process.php 自定义进程配置 
│   ├── redis.php redis配置 
│   ├── route.php 路由配置 
│   ├── server.php 端口、进程数等服务器配置 
│   ├── view.php 视图配置 
│   ├── static.php 静态文件开关及静态文件中间件配置 
│   ├── translation.php 多语言配置 
│   └── session.php session配置 
├── public 静态资源目录 
├── process 自定义进程目录 
├── runtime 应用的运行时目录,需要可写权限 
├── start.php 服务启动文件 
├── vendor composer安装的第三方类库目录 
└── support 类库适配(包括第三方类库) ├── Request.php 请求类 ├── Response.php 响应类 ├── Plugin.php 插件安装卸载脚本 ├── helpers.php 助手函数 └── bootstrap.php 进程启动后初始化脚本原理(这里是精髓,请逐行阅读,这里面的几个函数一定要对照手册仔细研究):
class worker
{//连接事件回调public $onConnect =null;//消息事件回调public $onCMessage =null;//连接关闭事件回调public $onClose =null;//所有socket,包括监听的所有socketpublic $allsockets =array();//构造函数function __construct($address){//创建监听socket$this->socket = stream_socket_server($address,$errno,$errstr);//设置为非阻塞stream_set_blocking($this->socket,0);//把监听的socet放入allsockets$this->allsockets[(int)$this->socket]=$this->socket;}//运行public function run(){while (1){//这里不监听socket可写事件和数据可读事件$write = $except = null;//监听所有的socket可读事件,包括客户端socket和监听端口的socket$read = $this->allsockets;//整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个可读参数stream_select($read,$write,$except);//$read被重新赋值,遍历所有状态为可读的socketforeach ($read as $index=>$socket){//如果监听的是可读的socket,说明有新的连接if($socket ===$this->socket){//通过stream_socket_accept获取新的连接$new_connect_socket = stream_socket_accept($this->socket);if(!$new_connect_socket);continue;//如果有onConnect事件回调,则尝试触发if($this->onConnect){call_user_func($this->onConnect,$new_connect_socket);}//将新的客户端新连接的socket放到allsockets中,stream_select监听可读事件$this->allsockets[(int)$new_connect_socket] = $new_connect_socket;}else{//读数据$buffer = fread($socket,65535);//数据为空,代表连接已断开if($buffer === '' || $buffer ===false){//尝试触发onClose回调if($this->onClose){call_user_func($this->onClose,$socket);}fclose($socket);//从allsockets中删除对应的连接unset($this->allsockets[(int)$socket]);continue;}//尝试触发onMessage回调call_user_func($this->onCMessage);}}}}
}
$server = new worker("tcp://0.0.0.0:12115");$server->onConnect = function ($conn){echo 'onConnect\\n';
};$server->onCMessage = function ($conn,$msg){fwrite($conn,"HTTP/1.1 200 OK\\r\\n Connection:Keep-alive\\r\\nServer:workman\\1.1.4\\r\\nContent-length:5\\r\\n\\rhellow");
};$server->onClose = function ($conn){echo 'onCLose\\n';
};$server->run();

例子

发送端:

<!doctype html>
<html lang="zh-cn">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>WebSocket大屏</title>
    <script src="jquery.min.js"></script>
</head>
<body>

<input type="text" id="content">
<input type="button" value="发送" οnclick="send()">
<script>

function connect() {
    ws = new WebSocket('ws://127.0.0.1:6161');
    ws.onmessage = function (e) {
        console.log(e.data);
    };
    ws.timer = setInterval(function () {
        ws.send('ping');
    }, 50000);
    ws.onclose = function () {
        clearTimeout(ws.timer);
        setTimeout(connect, 1000);
    };
}

//  通过WebSocket连接将数据发送给服务端
function send() {
    ws.send($('#content').val());
    $('#content').val('');
}

connect();
</script>
</body>
</html>

接收端:
 

<!doctype html>
<html lang="zh-cn">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <script src="jquery.min.js"></script>
    <title>WebSocket大屏</title>
</head>
<body>
    <ul id="content">

    </ul>
</body>
<script>
    function connect() {
        // 与服务端建立WebSocket连接
        //(为了方便测试这里ip使用的是127.0.0.1,正式环境请使用外网ip)
        ws = new WebSocket('ws://127.0.0.1:6161');
        // 连接建立后发送daping,表明自己是电脑浏览器
        ws.onopen = function() {
            ws.send('daping');
        };
        //  收到服务端推送的数据后,将数据显示在浏览器里(心跳数据pong除外)
        ws.onmessage = function (e) {
            if (e.data !== 'pong') {
                $($('#content')).append('<li>'+e.data+'</li>');
            }
        };
        // 没隔50秒发送一个心跳数据 ping 给服务器,保持连接
        ws.timer = setInterval(function () {
            ws.send('ping');
        }, 50000);
        //  当连接关闭时清除定时器,并设置1秒后重连
        ws.onclose = function () {
            clearTimeout(ws.timer);
            setTimeout(connect, 1000);
        };
    }
    // 执行连接
    connect();
</script>
</html>

服务文件:
 

<?php

require 'autoload.php';
use Workerman\\Worker;
use Workerman\\Connection\\TcpConnection;

// 使用websocket协议监听6161端口
$worker = new Worker('websocket://0.0.0.0:6161');

//  当浏览器(包括用户手机浏览器和电脑浏览器)发来消息时的处理逻辑
$worker->onMessage = function(TcpConnection $connection, $data) {
    // 这个静态变量用来存储电脑浏览器的websocket连接,方便推送使用
    static $daping_connection = null;
    switch ($data) {
        // 发送 daping 字符串的是电脑浏览器,将其连接保存到静态变量中
        case 'daping':
            $daping_connection = $connection;
            break;
        // ping 是心跳数据,用来维持连接,只返回 pong 字符串,无需做其它处理
        case 'ping':
            $connection->send('pong');
            break;
        // 用户手机浏览器发来的祝福语
        default:
            // 直接使用电脑浏览器的连接将祝福语推送给电脑
            if ($daping_connection) {
                $daping_connection->send($data);
            }
    }
};
Worker::runAll();