Skip to content
Learn Agentic AI
Learn Agentic AI13 min read1 views

AI Agent for Wait Time Management: Real-Time Updates and Queue Position Notifications

Build an AI agent that tracks patient queue positions in real time, estimates accurate wait times using historical data, sends proactive notifications, and offers rebooking options when delays occur.

Why Wait Time Transparency Matters

Patient satisfaction scores drop significantly when perceived wait times exceed expectations. The key word is "perceived" — patients who receive proactive updates about delays report higher satisfaction than those who wait the same amount of time without any communication. A wait time management agent provides real-time visibility into the queue, accurate time estimates, and actionable options when delays occur.

Queue Tracking System

The queue system tracks each patient's position from check-in through being called back. It monitors the actual flow of patients through each stage of their visit.

flowchart TD
    START["AI Agent for Wait Time Management: Real-Time Upda…"] --> A
    A["Why Wait Time Transparency Matters"]
    A --> B
    B["Queue Tracking System"]
    B --> C
    C["Wait Time Estimation"]
    C --> D
    D["Proactive Notification System"]
    D --> E
    E["Rebooking Options for Excessive Delays"]
    E --> F
    F["FAQ"]
    F --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
from enum import Enum
import uuid


class PatientStage(Enum):
    CHECKED_IN = "checked_in"
    IN_WAITING_ROOM = "in_waiting_room"
    IN_OPERATORY = "in_operatory"
    WITH_PROVIDER = "with_provider"
    CHECKOUT = "checkout"
    DEPARTED = "departed"


@dataclass
class QueueEntry:
    id: str = field(
        default_factory=lambda: str(uuid.uuid4())
    )
    patient_id: str = ""
    patient_name: str = ""
    appointment_id: str = ""
    appointment_time: Optional[datetime] = None
    check_in_time: Optional[datetime] = None
    called_back_time: Optional[datetime] = None
    provider_id: str = ""
    appointment_type: str = ""
    estimated_duration_minutes: int = 30
    stage: PatientStage = PatientStage.CHECKED_IN
    position: int = 0
    estimated_wait_minutes: int = 0


class QueueManager:
    def __init__(self, db):
        self.db = db

    async def check_in_patient(
        self, appointment_id: str,
    ) -> QueueEntry:
        appt = await self.db.fetchrow("""
            SELECT a.id, a.patient_id,
                   p.first_name || ' ' || p.last_name AS name,
                   a.start_time, a.provider_id, a.type,
                   a.duration_minutes
            FROM appointments a
            JOIN patients p ON p.id = a.patient_id
            WHERE a.id = $1
        """, appointment_id)

        now = datetime.utcnow()
        position = await self._calculate_position(
            appt["provider_id"], now
        )

        entry = QueueEntry(
            patient_id=appt["patient_id"],
            patient_name=appt["name"],
            appointment_id=appointment_id,
            appointment_time=appt["start_time"],
            check_in_time=now,
            provider_id=appt["provider_id"],
            appointment_type=appt["type"],
            estimated_duration_minutes=appt["duration_minutes"],
            stage=PatientStage.CHECKED_IN,
            position=position,
        )

        entry.estimated_wait_minutes = (
            await self._estimate_wait(entry)
        )

        await self.db.execute("""
            INSERT INTO queue_entries
                (id, patient_id, appointment_id,
                 check_in_time, provider_id,
                 stage, position, estimated_wait)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        """, entry.id, entry.patient_id, appointment_id,
             now, entry.provider_id, entry.stage.value,
             position, entry.estimated_wait_minutes)

        return entry

    async def _calculate_position(
        self, provider_id: str, now: datetime,
    ) -> int:
        count = await self.db.fetchrow("""
            SELECT COUNT(*) AS ahead
            FROM queue_entries
            WHERE provider_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND check_in_time < $2
              AND DATE(check_in_time) = DATE($2)
        """, provider_id, now)
        return (count["ahead"] or 0) + 1

    async def update_stage(
        self, queue_id: str, new_stage: PatientStage,
    ):
        now = datetime.utcnow()
        updates = {"stage": new_stage.value}
        if new_stage == PatientStage.IN_OPERATORY:
            updates["called_back_time"] = now

        set_clause = ", ".join(
            f"{k} = ${i+2}" for i, k in enumerate(updates)
        )
        values = [queue_id] + list(updates.values())
        await self.db.execute(
            f"UPDATE queue_entries SET {set_clause} "
            f"WHERE id = $1",
            *values,
        )

        if new_stage in (
            PatientStage.IN_OPERATORY,
            PatientStage.DEPARTED,
        ):
            await self._recalculate_positions(
                queue_id
            )

    async def _recalculate_positions(self, queue_id):
        entry = await self.db.fetchrow(
            "SELECT provider_id FROM queue_entries "
            "WHERE id = $1", queue_id,
        )
        waiting = await self.db.fetch("""
            SELECT id FROM queue_entries
            WHERE provider_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND DATE(check_in_time) = CURRENT_DATE
            ORDER BY check_in_time
        """, entry["provider_id"])

        for i, row in enumerate(waiting):
            await self.db.execute(
                "UPDATE queue_entries SET position = $2 "
                "WHERE id = $1",
                row["id"], i + 1,
            )

