高并发下PHP异步处理与并发控制攻略
一、背景与挑战
在高并发场景下(如电商秒杀、社交动态流、API 网关),PHP 应用面临以下主要挑战:
阻塞等待带宽与资源浪费
- 传统 PHP 是同步阻塞模式:发起一次远程接口调用或数据库查询,需要等待 I/O 完成后才能继续下一个操作。
- 若同时有上千个请求进入,数百个慢接口轮询会导致大量进程或协程处于“睡眠等待”状态,CPU 资源无法被充分利用。
并发任务数量失控导致资源耗尽
- 如果不对并发并行任务数量加以限制,瞬时并发过多会导致内存、文件描述符、数据库连接池耗尽,从而引发请求失败或服务崩溃。
- 必须在吞吐与资源可承受之间找到平衡,并对“并发度”进行动态或静态约束。
传统锁与阻塞带来性能瓶颈
- 在并发写共享资源(如缓存、日志、文件)时,若使用简单的互斥锁(
flock()
、Mutex
),会导致大量进程/协程等待锁释放,降低吞吐。 - 异步非阻塞模型可以通过队列化或原子操作等方式减少锁竞争开销。
- 在并发写共享资源(如缓存、日志、文件)时,若使用简单的互斥锁(
为应对上述挑战,本文将从 PHP 异步处理 与 并发控制 两个维度展开,重点借助 Swoole 协程(也兼顾 ReactPHP/Amp 等方案)示例,展示如何在高并发场景下:
- 非阻塞地执行网络/数据库 I/O
- 有效控制并发数量,避免资源耗尽
- 构建任务队列与限流策略
- 处理并发写冲突与锁优化
二、PHP 异步基础:从阻塞到非阻塞
2.1 同步阻塞模式
在传统 PHP 脚本中,读取远程接口或数据库都会阻塞当前进程或线程,示例代码:
<?php
function fetchData(string $url): string {
// 这是阻塞 I/O,同一时刻只能执行一条请求
$response = file_get_contents($url);
return $response ?: '';
}
// 串行发起多个请求
$urls = [
'http://api.example.com/user/1',
'http://api.example.com/user/2',
'http://api.example.com/user/3',
];
$results = [];
$start = microtime(true);
foreach ($urls as $url) {
$results[] = fetchData($url);
}
$end = microtime(true);
echo "同步完成,用时: " . round($end - $start, 3) . " 秒\n";
- 若每个
fetchData()
需要 1 秒,3 个请求依次执行耗时约 3 秒。 - 并发量一旦增大,阻塞等待会累加,导致吞吐急剧下降。
2.2 非阻塞/异步模型
异步 I/O 可以让单个进程在等待网络或磁盘操作时“挂起”该操作,并切换到下一任务,完成后再回来“续写”回调逻辑,实现“并发”效果。常见 PHP 异步方案包括:
- Swoole 协程:借助底层 epoll/kqueue,将 I/O 操作切换为协程挂起,不阻塞进程。
- ReactPHP / Amp:基于事件循环(Event Loop),使用回调或
yield
关键字实现异步非阻塞。 - Parallel / pthreads:多线程模型,将每个任务交给独立线程执行,本质上是并行而非真正“异步”。
下文将重点以 Swoole 协程 为主,兼顾 ReactPHP 思路,并展示如何借助这些模型让代码从“线性阻塞”变为“并发异步”。
三、方案一:Swoole 协程下的异步处理
3.1 Swoole 协程简介
- 协程(Coroutine):一种“用户态”轻量线程,具有非常快速的上下文切换。
- 当协程执行到阻塞 I/O(如 HTTP 请求、MySQL 查询、Redis 操作)时,会自动将该协程挂起,让出 CPU 给其他协程。I/O 完成后再恢复。
- Swoole 通过底层 hook 系统函数,将传统阻塞函数转换为可挂起的异步调用。
只需在脚本中调用 Swoole\Coroutine\run()
创建协程容器,之后在任意位置使用 go(function(){…})
即可开启协程。
3.2 示例:并发发起多 HTTP 请求
<?php
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine;
// 并发请求列表
$urls = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/delay/3',
'http://httpbin.org/get?param=4',
'http://httpbin.org/uuid'
];
Co\run(function() use ($urls) {
$responses = [];
$wg = new Swoole\Coroutine\WaitGroup();
foreach ($urls as $idx => $url) {
$wg->add();
go(function() use ($idx, $url, &$responses, $wg) {
$parts = parse_url($url);
$host = $parts['host'];
$port = $parts['scheme'] === 'https' ? 443 : 80;
$path = $parts['path'] . (isset($parts['query']) ? '?' . $parts['query'] : '');
$cli = new Client($host, $port, $parts['scheme'] === 'https');
$cli->set(['timeout' => 5]);
$cli->get($path);
$responses[$idx] = [
'status' => $cli->statusCode,
'body' => substr($cli->body, 0, 100) . '…'
];
$cli->close();
echo "[协程 {$idx}] 请求 {$url} 完成,状态码={$responses[$idx]['status']}\n";
$wg->done();
});
}
$wg->wait();
echo "[主协程] 所有请求已完成,共 " . count($responses) . " 条。\n";
print_r($responses);
});
ASCII 流程图
┌─────────────────────────────────────────────────────────────────┐
│ 主协程 (Coroutine) │
│ (Co\run 内部作为主调度) │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │ │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│协程 0 │ │协程 1 │ │协程 2 │ │协程 3 │ │协程 4 │
│发起 GET…│ │发起 GET…│ │发起 GET…│ │发起 GET…│ │发起 GET…│
└───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘ └───┬───┘
│ │ │ │ │
│ │ │ │ │
I/O 阻塞 I/O 阻塞 I/O 阻塞 I/O 阻塞 I/O 阻塞
│ │ │ │ │
[挂起协程 0] [挂起协程 1] [挂起协程 2] [挂起协程 3] [挂起协程 4]
↓ ↓ ↓ ↓ ↓
Swoole 底层 挂起 I/O 等待异步事件完成
↓ ↓ ↓ ↓ ↓
I/O 完成 I/O 完成 I/O 完成 I/O 完成 I/O 完成
│ │ │ │ │
恢复协程 0 恢复协程 1 恢复协程 2 恢复协程 3 恢复协程 4
│ │ │ │ │
处理响应 处理响应 处理响应 处理响应 处理响应
│ │ │ │ │
$wg->done() $wg->done() $wg->done() $wg->done() $wg->done()
└─────────────────────────────────────────┘
↓
主协程 调用 $wg->wait() 解除阻塞,继续执行
↓
输出所有响应并退出脚本
3.3 并发控制:限制协程数量
在高并发场景中,如果一次性开启上千个协程,可能出现以下风险:
- 突发大量并发 I/O,造成网络带宽瞬间拥堵
- PHP 进程内存分配不够,一次性分配大量协程栈空间导致 OOM
限制协程并发数示例
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
$urls = []; // 假设有上千个 URL 列表
for ($i = 0; $i < 1000; $i++) {
$urls[] = "http://httpbin.org/delay/" . rand(1, 3);
}
// 最大并发协程数
$maxConcurrency = 50;
// 使用 Channel 作为“令牌桶”或“协程池”
Co\run(function() use ($urls, $maxConcurrency) {
$sem = new Channel($maxConcurrency);
// 初始化令牌桶:放入 $maxConcurrency 个令牌
for ($i = 0; $i < $maxConcurrency; $i++) {
$sem->push(1);
}
$wg = new Swoole\Coroutine\WaitGroup();
foreach ($urls as $idx => $url) {
// 从令牌桶取出一个令牌;若为空则挂起等待
$sem->pop();
$wg->add();
go(function() use ($idx, $url, $sem, $wg) {
echo "[协程 {$idx}] 开始请求 {$url}\n";
$parts = parse_url($url);
$cli = new \Swoole\Coroutine\Http\Client($parts['host'], 80);
$cli->get($parts['path']);
$cli->close();
echo "[协程 {$idx}] 完成请求\n";
// 任务完成后归还令牌,让下一个协程能够启动
$sem->push(1);
$wg->done();
});
}
$wg->wait();
echo "[主协程] 所有请求已完成。\n";
});
原理与说明
Channel 令牌桶:
- 创建一个容量为
$maxConcurrency
的 Channel,并预先push()
同样数量的“令牌”(任意占位符)。 - 每次要启动新协程前,先
pop()
一个令牌;如果 Channel 为空,则意味着当前已有 $maxConcurrency 个协程在运行,新的协程会被挂起等待令牌。 - 协程执行完毕后
push()
回一个令牌,让后续被挂起的协程继续运行。
- 创建一个容量为
并发控制:
- 该方案等效于“协程池(Coroutine Pool)”,始终只维持最多
$maxConcurrency
个协程并发执行。 - 避免瞬时并发过大导致 PHP 内存或系统资源耗尽。
- 该方案等效于“协程池(Coroutine Pool)”,始终只维持最多
- ASCII 图解:并发限制流程
┌─────────────────────────────────────────────────────────┐
│ 主协程 (Coroutine) │
└─────────────────────────────────────────────────────────┘
│ │ │ │
pop │ │ │ │
─────────┼────────────┼────────────┼────────────┤
▼ (取令牌) ▼ (取令牌) ▼ (取令牌) ▼ (取令牌)
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ 协程 1 │ │ 协程 2 │ │ 协程 3 │ │ 协程 4 │
│ 执行请求 │ │ 执行请求 │ │ 执行请求 │ │ 执行请求 │
└────┬──────┘ └────┬──────┘ └────┬──────┘ └────┬──────┘
│ │ │ │
完成 完成 完成 完成
│ │ │ │
push push push push
(归还令牌) (归还令牌) (归还令牌) (归还令牌)
└──────────────┴──────────────┴──────────────┘
↓
下一个协程获取到令牌,继续启动
四、方案二:ReactPHP / Amp 异步事件循环
除了 Swoole,常见的 PHP 异步框架还有 ReactPHP 和 Amp。它们并不依赖扩展,而是基于事件循环(Event Loop) + 回调/Promise模式实现异步:
- ReactPHP:Node.js 式的事件循环,提供
react/http
、react/mysql
、react/redis
等组件。 - Amp:基于
yield
/await
的协程式语法糖,更接近同步写法,底层也是事件循环。
下面以 ReactPHP 为例,展示如何发起并发 HTTP 请求并控制并发量。
4.1 安装 ReactPHP
composer require react/event-loop react/http react/http-client react/promise react/promise-stream
4.2 并发请求示例(ReactPHP)
<?php
require 'vendor/autoload.php';
use React\EventLoop\Loop;
use React\Http\Browser;
use React\Promise\PromiseInterface;
// 要并发请求的 URL 列表
$urls = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/get?foo=bar',
'http://httpbin.org/status/200',
'http://httpbin.org/uuid'
];
// 并发限制
$maxConcurrency = 3;
$inFlight = 0;
$queue = new SplQueue();
// 存放结果
$results = [];
// 创建 HTTP 客户端
$client = new Browser(Loop::get());
// 将 URL 推入队列
foreach ($urls as $idx => $url) {
$queue->enqueue([$idx, $url]);
}
function processNext() {
global $queue, $inFlight, $maxConcurrency, $client, $results;
while ($inFlight < $maxConcurrency && !$queue->isEmpty()) {
list($idx, $url) = $queue->dequeue();
$inFlight++;
/** @var PromiseInterface $promise */
$promise = $client->get($url);
$promise->then(
function (\Psr\Http\Message\ResponseInterface $response) use ($idx, $url) {
global $inFlight, $results;
$results[$idx] = [
'url' => $url,
'status' => $response->getStatusCode(),
'body' => substr((string)$response->getBody(), 0, 100) . '…'
];
echo "[主循环] 请求 {$url} 完成,状态码=". $response->getStatusCode() . "\n";
$inFlight--;
processNext(); // 继续处理下一批任务
},
function (Exception $e) use ($idx, $url) {
global $inFlight, $results;
$results[$idx] = [
'url' => $url,
'error' => $e->getMessage()
];
echo "[主循环] 请求 {$url} 失败: " . $e->getMessage() . "\n";
$inFlight--;
processNext();
}
);
}
// 当队列空且 inFlight=0 时可以结束循环
if ($queue->isEmpty() && $inFlight === 0) {
// 打印所有结果
echo "[主循环] 所有请求完成,共 " . count($results) . " 条\n";
print_r($results);
Loop::stop();
}
}
// 启动处理
processNext();
Loop::run();
ASCII 流程图
┌───────────────────────────────────────────────────────────┐
│ ReactPHP 事件循环 │
└───────────────────────────────────────────────────────────┘
│ │ │ │ │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
[HTTP get] [HTTP get] [HTTP get] [队列等待] [队列等待]
(url1) (url2) (url3) (url4) (url5)
│ │ │
│ inFlight=3 │ (并发达到 max=3) 等待 等待
▼ ▼ ▼
I/O await I/O await I/O await
(挂起) (挂起) (挂起)
│ │ │
HTTP 响应1 HTTP 响应2 HTTP 响应3
│ │ │
inFlight-- inFlight-- inFlight--
└┬──────┐ └┬──────┐ └┬──────┐
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
processNext processNext processNext ...
检查队列 & 检查队列 & 检查队列 &
并发数<3 并发数<3 并发数<3
↓ ↓ ↓
发起 next HTTP 请求 …
五、并发控制与资源管理
无论异步模型如何,在高并发场景下,必须对并发度进行有效管理,否则可能出现:
- 内存耗尽:过多协程/进程同时运行,导致内存飙升。
- 连接池耗尽:如 MySQL/Redis 连接池不足,导致请求被拒绝。
- 下游接口限制:第三方 API 有 QPS 限制,过高并发会被封禁。
常见并发控制手段包括:
- 令牌桶/信号量:通过 Channel、Semaphore 等机制限制并发量。
- 任务队列/进程池/协程池:预先创建固定数量的“工作单元”,并从队列中取任务执行。
- 速率限制(Rate Limiting):使用 Leaky Bucket、Token Bucket 或滑动窗口算法限速。
- 超时与重试策略:对超时的异步任务及时取消或重试,避免僵死协程/进程。
下面以 Swoole 协程为例,介绍信号量与限速两种并发控制方式。
5.1 信号量(Semaphore)并发控制
Swoole 协程提供了 Swoole\Coroutine\Semaphore
类,可用于限制并发访问某段代码。
示例:并发查询多个数据库并限制并发数
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\MySQL;
use Swoole\Coroutine\Semaphore;
// 假设有若干用户 ID,需要并发查询用户详细信息
$userIds = range(1, 100);
// 最大并发协程数
$maxConcurrency = 10;
// 创建信号量
$sem = new Semaphore($maxConcurrency);
Co\run(function() use ($userIds, $sem) {
$results = [];
$wg = new Swoole\Coroutine\WaitGroup();
foreach ($userIds as $id) {
// 从信号量中获取一个票,若已达上限,挂起等待
$sem->wait();
$wg->add();
go(function() use ($id, $sem, &$results, $wg) {
// 连接数据库
$db = new MySQL();
$db->connect([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => '',
'database' => 'test',
]);
$res = $db->query("SELECT * FROM users WHERE id = {$id}");
$results[$id] = $res;
$db->close();
echo "[协程] 查询用户 {$id} 完成\n";
// 释放一个信号量票
$sem->release();
$wg->done();
});
}
$wg->wait();
echo "[主协程] 所有用户查询完成,共 " . count($results) . " 条数据\n";
// 处理 $results
});
原理与说明
new Semaphore($maxConcurrency)
:创建一个最大并发数为$maxConcurrency
的信号量。$sem->wait()
:用于“申请”一个资源票(P 操作);若当前已有$maxConcurrency
条协程已持有票,则其他协程会被挂起等待。$sem->release()
:释放一个资源票(V 操作),如果有协程在等待,会唤醒其中一个。- 结合
WaitGroup
,保证所有查询完成后再继续后续逻辑。
5.2 速率限制(限速)示例
在高并发场景,有时需要对同一个下游接口或资源进行限速,避免瞬时并发过多触发封禁。常用算法有 令牌桶(Token Bucket)、漏桶(Leaky Bucket)、滑动窗口。本文以“令牌桶”算法为例,在协程中简单实现 API QPS 限制。
示例:令牌桶限速
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
// 目标 QPS(每秒最多 5 次请求)
$qps = 5;
// 创建一个容量为 $qps 令牌桶 Channel
$bucket = new Channel($qps);
// 持续向桶中投放令牌
go(function() use ($qps, $bucket) {
while (true) {
// 如果桶未满,则放入一个令牌
if (!$bucket->isFull()) {
$bucket->push(1);
}
// 每隔 1/$qps 秒产生一个令牌
Coroutine::sleep(1 / $qps);
}
});
// 需要并发发起的总任务数
$totalTasks = 20;
// 等待组
$wg = new Swoole\Coroutine\WaitGroup();
for ($i = 1; $i <= $totalTasks; $i++) {
$wg->add();
go(function() use ($i, $bucket, $wg) {
// 从桶中取出一个令牌,若桶空则等待
$bucket->pop();
// 令牌取到后即可发起请求
echo "[协程 {$i}] 获取令牌,开始请求 API 时间:" . date('H:i:s') . "\n";
Coroutine::sleep(0.1); // 模拟 API 请求耗时 100ms
echo "[协程 {$i}] 请求完成 时间:" . date('H:i:s') . "\n";
$wg->done();
});
}
$wg->wait();
echo "[主协程] 所有任务完成。\n";
ASCII 图解:令牌桶限速
(桶满 5 个令牌后,多余的生产操作会 skip)
┌─────────────────────────────────────────────────────────────┐
│ 令牌桶(Channel) │
│ capacity = 5 (Max Token) │
│ ┌───┬───┬───┬───┬───┐ │
│ │ 1 │ 1 │ 1 │ 1 │ 1 │ <- 初始填满 5 个 │
│ └───┴───┴───┴───┴───┘ │
└─────────────────────────────────────────────────────────────┘
↑ ↑ ↑
│ │ │
[协程 1 pop] [协程 2 pop] [协程 3 pop]
│ │ │
发起请求 发起请求 发起请求
(now bucket has 2 tokens) (1 token) (0 token)
│ │ │
多余 Pop 时协程会被挂起 │
└───────────────┬─────────────┘
│
令牌生产协程每 0.2 秒推 1 令牌
│
┌────────────────┼────────────────┐
│ │ │
T+0.2s T+0.4s T+0.6s
bucket:1 bucket:2 bucket:3
│ │ │
[协程 4 pop] [协程 5 pop] [协程 6 pop]
发起请求 发起请求 发起请求
- 桶初始放满 5 个令牌,因此前 5 个协程几乎可瞬时拿到令牌并发起请求。
- 之后只有当令牌按
1/$qps
秒速率补充时,新的协程才能从桶中拿到令牌并发起请求,从而平滑控制请求 QPS。
六、并发写冲突与锁优化
在高并发写共享资源(如缓存、日志、队列)时,必须避免过度的锁竞争,否则会变成串行模式,扼杀并发增益。
6.1 缓存原子更新示例
假设要对 Redis 或 APCu 中的计数器执行自增操作,传统方式可能是:
<?php
// 非原子操作示例:读-改-写
$count = apcu_fetch('page_view') ?: 0;
$count++;
apcu_store('page_view', $count);
- 当并发高时,两个进程可能都
fetch=100
,然后同时写入101
,导致计数丢失。
原子操作示例
<?php
// 使用 APCu 内置原子自增函数
$newCount = apcu_inc('page_view', 1, $success);
if (!$success) {
// 如果键不存在,则先写入 1
apcu_store('page_view', 1);
$newCount = 1;
}
apcu_inc
是原子操作,内部会做加锁,确保并发自增结果准确无误。
6.2 文件锁与异步队列
如果需要对同一个文件或日志进行并发写入,可以将日志写入“异步队列”(如 Channel 或消息队列),然后在单独的写日志协程/进程中顺序消费,避免并发锁:
示例:协程队列写日志
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
Co\run(function() {
// 日志队列 Channel(容量1000)
$logQueue = new Channel(1000);
// 日志写入协程(单独一个),顺序消费
go(function() use ($logQueue) {
$fp = fopen(__DIR__ . '/app.log', 'a');
while (true) {
$entry = $logQueue->pop(); // 阻塞等待日志
if ($entry === false) {
// Channel 关闭
break;
}
fwrite($fp, $entry . "\n");
}
fclose($fp);
});
// 模拟多个业务协程并发写日志
for ($i = 1; $i <= 50; $i++) {
go(function() use ($i, $logQueue) {
$msg = "[协程 {$i}] 这是一条日志,时间:" . date('H:i:s');
$logQueue->push($msg);
});
}
// 等待一定时间后关闭日志队列
Coroutine::sleep(1);
$logQueue->close(); // 关闭 Channel,让日志写入协程退出
});
- 原理:所有协程都将日志数据
push
到共享队列,单独的日志写协程依次pop
并写入文件,避免多协程同时fopen
/fwrite
竞争。 - 该模式也可用于“任务队列消费”、“图片处理队列”等高并发写场景。
七、总结与最佳实践
在高并发场景下,PHP 应用要想获得优异性能,需要结合业务场景与技术选型,合理利用异步与并发控制。本文从以下几个方面给出了详尽示例与说明:
阻塞 vs 非阻塞
- 传统同步阻塞模型容易导致请求累加等待,吞吐下降。
- 通过 Swoole 协程、ReactPHP、Amp 等框架可实现异步非阻塞,提升 I/O 并发度。
Swoole 协程示例
- 并发发 HTTP 请求:利用
go()
+WaitGroup
实现简单并发调用。 - 并发控制:借助
Channel
或Semaphore
实现令牌桶或协程池,限制同时运行的协程数量,保护系统资源。
- 并发发 HTTP 请求:利用
ReactPHP 事件循环示例
- 使用事件循环与 Promise 模式对大批量请求进行异步并发,并通过手动队列管理控制并发度。
并发写冲突与异步队列
- 对共享资源(如文件、日志、缓存)并发写时,应尽量使用原子操作或将写操作集中到单独的协程/进程中顺序执行,避免锁竞争。
速率限制(Rate Limiting)
- 通过令牌桶算法简单实现 QPS 控制,确保下游接口调用不会被超载或封禁。
常见 Pitfall 与注意事项
- PCNTL、Parallel、Swoole 各有使用场景与系统依赖,不同场合下需要灵活选型。
- 异步代码中要避免使用阻塞 I/O,否则整个协程/事件循环会被挂起。
- 必须对“并发度”进行限制,避免系统瞬间创建过多协程/进程导致资源耗尽。
- 在协程环境下,原生函数会被 hook,确保使用 Swoole 协程安全的客户端(如
Swoole\Coroutine\MySQL
、Swoole\Coroutine\Http\Client
等)。
最佳实践总结:
- 如果项目仅需并发简单任务(比如几百个独立操作),可优先选择 Swoole 协程,开发成本低、性能佳;
- 如果需要兼容更底层的 PHP 版本,或只需在 CLI 环境下快速多进程,可选择 PCNTL;
- 若需要在纯 PHP 生态(无扩展)下实现异步,且对回调/Promise 接受度高,可使用 ReactPHP 或 Amp。
评论已关闭