Skip to content
Playwright with Async Python: Concurrent Browser Automation for AI Agents
Learn Agentic AI13 min read89 views

Playwright with Async Python: Concurrent Browser Automation for AI Agents

Learn how to use Playwright's async API with Python asyncio to run concurrent browser sessions, parallelize page interactions, and build high-throughput AI agent automation pipelines.

Why Async Matters for Browser Automation

Browser automation is inherently I/O-bound — most of the time is spent waiting for pages to load, elements to appear, and network requests to complete. Synchronous Playwright wastes this idle time by blocking the Python thread. Async Playwright, using Python's asyncio, lets your AI agent do useful work while waiting: processing data from a previous page, launching another browser tab, or calling an LLM API.

For agents that need to scrape multiple sites, interact with multiple accounts, or run parallel browser sessions, async Playwright can deliver 5-10x throughput improvements over synchronous code.

Async Playwright Basics

The async API mirrors the sync API exactly, but every method that performs I/O becomes a coroutine:

flowchart LR
    GOAL(["High level goal"])
    PLAN["Planner LLM"]
    SCREEN["Screen capture<br/>every step"]
    VLM["Vision LLM<br/>reads UI state"]
    ACT{"Action type"}
    CLICK["Click coordinate"]
    TYPE["Type text"]
    KEY["Keyboard shortcut"]
    GUARD["Safety filter<br/>allow lists"]
    OS[("OS sandbox<br/>ephemeral VM")]
    DONE(["Goal verified"])
    GOAL --> PLAN --> SCREEN --> VLM --> ACT
    ACT --> CLICK --> GUARD
    ACT --> TYPE --> GUARD
    ACT --> KEY --> GUARD
    GUARD --> OS --> SCREEN
    OS --> DONE
    style PLAN fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style DONE fill:#059669,stroke:#047857,color:#fff
import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        await page.goto("https://example.com")

        title = await page.title()
        print(f"Title: {title}")

        content = await page.locator("h1").text_content()
        print(f"Heading: {content}")

        await browser.close()

asyncio.run(main())

Notice the pattern: sync_playwright() becomes async_playwright(), and every Playwright method gets an await prefix. The import changes from playwright.sync_api to playwright.async_api.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Running Multiple Pages Concurrently

The real power of async Playwright is running multiple pages at the same time:

import asyncio
from playwright.async_api import async_playwright