Wait Time Estimation

Accurate estimates require more than simple averages. The estimator uses historical data specific to the provider, day of week, and procedure type.

class WaitTimeEstimator:
    def __init__(self, db):
        self.db = db

    async def _estimate_wait(
        self, entry: QueueEntry,
    ) -> int:
        historical = await self.db.fetchrow("""
            SELECT
                AVG(
                    EXTRACT(EPOCH FROM (
                        called_back_time - check_in_time
                    )) / 60
                ) AS avg_wait,
                PERCENTILE_CONT(0.75) WITHIN GROUP (
                    ORDER BY EXTRACT(EPOCH FROM (
                        called_back_time - check_in_time
                    )) / 60
                ) AS p75_wait
            FROM queue_entries
            WHERE provider_id = $1
              AND EXTRACT(DOW FROM check_in_time) = $2
              AND called_back_time IS NOT NULL
              AND check_in_time > CURRENT_DATE
                  - INTERVAL '90 days'
        """, entry.provider_id,
             datetime.utcnow().weekday())

        if not historical or not historical["avg_wait"]:
            return entry.position * 15  # fallback

        base_wait = float(historical["avg_wait"])

        current_behind = await self.db.fetchrow("""
            SELECT
                SUM(
                    CASE WHEN stage = 'with_provider'
                    THEN EXTRACT(EPOCH FROM (
                        CURRENT_TIMESTAMP - called_back_time
                    )) / 60
                    ELSE 0 END
                ) AS current_overrun
            FROM queue_entries
            WHERE provider_id = $1
              AND stage = 'with_provider'
        """, entry.provider_id)

        overrun = float(
            current_behind["current_overrun"] or 0
        )
        schedule_drift = max(0, overrun - 10)

        estimated = (
            base_wait * entry.position + schedule_drift
        )
        return max(1, round(estimated))

    async def get_current_wait(
        self, patient_id: str,
    ) -> Optional[dict]:
        entry = await self.db.fetchrow("""
            SELECT * FROM queue_entries
            WHERE patient_id = $1
              AND stage IN ('checked_in', 'in_waiting_room')
              AND DATE(check_in_time) = CURRENT_DATE
        """, patient_id)

        if not entry:
            return None

        elapsed = (
            datetime.utcnow() - entry["check_in_time"]
        ).total_seconds() / 60

        return {
            "position": entry["position"],
            "estimated_wait": entry["estimated_wait"],
            "elapsed_minutes": round(elapsed),
            "remaining_minutes": max(
                0,
                entry["estimated_wait"] - round(elapsed),
            ),
            "stage": entry["stage"],
        }

Proactive Notification System

The agent sends notifications at key moments: when the patient checks in, when their estimated wait changes significantly, and when they are about to be called back.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class WaitTimeNotifier:
    def __init__(self, db, sms_client, push_service):
        self.db = db
        self.sms = sms_client
        self.push = push_service

    async def send_check_in_confirmation(
        self, entry: QueueEntry,
    ):
        message = (
            f"Hi {entry.patient_name.split()[0]}, "
            f"you are checked in. Your estimated wait is "
            f"about {entry.estimated_wait_minutes} minutes. "
            f"You are #{entry.position} in line. "
            f"We will text you when the provider is ready."
        )
        patient = await self.db.fetchrow(
            "SELECT phone FROM patients WHERE id = $1",
            entry.patient_id,
        )
        await self.sms.send(patient["phone"], message)

    async def check_for_delay_updates(self):
        waiting = await self.db.fetch("""
            SELECT qe.*, p.phone, p.first_name
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            WHERE qe.stage IN ('checked_in', 'in_waiting_room')
              AND DATE(qe.check_in_time) = CURRENT_DATE
        """)

        estimator = WaitTimeEstimator(self.db)

        for entry_row in waiting:
            queue_entry = QueueEntry(
                id=entry_row["id"],
                patient_id=entry_row["patient_id"],
                provider_id=entry_row["provider_id"],
                position=entry_row["position"],
            )
            new_estimate = await estimator._estimate_wait(
                queue_entry
            )
            old_estimate = entry_row["estimated_wait"]

            if abs(new_estimate - old_estimate) >= 10:
                await self.db.execute(
                    "UPDATE queue_entries "
                    "SET estimated_wait = $2 WHERE id = $1",
                    entry_row["id"], new_estimate,
                )

                if new_estimate > old_estimate:
                    await self.sms.send(
                        entry_row["phone"],
                        f"Hi {entry_row['first_name']}, "
                        f"we are running a bit behind. "
                        f"Your updated wait is about "
                        f"{new_estimate} minutes. "
                        f"Thank you for your patience."
                    )

    async def send_ready_notification(
        self, queue_id: str,
    ):
        entry = await self.db.fetchrow("""
            SELECT qe.patient_id, p.phone, p.first_name
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            WHERE qe.id = $1
        """, queue_id)

        await self.sms.send(
            entry["phone"],
            f"Hi {entry['first_name']}, we are ready "
            f"for you! Please come to the front desk.",
        )

