推送实现

推送流程

总的来说logic暴露api接口服务,处理推送http请求,并通过队列的方式让job来消费该任务。最后通过grpc通知cloud节点推送实际的链接

1
logic -> job -> cloud

@logic

file:app/Service/Dao/Queue.php

简短粗暴的直接丢到队列 rabbitmq即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* pushMsg
* @param int $op
* @param string $server
* @param array $keys
* @param $msg
* @throws \Throwable
*/
public function pushMsg(int $operation,string $server,array $keys, $msg)
{
$type = PushMsg\Type::PUSH;
$pushmsg = compact("type","operation","server","keys","msg");
Log::info("push msg to job node data:".json_encode($pushmsg));
/** @var Producer $producers */
$producers = \bean(Producer::class);
//发送到队列里
producer()->produce($producers->producer($pushmsg));
}

@job

file:app/Task/pushKey.php
job节点直接消费数据,去连接池获取链接,直接push到cloud节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 进行grpc 和 cloud 节点通讯
* @param int $operation
* @param string $server
* @param array $subkey
* @param $body
*/
public function push(int $operation ,string $server , array $subkey , $body)
{
$proto = new Proto();
$proto->setOp($operation);
$proto->setVer(1);
$proto->setBody($body);

$pushMsg = new PushMsgReq();
$pushMsg->setKeys($subkey);
$pushMsg->setProto($proto);
$pushMsg->setProtoOp($operation);

if(!CloudClient::$table->exist($server)){
Log::error("pushkey not exist grpc client server: $server ");
return;
}
GrpcCloudClient::PushMsg($server,$pushMsg);
}

@cloud

file:app/Grpc/Cloud.php
grpc服务端获取到请求后,去swooletable 内存表拿到对应到链接信息,进行推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 接受 logic节点 job节点grpc请求,单点推送消息
* @return void
*/
public function pushMsg()
{
Log::debug("cloud node: pushmsg");
$pushMsgRpy = Parser::serializeMessage(new PushMsgReply());
/** @var PushMsgReq $pushMsgReq */
$pushMsgReq = Parser::deserializeMessage([PushMsgReq::class,null],request()->getRawBody());
response()->withContent($pushMsgRpy)->end();

if(empty($pushMsgReq->getKeys()) || empty($pushMsgReq->getProto())){
Log::error("cloud grpc pushmsg keys proto is empty raw data:".json_encode($pushMsgReq));
return;
}
/** @var array $keys */
$keys = $pushMsgReq->getKeys();
$op = $pushMsgReq->getProtoOp();
$body = $pushMsgReq->getProto()->getBody();
//coroutine do
foreach ($keys as $key){
/** @var Task $task */
\bean(Task::class)->deliver(Push::class,"push",[$key,$op,$body]);
}

}