#!/usr/bin/env python3
"""Exult RingCentral Admin MCP Server.

Wraps the RingCentral Platform API with the Remote Admin JWT stored in
/Users/agent/pi-mono/.config/exult/ringcentral.json. Exposes read-only admin
and mid-write operations that Keragon's RC MCP does NOT cover:

  - get_account_info
  - get_auth_context            (JWT app scopes and owner, no secrets)
  - list_extensions              (all extensions on the account)
  - get_extension                (single extension detail)
  - list_call_queues
  - get_call_queue_members
  - list_extension_devices       (find Other Phone / Existing Phone devices)
  - read_device_sip_info         (SIP credentials for supported devices)
  - pull_call_log                (detailed, paginated, with recording URLs)
  - get_voicemails               (list message store + transcripts when present)
  - get_voicemail_transcript     (AudioTranscription attachment)
  - get_ivr_menus
  - list_phone_numbers
  - get_service_status
  - create_ai_scheduling_extension (write-gated; exact approval text required)

Read-only by default. Write tools (add_extension, disable_extension,
update_ivr, update_call_queue) are DISABLED unless the env var
EXULT_RC_ALLOW_WRITES=1 is set at launch time — this is the Gautam-approval gate
from the amd/rc skill playbooks.

All PHI-bearing responses are returned as-is to the MCP client; downstream
consumers are responsible for not persisting them outside approved vaults.
"""
from __future__ import annotations

import asyncio
import json
import logging
import os
import re
import sys
import time
from pathlib import Path
from typing import Any, Optional

import httpx
from mcp.server import Server
from mcp.types import Tool, TextContent

# Streamable HTTP transport (Phase C of MCP consolidation). Replaces the
# stdio launcher with a bearer-gated HTTP listener served from the VM and
# fronted by a Tailscale funnel at /ringcentral-admin/mcp. The shared
# library lives at tools/mcp-shared/python and is installed into this
# server's venv (see entry-point note in README/pyproject).
from mcp_shared import serve_mcp_over_http  # type: ignore[import-not-found]

# Reserved port table parallel to tools/mcp-shared/ports.ts.
# Kept in sync manually; ringcentral-admin owns 18814.
MCP_PORT_RINGCENTRAL_ADMIN = 18814

CONFIG_PATH = Path(
    os.environ.get(
        "EXULT_RC_CONFIG",
        "/Users/agent/pi-mono/.config/exult/ringcentral.json",
    )
)
ALLOW_WRITES = os.environ.get("EXULT_RC_ALLOW_WRITES") == "1"
# Separate operator-set gate for revealing raw secrets (e.g. SIP credentials).
# Set at launch time only — a caller cannot self-grant it, unlike an
# approval_text the tool itself returns.
ALLOW_SECRET_READS = os.environ.get("EXULT_RC_ALLOW_SECRET_READS") == "1"
# Non-secret OAuth token-response keys we retain in the token cache (never the
# refresh_token or other secrets from the full response body).
_TOKEN_CACHE_KEYS = (
    "access_token",
    "token_type",
    "expires_in",
    "scope",
    "owner_id",
    "endpoint_id",
    "expireTime",
)
AI_EXTENSION_NUMBER = "9001"
AI_EXTENSION_NAME = "AI Scheduling Backup"
AI_EXTENSION_EMAIL = "agent@exulthealthcare.com"
AI_DEVICE_NAME = "AI Scheduling Backup SIP"

_token_cache: dict[str, Any] = {}
_creds_cache: Optional[dict] = None

_SAFE_ID = re.compile(r"^[a-zA-Z0-9~_-]+$")


def _validate_id(value: Any, name: str) -> str:
    s = str(value)
    if not _SAFE_ID.match(s):
        raise ValueError(f"Invalid {name}: must be alphanumeric")
    return s


def _load_creds() -> dict:
    global _creds_cache
    if _creds_cache is not None:
        return _creds_cache
    if not CONFIG_PATH.exists():
        raise FileNotFoundError(
            f"RingCentral config not found at {CONFIG_PATH}. Set EXULT_RC_CONFIG or create the file."
        )
    with open(CONFIG_PATH) as f:
        _creds_cache = json.load(f)
    return _creds_cache


async def _get_token() -> str:
    now = time.time()
    if _token_cache.get("expires_at", 0) > now + 60:
        return _token_cache["access_token"]
    creds = _load_creds()
    async with httpx.AsyncClient(timeout=30.0) as client:
        resp = await client.post(
            f"{creds['server']}/restapi/oauth/token",
            auth=(creds["client_id"], creds["client_secret"]),
            data={
                "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
                "assertion": creds["jwt"],
            },
        )
        resp.raise_for_status()
        body = resp.json()
    # Store only the keys we actually use. Avoid merging the full OAuth body
    # (which carries refresh_token and other secrets) so any future code that
    # logs or serializes _token_cache cannot leak credentials.
    for key in _TOKEN_CACHE_KEYS:
        if key in body:
            _token_cache[key] = body[key]
    _token_cache["expires_at"] = now + body.get("expires_in", 3600)
    return body["access_token"]


