diff --git a/app/Jobs/TrafficFetchJob.php b/app/Jobs/TrafficFetchJob.php index 69be777..a545713 100644 --- a/app/Jobs/TrafficFetchJob.php +++ b/app/Jobs/TrafficFetchJob.php @@ -20,7 +20,7 @@ class TrafficFetchJob implements ShouldQueue protected $childServer; protected $protocol; protected $timestamp; - public $tries = 3; + public $tries = 1; public $timeout = 10; /** @@ -28,46 +28,37 @@ class TrafficFetchJob implements ShouldQueue * * @return void */ - public function __construct(array $server, array $data, $protocol, int $timestamp, $nodeIp = null) + public function __construct(array $server, array $data, $protocol, int $timestamp, $childServer = null) { $this->onQueue('traffic_fetch'); $this->server = $server; $this->data = $data; $this->protocol = $protocol; $this->timestamp = $timestamp; - // 获取子节点 - $serverService = new ServerService(); - $this->childServer = ($this->server['parent_id'] == null && !blank($nodeIp)) ? $serverService->getChildServer($this->server['id'], $this->protocol, $nodeIp) : null; + $this->childServer = $childServer; } - /** - * Execute the job. - * - * @return void - */ public function handle(): void { - \DB::transaction(function () { - if ($this->attempts() === 1){ - $statService = new StatisticalService(); - $statService->setStartAt($this->timestamp); - $statService->setUserStats(); - $statService->setServerStats(); - } - - // 获取子节点\ - - $targetServer = $this->childServer ?? $this->server; - foreach ($this->data as $uid => $v) { + if ($this->attempts() === 1) { + $statService = new StatisticalService(); + $statService->setStartAt($this->timestamp); + $statService->setUserStats(); + $statService->setServerStats(); + } + // 获取子节点\ + $targetServer = $this->childServer ?? $this->server; + foreach ($this->data as $uid => $v) { + \DB::transaction(function () use ($uid, $v, $targetServer, $statService) { $u = $v[0]; $d = $v[1]; $user = \DB::table('v2_user')->lockForUpdate()->where('id', $uid)->first(); if (!$user) { - continue; + return; } - if ($this->attempts() === 1){ // 写缓存 + if ($this->attempts() === 1) { // 写缓存 $statService->statUser($targetServer['rate'], $uid, $u, $d); //如果存在子节点则使用子节点的倍率 - if(!blank($this->childServer)){ //如果存在子节点,则给子节点计算流量 + if (!blank($this->childServer)) { //如果存在子节点,则给子节点计算流量 $statService->statServer($this->childServer['id'], $this->protocol, $u, $d); } $statService->statServer($this->server['id'], $this->protocol, $u, $d); @@ -82,7 +73,7 @@ class TrafficFetchJob implements ShouldQueue 'u' => $newU, 'd' => $newD, ]); - } - }, 3); + }, 3); + } } } diff --git a/app/Services/UserService.php b/app/Services/UserService.php index ab769f1..79613bc 100644 --- a/app/Services/UserService.php +++ b/app/Services/UserService.php @@ -170,9 +170,13 @@ class UserService public function trafficFetch(array $server, string $protocol, array $data, string $nodeIp = null) { + // 获取子节点 + $childServer = ($server['parent_id'] == null && !blank($nodeIp)) + ? (new ServerService())->getChildServer($server['id'], $protocol, $nodeIp) + : null; $timestamp = strtotime(date('Y-m-d')); - collect($data)->chunk(1000)->each(function($chunk) use ($timestamp,$server,$protocol, $nodeIp){ - TrafficFetchJob::dispatch($server, $chunk->toArray(), $protocol, $timestamp, $nodeIp); + collect($data)->chunk(1000)->each(function($chunk) use ($timestamp,$server,$protocol, $childServer){ + TrafficFetchJob::dispatch($server, $chunk->toArray(), $protocol, $timestamp, $childServer); }); } }