Consul.php
4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<?php
namespace AukeySwrpc\Register;
use SensioLabs\Consul\ServiceFactory;
use SensioLabs\Consul\Services\Agent;
use SensioLabs\Consul\Services\AgentInterface as AgentInterfaceAlias;
use SensioLabs\Consul\Services\Catalog;
use SensioLabs\Consul\Services\CatalogInterface;
use SensioLabs\Consul\Services\Health;
use AukeySwrpc\Exceptions\RpcException;
class Consul implements RegisterInterface
{
protected $sf;
protected array $options;
protected array $serviceCache
= [
'ttl' => 10,
'services' => [],
'lastUpdateTime' => 0,
];
public function __construct($uri = 'http://127.0.0.1:8500', $options = [])
{
$this->options = $options;
$this->sf = new ServiceFactory([
'base_uri' => $uri
]);
}
public function getName(): string
{
return 'Consul';
}
/**
* 注册节点
*
* @param string $module
* @param string $host
* @param $port
* @param int $weight
* @author pengjch 202439 23:17:5
*/
public function register($module, $host, $port, $weight = 1)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->registerService([
'ID' => $id,
'Name' => $module,
'Port' => $port,
'Address' => $host,
'Tags' => [
'port_' . $port,
],
'Weights' => [
'Passing' => $weight,
'Warning' => 1,
],
'Check' => [
'TCP' => $host . ':' . $port,
'Interval' => $this->options['interval'] ?? '10s',
'Timeout' => $this->options['timeout'] ?? '5s',
'DeregisterCriticalServiceAfter' => $this->options['deregisterCriticalServiceAfter'] ?? '30s',
],
]);
}
/**
* 注销节点
* http://127.0.0.1:8500/v1/agent/service/deregister/service_id
*
* @param $host
* @param $port
* @author pengjch 202439 23:16:51
*/
public function unRegister($host, $port)
{
$id = $host . '_' . $port;
/** @var Agent $agent */
$agent = $this->sf->get(AgentInterfaceAlias::class);
$agent->deregisterService($id);
}
/**
* 获取模块下所有的服务
*
* @param string $module
* @return array
* @author pengjch 2024310 9:44:16
*/
public function getServices(string $module): array
{
$cache = $this->serviceCache;
$ttl = $this->options['ttl'] ?? $cache['ttl'];
//本地缓存所有节点信息,避免每次请求都要从consul拉一遍数据
if ($cache['lastUpdateTime'] + $ttl < time()) {
$health = new Health();
$servers = $health->service($module)->json();
if (empty($servers)) {
return [];
}
$result = [];
foreach ($servers as $server) {
$result[] = Service::build($server['Service']['Address'], $server['Service']['Port'], $server['Service']['Weights']['Passing']);
}
$cache['service'] = $result;
$cache['lastUpdateTime'] = time();
}
return $cache['service'];
}
/**
* 随机获取一个服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:27
*/
public function getRandomService(string $module): Service
{
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
return $services[rand(0, count($services) - 1)];
}
/**
* 获取权重服务
*
* @param string $module
* @return Service
* @author pengjch 2024310 9:44:38
*/
public function getWeightService(string $module): Service
{
$serviceArr = [];
$totalWeight = 0;
$services = $this->getServices($module);
if (!$services) {
throw new RpcException('It has not register module');
}
/** @var Service $service */
foreach ($services as $service) {
$totalWeight += $service->getWeight();
$sort[] = $service->getWeight();
$serviceArr[] = $service->toArray();
}
array_multisort($serviceArr, SORT_DESC, $sort);
$start = 0;
$rand = rand(1, $totalWeight);
foreach ($serviceArr as $service) {
if ($start + $service['weight'] >= $rand) {
return Service::build($service['host'], $service['port'], $service['weight']);
}
$start = $start + $service['weight'];
}
}
}