async def _get_auth_context() -> dict[str, Any]:
    await _get_token()
    safe_keys = [
        "token_type",
        "expires_in",
        "expireTime",
        "scope",
        "owner_id",
        "endpoint_id",
    ]
    context = {key: _token_cache.get(key) for key in safe_keys if key in _token_cache}
    scope_value = context.get("scope")
    if isinstance(scope_value, str):
        context["scopes"] = sorted(scope_value.split())
        context["has_edit_accounts"] = "EditAccounts" in context["scopes"]
        context["has_edit_extensions"] = "EditExtensions" in context["scopes"]
    return context


async def _api_get(path: str, params: Optional[dict] = None) -> Any:
    creds = _load_creds()
    token = await _get_token()
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.get(
            f"{creds['server']}{path}",
            params=params,
            headers={"Authorization": f"Bearer {token}"},
        )
        if resp.status_code >= 400:
            return {
                "error": True,
                "status": resp.status_code,
                "body": resp.text[:2000],
            }
        try:
            return resp.json()
        except json.JSONDecodeError:
            return {
                "status": resp.status_code,
                "content_type": resp.headers.get("content-type", ""),
                "body": resp.text[:2000],
            }


async def _api_get_binary(path: str) -> bytes:
    creds = _load_creds()
    token = await _get_token()
    async with httpx.AsyncClient(timeout=120.0) as client:
        resp = await client.get(
            f"{creds['server']}{path}",
            headers={"Authorization": f"Bearer {token}"},
        )
        resp.raise_for_status()
        return resp.content


async def _api_post(path: str, body: dict[str, Any]) -> Any:
    creds = _load_creds()
    token = await _get_token()
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.post(
            f"{creds['server']}{path}",
            json=body,
            headers={"Authorization": f"Bearer {token}"},
        )
        if resp.status_code >= 400:
            return {
                "error": True,
                "status": resp.status_code,
                "body": resp.text[:2000],
            }
        try:
            return resp.json()
        except json.JSONDecodeError:
            return {
                "status": resp.status_code,
                "content_type": resp.headers.get("content-type", ""),
                "body": resp.text[:2000],
            }


async def _api_put(path: str, body: dict[str, Any]) -> Any:
    creds = _load_creds()
    token = await _get_token()
    async with httpx.AsyncClient(timeout=60.0) as client:
        resp = await client.put(
            f"{creds['server']}{path}",
            json=body,
            headers={"Authorization": f"Bearer {token}"},
        )
        if resp.status_code >= 400:
            return {
                "error": True,
                "status": resp.status_code,
                "body": resp.text[:2000],
            }
        try:
            return resp.json()
        except json.JSONDecodeError:
            return {
                "status": resp.status_code,
                "content_type": resp.headers.get("content-type", ""),
                "body": resp.text[:2000],
            }


def _queue_members_approval_text(
    qid: str, add_ids: list[str], remove_ids: list[str]
) -> str:
    """Exact per-request approval string for a queue membership change."""
    return (
        f"Approve RingCentral queue write: update_call_queue_members on queue {qid}; "
        f"add={sorted(add_ids)}; remove={sorted(remove_ids)}."
    )


def _ring_type_approval_text(qid: str, rule_id: str, ring_type: str) -> str:
    """Exact per-request approval string for a ring-type (routing) change."""
    return (
        f"Approve RingCentral routing write: set_call_queue_ring_type on queue {qid}, "
        f"rule {rule_id}; transferMode={ring_type}."
    )


def _member_order_approval_text(
    qid: str, rule_id: str, ordered_ids: list[str]
) -> str:
    """Exact per-request approval string for a fixed-order routing change."""
    return (
        f"Approve RingCentral routing write: set_call_queue_member_order on queue {qid}, "
        f"rule {rule_id}; order={ordered_ids}."
    )


def _sip_reveal_approval_text(device_id: str) -> str:
    """Exact per-request approval string for revealing raw SIP credentials."""
    return (
        f"Approve RingCentral secret read: reveal raw SIP credentials for "
        f"device {device_id}."
    )


# Keys in a sip-info response that carry secrets and must be redacted unless
# an explicit, approved reveal is requested.
_SIP_SECRET_KEYS = frozenset(
    {"password", "authorizationId", "userName", "domain", "outboundProxy"}
)


def _redact_sip_info(obj: Any) -> Any:
    """Recursively redact secret-bearing keys in a sip-info payload."""
    if isinstance(obj, dict):
        return {
            k: ("<redacted>" if k in _SIP_SECRET_KEYS else _redact_sip_info(v))
            for k, v in obj.items()
        }
    if isinstance(obj, list):
        return [_redact_sip_info(v) for v in obj]
    return obj


