English | 中文
协程版 Workerman. 同步编码,异步执行
使用PHP原生的
yield
,yield from
实现,协程这块功能没有依赖其他拓展.
至少PHP v5.5
支持POSIX
规范的兼容性系统(Linux, OSX, BSD)
POSIX
和 PCNTL
这两个模块需要在PHP编译期间就启用。
为了更好的性能,推荐安装事件驱动拓展,如:Event,ev,libevent.
composer require paulxu-cn/co-workerman:0.0.*
<?php
// file: ./simpleTimerServer.php
define('DS', DIRECTORY_SEPARATOR);
require_once __DIR__ . DS . 'vendor' . DS . 'autoload.php';
use CoWorkerman\CoWorker;
use CoWorkerman\Lib\CoTimer;
use CoWorkerman\Connection\CoTcpConnection;
$worker = new CoWorker('tcp://0.0.0.0:8686');
$worker->onConnect = function (CoTcpConnection $connection) {
echo "New Connection, {$connection->getLocalIp()}" . PHP_EOL;
yield from CoTimer::sleepAsync(1000); // yield, and go back after 1 second.
$re = yield from $connection->sendAsync('hello');
CoWorker::safeEcho( "get re: [{$re}]" . PHP_EOL);
};
// 运行worker
CoWorker::runAll();
- start the server
## start the CoWorkerman Server
$ php ./simpleTimerServer.php start
## talnet it
$ talnet 127.0.0.1:8686
> hi
hello
<?php
// file: ~/examples/example5/coCartServer.php
define('DS', DIRECTORY_SEPARATOR);
require_once __DIR__ . DS . 'vendor' . DS . 'autoload.php';
use CoWorkerman\CoWorker;
use CoWorkerman\Coroutine\Promise;
use CoWorkerman\Connection\CoTcpClient;
use CoWorkerman\Connection\CoTcpConnection;
$worker = new CoWorker('tcp://0.0.0.0:8686');
$worker->onConnect = function (CoTcpConnection $connection) {
echo "New Connection, {$connection->getLocalIp()} \n";
$re = checkInventoryAsync($connection, rand(10, 20), true);
$re2 = checkProductAsync($connection, rand(10, 20), true);
// 顺序异步执行 ------------------------------------------------------------------+
// $re = yield from Promise::wait($re, 'onConnect');
// $re2 = yield from Promise::wait($re2, 'onConnect');
// or 同时异步执行 ---------------------------------------------------------------+
list($re, $re2) = yield from Promise::all(array($re, $re2), 'onConnect');
// 选择结束 ----------------------------------------------------------------------+
if (isset($re['re']) && isset($re2['re'])) {
$check = $re['re'] && $re2['re'];
}
$connection->sendAsync($check);
};
/**
* 发起rpc检查统一接口
*
* @param $host
* @param $port
* @param $method
* @param $data
* @param bool $noBlocking
* @return bool|false|string
*/
function checkClientAsync($connection, $host, $port, $method, $data, $noBlocking = true)
{
CoWorker::safeEcho('try to connect to ' . "tcp://$host:$port" . PHP_EOL);
$con = new CoTcpClient("tcp://$host:$port");
$ifCon = $con->connectAsync($connection);
$ifCon = yield from Promise::wait($ifCon, __FUNCTION__);
if (!$ifCon) {
return null;
}
CoWorker::safeEcho( "\nconnected to server, re: [{$ifCon}]\n" . PHP_EOL);
$message = json_encode(["method" => $method, "data" => $data]);
$re = $con->sendAsync($message);
return $re;
}
function checkInventoryAsync($connection, $productId, $noBlocking = true)
{
$host = "127.0.0.1";
$port = 8081;
$data = array('productId' => $productId);
return yield from checkClientAsync($connection, $host, $port, 'inventory', $data, $noBlocking);
}
function checkProductAsync($connection, $productId, $noBlocking = true)
{
$host = "127.0.0.1";
$port = 8082;
$data = array('productId' => $productId);
return yield from checkClientAsync($connection, $host, $port, 'product', $data, $noBlocking);
}
// 运行worker
CoWorker::runAll();
- 启动依赖的三个服务
## 启动一个处理耗时2s的库存服务
$ php ./examples/example5/otherServerFork.php 8081 inventory 1
## 启动一个处理耗时4s的产品服务
$ php ./examples/example5/otherServerFork.php 8082 product 2
## 监听8083端口,处理一个请求 耗时6s的 promo 服务
$ php ./examples/example5/otherServerFork.php 8083 promo 3
- 运行服务端,与客户端
## 启动一个非阻塞购物车服务
$ php ./coCartServer.php
## 客户端请求
$ php ./examples/example5/userClientFork.php
<?php
// file: ~/example/example3/coWorkerMySQLtest1.php
require_once __DIR__ . '/../../vendor/autoload.php';
use CoWorkerman\CoWorker;
use CoWorkerman\MySQL\Connection;
$worker = new CoWorker('tcp://0.0.0.0:6161');
// 收到客户端请求时
$worker->onMessage = function($connection, $data) {
global $mysql;
$mysql = new Connection(array(
'host' => '127.0.0.1', // 不要写localhost
'dbname' => 'mysql',
'user' => 'root',
'password' => '123456',
'port' => '3306'
));
$connected = yield from $mysql->connection();
$re = yield from $mysql->query('show databases');
CoWorker::safeEcho(json_encode($re) . PHP_EOL);
};
CoWorker::runAll();
- 启动服务
## start the MySQL client server
$ php ./examples/example3/coWorkerMySQLtest1.php start
- 访问
telnet 127.0.0.1 6161
目前还在测试开发阶段,不要用在生产环境中.
它克隆自workerman