Rebooking Options for Excessive Delays

When the estimated wait exceeds a threshold, the agent proactively offers the patient an option to reschedule rather than continuing to wait.

class RebookingManager:
    DELAY_THRESHOLD_MINUTES = 30

    def __init__(self, db, schedule_manager, sms_client):
        self.db = db
        self.scheduler = schedule_manager
        self.sms = sms_client

    async def offer_rebooking(self, queue_id: str):
        entry = await self.db.fetchrow("""
            SELECT qe.*, p.phone, p.first_name,
                   a.type, a.provider_id
            FROM queue_entries qe
            JOIN patients p ON p.id = qe.patient_id
            JOIN appointments a ON a.id = qe.appointment_id
            WHERE qe.id = $1
        """, queue_id)

        if entry["estimated_wait"] < self.DELAY_THRESHOLD_MINUTES:
            return

        from datetime import date as date_type
        next_slots = await self.scheduler.find_available_slots(
            appointment_type=entry["type"],
            preferred_date=date_type.today() + timedelta(days=1),
            provider_id=entry["provider_id"],
            search_days=5,
        )

        if next_slots:
            next_option = next_slots[0]
            await self.sms.send(
                entry["phone"],
                f"Hi {entry['first_name']}, we apologize "
                f"for the extended wait. If you would "
                f"prefer, we have an opening on "
                f"{next_option.start:%A at %I:%M %p}. "
                f"Reply REBOOK to reschedule or WAIT to "
                f"stay. Your current position is unchanged "
                f"either way."
            )

            await self.db.execute("""
                INSERT INTO rebooking_offers
                    (queue_id, offered_slot, offered_at)
                VALUES ($1, $2, $3)
            """, queue_id, next_option.start,
                 datetime.utcnow())

    async def process_rebooking_response(
        self, patient_id: str, response: str,
    ):
        if response.strip().upper() != "REBOOK":
            return {"action": "staying"}

        offer = await self.db.fetchrow("""
            SELECT rb.*, qe.appointment_id
            FROM rebooking_offers rb
            JOIN queue_entries qe ON qe.id = rb.queue_id
            WHERE qe.patient_id = $1
            ORDER BY rb.offered_at DESC LIMIT 1
        """, patient_id)

        if not offer:
            return {"action": "no_offer_found"}

        await self.db.execute(
            "UPDATE appointments SET status = 'rescheduled' "
            "WHERE id = $1", offer["appointment_id"],
        )
        await self.db.execute(
            "UPDATE queue_entries SET stage = 'departed' "
            "WHERE id = $1", offer["queue_id"],
        )

        return {
            "action": "rebooked",
            "new_time": offer["offered_slot"],
        }

FAQ

How does the agent estimate wait times accurately when procedures run longer than expected?

The estimator uses three data sources: historical averages for the specific provider and day of week, the real-time status of the patient currently with the provider (tracking overrun), and the scheduled durations of all patients ahead in the queue. When the current patient's procedure runs over its expected duration, the system detects the drift and adjusts all downstream estimates in real time. The P75 historical metric is used instead of the average to provide more conservative estimates that patients exceed less often.

What if patients leave the waiting room without telling the front desk?

The system integrates with check-in kiosks and can optionally use Bluetooth beacons or Wi-Fi presence detection to estimate whether a patient is still in the waiting area. If the system detects that a patient may have left, it sends a confirmation message asking if they are still waiting. After 15 minutes with no response and no detected presence, the queue entry is marked as "no show" and downstream patients' positions are updated automatically.

Can the wait time system work across multiple providers and operatories simultaneously?

Yes. The queue tracks each provider independently, so a delay with one provider does not affect the wait estimates for another. The system also accounts for shared resources like operatories and hygienists. When multiple providers share operatories, the estimator factors in room availability as a constraint on top of provider availability, providing a more accurate picture of actual wait times.


#WaitTime #QueueManagement #PatientExperience #HealthcareAI #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Healthcare

HCAHPS and Patient Experience Surveys via AI Voice Agents: Higher Response Rates, Faster Insight

Deploy AI voice agents to run HCAHPS-compliant post-visit surveys, boost response rates from 27% to 51%, and feed structured sentiment into your patient experience dashboard.

Healthcare

Reducing ER Boarding with AI Voice Triage: Nurse Line Automation That Diverts Non-Emergent Calls

How AI nurse triage agents route non-emergent callers away from the ER toward urgent care, telehealth, and self-care — measurably reducing door-to-provider time.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

AI Agents for Healthcare: Appointment Scheduling, Insurance Verification, and Patient Triage

How healthcare AI agents handle real workflows: appointment booking with provider matching, insurance eligibility checks, symptom triage, HIPAA compliance, and EHR integration patterns.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.