def _ai_extension_approval_text() -> str:
    return "\n".join(
        [
            "Approve these RingCentral writes only:",
            (
                f"1. Create a new enabled RingCentral extension named '{AI_EXTENSION_NAME}', "
                f"extension number '{AI_EXTENSION_NUMBER}', using '{AI_EXTENSION_EMAIL}' if "
                "RingCentral requires an email, with no direct phone number assignment."
            ),
            (
                f"2. Add one Existing Phone / Other Phone device under extension "
                f"'{AI_EXTENSION_NUMBER}', named '{AI_DEVICE_NAME}', for Cloud Phone SDK "
                "SIP registration."
            ),
            (
                "3. Do not change queue 55, IVR 2000, after-hours routing, greetings, "
                "forwarding, caller ID, or phone-number assignments."
            ),
        ]
    )


async def _find_ai_extension() -> Optional[dict[str, Any]]:
    # Page through all extensions so accounts with >250 extensions can't hide
    # an existing AI extension and trigger a duplicate-create attempt.
    page = 1
    while True:
        body = await _api_get(
            "/restapi/v1.0/account/~/extension",
            {"perPage": 250, "page": page},
        )
        if not isinstance(body, dict):
            return None
        records = body.get("records", []) or []
        for extension in records:
            if (
                str(extension.get("extensionNumber")) == AI_EXTENSION_NUMBER
                or extension.get("name") == AI_EXTENSION_NAME
            ):
                return extension
        navigation = body.get("navigation") or {}
        if not navigation.get("nextPage") and not records:
            return None
        paging = body.get("paging") or {}
        total_pages = paging.get("totalPages")
        if total_pages is not None and page >= int(total_pages):
            return None
        if not navigation.get("nextPage") and total_pages is None:
            # No paging metadata and no explicit next page: assume single page.
            return None
        page += 1


app = Server("exult-ringcentral-admin")


