作者 pengjch

dev: swrpc代码提交

... ... @@ -2,15 +2,21 @@
namespace AukeySwrpc;
use AukeySwrpc\Commands\SwrpcGenCom;
use AukeySwrpc\Commands\SwrcpTestCom;
use AukeySwrpc\Commands\SwrpcServerCom;
use Illuminate\Support\ServiceProvider;
class SwrpcProvider extends ServiceProvider
class AukeySwrpcProvider extends ServiceProvider
{
public function boot()
{
$this->loadMigrationsFrom(__DIR__ . '/database/migrations');
if ($this->app->runningInConsole()) {
$this->commands([
SwrpcServerCom::class,
SwrpcGenCom::class,
SwrcpTestCom::class,
]);
}
}
... ...
<?php
namespace AukeySwrpc;
use AukeySwrpc\Request\SyncRequest;
use AukeySwrpc\Tracer\TracerContext;
/**
* Class BaseService
*
* @package App\Clients
* @author pengjch 2024314 11:26:45
*/
class BaseService
{
protected $serviceKey;
protected $traceContext = null;
/**
* @return static
* @author 2021-03-14 11:25:18
*/
public static function factory()
{
return new static();
}
/**
* BaseService constructor.
*/
public function __construct()
{
$this->init();
}
/**
* @author pengjch 2024314 11:33:22
*/
protected function init()
{
}
/**
* @param $serviceKey
* @author pengjch 2024314 11:31:57
*/
protected function setService($serviceKey)
{
$this->serviceKey = $serviceKey;
}
/**
* 链路追踪上下文
*
* @param TracerContext|null $context
* @return $this
* @author pengjch 2024314 11:27:45
*/
public function trace(?TracerContext $context = null)
{
$this->traceContext = $context;
return $this;
}
/**
* 调用远程服务
*
* @param $method
* @param $args
* @return mixed
* @throws \Exception
* @author pengjch 2024-03-14 11:25:18
*/
protected function callRemoteService($method, $args)
{
$serviceNs = $this->getTargetServiceNamespace();
$method = $serviceNs . '_' . $method;
$client = ClientManger::getInstance($this->serviceKey);
$request = SyncRequest::create($method, $args, $this->traceContext);
return $client->send($request);
}
/**
* 调用本地服务
*
* @param $method
* @param $args
* @return string
* @author pengjch 2024-03-14 11:25:18
*/
protected function callLocalService($method, $args)
{
$serviceNs = $this->getTargetServiceNamespace();
return call_user_func_array([$serviceNs::factory(), $method], $args);
}
/**
* 获取目标调用类命名空间
*
* @return string
* @author pengjch 2024314 12:6:8
*/
protected function getTargetServiceNamespace(): string
{
$moduleName = $this->extractModuleName();
/** @var \Nwidart\Modules\Laravel\Module $module */
$module = \Nwidart\Modules\Facades\Module::find($moduleName);
$moduleNs = ltrim(str_replace([base_path(), '/'], ['', '\\'], $module->getPath()), '\\');
return $moduleNs . '\\Services\\' . $this->extractServiceName();
}
/**
* 根据调用类提前模块名称
*
* @return false|string
* @author pengjch 2024314 12:0:17
*/
protected function extractModuleName()
{
$calledClass = get_called_class();
$module = str_replace(['App\Clients\\', '\\'], ['', '_'], $calledClass);
return substr($module, 0, strrpos($module, '_'));
}
/**
* 根据调用类提前service名称
*
* @return false|string
* @author pengjch 2024314 12:0:34
*/
protected function extractServiceName()
{
$calledClass = get_called_class();
return substr($calledClass, strrpos($calledClass, '\\') + 1);
}
/**
* __call
*
* @param $method
* @param $args
* @return false|mixed
* @throws \Exception
* @author 2021-03-14 11:25:18
*/
public function __call($method, $args)
{
switch (env('SERVICE_CALL_STRATEGY', 'remote')) {
case 'remote':
return $this->callRemoteService($method, $args);
case 'local':
return $this->callLocalService($method, $args);
default:
throw new \Exception('error strategy');
}
}
}
... ...
<?php
namespace AukeySwrpc;
use Swoole\Client as SwClient;
use AukeySwrpc\Exceptions\RpcException;
use AukeySwrpc\Packer\PackerInterface;
use AukeySwrpc\Packer\SerializeLengthPacker;
use AukeySwrpc\Register\RegisterInterface;
use AukeySwrpc\Register\Service;
use AukeySwrpc\Request\Request;
/**
* Class Client
*
* @package Swrpc
* @author pengjch 202439 11:36:25
*/
class Client
{
protected $services = [];
protected $connects = [];
const STRATEGY_RANDOM = 1;
const STRATEGY_WEIGHT = 2;
protected $mode;
protected $timeout = 3;
protected array $options;
protected string $module;
protected int $strategy;
protected ?RegisterInterface $register = null;
protected ?PackerInterface $packer = null;
protected array $defaultOptions
= [
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 81920, //协议最大长度
];
/**
* Client constructor.
*
* @param string $module
* @param array $services
* @param int $mode
* @param int $timeout
* @param array $options
*/
public function __construct(string $module, array $services, $mode = SWOOLE_SOCK_TCP, $timeout = 3, $options = [])
{
$this->module = $module;
$this->services = $services;
$this->mode = $mode;
$this->timeout = $timeout;
if (empty($options)) {
$options = $this->defaultOptions;
}
$this->options = $options;
}
/**
* @param string $module
* @param string $host
* @param int $port
* @param int $mode
* @param array $options
* @return Client
* @author pengjch 2024313 18:31:17
*/
public static function create(
string $module,
string $host,
int $port,
$mode = SWOOLE_SOCK_TCP,
$timeout = 3,
$options = []
): Client {
$service = Service::build($host, $port, 1);
return new static($module, [$service], $mode, $timeout, $options);
}
/**
* @param string $module
* @param RegisterInterface $register
* @param int $strategy
* @param int $mode
* @param int $timeout
* @param array $options
* @return Client
* @author pengjch 2024313 18:31:22
*/
public static function createBalancer(
string $module,
RegisterInterface $register,
$strategy = self::STRATEGY_RANDOM,
$mode = SWOOLE_SOCK_TCP,
$timeout = 3,
$options = []
): Client {
$client = new static($module, [], $mode, $timeout, $options);
$client->strategy = $strategy;
$client->addRegister($register);
return $client;
}
/**
* @param RegisterInterface $register
* @return $this
* @author pengjch 2024313 18:27:20
*/
public function addRegister(RegisterInterface $register): Client
{
$this->register = $register;
$this->services = $this->register->getServices($this->module);
return $this;
}
/**
* @param PackerInterface $packer
* @return $this
* @author pengjch 2024313 18:27:24
*/
public function addPacker(PackerInterface $packer): Client
{
$this->packer = $packer;
return $this;
}
/**
* @return SwClient
* @throws RpcException
* @author pengjch 2024313 18:23:37
*/
public function connect(): SwClient
{
$n = count($this->services);
if ($n == 0) {
throw new RpcException('No services available');
}
/** @var Service $service */
if ($n == 1) { //单个服务节点
$service = $this->services[0];
$key = $service->getHost() . '_' . $service->getPort();
} else { //多个服务节点
$key = $this->getConnectKey();
}
if (isset($this->connects[$key]) && $this->connects[$key]->isConnected()) {
return $this->connects[$key];
}
$client = new SwClient($this->mode ?: SWOOLE_SOCK_TCP);
if (!$client->connect($service->getHost(), $service->getPort(), $this->timeout ?? 3)) {
throw new RpcException("connect failed. Error: {$client->errCode}");
}
$client->set($this->options);
$this->connects[$key] = $client;
return $this->connects[$key];
}
/**
* 发送请求
*
* @param Request $request
* @return mixed
* @throws RpcException
* @author pengjch 202439 13:35:25
*/
public function send(Request $request)
{
/** @var \Swoole\Client $conn */
$conn = $this->connect();
if (!$this->packer) {
$this->packer = new SerializeLengthPacker([
'package_length_type' => $options['package_length_type'] ?? 'N',
'package_body_offset' => $options['package_body_offset'] ?? 4,
]);
}
$request->setModule($this->module);
$conn->send($this->packer->pack($request));
/** @var Response $response */
$response = @unserialize($conn->recv());
if (!($response instanceof Response)) {
throw new RpcException('The server return type is not a Swrpc\Response');
}
if ($response->code == Response::RES_ERROR) {
throw new RpcException($response->msg);
}
return $response->data['result'] ?? null;
}
/**
* @return string
* @author pengjch 2024313 18:20:38
*/
public function getConnectKey(): string
{
/** @var Service $service */
if ($this->strategy == self::STRATEGY_RANDOM) {
$service = array_rand($this->services);
return $service->getHost() . '_' . $service->getPort();
} else {
/** @var Service $service */
foreach ($this->services as $service) {
$totalWeight += $service->getWeight();
$sort[] = $service->getWeight();
$serviceArr[] = $service->toArray();
}
array_multisort($serviceArr, SORT_DESC, $sort);
$start = 0;
$rand = rand(1, $totalWeight);
foreach ($serviceArr as $service) {
if ($start + $service['weight'] >= $rand) {
return $service['host'] . '_' . $service['port'];
}
$start = $start + $service['weight'];
}
}
}
/**
* 关闭客户端连接
*
* @return mixed
* @author pengjch 2024310 9:16:46
*/
public function close()
{
foreach ($this->connects as $connect) {
$connect->close(true);
}
}
/**
* 刷新节点服务信息
* 客户端使用长连接的情况下,需要起一个定时器来定时更新节点服务信息
*
* @author pengjch 2024313 18:24:23
*/
public function refreshServices()
{
if ($this->register) {
$this->services = $this->register->getServices($this->module);
$this->connects = [];
}
}
}
... ...
<?php
namespace AukeySwrpc;
use AukeySwrpc\Client;
use AukeySwrpc\Register\Consul;
/**
* 客户端管理器
* Class ClientManger
*
* @package App\Clients
* @author pengjch 202435 23:20:30
*/
class ClientManger
{
/** @var \Hprose\Client */
private static $clients;
private function __sleep()
{
}
private function __wakeup()
{
}
private function __construct()
{
}
/**
* @param $key
* @return Client
* @throws \Exception
* @author pengjch 2024314 12:34:8
*/
public static function getInstance($key): Client
{
if (!isset(self::$clients[$key])) {
$registerUri = env('SWRPC_REGISTER_URI');
if ($registerUri) { //从注册中心获取连接地址
self::$clients[$key] = Client::createBalancer($key, new Consul($registerUri));
} else { //直连模式
$conf = explode(':', env($key));
if (!$conf || count($conf) != 2) {
throw new \Exception('.env未配置' . $key);
}
self::$clients[$key] = Client::create($key, $conf[0], $conf[1]);
}
}
return self::$clients[$key];
}
}
... ...
<?php
namespace AukeySwrpc\Commands;
use Illuminate\Console\Command;
use Nwidart\Modules\Facades\Module;
use AukeySwrpc\LogicService;
/**
* Class SwrpcGenCom
*
* @package App\Console\Commands
* @author pengjch 2024314 12:13:7
*/
class SwrpcGenCom extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'swrpc:gen {-m|--module=}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'swrpc客户端代码生成器';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
* @throws \ReflectionException
*/
public function handle()
{
// $serviceKey = env('SWRPC_SERVER_NAME');
// if (!$serviceKey) {
// $this->error('SWRPC_SERVER_NAME未设置');
// return -1;
// }
$filters = [];
$specifyModules = $this->option('module') ? explode(',', $this->option('module')) : [];
$modules = Module::all();
print_r($modules);exit();
/** @var \Nwidart\Modules\Laravel\Module $module */
foreach ($modules as $module) {
if ($module->getName() == 'Common') {
continue;
}
//如果有指定moudle,则只有指定的moudle会生成,否则会生成全部
if (count($specifyModules) > 0 && !in_array($module->getName(), $specifyModules)) {
continue;
}
$moduleName = $module->getName();
$modulePath = str_replace('_', '/', $moduleName);
//create module dir
$basePath = base_path('app/Clients/' . $modulePath);
if (!is_dir($basePath)) {
$this->mkdir($basePath);
}
//target service namespace
$namespace = ltrim(str_replace([base_path(), '/'], ['', '\\'], $module->getPath()), '\\') . '\\Services';
//fetch file
$servicePath = $module->getPath() . '/Services';
if (!is_dir($servicePath)) {
continue;
}
$queues = scandir($servicePath);
while (count($queues) > 0) {
$file = array_shift($queues);
if ($file == '.' || $file == '..' || in_array($file, $filters)) {
continue;
}
//支持嵌套多层service目录
if (is_dir($servicePath . '/' . $file)) {
$childrenFiles = scandir($servicePath . '/' . $file);
foreach ($childrenFiles as $f) {
if ($f == '.' || $f == '..' || in_array($f, $filters)) {
continue;
}
array_push($queues, $file . '/' . $f);
}
continue;
}
$class = substr($file, 0, strrpos($file, '.'));
$funcs = $this->parseFunc($namespace, $class);
if (empty($funcs)) {
continue;
}
//generate proxy file
$proxyPath = $basePath . '/' . $file;
ob_start();
ob_implicit_flush(false);
include(base_path('app/Clients/client.template.php'));
$content = ob_get_clean();
if (!file_put_contents($proxyPath, $content)) {
$this->error($proxyPath . ' import failure.');
} else {
$this->info($proxyPath . ' import success.');
}
}
}
$this->info('完成.');
return 0;
}
/**
* @param $path
* @return bool
* @author pengjch 2024314 12:54:6
*/
protected function mkdir($path)
{
if (!is_dir($path)) {
$this->mkdir(dirname($path));
if (!mkdir($path, 0777)) {
return false;
}
}
return true;
}
/**
* parseFunc
*
* @param $namespace
* @param $class
* @return array
* @throws \ReflectionException
* @author pengjch 202435 16:3:16
*/
protected function parseFunc($namespace, $class): array
{
$serviceClass = str_replace('/', '\\', $namespace . '\\' . $class);
if (!class_exists($serviceClass)) {
$this->error($serviceClass . '类不存在');
return [];
}
try {
$rc = new \ReflectionClass($serviceClass);
} catch (\ReflectionException $e) {
$this->error($serviceClass . $e->getMessage());
return [];
}
$serviceObj = $rc->newInstance();
if (!($serviceObj instanceof LogicService)) {
$this->warn('没有继承\Swrpc\LogicService类,跳过'.get_class($serviceObj));
return [];
}
$funcs = [];
foreach ($rc->getMethods() as $method) {
if (in_array($method->getName(), [
'factory',
'initTracer',
'setModule',
'setTracerUrl',
'setParams',
'setTracerContext',
'getTracerContext'
])
) {
continue;
}
if (false === $method->isPublic()) {
continue;
}
$line = '';
if ($returnType = $method->getReturnType()) {
$line .= $returnType->getName();
}
$line .= ' ' . $method->getName() . '(';
$params = '';
foreach ($method->getParameters() as $parameter) {
$params .= '$' . $parameter->getName();
if ($parameter->isOptional() && $value = $parameter->getDefaultValue()) {
if (is_string($value)) {
$params .= '="' . $value . '"';
} else {
$params .= '=' . $value;
}
}
$params .= ',';
}
$line .= rtrim($params, ',');
$line .= ")\r\n";
$funcs[] = $line;
}
if (count($funcs) == 0) {
$this->warn('没有可用的方法', ['service' => $serviceObj->getName()]);
}
return $funcs;
}
}
... ...
<?php
namespace AukeySwrpc\Commands;
use AukeySwrpc\ClientManger;
use Illuminate\Console\Command;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Nwidart\Modules\Facades\Module;
use AukeySwrpc\Client;
use AukeySwrpc\LogicService;
use AukeySwrpc\Register\Consul;
use AukeySwrpc\Request\SystemRequest;
use AukeySwrpc\Server;
/**
* Class SwrpcServerCom
*
* @package App\Console\Commands
* @author pengjch 2024314 12:13:15
*/
class SwrpcServerCom extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'swrpc:server {action=start} {--module=}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'swrpc服务端';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
protected $pidFile = __DIR__ . '/swrpc.pid';
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$action = $this->argument('action');
if ($action == 'start') {
$this->start();
} elseif ($action == 'stop') {
$this->stop();
} elseif ($action == 'status') {
$this->status();
} elseif ($action == 'restart') {
$this->stop();
sleep(2);
$this->start();
}
return 0;
}
protected function status()
{
$pid = @file_get_contents($this->pidFile);
if (!$pid) {
$this->error('swrpc未启动');
return;
}
$res = posix_kill($pid, 0);
if (!$res) {
$this->error('swrpc未启动或已退出');
return;
}
$client = ClientManger::getInstance(env('SWRPC_SERVER_NAME'));
$records = $client->send(SystemRequest::create('stats', []));
$this->info('----------swrpc服务端状态---------');
foreach ((array)$records as $key => $value) {
$this->info($key . ': ' . $value);
}
}
protected function stop()
{
$pid = @file_get_contents($this->pidFile);
if (!$pid) {
$this->error('swrpc未启动');
return;
}
$res = posix_kill($pid, SIGTERM);
$res ? $this->info('swrpc停止成功') : $this->info('swrpc停止失败');
}
protected function start()
{
$pid = @file_get_contents($this->pidFile);
if ($pid && posix_kill($pid, 0)) {
$this->error('swrpc已启动,请不要重复启动');
return -1;
}
$name = env('SWRPC_SERVER_NAME');
if (!$name) {
$this->error('SWRPC_SERVER_NAME未设置');
return -1;
}
$host = env('SWRPC_SERVER_HOST');
if (!$host) {
$this->error('SWRPC_SERVER_HOST未设置');
return -1;
}
$port = env('SWRPC_SERVER_PORT');
if (!$port) {
$this->error('SWRPC_SERVER_PORT未设置');
return -1;
}
$logger = new Logger('swrpc');
$logger->pushHandler(new StreamHandler(STDOUT, Logger::INFO));
$server = new Server($name, $host, $port, [
'worker_num' => swoole_cpu_num(),
'pid_file' => $this->pidFile,
], SWOOLE_PROCESS, SWOOLE_SOCK_TCP, $logger);
if ($regiserUri = env('SWRPC_REGISTER_URI')) {
$server->addRegister(new Consul($regiserUri));
}
$filters = [];
$specifyModules = $this->option('module') ? explode(',', $this->option('module')) : [];
$modules = Module::all();
/** @var \Nwidart\Modules\Laravel\Module $module */
foreach ($modules as $module) {
if ($module->getName() == 'Common') {
continue;
}
//如果有指定moudle,则只有指定的moudle会生成,否则会生成全部
if (count($specifyModules) > 0 && !in_array($module->getName(), $specifyModules)) {
continue;
}
$namespace = ltrim(str_replace([base_path(), '/'], ['', '\\'], $module->getPath()), '\\') . '\\Services\\';
$servicePath = $module->getPath() . '/Services';
if (!is_dir($servicePath)) {
continue;
}
$queues = scandir($servicePath);
while (count($queues) > 0) {
$file = array_shift($queues);
if ($file == '.' || $file == '..' || in_array($file, $filters)) {
continue;
}
//支持嵌套多层service目录
if (is_dir($servicePath . '/' . $file)) {
$childrenFiles = scandir($servicePath . '/' . $file);
foreach ($childrenFiles as $f) {
if ($f == '.' || $f == '..' || in_array($f, $filters)) {
continue;
}
array_push($queues, $file . '/' . $f);
}
continue;
}
$class = substr($file, 0, strrpos($file, '.'));
$serviceObj = $this->build($namespace, $class);
if (!$serviceObj) {
continue;
}
if (!($serviceObj instanceof LogicService)) {
$this->warn('没有继承\Swrpc\LogicService类,跳过' . get_class($serviceObj));
continue;
}
$server->addService($serviceObj);
}
}
$server->start();
}
/**
* build
*
* @param $namespace
* @param $class
* @return object|null
* @author pengjch 202435 16:51:18
*/
protected function build($namespace, $class): ?object
{
$serviceClass = str_replace('/', '\\', $namespace . $class);
if (!class_exists($serviceClass)) {
$this->error($serviceClass . '类不存在');
return null;
}
try {
$rf = new \ReflectionClass($serviceClass);
} catch (\ReflectionException $e) {
$this->error($serviceClass . $e->getMessage());
return null;
}
try {
return $rf->newInstance();
} catch (\ReflectionException $e) {
$this->error($serviceClass . $e->getMessage());
return null;
}
}
}
... ...
<?php
namespace AukeySwrpc;
trait Event
{
public function OnWorkerStart(\Swoole\Server $server, int $workerId)
{
}
public function onStart(\Swoole\Server $server)
{
}
public function onShutdown(\Swoole\Server $server)
{
}
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class RequestException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class RequestException extends Exception
{
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class RpcException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class RpcException extends Exception
{
}
... ...
<?php
namespace AukeySwrpc\Exceptions;
use Exception;
/**
* Class TypeException
*
* @package AukeySwrpc\Exceptions
* @author pengjch 202439 11:38:5
*/
class TypeException extends Exception
{
}
... ...
... ... @@ -19,18 +19,18 @@ class RouteServiceProvider extends ServiceProvider{
* web路由设置
*/
protected function mapWebRoutes(){
Route::middleware('web')
->namespace('Swrpc\Http\Controllers')
->group(__DIR__ . '/Controllers/routes.php');
// Route::middleware('web')
// ->namespace('AukeySwrpc\Http\Controllers')
// ->group(__DIR__ . '/Controllers/routes.php');
}
/**
* api路由设置
*/
protected function mapApiRoutes(){
Route::prefix('api')
->middleware('api')
->namespace('Swrpc\Http\Controllers')
->group(__DIR__ . '/Controllers/routes.php');
// Route::prefix('api')
// ->middleware('api')
// ->namespace('AukeySwrpc\Http\Controllers')
// ->group(__DIR__ . '/Controllers/routes.php');
}
}
... ...
<?php
namespace AukeySwrpc;
use AukeySwrpc\Tracer\TracerContext;
use Zipkin\Endpoint;
use Zipkin\Reporters\Http;
use Zipkin\Samplers\BinarySampler;
use Zipkin\TracingBuilder;
/**
* Class LogicService
*
* @package Swrpc
* @author pengjch 2024311 11:10:18
*/
class LogicService
{
protected $params;
protected $module;
protected $tracerUrl;
protected $tracerContext;
protected $clients = [];
/**
* @return static
* @author pengjch 2024311 11:4:5
*/
public static function factory()
{
return new static();
}
/**
* 初始化链路追踪器
*
* @param $func
* @author pengjch 2024311 11:3:36
*/
public function initTracer($func)
{
$reporterUrl = $this->tracerUrl ?: 'http://127.0.0.1:9411/api/v2/spans';
$endpoint = Endpoint::create($this->module);
$reporter = new Http(['endpoint_url' => $reporterUrl]);
$sampler = BinarySampler::createAsAlwaysSample();
$tracing = TracingBuilder::create()
->havingLocalEndpoint($endpoint)
->havingSampler($sampler)
->havingReporter($reporter)
->build();
$tracer = $tracing->getTracer();
$span = $tracer->newTrace();
$span->setName($func);
$span->start();
$span->finish();
$tracer->flush();
$ctx = $span->getContext();
if ($this->tracerContext) {
$this->tracerContext->setTraceID($ctx->getTraceId());
$this->tracerContext->setParentID($ctx->getSpanId());
$this->tracerContext->setReporterUrl($reporterUrl);
} else {
$this->tracerContext = TracerContext::create($ctx->getTraceId(), $ctx->getSpanId(), $reporterUrl);
}
}
/**
* @param $context
* @return $this
* @author pengjch 2024311 11:18:43
*/
public function setTracerContext($context)
{
$this->tracerContext = $context;
return $this;
}
/**
* @param $func
* @return null
* @author pengjch 2024311 11:4:1
*/
public function getTracerContext($func)
{
if (empty($this->tracerUrl)) {
return null;
}
if (empty($this->tracerContext)) {
$this->initTracer($func);
}
return $this->tracerContext;
}
/**
* @param array $params
* @return $this
* @author pengjch 2024311 11:15:47
*/
public function setParams(array $params)
{
$this->params = $params;
return $this;
}
/**
* @param string $url
* @return static $this
* @author pengjch 2024311 11:15:35
*/
public function setTracerUrl(string $url)
{
$this->tracerUrl = $url;
return $this;
}
/**
* @param string $name
* @return $this
* @author pengjch 2024311 11:59:4
*/
public function setModule(string $name)
{
$this->module = $name;
return $this;
}
}
... ...
<?php
namespace AukeySwrpc\Middlewares;
use Closure;
use AukeySwrpc\Request\Request;
use AukeySwrpc\Response;
/**
* Interface MiddlewareInterface
*
* @package AukeySwrpc\Middlewares
* @author pengjch 202439 11:37:39
*/
interface MiddlewareInterface
{
function handle(Request $request, Closure $next): Response;
}
... ...
<?php
namespace AukeySwrpc\Middlewares;
use Closure;
use AukeySwrpc\Request\Request;
use AukeySwrpc\Response;
use Zipkin\Endpoint;
use Zipkin\Propagation\TraceContext;
use Zipkin\Reporters\Http;
use Zipkin\Samplers\BinarySampler;
use Zipkin\TracingBuilder;
/**
* 链路追踪中间件
* Class TraceMiddleware
*
* @package AukeySwrpc\Middlewares
* @author pengjch 2024310 16:41:6
*/
class TraceMiddleware implements MiddlewareInterface
{
function handle(Request $request, Closure $next): Response
{
$context = $request->getTraceContext();
if (!$context) {
return $next($request);
}
$traceContext = TraceContext::create($context->getTraceID(), $context->getParentID(), null, true);
$endpoint = Endpoint::create($request->getModule());
$reporter = new Http(['endpoint_url' => $context->getReporterUrl()]);
$sampler = BinarySampler::createAsAlwaysSample();
$tracing = TracingBuilder::create()
->havingLocalEndpoint($endpoint)
->havingSampler($sampler)
->havingReporter($reporter)
->build();
$tracer = $tracing->getTracer();
$span = $tracer->newChild($traceContext);
$span->setName($request->getMethod());
$span->start();
$span->tag('请求参数', serialize($request->getParams()));
$request->setTraceContext($span->getContext()->getTraceId(), $span->getContext()
->getSpanId(), $context->getReporterUrl());
$start = microtime(true);
$result = $next($request);
$end = microtime(true);
$span->tag('响应状态码code', $result->code);
$span->tag('响应提示语msg', $result->msg);
$span->tag('响应耗时', $end - $start);
$span->finish();
$tracer->flush();
return $result;
}
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Interface PackerInterface
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:10
*/
interface PackerInterface
{
function pack(Request $data):string;
function unpack(string $data);
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Class SerializeEofPacker
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:17
*/
class SerializeEofPacker implements PackerInterface
{
/**
* @var string
*/
protected $eof;
public function __construct(array $options = [])
{
$this->eof = $options['settings']['package_eof'] ?? "\r\n";
}
public function pack(Request $data): string
{
return serialize($data);
}
public function unpack(string $data)
{
return unserialize($data);
}
}
... ...
<?php
namespace AukeySwrpc\Packer;
use AukeySwrpc\Request\Request;
/**
* Class SerializeLengthPacker
*
* @package AukeySwrpc\Packer
* @author pengjch 202439 11:37:27
*/
class SerializeLengthPacker implements PackerInterface
{
/**
* @var string
*/
protected $type;
/**
* @var int
*/
protected $length;
protected $defaultOptions
= [
'package_length_type' => 'N',
'package_body_offset' => 4,
];
public function __construct(array $options = [])
{
$options = array_merge($this->defaultOptions, $options['settings'] ?? []);
$this->type = $options['package_length_type'];
$this->length = $options['package_body_offset'];
}
public function pack(Request $data): string
{
$data = serialize($data);
return pack($this->type, strlen($data)) . $data;
}
public function unpack(string $data)
{
$package = unpack('N', $data);
$len = $package[1];
//合并unserialize和substr,以减少内存拷贝 https://wenda.swoole.com/detail/107587
if (function_exists('swoole_substr_unserialize')) {
return swoole_substr_unserialize($data, $this->length, $len);
}
$data = substr($data, $this->length, $len);
return unserialize($data);
}
}
... ...
<?php
namespace AukeySwrpc\Register;
use SensioLabs\Consul\ServiceFactory;
use SensioLabs\Consul\Services\Agent;
use SensioLabs\Consul\Services\AgentInterface as AgentInterfaceAlias;
use SensioLabs\Consul\Services\Catalog;
use SensioLabs\Consul\Services\CatalogInterface;
use SensioLabs\Consul\Services\Health;
use AukeySwrpc\Exceptions\RpcException;
class Consul implements RegisterInterface
{
protected $sf;
protected array $options;
protected array $serviceCache
= [
'ttl' => 10,
'services' => [],
'lastUpdateTime' => 0,
];
public function __construct($uri = 'http://127.0.0.1:8500', $options = [])
{
$this->options = $options;
$this->sf = new ServiceFactory([
'base_uri' => $uri
]);
}
public function getName(): string
{
return 'Consul';
}
/**
* 注册节点
*
* @param string $module
* @param string $host
* @param $port
* @param int $weight
* @author pengjch 202439 23:17:5
*/
public function register($module, $host, $port, $weight = 1)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->registerService([
'ID' => $id,
'Name' => $module,
'Port' => $port,
'Address' => $host,
'Tags' => [
'port_' . $port,
],
'Weights' => [
'Passing' => $weight,
'Warning' => 1,
],
'Check' => [
'TCP' => $host . ':' . $port,
'Interval' => $this->options['interval'] ?? '10s',
'Timeout' => $this->options['timeout'] ?? '5s',
'DeregisterCriticalServiceAfter' => $this->options['deregisterCriticalServiceAfter'] ?? '30s',
],
]);
}
/**
* 注销节点
* http://127.0.0.1:8500/v1/agent/service/deregister/service_id
*
* @param $host
* @param $port
* @author pengjch 202439 23:16:51
*/
public function unRegister($host, $port)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->deregisterService($id);
}
/**
* 获取模块下所有的服务
*
* @param string $module
* @return array
* @author pengjch 2024310 9:44:16
*/
public function getServices(string $module): array
{
$cache = $this->serviceCache;
$ttl = $this->options['ttl'] ?? $cache['ttl'];
//本地缓存所有节点信息,避免每次请求都要从consul拉一遍数据
if ($cache['lastUpdateTime'] + $ttl < time()) {
$health = new Health();
$servers = $health->service($module)->json();
if (empty($servers)) {
return [];
}
$result = [];
foreach ($servers as $server) {
$result[] = Service::build($server['Service']['Address'], $server['Service']['Port'], $server['Service']['Weights']['Passing']);
}
$cache['service'] = $result;
$cache['lastUpdateTime'] = time();
}
return $cache['service'];
}
/**
* 随机获取一个服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:27
*/
public function getRandomService(string $module): Service
{
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
return $services[rand(0, count($services) - 1)];
}
/**
* 获取权重服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:38
*/
public function getWeightService(string $module): Service
{
$serviceArr = [];
$totalWeight = 0;
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
/** @var Service $service */
foreach ($services as $service) {
$totalWeight += $service->getWeight();
$sort[] = $service->getWeight();
$serviceArr[] = $service->toArray();
}
array_multisort($serviceArr, SORT_DESC, $sort);
$start = 0;
$rand = rand(1, $totalWeight);
foreach ($serviceArr as $service) {
if ($start + $service['weight'] >= $rand) {
return Service::build($service['host'], $service['port'], $service['weight']);
}
$start = $start + $service['weight'];
}
}
}
... ...
<?php
namespace AukeySwrpc\Register;
/**
* Interface RegisterInterface
*
* @package AukeySwrpc\Register
* @author pengjch 202439 16:23:35
*/
interface RegisterInterface
{
function getName(): string;
function register($module, $host, $port, $weight = 1);
function unRegister($host, $port);
function getServices(string $module): array;
function getRandomService(string $module): Service;
function getWeightService(string $module): Service;
}
... ...
<?php
namespace AukeySwrpc\Register;
/**
* 注册中心服务
* Class Service
*
* @package AukeySwrpc\Register
* @author pengjch 2024311 10:25:46
*/
class Service
{
protected $host;
protected $port;
protected $weight;
public function __construct($host, $port, $weight)
{
$this->host = $host;
$this->port = $port;
$this->weight = $weight;
}
public static function build($host, $port, $weight)
{
return new static($host, $port, $weight);
}
public function getHost()
{
return $this->host;
}
public function getPort()
{
return $this->port;
}
public function getWeight()
{
return $this->weight;
}
public function toArray(): array
{
return [
'host' => $this->host,
'port' => $this->port,
'weight' => $this->weight
];
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class AsyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:10:2
*/
class AsyncRequest extends Request
{
public function init()
{
$this->setSync(false);
$this->setSystem(false);
}
}
... ...
<?php
namespace AukeySwrpc\Request;
use AukeySwrpc\Tracer\TracerContext;
abstract class Request
{
protected string $module;
protected string $method;
protected array $params;
protected bool $isSync = true; //是否同步请求,默认是
protected bool $isSystem = false; //是否系统请求,默认否
protected $error;
protected ?TracerContext $traceContext;
public static function create($method, $params, ?TracerContext $traceContext = null)
{
return new static ($method, $params, $traceContext);
}
public function __construct($method, $params, ?TracerContext $traceContext = null)
{
$this->method = $method;
$this->params = $params;
$this->traceContext = $traceContext;
$this->init();
}
abstract public function init();
public function getModule(): string
{
return $this->module;
}
public function getMethod(): string
{
return $this->method;
}
public function getParams(): array
{
return $this->params;
}
public function setParams(array $params)
{
$this->params = $params;
}
public function setModule(string $name)
{
$this->module = $name;
}
public function mergeParams(array $params)
{
$this->params = array_merge($this->params, $params);
}
public function getTraceContext(): ?TracerContext
{
return $this->traceContext;
}
public function setTraceContext($traceID, $parentID, $url)
{
$this->traceContext = TracerContext::create($traceID, $parentID, $url);
}
public function setSync(bool $value)
{
$this->isSync = $value;
}
public function isSync(): bool
{
return $this->isSync;
}
public function setSystem(bool $value)
{
$this->isSystem = $value;
}
public function isSystem(): bool
{
return $this->isSystem;
}
public function getError()
{
return $this->error;
}
public function setError($err)
{
$this->error = $err;
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class SyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:9:54
*/
class SyncRequest extends Request
{
public function init()
{
$this->setSync(true);
$this->setSystem(false);
}
}
... ...
<?php
namespace AukeySwrpc\Request;
/**
* Class AsyncRequest
*
* @package AukeySwrpc\Request
* @author pengjch 2024313 9:10:2
*/
class SystemRequest extends Request
{
public function init()
{
$this->setSync(true);
$this->setSystem(true);
}
}
... ...
<?php
namespace AukeySwrpc;
/**
* Class Response
*
* @package Swrpc
* @author pengjch 202439 11:36:9
*/
class Response
{
const RES_ERROR = 0;
const RES_SUCCESS = 1;
public string $msg;
public int $code;
public array $data;
public function __construct($code, $msg, $data)
{
$this->data = $data;
$this->code = $code;
$this->msg = $msg;
}
public static function error($msg, $code = self::RES_ERROR, $data = []): Response
{
return new static($code, $msg, $data);
}
public static function success($data = [], $msg = 'success', $code = self::RES_SUCCESS): Response
{
return new static($code, $msg, $data);
}
}
... ...
<?php
namespace AukeySwrpc;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LoggerInterface;
use AukeySwrpc\Middlewares\TraceMiddleware;
use AukeySwrpc\Packer\SerializeLengthPacker;
use AukeySwrpc\Register\RegisterInterface;
use AukeySwrpc\Middlewares\MiddlewareInterface;
use AukeySwrpc\Packer\PackerInterface;
use AukeySwrpc\Request\Request;
/**
* Class Server
*
* @package Swrpc
* @author pengjch 202439 11:35:52
*/
class Server
{
use Event;
protected string $module;
protected string $host;
protected int $port;
protected int $weight = 1;
protected array $options;
protected array $defaultOptions
= [
'open_length_check' => true,
'package_length_type' => 'N',
'package_length_offset' => 0, //第N个字节是包长度的值
'package_body_offset' => 4, //第几个字节开始计算长度
'package_max_length' => 81920, //协议最大长度
];
/** @var PackerInterface $packer */
protected $packer;
/** @var Service $service */
protected $service;
/** @var LoggerInterface $logger */
protected $logger;
/** @var RegisterInterface $register */
protected $register;
/** @var \Swoole\Server $server */
protected \Swoole\Server $server;
private array $middlewares;
public function __construct(
string $module,
string $host,
int $port,
array $options = [],
$mode = SWOOLE_PROCESS,
$socketType = SWOOLE_SOCK_TCP,
LoggerInterface $logger = null
) {
$this->module = $module;
$this->host = $host;
$this->port = $port;
$this->setDefaultOptions($options);
$this->setDefaultLogger($logger);
$this->setCoreMiddleware();
$this->service = new Service($this->logger);
$server = new \Swoole\Server($host, $port, $mode ?: SWOOLE_PROCESS, $socketType ?: SWOOLE_SOCK_TCP);
$server->set($this->options);
$server->on('Start', [$this, 'onStart']);
$server->on('Shutdown', [$this, 'onShutdown']);
$server->on('WorkerStart', [$this, 'onWorkerStart']);
$server->on('Connect', [$this, 'OnConnect']);
$server->on('Receive', [$this, 'OnReceive']);
$server->on('Close', [$this, 'OnClose']);
$server->on('Task', [$this, 'OnTask']);
$server->on('Finish', [$this, 'OnFinish']);
$this->server = $server;
}
/**
* 设置节点权重
*
* @param int $weight
* @return Server
* @author pengjch 2024313 10:55:39
*/
public function weight(int $weight): Server
{
$this->weight = $weight;
return $this;
}
/**
* 设置默认选项
*
* @param $options
* @author pengjch 2024311 10:35:3
*/
protected function setDefaultOptions($options)
{
if (empty($options)) {
$options = $this->defaultOptions;
}
$this->options = $options;
//请求数量超过10000重启
if (empty($this->options['max_request'])) {
$this->options['max_request'] = 10000;
}
//默认task数量
if (empty($this->options['task_worker_num'])) {
$this->options['task_worker_num'] = swoole_cpu_num() * 2;
}
//task请求数超过10000则重启
if (empty($this->options['task_max_request'])) {
$this->options['task_max_request'] = 10000;
}
//10s没有数据传输就进行检测
if (empty($this->options['tcp_keepidle'])) {
$this->options['tcp_keepidle'] = 10;
}
//3s探测一次
if (empty($this->options['tcp_keepinterval'])) {
$this->options['tcp_keepinterval'] = 3;
}
//探测的次数,超过5次后还没回包close此连接
if (empty($this->options['tcp_keepcount'])) {
$this->options['tcp_keepcount'] = 5;
}
}
/**
* 设置默认日志处理器
*
* @param LoggerInterface|null $logger
* @author pengjch 2024311 10:34:19
*/
protected function setDefaultLogger(LoggerInterface $logger = null)
{
if (empty($logger)) {
$logger = new Logger('swrpc');
$logger->pushHandler(new StreamHandler(STDOUT, Logger::DEBUG));
}
$this->logger = $logger;
}
/**
* 设置核心中间件
*
* @author pengjch 2024311 10:34:5
*/
protected function setCoreMiddleware()
{
$this->middlewares[] = TraceMiddleware::class;
}
/**
* 添加中间件,支持匿名函数和实现类
* addMiddleware
*
* @param mixed ...$middlewares
* @author pengjch 202439 11:35:11
*/
public function addMiddleware(...$middlewares)
{
foreach ($middlewares as $middleware) {
if (is_string($middleware) && class_exists($middleware)) {
$middleware = new $middleware();
}
if (!($middleware instanceof \Closure) && !($middleware instanceof MiddlewareInterface)) {
$this->logger->warning('Skip illegal Middleware.');
continue;
}
$this->middlewares[] = $middleware;
}
}
/**
* 添加服务
* addService
*
* @param $service
* @param string $prefix
* @return Server
* @author pengjch 202439 11:35:2
*/
public function addService($service, $prefix = ''): Server
{
$this->service->addInstance($service, $prefix);
return $this;
}
/**
* @param $key
* @return mixed|null
* @author pengjch 2024312 16:11:12
*/
public function getService($key)
{
return $this->service->getService($key);
}
/**
* 注册发现中心
*
* @param $register
* @return Server
* @author pengjch 202439 16:38:51
*/
public function addRegister($register): Server
{
$this->register = $register;
return $this;
}
/**
* 添加日志处理器
*
* @param $logger
* @author pengjch 202439 12:20:57
*/
public function addLogger($logger)
{
$this->logger = $logger;
}
/**
* 添加包解析器
*
* @param $packer
* @author pengjch 202439 12:45:53
*/
public function addPacker($packer)
{
$this->packer = $packer;
}
/**
* 注册服务到consul
* onWorkerStart 和 onStart 回调是在不同进程中并行执行的,不存在先后顺序
*
* @param \Swoole\Server $server
* @author pengjch 202439 23:11:10
*/
public function onStart(\Swoole\Server $server)
{
if ($this->register) {
$this->logger->info(sprintf('Register server[%s:%d] to %s.', $this->host, $this->port, $this->register->getName()));
$this->register->register($this->module, $this->host, $this->port, $this->weight);
}
}
/**
* 注销服务
* 强制 kill 进程不会回调 onShutdown
* 需要使用 kill -15 来发送 SIGTERM 信号到主进程才能按照正常的流程终止
*
* @param \Swoole\Server $server
* @author pengjch 202439 23:14:40
*/
public function onShutdown(\Swoole\Server $server)
{
if ($this->register) {
$this->logger->info(sprintf('UnRegister server[%s:%d] from register.', $this->host, $this->port));
$this->register->unRegister($this->host, $this->port);
}
}
/**
* server接收请求
*
* @param \Swoole\Server $server
* @param $fd
* @param $reactor_id
* @param $data
* @return mixed
* @author pengjch 202439 11:34:0
*/
public function onReceive(\Swoole\Server $server, $fd, $reactor_id, $data)
{
/** @var Request $request */
$request = $this->packer->unpack($data);
//系统请求
if ($request->isSystem()) {
return $server->send($fd, serialize($this->doSystemRequest($request)));
}
//同步请求
if ($request->isSync()) {
return $server->send($fd, serialize($this->doRequest($request)));
}
//异步请求
$server->task($request);
return $server->send($fd, serialize(Response::success(['result' => 'success'])));
}
/**
* 执行请求
*
* @param Request $request
* @return Response
* @author pengjch 2024313 9:37:20
*/
public function doRequest(Request $request): Response
{
try {
$handler = $this->getRequestHandler();
} catch (\ReflectionException $e) {
return Response::error($e->getMessage());
}
$response = $handler($request);
if (!($response instanceof Response)) {
$msg = 'The middleware must return the response type';
$this->logger->error($msg);
$response = Response::error($msg);
}
return $response;
}
/**
* 系统请求
*
* @param Request $request
* @return Response
* @author pengjch 2024323 10:46:55
*/
public function doSystemRequest(Request $request): Response
{
if ($request->getMethod() == 'stats') {
return Response::success(['result' => $this->server->stats()]);
} else {
return Response::error($request->getMethod() . ' is not supported');
}
}
/**
* @return mixed
* @throws \ReflectionException
* @author pengjch 2024312 16:36:52
*/
public function getRequestHandler()
{
return array_reduce(array_reverse($this->middlewares), function ($stack, $next) {
return function ($request) use ($stack, $next) {
if ($next instanceof \Closure) {
return $next($request, $stack);
} elseif (is_string($next) && class_exists($next)) {
return (new $next())->handle($request, $stack);
} else {
return $next->handle($request, $stack);
}
};
}, function ($request) {
return $this->service->call($request);
});
}
/**
* 异步处理请求
*
* @param $server
* @param $taskID
* @param $reactorID
* @param $data
* @return Response
* @author pengjch 2024313 9:40:37
*/
public function OnTask($server, $taskID, $reactorID, $data): Response
{
$this->logger->debug('AsyncTask: Start', ['taskID' => $taskID]);
return $this->doRequest($data);
}
/**
* 完成异步任务回调
*
* @param $server
* @param $taskID
* @param $data
* @author pengjch 2024313 9:49:44
*/
public function OnFinish($server, $taskID, $data)
{
$this->logger->debug('AsyncTask: Finish', ['taskID' => $taskID, 'data' => $data]);
}
/**
* OnClose
*
* @param $server
* @param $fd
* @author pengjch 202439 11:34:48
*/
public function OnClose($server, $fd)
{
$this->logger->debug('Client: Close');
}
/**
* OnConnect
*
* @param $server
* @param $fd
* @author pengjch 202439 11:34:52
*/
public function OnConnect($server, $fd)
{
$this->logger->debug('Client: Connect.');
}
/**
* start
*
* @author pengjch 202439 11:34:56
*/
public function start(): bool
{
//可用服务数量
if ($this->service->count() == 0) {
$this->logger->error('There is no service available.');
return false;
}
//默认使用固定包头+包体方式解决粘包问题
if (empty($this->packer)) {
$this->packer = new SerializeLengthPacker([
'package_length_type' => $this->options['package_length_type'] ?? 'N',
'package_body_offset' => $this->options['package_body_offset'] ?? 4,
]);
}
$this->logger->info(sprintf('Rpc server[%s:%s] start.', $this->host, $this->port));
$this->server->start();
return true;
}
}
... ...
<?php
namespace AukeySwrpc;
use Psr\Log\LoggerInterface;
use ReflectionClass;
use ReflectionMethod;
use AukeySwrpc\Request\Request;
/**
* Class Service
*
* @package Swrpc
* @author pengjch 202439 11:39:41
*/
class Service
{
private array $services = [];
protected array $filers
= [
'factory',
'initTracer',
'setModule',
'setTracerUrl',
'setParams',
'setTracerContext',
'getTracerContext'
];
/** @var LoggerInterface $logger */
private $logger;
public function __construct($logger)
{
$this->logger = $logger;
}
/**
* 注册服务实例
*
* @param $obj
* @param $prefix
* @return bool
* @author pengjch 202438 13:43:21
*/
public function addInstance($obj, $prefix = ''): bool
{
if (is_string($obj)) {
$obj = new $obj();
}
if (!is_object($obj)) {
$this->logger->error('Service is not an object.', ['service' => $obj]);
return false;
}
if (!($obj instanceof LogicService)) {
$this->logger->error('The Service does not inherit LogicService', ['service' => get_class($obj)]);
return false;
}
$className = get_class($obj);
$methods = get_class_methods($obj);
foreach ($methods as $method) {
if (in_array($method, $this->filers)) {
continue;
}
if (strlen($prefix) > 0) {
$key = $prefix . '_' . $className . '_' . $method;
} else {
$key = $className . '_' . $method;
}
$this->services[$key] = $className;
$this->logger->info(sprintf('import %s => %s.', $key, $className));
}
return true;
}
/**
* 获取服务
*
* @param $key
* @return mixed|null
* @author pengjch 202438 13:43:17
*/
public function getService($key)
{
return $this->services[$key] ?? null;
}
/**
* 获取所有服务
* getServices
*
* @return array
* @author pengjch 202438 15:23:58
*/
public function getServices(): array
{
return $this->services;
}
/**
* count
*
* @return int
* @author pengjch 202439 12:56:46
*/
public function count(): int
{
return count($this->services);
}
/**
* @param $key
* @return bool
* @author pengjch 202438 14:32:50
*/
public function isExist($key): bool
{
return isset($this->services[$key]);
}
/**
* 调用服务
*
* @param Request $request
* @return Response
* @throws \ReflectionException
* @author pengjch 202439 10:17:59
*/
public function call(Request $request): Response
{
if ($err = $request->getError()) {
return Response::error($err);
}
$service = $this->getService($request->getMethod());
if (!$service) {
$this->logger->debug('service is not exist.', ['method' => $request->getMethod()]);
return Response::error('service is not exist.');
}
$methodArr = explode('_', $request->getMethod());
$methodName = array_pop($methodArr);
$reflect = new ReflectionClass($service);
$instance = $reflect->newInstanceArgs();
if (!method_exists($instance, $methodName)) {
$this->logger->debug('method is not exist.', ['method' => $request->getMethod()]);
return Response::error(sprintf('%s method[%s] is not exist.', $service, $methodName));
}
$ctx = $request->getTraceContext();
if ($ctx && method_exists($instance, 'setTracerContext')) {
$instance->setTracerUrl($ctx->getReporterUrl())->setTracerContext($ctx);
}
try {
$methodObj = new ReflectionMethod($reflect->getName(), $methodName);
$result = $methodObj->invokeArgs($instance, $request->getParams());
} catch (\Throwable $e) {
return Response::error($e->getMessage());
}
return Response::success([
'result' => $result
]);
}
}
... ...
<?php
namespace AukeySwrpc\Tracer;
/**
* 链路追踪上下文
* Class TracerContext
*
* @package AukeySwrpc\Tracer
* @author pengjch 2024311 10:21:34
*/
class TracerContext
{
protected $traceID;
protected $parentID;
protected $reporterUrl;
public function __construct($traceID, $parentID, $reporterUrl)
{
$this->traceID = $traceID;
$this->parentID = $parentID;
$this->reporterUrl = $reporterUrl;
}
public static function create($traceID, $parentID, $reporterUrl)
{
return new static($traceID, $parentID, $reporterUrl);
}
public function setTraceID($traceID)
{
$this->traceID = $traceID;
}
public function setParentID($parentID)
{
$this->parentID = $parentID;
}
public function setReporterUrl($url)
{
$this->reporterUrl = $url;
}
public function getTraceID()
{
return $this->traceID;
}
public function getParentID()
{
return $this->parentID;
}
public function getReporterUrl()
{
return $this->reporterUrl;
}
}
... ...