作者 pengjch

dev: swoole rpc

## 简介
AukeySwrpc是一个基于swoole开发的高性能rpc包,AukeySwrpc提供了注册发现,链路追踪,中间件等等功能,可以很容易集成到第三方框架,如laravel,yii等等。
## 功能
- 支持多进程模式或协程模式
- 支持同步,异步调用
- 支持自定义中间件
- 支持服务端按类提供对外服务
- 支持链路追踪
- 支持注册服务发现
- 支持客户端负载均衡,包含随机,权重两种模式
- 支持自定义日志,包协议,序列化方式等等
## 安装
```bash
php composer.phar require wuzhc/swprc ~1.0 -vvv
```
## 快速体验
假设我们User和School两个模块,为了获得用户学校名称,我们需要在User模块发起rpc请求调用School模块的服务。
### School模块
```php
<?php
use AukeySwrpc\LogicService;
class SchoolService extends LogicService
{
public function getUserSchool($userID) {
$name = $userID == 123 ? '火星' : '水星';
return $name.'学校';
}
}
```
School模块将作为rpc服务端,对外提供服务,启动如下:
```php
<?php
namespace AukeySwrpcTests;
use AukeySwrpc\Server;
$basePath = dirname(dirname(__FILE__));
require_once $basePath . "/vendor/autoload.php";
$options = [
'enable_coroutine' => true,
'pid_file' => __DIR__ . '/AukeySwrpc.pid',
];
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
$server->addService(\AukeySwrpcTests\services\SchoolService::class);
$server->start();
```
将SchoolService添加到server,server会自动检索类中所有可用的public方法。
![1615562843265](https://segmentfault.com/img/bVcPwbR)
### User模块
User模块作为客户端,调用School模块服务如下
```php
<?php
namespace AukeySwrpcTests;
use AukeySwrpc\Request;
use AukeySwrpc\LogicService;
use AukeySwrpc\Client;
class UserService extends LogicService
{
public function getUserSchoolName()
{
$userID = 123;
$module = 'School_Module'; //请求目标模块名称,需要和服务端定义的一致
$client = Client::create($module, '127.0.0.1', 9501);
return $client->send(Request::create('\AukeySwrpcTests\services\SchoolService_getUserSchool', [$userID]));
}
}
//调用
echo UserService::factory()->getUserSchoolName();
```
注意:
- Request.method 为服务类命名 + 下划线 + 方法名,例如上面的`\AukeySwrpcTests\services\SchoolService_getUserSchool`,如果服务类有命名空间,记得一定要带上命名空间
## 多进程和协程模式
多进程或协程模式需要和swoole配置一致,具体参考swoole配置
### 多进程模式
创建10进程来处理请求
```php
$options = [
'worker_num' => 10
'pid_file' => __DIR__ . '/AukeySwrpc.pid',
];
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
```
### 协程模式
目前AukeySwrpc协程模式是运行在单进程的
```php
$options = [
'enable_coroutine' => true,
'pid_file' => __DIR__ . '/AukeySwrpc.pid',
];
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
```
## 同步调用和异步调用
在客户端发起同步调用,客户端会一直等待服务端返回结果
```php
$client = \AukeySwrpc\Client::create($module, '127.0.0.1', 9501);
return $client->send(SyncRequest::create('SchoolService_getUserSchool', [$userID]));
```
在客户端发起异步调用,客户端会立马得到响应结果,请求将被swoole的task进程处理
```php
$client = \AukeySwrpc\Client::create($module, '127.0.0.1', 9501);
return $client->send(AsyncRequest::create('SchoolService_getUserSchool', [$userID]));
```
## 自定义中间件
中间件允许程序可以对请求进行前置操作和后置操作,底层使用了责任链设计模式,所以为了执行下一个中间件,必须返回`$next($request)`,如果想提前返回,则返回结果必须是`AukeySwrpc\Response`类型
```php
//中间件除了用匿名函数定义,还可以用实现AukeySwrpc\Middlewares\MiddlewareInterface接口的类
$middleware = function (\AukeySwrpc\Request $request, Closure $next) {
$start = microtime(true); //前置操作,记录请求开始时间
$result = $next($request);
echo '耗时:'.(microtime(true) - $start).PHP_EOL; //后置操作,记录请求结束时间,从而计算请求耗时
return $result; //继续下个中间件的处理
};
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
$server->addService(SchoolService::class);
$server->addMiddleware($middleware); //添加中间件
$server->start();
```
如果要提前中止中间件,可以提前在匿名函数或类方法中返回\AukeySwrpc\Response对象,如下
```php
$middleware = function (\AukeySwrpc\Request $request, Closure $next) {
if (empty($request->getParams())) {
return \AukeySwrpc\Response::error('参数不能为空'); //提前返回,必须是Response类型
}
return $next($request);
};
```
## 服务端按类提供对外服务
从上面的例子中,我们把SchoolService整个类添加的server中,这样server就能对外提供SchoolService类所有public方法的功能。
```php
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
$server->addService(SchoolService::class); //提供SchoolService所有public方法功能
$server->addService(AreaService::class); //提供AreaService所有public方法功能
$server->start();
```
客户端使用参考上面的快速体验
## 注册服务发现
如果服务端启动的时候有设置注册中心,则启动成功会自动向注册中心注册服务端地址。目前AukeySwrpc提供了`Consul`作为注册中心,使用如下
```php
$server = new Server('School_Module', '127.0.0.1', 9501, 1, $options);
$server->addRegister(new Consul());
$server->addService(SchoolService::class);
$server->start();
```
如上,使用Consul作为服务的注册中心,通过`http://127.0.0.1:8500`可以查看注册信息,如果想用etcd等其他注册中心,只要实现`AukeySwrpc\Middlewares\RegisterInterface`接口即可,然后在通过`$server->addRegister()`添加到server
![1615562878292](https://segmentfault.com/img/bVcPwbR)
![1615562927956](https://segmentfault.com/img/bVcPwb4)
![1615562975815](https://segmentfault.com/img/bVcPwb5)
## 客户端负载均衡
如果服务端启动多个节点,例如School模块启动3个节点,并且注册到了注册中心,那么我们可以从注册中心获取所有服务端节点信息,然后做一些策略处理。
```php
$register = new Consul();
$client = \AukeySwrpc\Client::createBalancer('School_Module', $register, \AukeySwrpc\Client::STRATEGY_WEIGHT);
$result = $client->send(Request::create('SchoolService_getUserSchool', [$userID]);
```
目前AukeySwrpc提供两种简单策略模式,`\AukeySwrpc\Client::STRATEGY_WEIGHT权重模式`,`\AukeySwrpc\Client::STRATEGY_RANDOM`随机模式
## 链路追踪
当我们的服务非常多并且需要互相调用时候,如果其中某个调用失败,会导致我们得不到我们想要的结果,而要调试出是哪个环节出了问题也比较麻烦,可能你需要登录每台机器看下有没有错误日志,或者看返回的错误信息是哪个服务提供的。链路追踪记录了整个调用链过程,如果某个环节出错,我们可以快速从调用链得到调用中断地方。
```php
class UserService extends LogicService
{
public function getUserSchoolName()
{
$userID = 123;
$module = 'School_Module'; //请求目标模块名称,需要和服务端定义的一致
$client = Client::create($module, '127.0.0.1', 9501);
return $client->send(Request::create('SchoolService_getUserSchool', [$userID], $this->getTracerContext(__FUNCTION__))); //getTracerContext()用于提供追踪上下文
}
}
$users = UserService::factory()
->setModule('User_Module') //当前模块,用于调用链的起点
->setTracerUrl('http://127.0.0.1:9411/api/v2/spans') //zipkin链路追踪地址
->getUserSchoolName();
```
![1615563070157](https://segmentfault.com/img/bVcPwb6)
如图,User_Module调用Class_Module,Class_Module又去调用School_Module
![1615563170709](https://segmentfault.com/img/bVcPwb8)
每个调用还记录响应结果
## 自定义日志处理器
默认使用`Monolog/Logger`作为日志处理器,日志信息会输出到控制台。可根据自己需求覆盖默认处理器,只要日志类
实现`Psr\Log\LoggerInterface`即可
```php
use AukeySwrpc\Server;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
$logger = new Logger('AukeySwrpc');
$logger->pushHandler(new StreamHandler(fopen('xxxx.log','w+'), Logger::DEBUG));
$server = new Server('127.0.0.1', 9501, ['enable_coroutine'=>true]);
$server->addService(UserService::class);
$server->addLogger($logger); //覆盖默认日志处理器
$server->start();
```
## 序列化方式
默认使用固定头+包体来解决**tcp粘包**问题,默认配置为`'package_length_type' => 'N'`,`'package_body_offset' => 4`
默认序列化数据会使用`serialize()`,如果swoole版本在4.5以上的自动使用swoole_substr_unserialize(),可以实现的类来覆盖默认配置,只要实现`src/Packer/PackerInterface`即可,注意服务端和客户端需要使用一样的协议,否则解析不了。
```php
use AukeySwrpc\Server;
$packer = new \AukeySwrpc\Packer\SerializeLengthPacker();
$server = new Server('127.0.0.1', 9501, ['enable_coroutine'=>true]);
$server->addService(UserService::class);
$server->addPacker($packer); //覆盖默认值
```
## 安全证书配置
参考:<https://wiki.swoole.com/#/server/setting?id=ssl_cert_file>
### 服务端
```php
$options = [
'ssl_cert_file' => __DIR__.'/config/ssl.crt',
'ssl_key_file' => __DIR__.'/config/ssl.key',
'pid_file' => __DIR__ . '/AukeySwrpc.pid',
];
$server = new Server('School_Module', '127.0.0.1', 9501, $options, SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$server->addService(SchoolService::class);
$server->start();
```
注意:
- 文件必须为 `PEM` 格式,不支持 `DER` 格式,可使用 `openssl` 工具进行转换
## 测试
使用phpuint实现的简单测试案例,配置文件`phpunit.xml`,根据你的服务器配置ip地址
```bash
php phpunit.phar tests --debug
```
![1615602809212](https://segmentfault.com/img/bVcPwcb)
### phpunit 测试报告
```
Client (AukeySwrpcTests\Client)
[x] Client connect
[x] Client sync request
[x] Client async request
Packer (AukeySwrpcTests\Packer)
[x] Serialize length pack
[x] Serialize lenght unpack
[x] Serialize eof pack
[x] Serialize eof unpack
Server (AukeySwrpcTests\Server)
[x] Server register to consul
[x] Server unregister from consul
[x] Server add service
[x] Server add middleware
```
... ...
{
"name": "aukey/aukeySwrpc",
"description": "基于swoole的rpc库",
"license": "MIT",
"authors": [
{
"name": "pengjianchao",
"email": "pengjianchao@aukeys.com"
}
],
"require": {
"php": ">=7.3",
"psr/log": "^1.0",
"monolog/monolog": "^2.2",
"sensiolabs/consul-php-sdk": "~4.0",
"openzipkin/zipkin": "~2.0"
},
"require-dev": {
"phpunit/phpunit": "~9.5"
},
"autoload": {
"psr-4": {
"AukeySwrpc\\": "src/",
"AukeySwrpcTests\\": "tests/"
},
"classmap": [
"src/"
]
}
}
... ...
此 diff 太大无法显示。
<?xml version="1.0"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" bootstrap="vendor/autoload.php" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<coverage/>
<php>
<env name="CONSUL_HOST" value="127.0.0.1"/>
<env name="CONSUL_PORT" value="8500"/>
<env name="RPC_SERVER_HOST" value="127.0.0.1"/>
<env name="RPC_SERVER_PORT" value="19501"/>
</php>
<logging>
<testdoxText outputFile="./AukeySwrpc_test_report.log"/>
</logging>
</phpunit>
... ...
<?php
namespace AukeySwrpc;
use Illuminate\Support\ServiceProvider;
class AukeyAukeySwrpcProvider extends ServiceProvider
{
public function boot()
{
$this->loadMigrationsFrom(__DIR__ . '/database/migrations');
if ($this->app->runningInConsole()) {
$this->commands([
\AukeyDataCenter\Commands\CreateBaseCode::class,
\AukeyDataCenter\Commands\SheinReturnOrderCom::class,
\AukeyDataCenter\Commands\SheinProductCom::class,
\AukeyDataCenter\Commands\SheinBrandCom::class,
]);
}
}
public function register()
{
$provides = [
'AukeyDataCenter\Http\RouteServiceProvider',
];
foreach ($provides as $provider) {
$this->app->register($provider);
}
}
}
... ...
<?php
namespace AukeySwrpc;
use Swoole\Client as SwClient;
use AukeySwrpc\Exceptions\RpcException;
use AukeySwrpc\Packer\PackerInterface;
use AukeySwrpc\Packer\SerializeLengthPacker;
use AukeySwrpc\Register\RegisterInterface;
use AukeySwrpc\Register\Service;
use AukeySwrpc\Request\Request;
/**
* Class Client
*
* @package AukeySwrpc
* @author pengjch 202439 11:36:25
*/
class Client
{
protected $services = [];
protected $connects = [];
const STRATEGY_RANDOM = 1;
const STRATEGY_WEIGHT = 2;
protected $mode;
protected $timeout = 3;
protected array $options;
protected string $module;
protected int $strategy;
protected ?RegisterInterface $register = null;
protected ?PackerInterface $packer = null;
protected array $defaultOptions
= [
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 81920, //协议最大长度
];
/**
* Client constructor.
*
* @param string $module
* @param array $services
* @param int $mode
* @param int $timeout
* @param array $options
*/
public function __construct(string $module, array $services, $mode = SWOOLE_SOCK_TCP, $timeout = 3, $options = [])
{
$this->module = $module;
$this->services = $services;
$this->mode = $mode;
$this->timeout = $timeout;
if (empty($options)) {
$options = $this->defaultOptions;
}
$this->options = $options;
}
/**
* @param string $module
* @param string $host
* @param int $port
* @param int $mode
* @param array $options
* @return Client
* @author pengjch 2024313 18:31:17
*/
public static function create(
string $module,
string $host,
int $port,
$mode = SWOOLE_SOCK_TCP,
$timeout = 3,
$options = []
): Client {
$service = Service::build($host, $port, 1);
return new static($module, [$service], $mode, $timeout, $options);
}
/**
* @param string $module
* @param RegisterInterface $register
* @param int $strategy
* @param int $mode
* @param int $timeout
* @param array $options
* @return Client
* @author pengjch 2024313 18:31:22
*/
public static function createBalancer(
string $module,
RegisterInterface $register,
$strategy = self::STRATEGY_RANDOM,
$mode = SWOOLE_SOCK_TCP,
$timeout = 3,
$options = []
): Client {
$client = new static($module, [], $mode, $timeout, $options);
$client->strategy = $strategy;
$client->addRegister($register);
return $client;
}
/**
* @param RegisterInterface $register
* @return $this
* @author pengjch 2024313 18:27:20
*/
public function addRegister(RegisterInterface $register): Client
{
$this->register = $register;
$this->services = $this->register->getServices($this->module);
return $this;
}
/**
* @param PackerInterface $packer
* @return $this
* @author pengjch 2024313 18:27:24
*/
public function addPacker(PackerInterface $packer): Client
{
$this->packer = $packer;
return $this;
}
/**
* @return SwClient
* @throws RpcException
* @author pengjch 2024313 18:23:37
*/
public function connect(): SwClient
{
$n = count($this->services);
if ($n == 0) {
throw new RpcException('No services available');
}
/** @var Service $service */
if ($n == 1) { //单个服务节点
$service = $this->services[0];
$key = $service->getHost() . '_' . $service->getPort();
} else { //多个服务节点
$key = $this->getConnectKey();
}
if (isset($this->connects[$key]) && $this->connects[$key]->isConnected()) {
return $this->connects[$key];
}
$client = new SwClient($this->mode ?: SWOOLE_SOCK_TCP);
if (!$client->connect($service->getHost(), $service->getPort(), $this->timeout ?? 3)) {
throw new RpcException("connect failed. Error: {$client->errCode}");
}
$client->set($this->options);
$this->connects[$key] = $client;
return $this->connects[$key];
}
/**
* 发送请求
*
* @param Request $request
* @return mixed
* @throws RpcException
* @author pengjch 202439 13:35:25
*/
public function send(Request $request)
{
/** @var \Swoole\Client $conn */
$conn = $this->connect();
if (!$this->packer) {
$this->packer = new SerializeLengthPacker([
'package_length_type' => $options['package_length_type'] ?? 'N',
'package_body_offset' => $options['package_body_offset'] ?? 4,
]);
}
$request->setModule($this->module);
$conn->send($this->packer->pack($request));
/** @var Response $response */
$response = @unserialize($conn->recv());
if (!($response instanceof Response)) {
throw new RpcException('The server return type is not a AukeySwrpc\Response');
}
if ($response->code == Response::RES_ERROR) {
throw new RpcException($response->msg);
}
return $response->data['result'] ?? null;
}
/**
* @return string
* @author pengjch 2024313 18:20:38
*/
public function getConnectKey(): string
{
/** @var Service $service */
if ($this->strategy == self::STRATEGY_RANDOM) {
$service = array_rand($this->services);
return $service->getHost() . '_' . $service->getPort();
} else {
/** @var Service $service */
foreach ($this->services as $service) {
$totalWeight += $service->getWeight();
$sort[] = $service->getWeight();
$serviceArr[] = $service->toArray();
}
array_multisort($serviceArr, SORT_DESC, $sort);
$start = 0;
$rand = rand(1, $totalWeight);
foreach ($serviceArr as $service) {
if ($start + $service['weight'] >= $rand) {
return $service['host'] . '_' . $service['port'];
}
$start = $start + $service['weight'];
}
}
}
/**
* 关闭客户端连接
*
* @return mixed
* @author pengjch 2024310 9:16:46
*/
public function close()
{
foreach ($this->connects as $connect) {
$connect->close(true);
}
}
/**
* 刷新节点服务信息
* 客户端使用长连接的情况下,需要起一个定时器来定时更新节点服务信息
*
* @author pengjch 2024313 18:24:23
*/
public function refreshServices()
{
if ($this->register) {
$this->services = $this->register->getServices($this->module);
$this->connects = [];
}
}
}
... ...
<?php
namespace AukeySwrpc;
trait Event
{
public function OnWorkerStart(\Swoole\Server $server, int $workerId)
{
}
public function onStart(\Swoole\Server $server)
{
}
public function onShutdown(\Swoole\Server $server)
{
}
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class RequestException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class RequestException extends Exception
{
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class RpcException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class RpcException extends Exception
{
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class TypeException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class TypeException extends Exception
{
}
... ...
<?php
namespace AukeySwrpc;
use AukeySwrpc\Tracer\TracerContext;
use Zipkin\Endpoint;
use Zipkin\Reporters\Http;
use Zipkin\Samplers\BinarySampler;
use Zipkin\TracingBuilder;
/**
* Class LogicService
*
* @package AukeySwrpc
* @author pengjch 2024311 11:10:18
*/
class LogicService
{
protected $params;
protected $module;
protected $tracerUrl;
protected $tracerContext;
protected $clients = [];
/**
* @return static
* @author pengjch 2024311 11:4:5
*/
public static function factory()
{
return new static();
}
/**
* 初始化链路追踪器
*
* @param $func
* @author pengjch 2024311 11:3:36
*/
public function initTracer($func)
{
$reporterUrl = $this->tracerUrl ?: 'http://127.0.0.1:9411/api/v2/spans';
$endpoint = Endpoint::create($this->module);
$reporter = new Http(['endpoint_url' => $reporterUrl]);
$sampler = BinarySampler::createAsAlwaysSample();
$tracing = TracingBuilder::create()
->havingLocalEndpoint($endpoint)
->havingSampler($sampler)
->havingReporter($reporter)
->build();
$tracer = $tracing->getTracer();
$span = $tracer->newTrace();
$span->setName($func);
$span->start();
$span->finish();
$tracer->flush();
$ctx = $span->getContext();
if ($this->tracerContext) {
$this->tracerContext->setTraceID($ctx->getTraceId());
$this->tracerContext->setParentID($ctx->getSpanId());
$this->tracerContext->setReporterUrl($reporterUrl);
} else {
$this->tracerContext = TracerContext::create($ctx->getTraceId(), $ctx->getSpanId(), $reporterUrl);
}
}
/**
* @param $context
* @return $this
* @author pengjch 2024311 11:18:43
*/
public function setTracerContext($context)
{
$this->tracerContext = $context;
return $this;
}
/**
* @param $func
* @return null
* @author pengjch 2024311 11:4:1
*/
public function getTracerContext($func)
{
if (empty($this->tracerUrl)) {
return null;
}
if (empty($this->tracerContext)) {
$this->initTracer($func);
}
return $this->tracerContext;
}
/**
* @param array $params
* @return $this
* @author pengjch 2024311 11:15:47
*/
public function setParams(array $params)
{
$this->params = $params;
return $this;
}
/**
* @param string $url
* @return static $this
* @author pengjch 2024311 11:15:35
*/
public function setTracerUrl(string $url)
{
$this->tracerUrl = $url;
return $this;
}
/**
* @param string $name
* @return $this
* @author pengjch 2024311 11:59:4
*/
public function setModule(string $name)
{
$this->module = $name;
return $this;
}
}
... ...
<?php
namespace AukeySwrpc\Middlewares;
use Closure;
use AukeySwrpc\Request\Request;
use AukeySwrpc\Response;
/**
* Interface MiddlewareInterface
*
* @package AukeySwrpc\Middlewares
* @author pengjch 202439 11:37:39
*/
interface MiddlewareInterface
{
function handle(Request $request, Closure $next): Response;
}
... ...
<?php
namespace AukeySwrpc\Middlewares;
use Closure;
use AukeySwrpc\Request\Request;
use AukeySwrpc\Response;
use Zipkin\Endpoint;
use Zipkin\Propagation\TraceContext;
use Zipkin\Reporters\Http;
use Zipkin\Samplers\BinarySampler;
use Zipkin\TracingBuilder;
/**
* 链路追踪中间件
* Class TraceMiddleware
*
* @package AukeySwrpc\Middlewares
* @author pengjch 2024310 16:41:6
*/
class TraceMiddleware implements MiddlewareInterface
{
function handle(Request $request, Closure $next): Response
{
$context = $request->getTraceContext();
if (!$context) {
return $next($request);
}
$traceContext = TraceContext::create($context->getTraceID(), $context->getParentID(), null, true);
$endpoint = Endpoint::create($request->getModule());
$reporter = new Http(['endpoint_url' => $context->getReporterUrl()]);
$sampler = BinarySampler::createAsAlwaysSample();
$tracing = TracingBuilder::create()
->havingLocalEndpoint($endpoint)
->havingSampler($sampler)
->havingReporter($reporter)
->build();
$tracer = $tracing->getTracer();
$span = $tracer->newChild($traceContext);
$span->setName($request->getMethod());
$span->start();
$span->tag('请求参数', serialize($request->getParams()));
$request->setTraceContext($span->getContext()->getTraceId(), $span->getContext()
->getSpanId(), $context->getReporterUrl());
$start = microtime(true);
$result = $next($request);
$end = microtime(true);
$span->tag('响应状态码code', $result->code);
$span->tag('响应提示语msg', $result->msg);
$span->tag('响应耗时', $end - $start);
$span->finish();
$tracer->flush();
return $result;
}
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Interface PackerInterface
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:10
*/
interface PackerInterface
{
function pack(Request $data):string;
function unpack(string $data);
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Class SerializeEofPacker
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:17
*/
class SerializeEofPacker implements PackerInterface
{
/**
* @var string
*/
protected $eof;
public function __construct(array $options = [])
{
$this->eof = $options['settings']['package_eof'] ?? "\r\n";
}
public function pack(Request $data): string
{
return serialize($data);
}
public function unpack(string $data)
{
return unserialize($data);
}
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Class SerializeLengthPacker
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:27
*/
class SerializeLengthPacker implements PackerInterface
{
/**
* @var string
*/
protected $type;
/**
* @var int
*/
protected $length;
protected $defaultOptions
= [
'package_length_type' => 'N',
'package_body_offset' => 4,
];
public function __construct(array $options = [])
{
$options = array_merge($this->defaultOptions, $options['settings'] ?? []);
$this->type = $options['package_length_type'];
$this->length = $options['package_body_offset'];
}
public function pack(Request $data): string
{
$data = serialize($data);
return pack($this->type, strlen($data)) . $data;
}
public function unpack(string $data)
{
$package = unpack('N', $data);
$len = $package[1];
//合并unserialize和substr,以减少内存拷贝 https://wenda.swoole.com/detail/107587
if (function_exists('swoole_substr_unserialize')) {
return swoole_substr_unserialize($data, $this->length, $len);
}
$data = substr($data, $this->length, $len);
return unserialize($data);
}
}
... ...
<?php
namespace AukeySwrpc\Register;
use SensioLabs\Consul\ServiceFactory;
use SensioLabs\Consul\Services\Agent;
use SensioLabs\Consul\Services\AgentInterface as AgentInterfaceAlias;
use SensioLabs\Consul\Services\Catalog;
use SensioLabs\Consul\Services\CatalogInterface;
use SensioLabs\Consul\Services\Health;
use AukeySwrpc\Exceptions\RpcException;
class Consul implements RegisterInterface
{
protected $sf;
protected array $options;
protected array $serviceCache
= [
'ttl' => 10,
'services' => [],
'lastUpdateTime' => 0,
];
public function __construct($uri = 'http://127.0.0.1:8500', $options = [])
{
$this->options = $options;
$this->sf = new ServiceFactory([
'base_uri' => $uri
]);
}
public function getName(): string
{
return 'Consul';
}
/**
* 注册节点
*
* @param string $module
* @param string $host
* @param $port
* @param int $weight
* @author pengjch 202439 23:17:5
*/
public function register($module, $host, $port, $weight = 1)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->registerService([
'ID' => $id,
'Name' => $module,
'Port' => $port,
'Address' => $host,
'Tags' => [
'port_' . $port,
],
'Weights' => [
'Passing' => $weight,
'Warning' => 1,
],
'Check' => [
'TCP' => $host . ':' . $port,
'Interval' => $this->options['interval'] ?? '10s',
'Timeout' => $this->options['timeout'] ?? '5s',
'DeregisterCriticalServiceAfter' => $this->options['deregisterCriticalServiceAfter'] ?? '30s',
],
]);
}
/**
* 注销节点
* http://127.0.0.1:8500/v1/agent/service/deregister/service_id
*
* @param $host
* @param $port
* @author pengjch 202439 23:16:51
*/
public function unRegister($host, $port)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->deregisterService($id);
}
/**
* 获取模块下所有的服务
*
* @param string $module
* @return array
* @author pengjch 2024310 9:44:16
*/
public function getServices(string $module): array
{
$cache = $this->serviceCache;
$ttl = $this->options['ttl'] ?? $cache['ttl'];
//本地缓存所有节点信息,避免每次请求都要从consul拉一遍数据
if ($cache['lastUpdateTime'] + $ttl < time()) {
$health = new Health();
$servers = $health->service($module)->json();
if (empty($servers)) {
return [];
}
$result = [];
foreach ($servers as $server) {
$result[] = Service::build($server['Service']['Address'], $server['Service']['Port'], $server['Service']['Weights']['Passing']);
}
$cache['service'] = $result;
$cache['lastUpdateTime'] = time();
}
return $cache['service'];
}
/**
* 随机获取一个服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:27
*/
public function getRandomService(string $module): Service
{
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
return $services[rand(0, count($services) - 1)];
}
/**
* 获取权重服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:38
*/
public function getWeightService(string $module): Service
{
$serviceArr = [];
$totalWeight = 0;
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
/** @var Service $service */
foreach ($services as $service) {
$totalWeight += $service->getWeight();
$sort[] = $service->getWeight();
$serviceArr[] = $service->toArray();
}
array_multisort($serviceArr, SORT_DESC, $sort);
$start = 0;
$rand = rand(1, $totalWeight);
foreach ($serviceArr as $service) {
if ($start + $service['weight'] >= $rand) {
return Service::build($service['host'], $service['port'], $service['weight']);
}
$start = $start + $service['weight'];
}
}
}
... ...
<?php
namespace AukeySwrpc\Register;
/**
* Interface RegisterInterface
*
* @package AukeySwrpc\Register
* @author pengjch 202439 16:23:35
*/
interface RegisterInterface
{
function getName(): string;
function register($module, $host, $port, $weight = 1);
function unRegister($host, $port);
function getServices(string $module): array;
function getRandomService(string $module): Service;
function getWeightService(string $module): Service;
}
... ...
<?php
namespace AukeySwrpc\Register;
/**
* 注册中心服务
* Class Service
*
* @package AukeySwrpc\Register
* @author pengjch 2024311 10:25:46
*/
class Service
{
protected $host;
protected $port;
protected $weight;
public function __construct($host, $port, $weight)
{
$this->host = $host;
$this->port = $port;
$this->weight = $weight;
}
public static function build($host, $port, $weight)
{
return new static($host, $port, $weight);
}
public function getHost()
{
return $this->host;
}
public function getPort()
{
return $this->port;
}
public function getWeight()
{
return $this->weight;
}
public function toArray(): array
{
return [
'host' => $this->host,
'port' => $this->port,
'weight' => $this->weight
];
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class AsyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:10:2
*/
class AsyncRequest extends Request
{
public function init()
{
$this->setSync(false);
$this->setSystem(false);
}
}
... ...
<?php
namespace AukeySwrpc\Request;
use AukeySwrpc\Tracer\TracerContext;
abstract class Request
{
protected string $module;
protected string $method;
protected array $params;
protected bool $isSync = true; //是否同步请求,默认是
protected bool $isSystem = false; //是否系统请求,默认否
protected $error;
protected ?TracerContext $traceContext;
public static function create($method, $params, ?TracerContext $traceContext = null)
{
return new static ($method, $params, $traceContext);
}
public function __construct($method, $params, ?TracerContext $traceContext = null)
{
$this->method = $method;
$this->params = $params;
$this->traceContext = $traceContext;
$this->init();
}
abstract public function init();
public function getModule(): string
{
return $this->module;
}
public function getMethod(): string
{
return $this->method;
}
public function getParams(): array
{
return $this->params;
}
public function setParams(array $params)
{
$this->params = $params;
}
public function setModule(string $name)
{
$this->module = $name;
}
public function mergeParams(array $params)
{
$this->params = array_merge($this->params, $params);
}
public function getTraceContext(): ?TracerContext
{
return $this->traceContext;
}
public function setTraceContext($traceID, $parentID, $url)
{
$this->traceContext = TracerContext::create($traceID, $parentID, $url);
}
public function setSync(bool $value)
{
$this->isSync = $value;
}
public function isSync(): bool
{
return $this->isSync;
}
public function setSystem(bool $value)
{
$this->isSystem = $value;
}
public function isSystem(): bool
{
return $this->isSystem;
}
public function getError()
{
return $this->error;
}
public function setError($err)
{
$this->error = $err;
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class SyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:9:54
*/
class SyncRequest extends Request
{
public function init()
{
$this->setSync(true);
$this->setSystem(false);
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class AsyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:10:2
*/
class SystemRequest extends Request
{
public function init()
{
$this->setSync(true);
$this->setSystem(true);
}
}
... ...
<?php
namespace AukeySwrpc;
/**
* Class Response
*
* @package AukeySwrpc
* @author pengjch 202439 11:36:9
*/
class Response
{
const RES_ERROR = 0;
const RES_SUCCESS = 1;
public string $msg;
public int $code;
public array $data;
public function __construct($code, $msg, $data)
{
$this->data = $data;
$this->code = $code;
$this->msg = $msg;
}
public static function error($msg, $code = self::RES_ERROR, $data = []): Response
{
return new static($code, $msg, $data);
}
public static function success($data = [], $msg = 'success', $code = self::RES_SUCCESS): Response
{
return new static($code, $msg, $data);
}
}
... ...
<?php
namespace AukeySwrpc;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LoggerInterface;
use AukeySwrpc\Middlewares\TraceMiddleware;
use AukeySwrpc\Packer\SerializeLengthPacker;
use AukeySwrpc\Register\RegisterInterface;
use AukeySwrpc\Middlewares\MiddlewareInterface;
use AukeySwrpc\Packer\PackerInterface;
use AukeySwrpc\Request\Request;
/**
* Class Server
*
* @package AukeySwrpc
* @author pengjch 202439 11:35:52
*/
class Server
{
use Event;
protected string $module;
protected string $host;
protected int $port;
protected int $weight = 1;
protected array $options;
protected array $defaultOptions
= [
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 81920, //协议最大长度
];
/** @var PackerInterface $packer */
protected $packer;
/** @var Service $service */
protected $service;
/** @var LoggerInterface $logger */
protected $logger;
/** @var RegisterInterface $register */
protected $register;
/** @var \Swoole\Server $server */
protected \Swoole\Server $server;
private array $middlewares;
public function __construct(
string $module,
string $host,
int $port,
array $options = [],
$mode = SWOOLE_PROCESS,
$socketType = SWOOLE_SOCK_TCP,
LoggerInterface $logger = null
) {
$this->module = $module;
$this->host = $host;
$this->port = $port;
$this->setDefaultOptions($options);
$this->setDefaultLogger($logger);
$this->setCoreMiddleware();
$this->service = new Service($this->logger);
$server = new \Swoole\Server($host, $port, $mode ?: SWOOLE_PROCESS, $socketType ?: SWOOLE_SOCK_TCP);
$server->set($this->options);
$server->on('Start', [$this, 'onStart']);
$server->on('Shutdown', [$this, 'onShutdown']);
$server->on('WorkerStart', [$this, 'onWorkerStart']);
$server->on('Connect', [$this, 'OnConnect']);
$server->on('Receive', [$this, 'OnReceive']);
$server->on('Close', [$this, 'OnClose']);
$server->on('Task', [$this, 'OnTask']);
$server->on('Finish', [$this, 'OnFinish']);
$this->server = $server;
}
/**
* 设置节点权重
*
* @param int $weight
* @return Server
* @author pengjch 2024313 10:55:39
*/
public function weight(int $weight): Server
{
$this->weight = $weight;
return $this;
}
/**
* 设置默认选项
*
* @param $options
* @author pengjch 2024311 10:35:3
*/
protected function setDefaultOptions($options)
{
if (empty($options)) {
$options = $this->defaultOptions;
}
$this->options = $options;
//请求数量超过10000重启
if (empty($this->options['max_request'])) {
$this->options['max_request'] = 10000;
}
//默认task数量
if (empty($this->options['task_worker_num'])) {
$this->options['task_worker_num'] = swoole_cpu_num() * 2;
}
//task请求数超过10000则重启
if (empty($this->options['task_max_request'])) {
$this->options['task_max_request'] = 10000;
}
//10s没有数据传输就进行检测
if (empty($this->options['tcp_keepidle'])) {
$this->options['tcp_keepidle'] = 10;
}
//3s探测一次
if (empty($this->options['tcp_keepinterval'])) {
$this->options['tcp_keepinterval'] = 3;
}
//探测的次数,超过5次后还没回包close此连接
if (empty($this->options['tcp_keepcount'])) {
$this->options['tcp_keepcount'] = 5;
}
}
/**
* 设置默认日志处理器
*
* @param LoggerInterface|null $logger
* @author pengjch 2024311 10:34:19
*/
protected function setDefaultLogger(LoggerInterface $logger = null)
{
if (empty($logger)) {
$logger = new Logger('AukeySwrpc');
$logger->pushHandler(new StreamHandler(STDOUT, Logger::DEBUG));
}
$this->logger = $logger;
}
/**
* 设置核心中间件
*
* @author pengjch 2024311 10:34:5
*/
protected function setCoreMiddleware()
{
$this->middlewares[] = TraceMiddleware::class;
}
/**
* 添加中间件,支持匿名函数和实现类
* addMiddleware
*
* @param mixed ...$middlewares
* @author pengjch 202439 11:35:11
*/
public function addMiddleware(...$middlewares)
{
foreach ($middlewares as $middleware) {
if (is_string($middleware) && class_exists($middleware)) {
$middleware = new $middleware();
}
if (!($middleware instanceof \Closure) && !($middleware instanceof MiddlewareInterface)) {
$this->logger->warning('Skip illegal Middleware.');
continue;
}
$this->middlewares[] = $middleware;
}
}
/**
* 添加服务
* addService
*
* @param $service
* @param string $prefix
* @return Server
* @author pengjch 202439 11:35:2
*/
public function addService($service, $prefix = ''): Server
{
$this->service->addInstance($service, $prefix);
return $this;
}
/**
* @param $key
* @return mixed|null
* @author pengjch 2024312 16:11:12
*/
public function getService($key)
{
return $this->service->getService($key);
}
/**
* 注册发现中心
*
* @param $register
* @return Server
* @author pengjch 202439 16:38:51
*/
public function addRegister($register): Server
{
$this->register = $register;
return $this;
}
/**
* 添加日志处理器
*
* @param $logger
* @author pengjch 202439 12:20:57
*/
public function addLogger($logger)
{
$this->logger = $logger;
}
/**
* 添加包解析器
*
* @param $packer
* @author pengjch 202439 12:45:53
*/
public function addPacker($packer)
{
$this->packer = $packer;
}
/**
* 注册服务到consul
* onWorkerStart 和 onStart 回调是在不同进程中并行执行的,不存在先后顺序
*
* @param \Swoole\Server $server
* @author pengjch 202439 23:11:10
*/
public function onStart(\Swoole\Server $server)
{
if ($this->register) {
$this->logger->info(sprintf('Register server[%s:%d] to %s.', $this->host, $this->port, $this->register->getName()));
$this->register->register($this->module, $this->host, $this->port, $this->weight);
}
}
/**
* 注销服务
* 强制 kill 进程不会回调 onShutdown
* 需要使用 kill -15 来发送 SIGTERM 信号到主进程才能按照正常的流程终止
*
* @param \Swoole\Server $server
* @author pengjch 202439 23:14:40
*/
public function onShutdown(\Swoole\Server $server)
{
if ($this->register) {
$this->logger->info(sprintf('UnRegister server[%s:%d] from register.', $this->host, $this->port));
$this->register->unRegister($this->host, $this->port);
}
}
/**
* server接收请求
*
* @param \Swoole\Server $server
* @param $fd
* @param $reactor_id
* @param $data
* @return mixed
* @author pengjch 202439 11:34:0
*/
public function onReceive(\Swoole\Server $server, $fd, $reactor_id, $data)
{
/** @var Request $request */
$request = $this->packer->unpack($data);
//系统请求
if ($request->isSystem()) {
return $server->send($fd, serialize($this->doSystemRequest($request)));
}
//同步请求
if ($request->isSync()) {
return $server->send($fd, serialize($this->doRequest($request)));
}
//异步请求
$server->task($request);
return $server->send($fd, serialize(Response::success(['result' => 'success'])));
}
/**
* 执行请求
*
* @param Request $request
* @return Response
* @author pengjch 2024313 9:37:20
*/
public function doRequest(Request $request): Response
{
try {
$handler = $this->getRequestHandler();
} catch (\ReflectionException $e) {
return Response::error($e->getMessage());
}
$response = $handler($request);
if (!($response instanceof Response)) {
$msg = 'The middleware must return the response type';
$this->logger->error($msg);
$response = Response::error($msg);
}
return $response;
}
/**
* 系统请求
*
* @param Request $request
* @return Response
* @author pengjch 2024323 10:46:55
*/
public function doSystemRequest(Request $request): Response
{
if ($request->getMethod() == 'stats') {
return Response::success(['result' => $this->server->stats()]);
} else {
return Response::error($request->getMethod() . ' is not supported');
}
}
/**
* @return mixed
* @throws \ReflectionException
* @author pengjch 2024312 16:36:52
*/
public function getRequestHandler()
{
return array_reduce(array_reverse($this->middlewares), function ($stack, $next) {
return function ($request) use ($stack, $next) {
if ($next instanceof \Closure) {
return $next($request, $stack);
} elseif (is_string($next) && class_exists($next)) {
return (new $next())->handle($request, $stack);
} else {
return $next->handle($request, $stack);
}
};
}, function ($request) {
return $this->service->call($request);
});
}
/**
* 异步处理请求
*
* @param $server
* @param $taskID
* @param $reactorID
* @param $data
* @return Response
* @author pengjch 2024313 9:40:37
*/
public function OnTask($server, $taskID, $reactorID, $data): Response
{
$this->logger->debug('AsyncTask: Start', ['taskID' => $taskID]);
return $this->doRequest($data);
}
/**
* 完成异步任务回调
*
* @param $server
* @param $taskID
* @param $data
* @author pengjch 2024313 9:49:44
*/
public function OnFinish($server, $taskID, $data)
{
$this->logger->debug('AsyncTask: Finish', ['taskID' => $taskID, 'data' => $data]);
}
/**
* OnClose
*
* @param $server
* @param $fd
* @author pengjch 202439 11:34:48
*/
public function OnClose($server, $fd)
{
$this->logger->debug('Client: Close');
}
/**
* OnConnect
*
* @param $server
* @param $fd
* @author pengjch 202439 11:34:52
*/
public function OnConnect($server, $fd)
{
$this->logger->debug('Client: Connect.');
}
/**
* start
*
* @author pengjch 202439 11:34:56
*/
public function start(): bool
{
//可用服务数量
if ($this->service->count() == 0) {
$this->logger->error('There is no service available.');
return false;
}
//默认使用固定包头+包体方式解决粘包问题
if (empty($this->packer)) {
$this->packer = new SerializeLengthPacker([
'package_length_type' => $this->options['package_length_type'] ?? 'N',
'package_body_offset' => $this->options['package_body_offset'] ?? 4,
]);
}
$this->logger->info(sprintf('Rpc server[%s:%s] start.', $this->host, $this->port));
$this->server->start();
return true;
}
}
... ...
<?php
namespace AukeySwrpc;
use Psr\Log\LoggerInterface;
use ReflectionClass;
use ReflectionMethod;
use AukeySwrpc\Request\Request;
/**
* Class Service
*
* @package AukeySwrpc
* @author pengjch 202439 11:39:41
*/
class Service
{
private array $services = [];
protected array $filers
= [
'factory',
'initTracer',
'setModule',
'setTracerUrl',
'setParams',
'setTracerContext',
'getTracerContext'
];
/** @var LoggerInterface $logger */
private $logger;
public function __construct($logger)
{
$this->logger = $logger;
}
/**
* 注册服务实例
*
* @param $obj
* @param $prefix
* @return bool
* @author pengjch 202438 13:43:21
*/
public function addInstance($obj, $prefix = ''): bool
{
if (is_string($obj)) {
$obj = new $obj();
}
if (!is_object($obj)) {
$this->logger->error('Service is not an object.', ['service' => $obj]);
return false;
}
if (!($obj instanceof LogicService)) {
$this->logger->error('The Service does not inherit LogicService', ['service' => get_class($obj)]);
return false;
}
$className = get_class($obj);
$methods = get_class_methods($obj);
foreach ($methods as $method) {
if (in_array($method, $this->filers)) {
continue;
}
if (strlen($prefix) > 0) {
$key = $prefix . '_' . $className . '_' . $method;
} else {
$key = $className . '_' . $method;
}
$this->services[$key] = $className;
$this->logger->info(sprintf('import %s => %s.', $key, $className));
}
return true;
}
/**
* 获取服务
*
* @param $key
* @return mixed|null
* @author pengjch 202438 13:43:17
*/
public function getService($key)
{
return $this->services[$key] ?? null;
}
/**
* 获取所有服务
* getServices
*
* @return array
* @author pengjch 202438 15:23:58
*/
public function getServices(): array
{
return $this->services;
}
/**
* count
*
* @return int
* @author pengjch 202439 12:56:46
*/
public function count(): int
{
return count($this->services);
}
/**
* @param $key
* @return bool
* @author pengjch 202438 14:32:50
*/
public function isExist($key): bool
{
return isset($this->services[$key]);
}
/**
* 调用服务
*
* @param Request $request
* @return Response
* @throws \ReflectionException
* @author pengjch 202439 10:17:59
*/
public function call(Request $request): Response
{
if ($err = $request->getError()) {
return Response::error($err);
}
$service = $this->getService($request->getMethod());
if (!$service) {
$this->logger->debug('service is not exist.', ['method' => $request->getMethod()]);
return Response::error('service is not exist.');
}
$methodArr = explode('_', $request->getMethod());
$methodName = array_pop($methodArr);
$reflect = new ReflectionClass($service);
$instance = $reflect->newInstanceArgs();
if (!method_exists($instance, $methodName)) {
$this->logger->debug('method is not exist.', ['method' => $request->getMethod()]);
return Response::error(sprintf('%s method[%s] is not exist.', $service, $methodName));
}
$ctx = $request->getTraceContext();
if ($ctx && method_exists($instance, 'setTracerContext')) {
$instance->setTracerUrl($ctx->getReporterUrl())->setTracerContext($ctx);
}
try {
$methodObj = new ReflectionMethod($reflect->getName(), $methodName);
$result = $methodObj->invokeArgs($instance, $request->getParams());
} catch (\Throwable $e) {
return Response::error($e->getMessage());
}
return Response::success([
'result' => $result
]);
}
}
... ...
<?php
namespace AukeySwrpc\Tracer;
/**
* 链路追踪上下文
* Class TracerContext
*
* @package AukeySwrpc\Tracer
* @author pengjch 2024311 10:21:34
*/
class TracerContext
{
protected $traceID;
protected $parentID;
protected $reporterUrl;
public function __construct($traceID, $parentID, $reporterUrl)
{
$this->traceID = $traceID;
$this->parentID = $parentID;
$this->reporterUrl = $reporterUrl;
}
public static function create($traceID, $parentID, $reporterUrl)
{
return new static($traceID, $parentID, $reporterUrl);
}
public function setTraceID($traceID)
{
$this->traceID = $traceID;
}
public function setParentID($parentID)
{
$this->parentID = $parentID;
}
public function setReporterUrl($url)
{
$this->reporterUrl = $url;
}
public function getTraceID()
{
return $this->traceID;
}
public function getParentID()
{
return $this->parentID;
}
public function getReporterUrl()
{
return $this->reporterUrl;
}
}
... ...
<?php
namespace AukeySwrpcTests;
use PHPUnit\Framework\TestCase;
/**
* Class BootTest
*
* @author wuzhc
* @internal
*/
abstract class BootTest extends TestCase
{
const PID_FILE = __DIR__ . '/AukeySwrpc.pid';
const SERVER_LOG = __DIR__ . '/AukeySwrpc.log';
const SERVER_SCRIPT = __DIR__ . '/server.sh';
public static function setUpBeforeClass(): void
{
// fwrite(STDOUT, 'Starting rpc server...' . PHP_EOL);
$cmd = 'nohup ' . self::SERVER_SCRIPT . ' > ' . self::SERVER_LOG . ' 2>&1 &';
shell_exec($cmd);
sleep(5);
self::assertFileExists(self::PID_FILE, 'Run rpc server failed: ' . $cmd . '');
$pid = file_get_contents(self::PID_FILE);
self::assertNotEmpty($pid, 'Failed to start the rpc server.');
$res = shell_exec('ps aux | grep ' . $pid . ' | wc -l');
self::assertGreaterThanOrEqual(1, intval($res), 'Failed to start the rpc server.');
// fwrite(STDOUT, 'Rpc server started successfully.' . PHP_EOL);
}
public static function tearDownAfterClass(): void
{
if (\file_exists(self::PID_FILE)) {
$pid = file_get_contents(self::PID_FILE);
\shell_exec('kill -15 ' . $pid);
if (\file_exists(self::PID_FILE)) {
\unlink(self::PID_FILE);
}
\sleep(1);
}
}
}
... ...
<?php
namespace AukeySwrpcTests;
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Register\Service;
use AukeySwrpc\Request;
use AukeySwrpc\Client;
/**
* 客户端单元测试
* php74 ../phpunit.phar tests/ClientTest.php --debug
* Class ClientTest
*
* @link http://www.phpunit.cn/manual/7.0/zh_cn/index.html
* @author pengjch 2024312 9:39:22
*/
class ClientTest extends BootTest
{
/**
* @return Client Client
* @author pengjch 2024312 14:29:28
*/
public function testClientConnect(): Client
{
$client = Client::create('School_Module', getenv('RPC_SERVER_HOST'), getenv('RPC_SERVER_PORT'));
$conn = $client->connect();
$this->assertIsBool($conn->isConnected(), 'Client connect failure.');
return $client;
}
/**
* @depends testClientConnect
* @param Client $client
* @author pengjch 2024312 14:29:20
*/
public function testClientSyncRequest($client)
{
$request = Request\SyncRequest::create('AukeySwrpcTests\services\SchoolService_getUserSchool', [1, 1]);
$res = $client->send($request);
$this->assertEquals('未来学校1', $res);
}
/**
* @depends testClientConnect
* @param $client
* @author pengjch 2024313 10:3:43
*/
public function testClientAsyncRequest($client)
{
$request = Request\AsyncRequest::create('AukeySwrpcTests\services\SchoolService_saveUserName', ['tony']);
$res = $client->send($request);
$this->assertEquals('success', $res);
sleep(3);
$this->assertFileExists('xxx.log', 'Async request failure.');
$value = file_get_contents('xxx.log');
$this->assertEquals('tony', $value);
@unlink('xxx.log');
}
}
... ...
<?php
namespace AukeySwrpcTests;
use PHPUnit\Framework\TestCase;
use AukeySwrpc\Request\SyncRequest;
/**
* Class PackerTest
* php74 ../phpunit.phar tests/ClientTest.php
*
* @package AukeySwrpcTests
* @author pengjch 2024312 16:5:14
*/
class PackerTest extends TestCase
{
/**
* 注意:Request类属性和方法发生变化时,这个测试案例就没有意义了
* @return string
* @author pengjch 2024312 14:57:32
*/
public function testSerializeLengthPack()
{
$packer = new \AukeySwrpc\Packer\SerializeLengthPacker();
$result = $packer->pack(SyncRequest::create('SchoolService_getName', [1, 1]));
$this->assertEquals('AAAAzU86MjU6IlN3cnBjXFJlcXVlc3RcU3luY1JlcXVlc3QiOjY6e3M6OToiACoAbWV0aG9kIjtzOjIxOiJTY2hvb2xTZXJ2aWNlX2dldE5hbWUiO3M6OToiACoAcGFyYW1zIjthOjI6e2k6MDtpOjE7aToxO2k6MTt9czo5OiIAKgBpc1N5bmMiO2I6MTtzOjExOiIAKgBpc1N5c3RlbSI7YjowO3M6ODoiACoAZXJyb3IiO047czoxNToiACoAdHJhY2VDb250ZXh0IjtOO30=', base64_encode($result));
return base64_encode($result);
}
/**
* @depends testSerializeLengthPack
* @author pengjch 2024312 14:57:23
*/
public function testSerializeLenghtUnpack($value)
{
$expect = SyncRequest::create('SchoolService_getName', [1, 1]);
$packer = new \AukeySwrpc\Packer\SerializeLengthPacker();
$result = $packer->unpack(base64_decode($value));
$this->assertSame(serialize($expect), serialize($result));
}
/**
* 注意:Request类属性和方法发生变化时,这个测试案例就没有意义了
* @return string
* @author pengjch 2024312 14:57:32
*/
public function testSerializeEofPack()
{
$packer = new \AukeySwrpc\Packer\SerializeEofPacker();
$result = $packer->pack(SyncRequest::create('SchoolService_getName', [1, 1]));
$this->assertEquals('TzoyNToiU3dycGNcUmVxdWVzdFxTeW5jUmVxdWVzdCI6Njp7czo5OiIAKgBtZXRob2QiO3M6MjE6IlNjaG9vbFNlcnZpY2VfZ2V0TmFtZSI7czo5OiIAKgBwYXJhbXMiO2E6Mjp7aTowO2k6MTtpOjE7aToxO31zOjk6IgAqAGlzU3luYyI7YjoxO3M6MTE6IgAqAGlzU3lzdGVtIjtiOjA7czo4OiIAKgBlcnJvciI7TjtzOjE1OiIAKgB0cmFjZUNvbnRleHQiO047fQ==', base64_encode($result));
return base64_encode($result);
}
/**
* @depends testSerializeEofPack
* @author pengjch 2024312 14:57:23
*/
public function testSerializeEofUnpack($value)
{
$expect = SyncRequest::create('SchoolService_getName', [1, 1]);
$packer = new \AukeySwrpc\Packer\SerializeEofPacker();
$result = $packer->unpack(base64_decode($value));
$this->assertSame(serialize($expect), serialize($result));
}
}
... ...
<?php
namespace AukeySwrpcTests;
use Monolog\Logger;
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Register\Service;
use AukeySwrpc\Request\Request;
use AukeySwrpc\Request\SyncRequest;
use AukeySwrpc\Server;
use AukeySwrpcTests\services\UserService;
/**
* Class ServerTest
* php74 ../phpunit.phar tests/ClientTest.php
*
* @package AukeySwrpcTests
* @author pengjch 2024312 16:5:7
*/
class ServerTest extends \PHPUnit\Framework\TestCase
{
/**
* @author pengjch 2024312 17:8:31
*/
public function testServerRegisterToConsul()
{
$res = shell_exec('netstat anp | grep ' . getenv('CONSUL_PORT') . ' | wc -l');
$this->assertGreaterThanOrEqual(1, intval($res), 'Warning: Consul not started.');
$consul = new Consul('http://' . getenv('CONSUL_HOST') . ':' . getenv('CONSUL_PORT'));
$consul->register('test_module', '127.0.0.1', 8080);
$isSuccess = false;
$services = $consul->getServices('test_module');
/** @var Service $service */
foreach ($services as $service) {
if ($service->getHost() == '127.0.0.1' && $service->getPort() == 8080) {
$isSuccess = true;
break;
}
}
$this->assertIsBool($isSuccess);
return $consul;
}
/**
* @depends testServerRegisterToConsul
* @param Consul $consul
* @author pengjch 2024312 17:12:17
*/
public function testServerUnregisterFromConsul($consul)
{
$consul->unRegister('127.0.0.1', 8080);
$isSuccess = true;
$services = $consul->getServices('test_module');
/** @var Service $service */
foreach ($services as $service) {
if ($service->getHost() == '127.0.0.1' && $service->getPort() == 8080) {
$isSuccess = false;
break;
}
}
$this->assertIsBool($isSuccess);
}
/**
* @return Server
* @author pengjch 2024312 17:8:17
*/
public function testServerAddService()
{
$logger = new Logger('swprc');
$server = new Server('School_Module', getenv('RPC_SERVER_HOST'), getenv('RPC_SERVER_PORT'), [], null, null, $logger);
$server->addService(UserService::class);
$key = UserService::class . '_getName';
$value = $server->getService($key);
$this->assertEquals(UserService::class, $value);
return $server;
}
/**
* @depends testServerAddService
* @param $server
* @author pengjch 2024312 16:40:0
*/
public function testServerAddMiddleware($server)
{
$request = SyncRequest::create('AukeySwrpcTests\services\UserService_getFavoriteFood', ['肥胖']);
$server->addMiddleware(function (Request $request, $next) {
$request->setParams(['帅气']);
return $next($request);
});
$func = $server->getRequestHandler();
$result = $func($request);
$this->assertEquals('帅气的我喜欢吃苹果', $result->data['result']);
}
}
... ...
#!/usr/bin/env /opt/php74/bin/php
<?php
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Server;
$basePath = dirname(dirname(__FILE__));
require_once $basePath . "/vendor/autoload.php";
$options = [
'enable_coroutine' => true,
'pid_file' => __DIR__ . '/AukeySwrpc.pid',
];
$server = new Server('School_Module', getenv('RPC_SERVER_HOST'), getenv('RPC_SERVER_PORT'), $options);
$server->addRegister(new Consul())
->addService(\AukeySwrpcTests\services\UserService::class)
->addService(\AukeySwrpcTests\services\SchoolService::class)
->addService(\AukeySwrpcTests\services\ClassService::class)
->start();
... ...
<?php
namespace AukeySwrpcTests\services;
use AukeySwrpc\Exceptions\RpcException;
use AukeySwrpc\LogicService;
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Request\SyncRequest;
/**
* Class ClassService
*
* @package AukeySwrpcTests\services
* @author pengjch 2024313 9:15:21
*/
class ClassService extends LogicService
{
public function getUserClass($userID = 1): string
{
$register = new Consul();
try {
$classID = 111;
$client = \AukeySwrpc\Client::createBalancer('School_Module', $register, \AukeySwrpc\Client::STRATEGY_WEIGHT);
$result = $client->send(SyncRequest::create('SchoolService_getUserSchool', [
$userID,
$classID,
], $this->getTracerContext(__FUNCTION__)));
} catch (RpcException $e) {
return $e->getMessage() . PHP_EOL;
}
return '高一2班, school:' . $result;
}
}
... ...
<?php
namespace AukeySwrpcTests\services;
use AukeySwrpc\LogicService;
/**
* Class SchoolService
*
* @package AukeySwrpcTests\services
* @author pengjch 2024313 9:15:30
*/
class SchoolService extends LogicService
{
public function getUserSchool($userID, $classID): string
{
return '未来学校' . $userID;
}
public function saveUserName($name)
{
file_put_contents('xxx.log', $name);
}
}
... ...
<?php
namespace AukeySwrpcTests\services;
use AukeySwrpc\Exceptions\RpcException;
use AukeySwrpc\LogicService;
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Request\SyncRequest;
/**
* Class UserService
*
* @package AukeySwrpcTests\services
* @author pengjch 2024313 9:15:52
*/
class UserService extends LogicService
{
/**
* @return UserService
* @author pengjch 2024311 11:32:35
*/
public static function factory()
{
return parent::factory();
}
public function getName(): string
{
$register = new Consul();
try {
$userID = 1;
$client = \AukeySwrpc\Client::createBalancer('Class_Module', $register, \AukeySwrpc\Client::STRATEGY_WEIGHT);
$result = $client->send(SyncRequest::create('ClassService_getUserClass', [$userID], $this->getTracerContext(__FUNCTION__)));
} catch (RpcException $e) {
return $e->getMessage() . PHP_EOL;
}
return 'user:wuzhc, class:' . $result;
}
public function getAge(): int
{
return 30;
}
public function getFavoriteFood($prefix)
{
return $prefix . '的我喜欢吃苹果';
}
}
... ...