@app.list_tools()
async def list_tools() -> list[Tool]:
    tools = [
        Tool(
            name="get_account_info",
            description="Fetch the Exult RingCentral account summary (id, status, service plan, main number).",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="get_auth_context",
            description="Return the current RingCentral JWT auth context and scopes without access tokens or secrets.",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="list_extensions",
            description=(
                "List all extensions on the Exult account. Supports filtering by type "
                "(User, Department, Announcement, Voicemail, SharedLinesGroup, IvrMenu, "
                "ParkLocation, ApplicationExtension, Bot) and status (Enabled, Disabled, "
                "NotActivated, Unassigned)."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "extension_type": {"type": "string"},
                    "status": {"type": "string"},
                    "per_page": {"type": "integer", "default": 250},
                },
            },
        ),
        Tool(
            name="get_extension",
            description="Get detail on a single extension by its extension_id (the numeric id, not the dialable number).",
            inputSchema={
                "type": "object",
                "properties": {"extension_id": {"type": "string"}},
                "required": ["extension_id"],
            },
        ),
        Tool(
            name="list_call_queues",
            description="List all call queues (a.k.a. departments) configured on the Exult account.",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="get_call_queue_members",
            description="List the extensions that are members of a given call queue.",
            inputSchema={
                "type": "object",
                "properties": {"queue_id": {"type": "string"}},
                "required": ["queue_id"],
            },
        ),
        Tool(
            name="update_call_queue_members",
            description=(
                "WRITE-GATED. Update the member list of a call queue via the "
                "bulk-assign endpoint. Requires EXULT_RC_ALLOW_WRITES=1. "
                "add_extension_ids and remove_extension_ids are arrays of "
                "extension IDs (the numeric id, not the dialable number). "
                "Dry run by default — must set dry_run=false to actually mutate. "
                "The live write also requires an exact approval_text matching the "
                "specific queue/member change; run dry_run=true first to obtain it. "
                "Returns before/after member lists for review."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "queue_id": {"type": "string"},
                    "add_extension_ids": {
                        "type": "array",
                        "items": {"type": "string"},
                        "default": [],
                    },
                    "remove_extension_ids": {
                        "type": "array",
                        "items": {"type": "string"},
                        "default": [],
                    },
                    "dry_run": {"type": "boolean", "default": True},
                    "approval_text": {
                        "type": "string",
                        "description": (
                            "Exact approval string for this specific change "
                            "(returned by the dry_run preview). Required for the "
                            "live write."
                        ),
                    },
                },
                "required": ["queue_id"],
            },
        ),
        Tool(
            name="set_call_queue_ring_type",
            description=(
                "WRITE-GATED. Set the ring (transfer) mode on a call queue's "
                "answering rule. Requires EXULT_RC_ALLOW_WRITES=1. ring_type "
                "must be one of: Simultaneous, Rotating, FixedOrder. "
                "rule_id defaults to 'business-hours-rule'; pass a custom "
                "rule id (e.g. 'after-hours-rule' or a Custom rule id) to "
                "target a different schedule. Dry run by default — must set "
                "dry_run=false AND pass the exact approval_text (from the dry_run "
                "preview) to actually mutate. Returns before/after rule "
                "snapshots so the caller can confirm. Note: this only changes "
                "queue.transferMode on the targeted rule; existing "
                "fixedOrderAgents are preserved. Use "
                "set_call_queue_member_order to set the ranked order."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "queue_id": {"type": "string"},
                    "ring_type": {
                        "type": "string",
                        "enum": ["Simultaneous", "Rotating", "FixedOrder"],
                    },
                    "rule_id": {
                        "type": "string",
                        "default": "business-hours-rule",
                    },
                    "dry_run": {"type": "boolean", "default": True},
                    "approval_text": {
                        "type": "string",
                        "description": (
                            "Exact approval string for this specific routing "
                            "change (returned by the dry_run preview). Required "
                            "for the live write."
                        ),
                    },
                },
                "required": ["queue_id", "ring_type"],
            },
        ),
        Tool(
            name="set_call_queue_member_order",
            description=(
                "WRITE-GATED. Set the ranked agent order on a call queue's "
                "answering rule. Requires EXULT_RC_ALLOW_WRITES=1. "
                "ordered_extension_ids is a list of extension IDs in the "
                "desired ring order; first id rings first. rule_id defaults "
                "to 'business-hours-rule'. The list MUST cover the same "
                "extensions currently members of the queue — any extension "
                "in the list that is not a queue member, or any current "
                "member not in the list, will be reported and the call "
                "aborted (use update_call_queue_members first). This only "
                "writes queue.fixedOrderAgents on the targeted rule; ring "
                "type is unchanged (use set_call_queue_ring_type to flip "
                "to FixedOrder). Dry run by default; the live write also "
                "requires the exact approval_text from the dry_run preview."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "queue_id": {"type": "string"},
                    "ordered_extension_ids": {
                        "type": "array",
                        "items": {"type": "string"},
                    },
                    "rule_id": {
                        "type": "string",
                        "default": "business-hours-rule",
                    },
                    "dry_run": {"type": "boolean", "default": True},
                    "approval_text": {
                        "type": "string",
                        "description": (
                            "Exact approval string for this specific member-order "
                            "change (returned by the dry_run preview). Required "
                            "for the live write."
                        ),
                    },
                },
                "required": ["queue_id", "ordered_extension_ids"],
            },
        ),
        Tool(
            name="list_extension_devices",
            description=(
                "List devices under one extension. Cloud Phone SDK requires an "
                "'Other Phone' device, equivalent to Existing Phone in the admin portal."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "extension_id": {"type": "string"},
                    "per_page": {"type": "integer", "default": 100},
                },
                "required": ["extension_id"],
            },
        ),
        Tool(
            name="read_device_sip_info",
            description=(
                "Read SIP info for a RingCentral device. By default the response "
                "is REDACTED (password, authorizationId, userName, domain and "
                "outboundProxy are masked). To return raw SIP credentials, the "
                "server must be started with EXULT_RC_ALLOW_SECRET_READS=1 "
                "(operator gate) AND the call must pass reveal_secrets=true with "
                "the exact approval_text returned by the redacted call. Use only "
                "for an approved/dedicated Other Phone device."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "device_id": {"type": "string"},
                    "reveal_secrets": {
                        "type": "boolean",
                        "default": False,
                        "description": (
                            "Return raw SIP credentials instead of the redacted "
                            "view. Requires the exact approval_text."
                        ),
                    },
                    "approval_text": {
                        "type": "string",
                        "description": (
                            "Exact approval string to reveal raw secrets "
                            "(returned by the redacted call)."
                        ),
                    },
                },
                "required": ["device_id"],
            },
        ),
        Tool(
            name="pull_call_log",
            description=(
                "Pull the detailed company call log for a date range. Returns recording "
                "URLs when present. Paginates until the API stops returning a nextPage. "
                "WARNING: can be large — prefer <= 7 day windows for account-wide pulls. "
                "Timestamps are ISO 8601. Types: Voice, Fax. Directions: Inbound, Outbound."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "date_from": {"type": "string", "description": "ISO 8601 with tz, e.g. 2026-04-01T00:00:00-05:00"},
                    "date_to": {"type": "string"},
                    "extension_id": {
                        "type": "string",
                        "description": "Optional extension id to scope to a single extension; omit for account-wide.",
                    },
                    "call_type": {"type": "string", "description": "Voice or Fax; omit for all"},
                    "direction": {"type": "string", "description": "Inbound or Outbound; omit for all"},
                    "max_pages": {"type": "integer", "default": 10},
                },
                "required": ["date_from", "date_to"],
            },
        ),
        Tool(
            name="get_voicemails",
            description=(
                "List voicemail messages on an extension's message store. Returns "
                "message metadata including attachment ids for audio and transcription."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "extension_id": {"type": "string", "description": "Defaults to the authenticated extension (~)"},
                    "date_from": {"type": "string"},
                    "date_to": {"type": "string"},
                    "per_page": {"type": "integer", "default": 100},
                },
            },
        ),
        Tool(
            name="get_voicemail_transcript",
            description=(
                "Fetch the AudioTranscription attachment body for a voicemail message. "
                "Returns the text transcript, or an error if the VM has not been "
                "transcribed yet."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "message_id": {"type": "string"},
                    "attachment_id": {"type": "string"},
                    "extension_id": {"type": "string", "default": "~"},
                },
                "required": ["message_id", "attachment_id"],
            },
        ),
        Tool(
            name="get_ivr_menus",
            description="List auto-receptionist IVR menus on the account.",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="list_phone_numbers",
            description="List all phone numbers assigned to the account (main, DIDs, fax, toll-free).",
            inputSchema={
                "type": "object",
                "properties": {"per_page": {"type": "integer", "default": 250}},
            },
        ),
        Tool(
            name="get_service_status",
            description="Check the RingCentral platform service status (outage indicator).",
            inputSchema={"type": "object", "properties": {}},
        ),
        Tool(
            name="create_ai_scheduling_extension",
            description=(
                "WRITE-GATED. Create only the approved AI Scheduling Backup extension. "
                "Requires EXULT_RC_ALLOW_WRITES=1. Call with dry_run=true (default) "
                "to preview and obtain the exact approval_text, then call with "
                "dry_run=false and that approval_text to perform the write. Does not "
                "create devices, routing, forwarding, queues, IVRs, greetings, caller "
                "ID, or phone-number assignments."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "approval_text": {
                        "type": "string",
                        "description": (
                            "Exact approval string (returned by the dry_run "
                            "preview). Required for the live write."
                        ),
                    },
                    "dry_run": {"type": "boolean", "default": True},
                },
            },
        ),
    ]
    return tools


