Promise.all 实现浏览器内高并发在复杂的爬虫开发中,我们常遇到两难困境:
for loop):稳定,能自动继承页面的 Cookie 和 Session,但速度慢(串行阻塞)。requests/aiohttp):速度快,但无法继承浏览器上下文,遇到 CSRF Token、签名验证或复杂 Header 时极易失效(报 400/403)。解决方案: 将并发的控制权移交给浏览器。通过注入 JavaScript 的 Promise.all,利用浏览器的非阻塞 I/O 能力并行发出请求。
这是一个完全独立的工具函数,可以直接放入你的 utils.py 中。它不包含任何具体业务逻辑,只负责“在浏览器里跑并发任务”。
def execute_browser_batch(page, task_list):
"""
通用浏览器并发请求执行器
:param page: Playwright 的 Page 对象
:param task_list: 任务列表,必须包含 'url' 字段,可包含其他元数据(id, type等)
:return: 包含响应结果和原始元数据的列表
"""
# 定义核心 JS 逻辑:接收任务 -> 并发 Fetch -> 返回结果
# 关键点:在 map 内部捕获异常,确保单个失败不影响整体
js_payload = """
async (tasks) => {
const promises = tasks.map(async (task) => {
const start = Date.now();
try {
// 发起 Fetch 请求
// 浏览器会自动附带当前域名的 Cookie、Referer 和 LocalStorage Token
const response = await fetch(task.url, {
method: 'GET',
headers: { 'Accept': 'application/json' }
});
// 统一处理 HTTP 错误状态
if (!response.ok) {
return {
status: 'failed',
meta: task, // 将原始任务信息带回,方便 Python 端匹配
error: `HTTP ${response.status}: ${response.statusText}`
};
}
const data = await response.json();
return {
status: 'success',
meta: task,
data: data,
latency: Date.now() - start
};
} catch (err) {
// 捕获网络层面的错误(如 DNS 解析失败、超时)
return {
status: 'error',
meta: task,
error: err.toString()
};
}
});
// 等待所有请求完成 (Parallel Execution)
return await Promise.all(promises);
}
"""
try:
# 将 Python 列表传递给浏览器执行
return page.evaluate(js_payload, task_list)
except Exception as e:
print(f"[System Error] JS 注入执行失败: {e}")
return []
假设我们需要从某财经网站采集 50 个加密货币的实时详情。
import time
import random
from playwright.sync_api import sync_playwright
class CryptoMarketScraper:
def __init__(self):
# 模拟待采集的目标 (ID 和 API 地址)
self.targets = [
{"id": "BTC", "symbol": "bitcoin", "api": "https://api.coincap.io/v2/assets/bitcoin"},
{"id": "ETH", "symbol": "ethereum", "api": "https://api.coincap.io/v2/assets/ethereum"},
{"id": "DOGE", "symbol": "dogecoin", "api": "https://api.coincap.io/v2/assets/dogecoin"},
# ... 假设这里还有 50 个 ...
]
def run(self):
with sync_playwright() as p:
# 启动浏览器
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
# 1. 预加载:先访问主域,获取必要的 Cookie/Session/Token
print("正在初始化会话...")
page.goto("https://coincap.io")
page.wait_for_timeout(2000) # 等待各种 Token 初始化完成
# 2. 开始分批采集
self.process_in_batches(page)
browser.close()
def process_in_batches(self, page):
BATCH_SIZE = 5 # 并发控制:每批 5 个
total = len(self.targets)
print(f"开始任务,共 {total} 个目标...")
for i in range(0, total, BATCH_SIZE):
# A. 准备批次数据
batch_data = self.targets[i : i + BATCH_SIZE]
# B. 构造任务对象 (Task Construction)
# 我们将 url 和 meta data 打包在一起
tasks = []
for item in batch_data:
tasks.append({
"url": item['api'], # JS 需要的 URL
"asset_id": item['id'], # 传递给 JS,方便它原样返回
"name": item['symbol']
})
print(f" 正在并发请求第 {i+1} - {min(i+BATCH_SIZE, total)} 项...")
# C. 调用通用调度器
results = execute_browser_batch(page, tasks)
# D. 处理结果
for res in results:
# 提取元数据
meta = res.get('meta', {})
name = meta.get('name', 'Unknown')
if res['status'] == 'success':
price = res['data']['data']['priceUsd']
latency = res['latency']
print(f" [{name}] 价格: ${float(price):.2f} (耗时 {latency}ms)")
# TODO: 在这里写入数据库
else:
print(f" [{name}] 采集失败: {res.get('error')}")
# E. 批次间随机休眠 (模拟人类浏览行为)
time.sleep(random.uniform(1.0, 2.5))
if __name__ == "__main__":
spider = CryptoMarketScraper()
spider.run()
使用 Python requests 库时,最大的痛点是复刻浏览器环境。你需要手动提取 Cookie,手动计算 Authorization 头,甚至要破解 _tb_token_ 或 x-sign。
而在本模式中,fetch 是在页面上下文中执行的,浏览器自动为你附加了所有认证信息。只要页面是登录状态,接口请求就是登录状态。
注意代码中的 meta 字段。我们在 Python 端把 {"id": "BTC"} 传给 JS,JS 在返回结果时原样带回。
这意味着我们不需要去解析 URL 来判断这个 JSON 数据属于哪个商品或 ID,直接读 meta 即可,逻辑非常清晰。
在 JS 代码中,我们对每个 map 内部的 Promise 都做了 try...catch。
如果不这样做,Promise.all 只要遇到一个网络错误(比如其中一个 API 超时),整个批次的所有数据都会丢失(Fail Fast 机制)。加上 try...catch 后,我们可以实现“部分成功,部分重试”。