diff --git a/app/Http/Controllers/V1/Server/UniProxyController.php b/app/Http/Controllers/V1/Server/UniProxyController.php index e9baf1e..73e036f 100644 --- a/app/Http/Controllers/V1/Server/UniProxyController.php +++ b/app/Http/Controllers/V1/Server/UniProxyController.php @@ -89,12 +89,8 @@ class UniProxyController extends Controller $online_user = $onlineCollection->sum('online_user'); Cache::put(CacheKey::get('SERVER_' . strtoupper($this->nodeType) . '_ONLINE_USER', $this->nodeInfo->id), $online_user, 3600); Cache::put(CacheKey::get('SERVER_' . strtoupper($this->nodeType) . '_LAST_PUSH_AT', $this->nodeInfo->id), time(), 3600); - - // 查询是否存在子节点 - $childServer = null; - if ($this->nodeInfo->parent_id == null) $childServer = $this->serverService->getChildServer($this->nodeId, $this->nodeType, $ip); $userService = new UserService(); - $userService->trafficFetch($this->nodeInfo->toArray(), $this->nodeType, $data, $childServer ? $childServer->toArray() : null); + $userService->trafficFetch($this->nodeInfo->toArray(), $this->nodeType, $data , $ip); return response([ 'data' => true diff --git a/app/Jobs/TrafficFetchJob.php b/app/Jobs/TrafficFetchJob.php index b3ae3ad..69be777 100644 --- a/app/Jobs/TrafficFetchJob.php +++ b/app/Jobs/TrafficFetchJob.php @@ -4,6 +4,8 @@ namespace App\Jobs; use App\Models\User; use App\Services\MailService; +use App\Services\ServerService; +use App\Services\StatisticalService; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; @@ -13,12 +15,11 @@ use Illuminate\Queue\SerializesModels; class TrafficFetchJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; - protected $u; - protected $d; - protected $userId; + protected $data; protected $server; + protected $childServer; protected $protocol; - + protected $timestamp; public $tries = 3; public $timeout = 10; @@ -27,14 +28,16 @@ class TrafficFetchJob implements ShouldQueue * * @return void */ - public function __construct($u, $d, $userId, array $server, $protocol) + public function __construct(array $server, array $data, $protocol, int $timestamp, $nodeIp = null) { $this->onQueue('traffic_fetch'); - $this->u = $u; - $this->d = $d; - $this->userId = $userId; $this->server = $server; + $this->data = $data; $this->protocol = $protocol; + $this->timestamp = $timestamp; + // 获取子节点 + $serverService = new ServerService(); + $this->childServer = ($this->server['parent_id'] == null && !blank($nodeIp)) ? $serverService->getChildServer($this->server['id'], $this->protocol, $nodeIp) : null; } /** @@ -45,29 +48,40 @@ class TrafficFetchJob implements ShouldQueue public function handle(): void { \DB::transaction(function () { - $user = \DB::table('v2_user')->lockForUpdate()->where('id', $this->userId)->first(); - - if (!$user) { - return; + if ($this->attempts() === 1){ + $statService = new StatisticalService(); + $statService->setStartAt($this->timestamp); + $statService->setUserStats(); + $statService->setServerStats(); } - - $newTime = time(); - $newU = $user->u + ($this->u * $this->server['rate']); - $newD = $user->d + ($this->d * $this->server['rate']); - - $updatedRows = \DB::table('v2_user') - ->where('id', $this->userId) - ->update([ - 't' => $newTime, - 'u' => $newU, - 'd' => $newD, - ]); - - if (!$updatedRows) { - info("流量更新失败\n未记录用户ID:{$this->userId}\n未记录上行:{$this->u}\n未记录下行:{$this->d}"); - $this->fail(); - } else { - + + // 获取子节点\ + + $targetServer = $this->childServer ?? $this->server; + foreach ($this->data as $uid => $v) { + $u = $v[0]; + $d = $v[1]; + $user = \DB::table('v2_user')->lockForUpdate()->where('id', $uid)->first(); + if (!$user) { + continue; + } + if ($this->attempts() === 1){ // 写缓存 + $statService->statUser($targetServer['rate'], $uid, $u, $d); //如果存在子节点则使用子节点的倍率 + if(!blank($this->childServer)){ //如果存在子节点,则给子节点计算流量 + $statService->statServer($this->childServer['id'], $this->protocol, $u, $d); + } + $statService->statServer($this->server['id'], $this->protocol, $u, $d); + } + $newTime = time(); + $newU = $user->u + ($v[0] * $targetServer['rate']); + $newD = $user->d + ($v[1] * $targetServer['rate']); + \DB::table('v2_user') + ->where('id', $uid) + ->update([ + 't' => $newTime, + 'u' => $newU, + 'd' => $newD, + ]); } }, 3); } diff --git a/app/Services/ServerService.php b/app/Services/ServerService.php index 9919018..363cca1 100644 --- a/app/Services/ServerService.php +++ b/app/Services/ServerService.php @@ -363,7 +363,7 @@ class ServerService ->where('ips',"like", "%\"$nodeIp\"%") ->first(); default: - return false; + return null; } } } diff --git a/app/Services/UserService.php b/app/Services/UserService.php index ba0cbb7..ab769f1 100644 --- a/app/Services/UserService.php +++ b/app/Services/UserService.php @@ -168,26 +168,11 @@ class UserService return true; } - public function trafficFetch(array $server, string $protocol, array $data, array $childServer = null) + public function trafficFetch(array $server, string $protocol, array $data, string $nodeIp = null) { - $statService = new StatisticalService(); - $statService->setStartAt(strtotime(date('Y-m-d'))); - $statService->setUserStats(); - $statService->setServerStats(); - foreach (array_keys($data) as $userId) { - $u = $data[$userId][0]; - $d = $data[$userId][1]; - // 如果存在子节点则使用过子节点的倍率进行进行流量计算,该计算方式依赖服务器IP地址 - if(!blank($childServer)){ - TrafficFetchJob::dispatch($u, $d, $userId, $childServer, $protocol); - $statService->statUser($childServer['rate'], $userId, $u, $d); - $statService->statServer($childServer['id'], $protocol, $u, $d); - }else{ - TrafficFetchJob::dispatch($u, $d, $userId, $server, $protocol); - $statService->statUser($server['rate'], $userId, $u, $d); - } - $statService->statServer($server['id'], $protocol, $u, $d); - - } + $timestamp = strtotime(date('Y-m-d')); + collect($data)->chunk(1000)->each(function($chunk) use ($timestamp,$server,$protocol, $nodeIp){ + TrafficFetchJob::dispatch($server, $chunk->toArray(), $protocol, $timestamp, $nodeIp); + }); } }