def _ok(obj: Any) -> list[TextContent]:
    return [TextContent(type="text", text=json.dumps(obj, indent=2, default=str))]


def _err(msg: str) -> list[TextContent]:
    return [TextContent(type="text", text=json.dumps({"error": True, "message": msg}))]


@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        if name == "get_account_info":
            return _ok(await _api_get("/restapi/v1.0/account/~"))

        if name == "get_auth_context":
            return _ok(await _get_auth_context())

        if name == "list_extensions":
            params: dict[str, Any] = {"perPage": arguments.get("per_page", 250)}
            if arguments.get("extension_type"):
                params["type"] = arguments["extension_type"]
            if arguments.get("status"):
                params["status"] = arguments["status"]
            return _ok(await _api_get("/restapi/v1.0/account/~/extension", params))

        if name == "get_extension":
            ext_id = _validate_id(arguments["extension_id"], "extension_id")
            return _ok(await _api_get(f"/restapi/v1.0/account/~/extension/{ext_id}"))

        if name == "list_call_queues":
            return _ok(
                await _api_get(
                    "/restapi/v1.0/account/~/extension",
                    {"type": "Department", "perPage": 250},
                )
            )

        if name == "get_call_queue_members":
            qid = _validate_id(arguments["queue_id"], "queue_id")
            return _ok(
                await _api_get(f"/restapi/v1.0/account/~/call-queues/{qid}/members")
            )

        if name == "update_call_queue_members":
            qid = _validate_id(arguments["queue_id"], "queue_id")
            add_ids = [
                _validate_id(x, "add_extension_ids item")
                for x in arguments.get("add_extension_ids", []) or []
            ]
            remove_ids = [
                _validate_id(x, "remove_extension_ids item")
                for x in arguments.get("remove_extension_ids", []) or []
            ]
            if not add_ids and not remove_ids:
                return _err("update_call_queue_members: must pass at least one of add_extension_ids or remove_extension_ids")
            dry_run = bool(arguments.get("dry_run", True))
            approval_required = _queue_members_approval_text(qid, add_ids, remove_ids)
            before = await _api_get(f"/restapi/v1.0/account/~/call-queues/{qid}/members")
            body = {"addedExtensionIds": add_ids, "removedExtensionIds": remove_ids}
            if dry_run:
                return _ok({
                    "ok": True,
                    "dry_run": True,
                    "allow_writes": ALLOW_WRITES,
                    "endpoint": f"POST /restapi/v1.0/account/~/call-queues/{qid}/bulk-assign",
                    "body": body,
                    "before_members": before,
                    "approval_text": approval_required,
                })
            if arguments.get("approval_text") != approval_required:
                return _err(
                    "approval_text did not match the exact required text for this "
                    "queue/member change. Run with dry_run=true to obtain it."
                )
            if not ALLOW_WRITES:
                return _err("EXULT_RC_ALLOW_WRITES=1 is required at server startup for this write")
            assign_resp = await _api_post(
                f"/restapi/v1.0/account/~/call-queues/{qid}/bulk-assign",
                body,
            )
            after = await _api_get(f"/restapi/v1.0/account/~/call-queues/{qid}/members")
            return _ok({
                "ok": not (isinstance(assign_resp, dict) and assign_resp.get("error")),
                "assign_response": assign_resp,
                "before_members": before,
                "after_members": after,
            })

        if name == "set_call_queue_ring_type":
            qid = _validate_id(arguments["queue_id"], "queue_id")
            ring_type = arguments["ring_type"]
            allowed = {"Simultaneous", "Rotating", "FixedOrder"}
            if ring_type not in allowed:
                return _err(f"ring_type must be one of {sorted(allowed)}")
            # rule_id may be 'business-hours-rule', 'after-hours-rule', or a numeric Custom rule id
            rule_id_raw = arguments.get("rule_id") or "business-hours-rule"
            if rule_id_raw not in {"business-hours-rule", "after-hours-rule"}:
                rule_id = _validate_id(rule_id_raw, "rule_id")
            else:
                rule_id = rule_id_raw
            dry_run = bool(arguments.get("dry_run", True))
            rule_path = f"/restapi/v1.0/account/~/extension/{qid}/answering-rule/{rule_id}"
            before = await _api_get(rule_path)
            if isinstance(before, dict) and before.get("error"):
                return _ok({"ok": False, "error": before, "endpoint": rule_path})
            body = {"queue": {"transferMode": ring_type}}
            approval_required = _ring_type_approval_text(qid, rule_id, ring_type)
            if dry_run:
                return _ok({
                    "ok": True,
                    "dry_run": True,
                    "allow_writes": ALLOW_WRITES,
                    "endpoint": f"PUT {rule_path}",
                    "body": body,
                    "before_transferMode": (before.get("queue") or {}).get("transferMode"),
                    "before_rule": before,
                    "approval_text": approval_required,
                })
            if arguments.get("approval_text") != approval_required:
                return _err(
                    "approval_text did not match the exact required text for this "
                    "ring-type change. Run with dry_run=true to obtain it."
                )
            if not ALLOW_WRITES:
                return _err("EXULT_RC_ALLOW_WRITES=1 is required at server startup for this write")
            put_resp = await _api_put(rule_path, body)
            after = await _api_get(rule_path)
            return _ok({
                "ok": not (isinstance(put_resp, dict) and put_resp.get("error")),
                "put_response": put_resp,
                "before_transferMode": (before.get("queue") or {}).get("transferMode"),
                "after_transferMode": (after.get("queue") or {}).get("transferMode") if isinstance(after, dict) else None,
                "after_rule": after,
            })

        if name == "set_call_queue_member_order":
            qid = _validate_id(arguments["queue_id"], "queue_id")
            ordered_ids = [
                _validate_id(x, "ordered_extension_ids item")
                for x in arguments.get("ordered_extension_ids", []) or []
            ]
            if not ordered_ids:
                return _err("ordered_extension_ids must be non-empty")
            if len(set(ordered_ids)) != len(ordered_ids):
                return _err("ordered_extension_ids contains duplicates")
            rule_id_raw = arguments.get("rule_id") or "business-hours-rule"
            if rule_id_raw not in {"business-hours-rule", "after-hours-rule"}:
                rule_id = _validate_id(rule_id_raw, "rule_id")
            else:
                rule_id = rule_id_raw
            dry_run = bool(arguments.get("dry_run", True))
            # Confirm the ordered list matches the queue's current membership.
            members_resp = await _api_get(f"/restapi/v1.0/account/~/call-queues/{qid}/members")
            if isinstance(members_resp, dict) and members_resp.get("error"):
                return _ok({"ok": False, "error": members_resp, "phase": "fetch_members"})
            current_member_ids = {
                str(rec.get("id"))
                for rec in (members_resp.get("records") or [])
                if rec.get("id") is not None
            }
            ordered_set = set(ordered_ids)
            missing_from_members = ordered_set - current_member_ids
            missing_from_order = current_member_ids - ordered_set
            if missing_from_members or missing_from_order:
                return _err(
                    "ordered_extension_ids must exactly cover the queue's current members. "
                    f"in_order_but_not_member={sorted(missing_from_members)}, "
                    f"member_but_not_in_order={sorted(missing_from_order)}. "
                    "Run update_call_queue_members first."
                )
            rule_path = f"/restapi/v1.0/account/~/extension/{qid}/answering-rule/{rule_id}"
            before_rule = await _api_get(rule_path)
            if isinstance(before_rule, dict) and before_rule.get("error"):
                return _ok({"ok": False, "error": before_rule, "endpoint": rule_path})
            fixed_order_agents = [
                {"extension": {"id": ext_id}, "index": idx + 1}
                for idx, ext_id in enumerate(ordered_ids)
            ]
            body = {"queue": {"fixedOrderAgents": fixed_order_agents}}
            before_agents = [
                {"id": str((a.get("extension") or {}).get("id")), "index": a.get("index")}
                for a in ((before_rule.get("queue") or {}).get("fixedOrderAgents") or [])
            ]
            approval_required = _member_order_approval_text(qid, rule_id, ordered_ids)
            if dry_run:
                return _ok({
                    "ok": True,
                    "dry_run": True,
                    "allow_writes": ALLOW_WRITES,
                    "endpoint": f"PUT {rule_path}",
                    "body": body,
                    "before_fixedOrderAgents": before_agents,
                    "approval_text": approval_required,
                })
            if arguments.get("approval_text") != approval_required:
                return _err(
                    "approval_text did not match the exact required text for this "
                    "member-order change. Run with dry_run=true to obtain it."
                )
            if not ALLOW_WRITES:
                return _err("EXULT_RC_ALLOW_WRITES=1 is required at server startup for this write")
            put_resp = await _api_put(rule_path, body)
            after_rule = await _api_get(rule_path)
            after_agents = [
                {"id": str((a.get("extension") or {}).get("id")), "index": a.get("index")}
                for a in ((after_rule.get("queue") or {}).get("fixedOrderAgents") or [])
            ] if isinstance(after_rule, dict) else None
            return _ok({
                "ok": not (isinstance(put_resp, dict) and put_resp.get("error")),
                "put_response": put_resp,
                "before_fixedOrderAgents": before_agents,
                "after_fixedOrderAgents": after_agents,
            })

        if name == "list_extension_devices":
            ext_id = _validate_id(arguments["extension_id"], "extension_id")
            return _ok(
                await _api_get(
                    f"/restapi/v1.0/account/~/extension/{ext_id}/device",
                    {"perPage": arguments.get("per_page", 100)},
                )
            )

        if name == "read_device_sip_info":
            device_id = _validate_id(arguments["device_id"], "device_id")
            sip_info = await _api_get(
                f"/restapi/v1.0/account/~/device/{device_id}/sip-info"
            )
            reveal = bool(arguments.get("reveal_secrets", False))
            approval_required = _sip_reveal_approval_text(device_id)
            # Default: return a redacted view (safe to read). Raw SIP
            # credentials (password, authorizationId, etc.) are returned only
            # with an explicit reveal_secrets=true AND the exact approval_text.
            if not reveal:
                return _ok({
                    "redacted": True,
                    "sip_info": _redact_sip_info(sip_info),
                    "note": (
                        "SIP credentials are redacted. To return raw secrets, "
                        "call again with reveal_secrets=true and the exact "
                        "approval_text below."
                    ),
                    "approval_text": approval_required,
                })
            # Operator gate the caller cannot self-grant: revealing raw secrets
            # requires EXULT_RC_ALLOW_SECRET_READS=1 set at server startup.
            if not ALLOW_SECRET_READS:
                return _err(
                    "EXULT_RC_ALLOW_SECRET_READS=1 must be set at server startup to "
                    "reveal raw SIP credentials. Default (redacted) reads do not "
                    "require it."
                )
            if arguments.get("approval_text") != approval_required:
                return _err(
                    "approval_text did not match the exact required text to reveal "
                    "SIP credentials for this device. Call without reveal_secrets "
                    "first to obtain it."
                )
            return _ok({"redacted": False, "sip_info": sip_info})

        if name == "pull_call_log":
            date_from = arguments["date_from"]
            date_to = arguments["date_to"]
            max_pages = int(arguments.get("max_pages", 10))
            ext_id = arguments.get("extension_id")
            base = "/restapi/v1.0/account/~"
            if ext_id:
                ext_id = _validate_id(ext_id, "extension_id")
                base += f"/extension/{ext_id}"
            base += "/call-log"
            records: list[Any] = []
            page = 1
            while page <= max_pages:
                params: dict[str, Any] = {
                    "view": "Detailed",
                    "dateFrom": date_from,
                    "dateTo": date_to,
                    "perPage": 250,
                    "page": page,
                }
                if arguments.get("call_type"):
                    params["type"] = arguments["call_type"]
                if arguments.get("direction"):
                    params["direction"] = arguments["direction"]
                body = await _api_get(base, params)
                if isinstance(body, dict) and body.get("error"):
                    return _ok({"partial": records, "error": body})
                page_records = body.get("records", []) if isinstance(body, dict) else []
                records.extend(page_records)
                nav = body.get("navigation", {}) if isinstance(body, dict) else {}
                if not nav.get("nextPage"):
                    break
                page += 1
            return _ok({"count": len(records), "records": records, "pages_fetched": page})

        if name == "get_voicemails":
            ext_id = _validate_id(arguments.get("extension_id", "~"), "extension_id")
            params = {
                "messageType": "VoiceMail",
                "perPage": arguments.get("per_page", 100),
            }
            if arguments.get("date_from"):
                params["dateFrom"] = arguments["date_from"]
            if arguments.get("date_to"):
                params["dateTo"] = arguments["date_to"]
            return _ok(
                await _api_get(
                    f"/restapi/v1.0/account/~/extension/{ext_id}/message-store",
                    params,
                )
            )

        if name == "get_voicemail_transcript":
            ext_id = _validate_id(arguments.get("extension_id", "~"), "extension_id")
            mid = _validate_id(arguments["message_id"], "message_id")
            aid = _validate_id(arguments["attachment_id"], "attachment_id")
            content = await _api_get_binary(
                f"/restapi/v1.0/account/~/extension/{ext_id}/message-store/{mid}/content/{aid}"
            )
            return _ok(
                {
                    "message_id": mid,
                    "attachment_id": aid,
                    "transcript": content.decode("utf-8", errors="replace"),
                }
            )

        if name == "get_ivr_menus":
            return _ok(await _api_get("/restapi/v1.0/account/~/ivr-menus"))

        if name == "list_phone_numbers":
            return _ok(
                await _api_get(
                    "/restapi/v1.0/account/~/phone-number",
                    {"perPage": arguments.get("per_page", 250)},
                )
            )

        if name == "get_service_status":
            return _ok(await _api_get("/restapi/v1.0/status"))

        if name == "create_ai_scheduling_extension":
            expected = _ai_extension_approval_text()
            dry_run = bool(arguments.get("dry_run", True))
            existing = await _find_ai_extension()
            body = {
                "contact": {
                    "firstName": "AI Scheduling",
                    "lastName": "Backup",
                    "email": AI_EXTENSION_EMAIL,
                },
                "extensionNumber": AI_EXTENSION_NUMBER,
                "status": "Enabled",
                "type": "User",
            }
            if existing:
                return _ok(
                    {
                        "ok": True,
                        "already_exists": True,
                        "extension": existing,
                        "next_manual_step": (
                            f"Add Existing Phone / Other Phone device named {AI_DEVICE_NAME} "
                            f"under extension {AI_EXTENSION_NUMBER}."
                        ),
                    }
                )
            if dry_run:
                # Consistent with the other write tools: dry-run returns the
                # exact approval_text so the caller can preview-then-confirm.
                return _ok(
                    {
                        "ok": True,
                        "dry_run": True,
                        "allow_writes": ALLOW_WRITES,
                        "endpoint": "POST /restapi/v1.0/account/~/extension",
                        "body": body,
                        "approval_text": expected,
                    }
                )
            if arguments.get("approval_text") != expected:
                return _err(
                    "approval_text did not match the exact required RingCentral "
                    "approval text. Run with dry_run=true to obtain it."
                )
            if not ALLOW_WRITES:
                return _err("EXULT_RC_ALLOW_WRITES=1 is required at server startup for this write")
            created = await _api_post("/restapi/v1.0/account/~/extension", body)
            return _ok(
                {
                    "ok": not (isinstance(created, dict) and created.get("error")),
                    "created": created,
                    "next_manual_step": (
                        f"Add Existing Phone / Other Phone device named {AI_DEVICE_NAME} "
                        f"under extension {AI_EXTENSION_NUMBER}."
                    ),
                }
            )

        return _err(f"unknown tool: {name}")
    except Exception as exc:
        return _err(f"{type(exc).__name__}: {exc}")


def main() -> None:
    token = os.environ.get("MCP_BEARER_TOKEN")
    if not token:
        sys.stderr.write("ringcentral-admin: MCP_BEARER_TOKEN required\n")
        sys.exit(1)

    logging.basicConfig(level=logging.INFO)
    sys.stderr.write(
        f"ringcentral-admin: starting Streamable HTTP on :{MCP_PORT_RINGCENTRAL_ADMIN} "
        f"(writes={'ENABLED' if ALLOW_WRITES else 'disabled'}, "
        f"secret_reads={'ENABLED' if ALLOW_SECRET_READS else 'disabled'})\n"
    )
    serve_mcp_over_http(
        app,
        port=MCP_PORT_RINGCENTRAL_ADMIN,
        token=token,
    )


if __name__ == "__main__":
    main()
