网站首页 > ⽹⻚⾃动化 > 数据采集 > Playwright 进阶模式:利用 Promise.all 实现浏览器内高并发

Playwright 进阶模式:利用 Promise.all 实现浏览器内高并发

  • 作者:互联网
  • 时间:2026-02-11 15:55:01

Playwright 进阶模式:利用 Promise.all 实现浏览器内高并发

在复杂的爬虫开发中,我们常遇到两难困境:

  1. Python 循环 (for loop):稳定,能自动继承页面的 Cookie 和 Session,但速度慢(串行阻塞)。
  2. Python 多线程 (requests/aiohttp):速度快,但无法继承浏览器上下文,遇到 CSRF Token、签名验证或复杂 Header 时极易失效(报 400/403)。

解决方案: 将并发的控制权移交给浏览器。通过注入 JavaScript 的 Promise.all,利用浏览器的非阻塞 I/O 能力并行发出请求。

核心代码模式

1. 通用调度器 (The Scheduler)

这是一个完全独立的工具函数,可以直接放入你的 utils.py 中。它不包含任何具体业务逻辑,只负责“在浏览器里跑并发任务”。

def execute_browser_batch(page, task_list):
    """
    通用浏览器并发请求执行器
    :param page: Playwright 的 Page 对象
    :param task_list: 任务列表,必须包含 'url' 字段,可包含其他元数据(id, type等)
    :return: 包含响应结果和原始元数据的列表
    """
    
    # 定义核心 JS 逻辑:接收任务 -> 并发 Fetch -> 返回结果
    # 关键点:在 map 内部捕获异常,确保单个失败不影响整体
    js_payload = """
    async (tasks) => {
        const promises = tasks.map(async (task) => {
            const start = Date.now();
            try {
                // 发起 Fetch 请求
                // 浏览器会自动附带当前域名的 Cookie、Referer 和 LocalStorage Token
                const response = await fetch(task.url, {
                    method: 'GET',
                    headers: { 'Accept': 'application/json' }
                });

                // 统一处理 HTTP 错误状态
                if (!response.ok) {
                    return {
                        status: 'failed',
                        meta: task, // 将原始任务信息带回,方便 Python 端匹配
                        error: `HTTP ${response.status}: ${response.statusText}`
                    };
                }

                const data = await response.json();
                return {
                    status: 'success',
                    meta: task,
                    data: data,
                    latency: Date.now() - start
                };

            } catch (err) {
                // 捕获网络层面的错误(如 DNS 解析失败、超时)
                return {
                    status: 'error',
                    meta: task,
                    error: err.toString()
                };
            }
        });

        // 等待所有请求完成 (Parallel Execution)
        return await Promise.all(promises);
    }
    """

    try:
        # 将 Python 列表传递给浏览器执行
        return page.evaluate(js_payload, task_list)
    except Exception as e:
        print(f"[System Error] JS 注入执行失败: {e}")
        return []

2. 业务实现示例:加密货币价格监控

假设我们需要从某财经网站采集 50 个加密货币的实时详情。

import time
import random
from playwright.sync_api import sync_playwright

class CryptoMarketScraper:
    def __init__(self):
        # 模拟待采集的目标 (ID 和 API 地址)
        self.targets = [
            {"id": "BTC", "symbol": "bitcoin", "api": "https://api.coincap.io/v2/assets/bitcoin"},
            {"id": "ETH", "symbol": "ethereum", "api": "https://api.coincap.io/v2/assets/ethereum"},
            {"id": "DOGE", "symbol": "dogecoin", "api": "https://api.coincap.io/v2/assets/dogecoin"},
            # ... 假设这里还有 50 个 ...
        ]
        
    def run(self):
        with sync_playwright() as p:
            # 启动浏览器
            browser = p.chromium.launch(headless=False)
            context = browser.new_context()
            page = context.new_page()

            # 1. 预加载:先访问主域,获取必要的 Cookie/Session/Token
            print("正在初始化会话...")
            page.goto("https://coincap.io") 
            page.wait_for_timeout(2000) # 等待各种 Token 初始化完成

            # 2. 开始分批采集
            self.process_in_batches(page)

            browser.close()

    def process_in_batches(self, page):
        BATCH_SIZE = 5 # 并发控制:每批 5 个
        total = len(self.targets)
        
        print(f"开始任务,共 {total} 个目标...")

        for i in range(0, total, BATCH_SIZE):
            # A. 准备批次数据
            batch_data = self.targets[i : i + BATCH_SIZE]
            
            # B. 构造任务对象 (Task Construction)
            # 我们将 url 和 meta data 打包在一起
            tasks = []
            for item in batch_data:
                tasks.append({
                    "url": item['api'],     # JS 需要的 URL
                    "asset_id": item['id'], # 传递给 JS,方便它原样返回
                    "name": item['symbol']
                })

            print(f" 正在并发请求第 {i+1} - {min(i+BATCH_SIZE, total)} 项...")

            # C. 调用通用调度器
            results = execute_browser_batch(page, tasks)

            # D. 处理结果
            for res in results:
                # 提取元数据
                meta = res.get('meta', {})
                name = meta.get('name', 'Unknown')

                if res['status'] == 'success':
                    price = res['data']['data']['priceUsd']
                    latency = res['latency']
                    print(f"    [{name}] 价格: ${float(price):.2f} (耗时 {latency}ms)")
                    # TODO: 在这里写入数据库
                else:
                    print(f"    [{name}] 采集失败: {res.get('error')}")

            # E. 批次间随机休眠 (模拟人类浏览行为)
            time.sleep(random.uniform(1.0, 2.5))

if __name__ == "__main__":
    spider = CryptoMarketScraper()
    spider.run()

为什么要用这种模式?(技术总结)

1. 解决了 "上下文断裂" 问题

使用 Python requests 库时,最大的痛点是复刻浏览器环境。你需要手动提取 Cookie,手动计算 Authorization 头,甚至要破解 _tb_token_x-sign。 而在本模式中,fetch 是在页面上下文中执行的,浏览器自动为你附加了所有认证信息。只要页面是登录状态,接口请求就是登录状态。

2. 性能提升显著

  • 串行模式:请求 A (1s) -> 请求 B (1s) -> 请求 C (1s) = 总耗时 3s。
  • Promise.all 模式:请求 A, B, C 同时发出 -> 等待最慢的一个返回 (1s) = 总耗时 1s。
  • 效率对比:通常能获得 5~10 倍 的速度提升。

3. 元数据透传 (Metadata Passthrough)

注意代码中的 meta 字段。我们在 Python 端把 {"id": "BTC"} 传给 JS,JS 在返回结果时原样带回。 这意味着我们不需要去解析 URL 来判断这个 JSON 数据属于哪个商品或 ID,直接读 meta 即可,逻辑非常清晰。

4. 鲁棒性设计

在 JS 代码中,我们对每个 map 内部的 Promise 都做了 try...catch。 如果不这样做,Promise.all 只要遇到一个网络错误(比如其中一个 API 超时),整个批次的所有数据都会丢失(Fail Fast 机制)。加上 try...catch 后,我们可以实现“部分成功,部分重试”。