服务发现

@注册服务

file:app/Process/Discovery.php

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
* 自定义子进程 执行入口
* @param Process $process
*/
public function run(Process $process)
{
swoole_set_process_name(sprintf('php-im-cloud discovery process (%s)',ROOT));
$registerStatus = false;
while(!$registerStatus){
$registerStatus = provider()->select()->registerService();
if(!$registerStatus){
CLog::error("consul register false sleep 1 sec to reregiseter");
sleep(1);
}
}
$config = config("discovery");
$discovery = $config["consul"]["discovery"]["name"];
while (true){
$services = provider()->select()->getServiceList($discovery);
if(empty($services)){
Log::error("not find any instance node:$discovery");
LogicClient::updateService([]);
goto SLEEP;
}
LogicClient::updateService($services);
SLEEP:
sleep(5);
}
}

  • 1.注册服务 注册失败后重试
  • 2.发现服务 进行更新本地服务列表

@更新服务

发现服务到频率越高表示同步到时间约精确,这里会有一个问题,php到array数组是非线程安全更新和读取都在进行,并发是会很大几率出现错误,单纯都加锁反而会降低性能

swoole table 应运而生,底层是原子操作,多进程共享变量,完整的解决来问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Class LogicClient
* @Bean()
* @package App\Lib
*/
class LogicClient
{
/**
* servicelist
* @var MemoryTable
* [
* ip => [addr => ip]
* "127.0.0.1:9500" => ["Address" => "127.0.0.1","Port" => "9500"]
* ]
*/
public static $table = null;

/**
* LogicClient constructor.
*/
public function __construct()
{
$memorySize = (int)env("MEMORY_TABLE",1000);
$column = [
"Address" => [Type::String,20],
"Port" => [Type::String,10],
];
self::$table = Table::create($memorySize,$column);
}

/**
* 返回一个可用的grpc 客户端 和logic 节点进行交互
* @return mixed|null
*/
public static function getLogicClient(){
if(self::$table->count() == 0){
Log::error("not logic node find");
return false;
}

$node = \bean(RandomBalancer::class)->select(self::$table->getKeys());
return $node;
}

/**
* automic operation insert|update|del
* @param array $server
*/
public static function updateService(array $server)
{
//insert if not exist | update if not equal
$serverList = [];
foreach ($server as $ser) {
$addr = $ser["Address"].":".$ser["Port"];
$serverList[] = $addr;
if(!self::$table->exist($addr))
self::$table->set($addr,$ser);
}
//del not exist
foreach (self::$table as $k => $ser) {
if (!in_array($k, $serverList)) {
self::$table->del($k);
}
}
}
}
  • table 内存表的创建使用原生封装的库core/table
  • 需要加上@bean()注解,因为内存表需要在swoole启动之前创建,也就是注解扫描阶段就需要建立