perf: 优化流量消费相关代码性能,解决流量纪录与已用流量有概率不一致的问题

This commit is contained in:
xboard 2024-04-27 17:06:57 +08:00
parent 4438fee3ca
commit be9ed269fa
12 changed files with 246 additions and 171 deletions

View File

@ -44,7 +44,7 @@ class CheckOrder extends Command
public function handle()
{
ini_set('memory_limit', -1);
$orders = Order::whereIn('status', [0, 1])
$orders = Order::whereIn('status', [Order::STATUS_PENDING, Order::STATUS_PROCESSING])
->orderBy('created_at', 'ASC')
->get();
foreach ($orders as $order) {

View File

@ -58,7 +58,6 @@ class XboardStatistics extends Command
$recordAt = strtotime('-1 day', strtotime(date('Y-m-d')));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setServerStats();
$stats = $statService->getStatServer();
foreach ($stats as $stat) {
if (!StatServer::insert([
@ -90,7 +89,6 @@ class XboardStatistics extends Command
$recordAt = strtotime('-1 day', strtotime(date('Y-m-d')));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setUserStats();
$stats = $statService->getStatUser();
foreach ($stats as $stat) {
if (!StatUser::insert([

View File

@ -28,21 +28,21 @@ class Kernel extends ConsoleKernel
{
Cache::put(CacheKey::get('SCHEDULE_LAST_CHECK_AT', null), time());
// v2board
$schedule->command('xboard:statistics')->dailyAt('0:10');
$schedule->command('xboard:statistics')->dailyAt('0:10')->onOneServer();
// check
$schedule->command('check:order')->everyMinute();
$schedule->command('check:commission')->everyMinute();
$schedule->command('check:ticket')->everyMinute();
$schedule->command('check:order')->everyMinute()->onOneServer();
$schedule->command('check:commission')->everyMinute()->onOneServer();
$schedule->command('check:ticket')->everyMinute()->onOneServer();
// reset
$schedule->command('reset:traffic')->daily();
$schedule->command('reset:log')->daily();
$schedule->command('reset:traffic')->daily()->onOneServer();
$schedule->command('reset:log')->daily()->onOneServer();
// send
$schedule->command('send:remindMail')->dailyAt('11:30');
$schedule->command('send:remindMail')->dailyAt('11:30')->onOneServer();
// horizon metrics
$schedule->command('horizon:snapshot')->everyFiveMinutes();
$schedule->command('horizon:snapshot')->everyFiveMinutes()->onOneServer();
// backup Timing
if (env('ENABLE_AUTO_BACKUP_AND_UPDATE', false)) {
$schedule->command('backup:database',['true'])->daily();
$schedule->command('backup:database', ['true'])->daily()->onOneServer();
}
}

View File

@ -110,13 +110,11 @@ class StatController extends Controller
$recordAt = strtotime(date('Y-m-d'));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setServerStats();
$stats = $statService->getStatServer();
$statistics = collect($stats)->map(function ($item){
$item['total'] = $item['u'] + $item['d'];
return $item;
})->sortByDesc('total')->values()->all();
// return json_encode($statistics);
foreach ($statistics as $k => $v) {
foreach ($servers[$v['server_type']] as $server) {
if ($server['id'] === $v['server_id']) {
@ -190,7 +188,6 @@ class StatController extends Controller
$recordAt = strtotime(date('Y-m-d'));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setUserStats();
$todayTraffics = $statService->getStatUserByUserID($request->input('user_id'));
if (($current == 1) && count($todayTraffics) > 0) {
foreach ($todayTraffics as $todayTraffic){

View File

@ -35,11 +35,9 @@ class UniProxyController extends Controller
public function push(Request $request)
{
$res = json_decode(get_request_content(), true);
$data = Validator::make($res, [
'*.0' => 'integer',
'*.1' => 'integer',
])->validate();
$data = array_filter($res, function ($item) {
return is_array($item) && count($item) === 2 && is_numeric($item[0]) && is_numeric($item[1]);
});
$nodeType = $request->input('node_type');
$nodeId = $request->input('node_id');
// 增加单节点多服务器统计在线人数

View File

@ -24,7 +24,6 @@ class StatController extends Controller
$recordAt = strtotime(date('Y-m-d'));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setUserStats();
$todayTraffics = $statService->getStatUserByUserID($request->user['id']);
if (count($todayTraffics) > 0) {
$todayTraffics = collect($todayTraffics)->map(function ($todayTraffic) {

View File

@ -0,0 +1,69 @@
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BatchTrafficFetchJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $data;
protected $server;
protected $childServer;
protected $protocol;
protected $timestamp;
public $tries = 1;
public $timeout = 10;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(array $server, array $data, $protocol, int $timestamp, $childServer = null)
{
$this->onQueue('batch_traffic_fetch');
$this->server = $server;
$this->data = $data;
$this->protocol = $protocol;
$this->timestamp = $timestamp;
$this->childServer = $childServer;
}
public function handle(): void
{
// 获取子节点
$targetServer = $this->childServer ?? $this->server;
foreach ($this->data as $uid => $v) {
$u = $v[0];
$d = $v[1];
$result = \DB::transaction(function () use ($uid, $u, $d, $targetServer) {
$user = \DB::table('v2_user')->lockForUpdate()->where('id', $uid)->first();
if (!$user) {
return true;
}
$newTime = time();
$newU = $user->u + ($u * $targetServer['rate']);
$newD = $user->d + ($d * $targetServer['rate']);
$rows = \DB::table('v2_user')
->where('id', $uid)
->update([
't' => $newTime,
'u' => $newU,
'd' => $newD,
]);
if ($rows === 0) {
return false;
}
return true;
}, 3);
if (!$result) {
TrafficFetchJob::dispatch($u, $d, $uid, $targetServer, $this->protocol);
}
}
}
}

View File

@ -2,7 +2,7 @@
namespace App\Jobs;
use App\Services\StatisticalService;
use App\Models\User;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
@ -12,12 +12,13 @@ use Illuminate\Queue\SerializesModels;
class TrafficFetchJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $data;
protected $u;
protected $d;
protected $userId;
protected $server;
protected $childServer;
protected $protocol;
protected $timestamp;
public $tries = 1;
public $tries = 3;
public $timeout = 10;
/**
@ -25,55 +26,33 @@ class TrafficFetchJob implements ShouldQueue
*
* @return void
*/
public function __construct(array $server, array $data, $protocol, int $timestamp, $childServer = null)
public function __construct($u, $d, $userId, array $server, $protocol)
{
$this->onQueue('traffic_fetch');
$this->u = $u;
$this->d = $d;
$this->userId = $userId;
$this->server = $server;
$this->data = $data;
$this->protocol = $protocol;
$this->timestamp = $timestamp;
$this->childServer = $childServer;
}
public function handle(): void
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
if ($this->attempts() === 1) {
$statService = new StatisticalService();
$statService->setStartAt($this->timestamp);
$statService->setUserStats();
$statService->setServerStats();
}
// 获取子节点\
$targetServer = $this->childServer ?? $this->server;
foreach ($this->data as $uid => $v) {
\DB::transaction(function () use ($uid, $v, $targetServer, $statService) {
$u = $v[0];
$d = $v[1];
$user = \DB::table('v2_user')->lockForUpdate()->where('id', $uid)->first();
if (!$user) {
\DB::transaction(function () {
$user = User::lockForUpdate()->find($this->userId);
if (!$user)
return;
$user->t = time();
$user->u = $user->u + ($this->u * $this->server['rate']);
$user->d = $user->d + ($this->d * $this->server['rate']);
if (!$user->save()) {
info("流量更新失败\n未记录用户ID:{$this->userId}\n未记录上行:{$user->u}\n未记录下行:{$user->d}");
}
if ($this->attempts() === 1) { // 写缓存
$statService->statUser($targetServer['rate'], $uid, $u, $d); //如果存在子节点则使用子节点的倍率
if (!blank($this->childServer)) { //如果存在子节点,则给子节点计算流量
$statService->statServer($this->childServer['id'], $this->protocol, $u, $d);
}
$statService->statServer($this->server['id'], $this->protocol, $u, $d);
}
$newTime = time();
$newU = $user->u + ($v[0] * $targetServer['rate']);
$newD = $user->d + ($v[1] * $targetServer['rate']);
$rows = \DB::table('v2_user')
->where('id', $uid)
->update([
't' => $newTime,
'u' => $newU,
'd' => $newD,
]);
if($rows === 0){
\Log::error("流量更新失败\n未记录用户ID:{$uid}\n未记录上行:{$user->u}\n未记录下行:{$user->d}");
}
}, 3);
}
});
}
}

View File

@ -165,7 +165,7 @@ class OrderService
{
$lastOneTimeOrder = Order::where('user_id', $user->id)
->where('period', 'onetime_price')
->where('status', 3)
->where('status', Order::STATUS_COMPLETED)
->orderBy('id', 'DESC')
->first();
if (!$lastOneTimeOrder) return;
@ -176,7 +176,7 @@ class OrderService
$trafficUnitPrice = $paidTotalAmount / $nowUserTraffic;
$notUsedTraffic = $nowUserTraffic - (($user->u + $user->d) / 1073741824);
$result = $trafficUnitPrice * $notUsedTraffic;
$orderModel = Order::where('user_id', $user->id)->where('period', '!=', 'reset_price')->where('status', 3);
$orderModel = Order::where('user_id', $user->id)->where('period', '!=', 'reset_price')->where('status', Order::STATUS_COMPLETED);
$order->surplus_amount = $result > 0 ? $result : 0;
$order->surplus_order_ids = array_column($orderModel->get()->toArray(), 'id');
}
@ -184,9 +184,8 @@ class OrderService
private function getSurplusValueByPeriod(User $user, Order $order)
{
$orders = Order::where('user_id', $user->id)
->where('period', '!=', 'reset_price')
->where('period', '!=', 'onetime_price')
->where('status', 3)
->whereNotIn('period', ['reset_price', 'onetime_price'])
->where('status', Order::STATUS_COMPLETED)
->get()
->toArray();
if (!$orders) return;

View File

@ -7,44 +7,41 @@ use App\Models\Stat;
use App\Models\StatServer;
use App\Models\StatUser;
use App\Models\User;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;
class StatisticalService {
class StatisticalService
{
protected $userStats;
protected $startAt;
protected $endAt;
protected $serverStats;
protected $statServerKey;
protected $statUserKey;
protected $redis;
public function __construct()
{
ini_set('memory_limit', -1);
$this->redis = Redis::connection();
}
public function setStartAt($timestamp) {
public function setStartAt($timestamp)
{
$this->startAt = $timestamp;
$this->statServerKey = "stat_server_{$this->startAt}";
$this->statUserKey = "stat_user_{$this->startAt}";
}
public function setEndAt($timestamp) {
public function setEndAt($timestamp)
{
$this->endAt = $timestamp;
}
public function setServerStats() {
$this->serverStats = Cache::get("stat_server_{$this->startAt}");
$this->serverStats = json_decode($this->serverStats, true) ?? [];
if (!is_array($this->serverStats)) {
$this->serverStats = [];
}
}
public function setUserStats() {
$this->userStats = Cache::get("stat_user_{$this->startAt}");
$this->userStats = json_decode($this->userStats, true) ?? [];
if (!is_array($this->userStats)) {
$this->userStats = [];
}
}
/**
* 生成统计报表
*/
public function generateStatData(): array
{
$startAt = $this->startAt;
@ -86,91 +83,114 @@ class StatisticalService {
return $data;
}
/**
* 往服务器报表缓存正追加流量使用数据
*/
public function statServer($serverId, $serverType, $u, $d)
{
$this->serverStats[$serverType] = $this->serverStats[$serverType] ?? [];
if (isset($this->serverStats[$serverType][$serverId])) {
$this->serverStats[$serverType][$serverId][0] += $u;
$this->serverStats[$serverType][$serverId][1] += $d;
} else {
$this->serverStats[$serverType][$serverId] = [$u, $d];
}
Cache::set("stat_server_{$this->startAt}", json_encode($this->serverStats));
$u_menber = "{$serverType}_{$serverId}_u"; //储存上传流量的集合成员
$d_menber = "{$serverType}_{$serverId}_d"; //储存下载流量的集合成员
$this->redis->zincrby($this->statServerKey, $u, $u_menber);
$this->redis->zincrby($this->statServerKey, $d, $d_menber);
}
/**
* 追加用户使用流量
*/
public function statUser($rate, $userId, $u, $d)
{
$this->userStats[$rate] = $this->userStats[$rate] ?? [];
if (isset($this->userStats[$rate][$userId])) {
$this->userStats[$rate][$userId][0] += $u;
$this->userStats[$rate][$userId][1] += $d;
} else {
$this->userStats[$rate][$userId] = [$u, $d];
}
Cache::set("stat_user_{$this->startAt}", json_encode($this->userStats));
$u_menber = "{$rate}_{$userId}_u"; //储存上传流量的集合成员
$d_menber = "{$rate}_{$userId}_d"; //储存下载流量的集合成员
$this->redis->zincrby($this->statUserKey, $u, $u_menber);
$this->redis->zincrby($this->statUserKey, $d, $d_menber);
}
/**
* 获取指定用户的流量使用情况
*/
public function getStatUserByUserID($userId): array
{
$stats = [];
foreach (array_keys($this->userStats) as $rate) {
if (!isset($this->userStats[$rate][$userId])) continue;
$stats[] = [
$statsUser = $this->redis->zrange($this->statUserKey, 0, -1, true);
foreach ($statsUser as $member => $value) {
list($rate, $uid, $type) = explode('_', $member);
if ($uid !== $userId)
continue;
$key = "{$rate}_{$uid}";
$stats[$key] = $stats[$key] ?? [
'record_at' => $this->startAt,
'server_rate' => $rate,
'u' => $this->userStats[$rate][$userId][0],
'd' => $this->userStats[$rate][$userId][1],
'user_id' => $userId
'server_rate' => floatval($rate),
'u' => 0,
'd' => 0,
'user_id' => intval($userId),
];
$stats[$key][$type] += $value;
}
return $stats;
return array_values($stats);
}
/**
* 获取缓存中的用户报表
*/
public function getStatUser()
{
$stats = [];
foreach ($this->userStats as $k => $v) {
foreach (array_keys($v) as $userId) {
if (isset($v[$userId])) {
$stats[] = [
'server_rate' => $k,
'u' => $v[$userId][0],
'd' => $v[$userId][1],
'user_id' => $userId
$statsUser = $this->redis->zrange($this->statUserKey, 0, -1, true);
foreach ($statsUser as $member => $value) {
list($rate, $uid, $type) = explode('_', $member);
$key = "{$rate}_{$uid}";
$stats[$key] = $stats[$key] ?? [
'record_at' => $this->startAt,
'server_rate' => $rate,
'u' => 0,
'd' => 0,
'user_id' => intval($uid),
];
$stats[$key][$type] += $value;
}
}
}
return $stats;
return array_values($stats);
}
/**
* 获取缓存中的服务器爆表
*/
public function getStatServer()
{
$stats = [];
foreach ($this->serverStats as $serverType => $v) {
foreach (array_keys($v) as $serverId) {
if (isset($v[$serverId])) {
$stats[] = [
'server_id' => $serverId,
$statsServer = $this->redis->zrange($this->statServerKey, 0, -1, true);
foreach ($statsServer as $member => $value) {
list($serverType, $serverId, $type) = explode('_', $member);
$key = "{$serverType}_{$serverId}";
if (!isset($stats[$key])) {
$stats[$key] = [
'server_id' => intval($serverId),
'server_type' => $serverType,
'u' => $v[$serverId][0],
'd' => $v[$serverId][1],
'u' => 0,
'd' => 0,
];
}
$stats[$key][$type] += $value;
}
}
return $stats;
return array_values($stats);
}
/**
* 清除用户报表缓存数据
*/
public function clearStatUser()
{
Cache::forget("stat_user_{$this->startAt}");
$this->redis->del($this->statUserKey);
}
/**
* 清除服务器报表缓存数据
*/
public function clearStatServer()
{
Cache::forget("stat_server_{$this->startAt}");
$this->redis->del($this->statServerKey);
}
public function getStatRecord($type)
@ -236,7 +256,8 @@ class StatisticalService {
$users = User::whereIn('id', $stats->pluck('invite_user_id')->toArray())->get()->keyBy('id');
foreach ($stats as $k => $v) {
if (!isset($users[$v['invite_user_id']])) continue;
if (!isset($users[$v['invite_user_id']]))
continue;
$stats[$k]['email'] = $users[$v['invite_user_id']]['email'];
}
return $stats;
@ -258,7 +279,8 @@ class StatisticalService {
->get();
$users = User::whereIn('id', $stats->pluck('user_id')->toArray())->get()->keyBy('id');
foreach ($stats as $k => $v) {
if (!isset($users[$v['user_id']])) continue;
if (!isset($users[$v['user_id']]))
continue;
$stats[$k]['email'] = $users[$v['user_id']]['email'];
}
return $stats;

View File

@ -2,10 +2,11 @@
namespace App\Services;
use App\Jobs\TrafficFetchJob;
use App\Jobs\BatchTrafficFetchJob;
use App\Models\Order;
use App\Models\Plan;
use App\Models\User;
use Illuminate\Support\Facades\Bus;
class UserService
{
@ -53,9 +54,11 @@ class UserService
if (!isset($user->plan)) {
$user->plan = Plan::find($user->plan_id);
}
if ($user->expired_at <= time() || $user->expired_at === NULL) return null;
if ($user->expired_at <= time() || $user->expired_at === NULL)
return null;
// if reset method is not reset
if ($user->plan->reset_traffic_method === 2) return null;
if ($user->plan->reset_traffic_method === 2)
return null;
switch (true) {
case ($user->plan->reset_traffic_method === NULL): {
$resetTrafficMethod = admin_setting('reset_traffic_method', 0);
@ -168,13 +171,24 @@ class UserService
public function trafficFetch(array $server, string $protocol, array $data, string $nodeIp = null)
{
// 获取子节点
$childServer = ($server['parent_id'] == null && !blank($nodeIp))
? ServerService::getChildServer($server['id'], $protocol, $nodeIp)
: null;
$timestamp = strtotime(date('Y-m-d'));
$statService = new StatisticalService();
$statService->setStartAt($timestamp);
// 获取子节点
$childServer = ($server['parent_id'] == null && $nodeIp) ? ServerService::getChildServer($server['id'], $protocol, $nodeIp) : null;
foreach ($data as $uid => $v) {
$u = $v[0];
$d = $v[1];
$targetServer = $childServer ?? $server;
$statService->statUser($targetServer['rate'], $uid, $u, $d); //如果存在子节点则使用子节点的倍率
if (!blank($childServer)) { //如果存在子节点,则给子节点计算流量
$statService->statServer($childServer['id'], $protocol, $u, $d);
}
$statService->statServer($server['id'], $protocol, $u, $d);
}
collect($data)->chunk(1000)->each(function ($chunk) use ($timestamp, $server, $protocol, $childServer) {
TrafficFetchJob::dispatch($server, $chunk->toArray(), $protocol, $timestamp, $childServer);
BatchTrafficFetchJob::dispatch($server, $chunk->toArray(), $protocol, $timestamp, $childServer);
});
}
}

File diff suppressed because one or more lines are too long