async def scrape_page(browser, url: str) -> dict:
    """Scrape a single page in its own context."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle", timeout=15000)
        return {
            "url": url,
            "title": await page.title(),
            "heading": await page.locator("h1").text_content()
            if await page.locator("h1").count() > 0 else None,
        }
    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
        "https://jsonplaceholder.typicode.com",
        "https://reqres.in",
        "https://dummyjson.com",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        # Scrape all pages concurrently
        tasks = [scrape_page(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            if "error" in result:
                print(f"FAILED: {result['url']} - {result['error']}")
            else:
                print(f"OK: {result['title']} ({result['url']})")

        await browser.close()

asyncio.run(main())

This scrapes all five pages simultaneously rather than sequentially. On a fast connection, this completes in roughly the time of the slowest single page load, not the sum of all five.

Controlling Concurrency with Semaphores

Unlimited concurrency can overwhelm the browser or trigger rate limiting. Use an asyncio.Semaphore to cap parallel sessions:

import asyncio
from playwright.async_api import async_playwright

async def scrape_with_limit(browser, url: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle")
            title = await page.title()
            return {"url": url, "title": title}
        except Exception as e:
            return {"url": url, "error": str(e)}
        finally:
            await context.close()

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(20)]

    # Allow at most 5 concurrent browser contexts
    semaphore = asyncio.Semaphore(5)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        tasks = [scrape_with_limit(browser, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

        success = sum(1 for r in results if "error" not in r)
        print(f"Completed: {success}/{len(urls)} pages")

        await browser.close()

asyncio.run(main())

The semaphore ensures that no more than 5 contexts are active at any time, preventing memory exhaustion while still maintaining significant parallelism.

Async Event Handling

Handle network events and page events asynchronously:

import asyncio
from playwright.async_api import async_playwright

async def main():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()

        api_responses = []

        async def on_response(response):
            if "/api/" in response.url and response.status == 200:
                try:
                    data = await response.json()
                    api_responses.append({
                        "url": response.url,
                        "data": data,
                    })
                except Exception:
                    pass

        page.on("response", on_response)
        await page.goto("https://example.com")
        await page.wait_for_load_state("networkidle")

        print(f"Captured {len(api_responses)} API responses")
        await browser.close()

asyncio.run(main())

Combining Playwright with Other Async Operations

The real power of async comes from combining browser automation with other I/O operations — API calls, database queries, and LLM requests:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import asyncio
from openai import AsyncOpenAI
from playwright.async_api import async_playwright

client = AsyncOpenAI()

async def scrape_and_analyze(browser, url: str) -> dict:
    """Scrape a page and analyze its content with an LLM."""
    context = await browser.new_context()
    page = await context.new_page()

    try:
        await page.goto(url, wait_until="networkidle")
        title = await page.title()
        body_text = await page.locator("body").text_content()

        # Truncate to avoid token limits
        body_text = body_text[:3000] if body_text else ""

        # Analyze with LLM while we have the page data
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "Summarize the following web page content "
                               "in 2-3 sentences.",
                },
                {"role": "user", "content": f"Title: {title}\n{body_text}"},
            ],
            max_tokens=200,
        )

        summary = response.choices[0].message.content
        return {"url": url, "title": title, "summary": summary}

    except Exception as e:
        return {"url": url, "error": str(e)}
    finally:
        await context.close()

async def main():
    urls = [
        "https://example.com",
        "https://httpbin.org",
    ]

    async with async_playwright() as p:
        browser = await p.chromium.launch()
        tasks = [scrape_and_analyze(browser, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for r in results:
            if "summary" in r:
                print(f"\n{r['title']}:")
                print(f"  {r['summary']}")

        await browser.close()

asyncio.run(main())

Async Producer-Consumer Pattern

For high-throughput scraping, use a queue-based producer-consumer pattern:

import asyncio
from playwright.async_api import async_playwright

async def worker(name: str, browser, queue: asyncio.Queue, results: list):
    """Worker that processes URLs from a shared queue."""
    while True:
        url = await queue.get()
        if url is None:
            queue.task_done()
            break

        context = await browser.new_context()
        page = await context.new_page()
        try:
            await page.goto(url, wait_until="networkidle", timeout=10000)
            results.append({
                "url": url,
                "title": await page.title(),
                "worker": name,
            })
            print(f"[{name}] Scraped: {url}")
        except Exception as e:
            print(f"[{name}] Failed: {url} ({e})")
        finally:
            await context.close()
            queue.task_done()

async def main():
    urls = [f"https://example.com/item/{i}" for i in range(15)]
    num_workers = 3

    queue = asyncio.Queue()
    results = []

    for url in urls:
        await queue.put(url)

    # Add poison pills to stop workers
    for _ in range(num_workers):
        await queue.put(None)

    async with async_playwright() as p:
        browser = await p.chromium.launch()

        workers = [
            asyncio.create_task(
                worker(f"W{i}", browser, queue, results)
            )
            for i in range(num_workers)
        ]

        await asyncio.gather(*workers)
        print(f"\nTotal scraped: {len(results)}")

        await browser.close()

asyncio.run(main())

FAQ

When should I use async vs sync Playwright?

Use sync Playwright for simple scripts, debugging, and prototyping — it is easier to read and write. Switch to async when you need concurrent page operations, integration with other async libraries (FastAPI, aiohttp, OpenAI async client), or high-throughput automation with many pages. If your AI agent framework is already async (most modern ones are), use async Playwright to avoid blocking the event loop.

Does asyncio.gather run tasks in separate threads?

No. asyncio.gather runs coroutines concurrently within a single thread using cooperative multitasking. When one coroutine hits an await (waiting for a page to load, for example), the event loop switches to another coroutine that is ready to run. This works well for I/O-bound tasks like browser automation. For CPU-bound work, you would need asyncio.to_thread() or ProcessPoolExecutor.

How many concurrent browser pages can async Playwright handle?

The practical limit depends on RAM and the complexity of the pages being loaded. Each page/context uses roughly 20-50 MB. On a 16 GB machine, you can comfortably run 50-100 concurrent lightweight pages. Use a semaphore to cap concurrency at a level your machine can handle, and monitor memory usage during development to find the right number.


#AsyncPython #Playwright #Asyncio #ConcurrentAutomation #AIAgents #ParallelScraping #EventLoop

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.