#!/usr/bin/env bun
/// <reference types="bun-types" />
/**
 * Sendblue iMessage/SMS channel for Claude Code.
 *
 * Two-way MCP channel server using the Sendblue REST API.
 * - Inbound: HTTP webhook listener on port 18799 (proxied via Tailscale funnel on 8799)
 * - Outbound: POST to Sendblue API
 * - Permission relay: approve/deny tool use from your phone
 *
 * Env vars:
 *   SENDBLUE_API_KEY_ID     - Sendblue API key ID (required)
 *   SENDBLUE_API_SECRET_KEY - Sendblue API secret key (required)
 *   SENDBLUE_WEBHOOK_PORT   - Local webhook port (default: 18801)
 *   SENDBLUE_WEBHOOK_URL    - Public webhook base URL for status_callback (required)
 *   SENDBLUE_WEBHOOK_PATH   - Webhook URL path; treat as a shared secret (required)
 *   SENDBLUE_OWN_NUMBER     - The Sendblue number (required)
 *   SENDBLUE_STATE_DIR      - Override state dir (default: ~/.claude/channels/sendblue)
 *   SENDBLUE_HARNESS_DIR    - Override harness dir (default: $SENDBLUE_STATE_DIR/harness)
 */

// Polyfill Bun globals when running under Node (Linux native claude).
// No-op under Bun (MBP). Must be the first import.
import "./bun-polyfill.js";

import {
  readFileSync,
  writeFileSync,
  mkdirSync,
  appendFileSync,
  existsSync,
  readdirSync,
} from "fs";
import { appendFile } from "fs/promises";
import { homedir } from "os";
import { join } from "path";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { ListToolsRequestSchema, CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import {
  appendGroupHistory,
  initSeenGroups,
  isGroupAllowed,
  isGroupId,
  loadGroupContext,
  recordGroupActivity,
} from "./groups.js";

// ---------------------------------------------------------------------------
// Non-blocking receipt log helper (avoids appendFileSync in the hot path)
// ---------------------------------------------------------------------------

const RECEIPT_LOG = "/tmp/sendblue-auto-receipt.log";

/** Append to receipt log without blocking the event loop. */
function logReceipt(msg: string): void {
  appendFile(RECEIPT_LOG, `[${new Date().toISOString()}] ${msg}\n`).catch(() => {});
}

// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------

const API_KEY_ID = process.env.SENDBLUE_API_KEY_ID ?? "";
const API_SECRET = process.env.SENDBLUE_API_SECRET_KEY ?? "";
const WEBHOOK_PORT = Number(process.env.SENDBLUE_WEBHOOK_PORT ?? 18801);
const OWN_NUMBER = process.env.SENDBLUE_OWN_NUMBER ?? "";
const SENDBLUE_API = "https://api.sendblue.co";
const WEBHOOK_BASE = process.env.SENDBLUE_WEBHOOK_URL ?? "";
const WEBHOOK_PATH = process.env.SENDBLUE_WEBHOOK_PATH ?? "";
// Full callback URL we hand to Sendblue. The server only serves the webhook
// path (see request handler below), so omitting the path would cause Sendblue
// to POST status updates to the base URL and get a 404.
const WEBHOOK_CALLBACK_URL =
  WEBHOOK_BASE.replace(/\/+$/, "") + (WEBHOOK_PATH.startsWith("/") ? WEBHOOK_PATH : `/${WEBHOOK_PATH}`);

// Fail fast if required configuration is missing rather than silently using
// previously-baked-in defaults (which leaked operational endpoints into the
// committed source).
const missingEnv: string[] = [];
if (!API_KEY_ID) missingEnv.push("SENDBLUE_API_KEY_ID");
if (!API_SECRET) missingEnv.push("SENDBLUE_API_SECRET_KEY");
if (!OWN_NUMBER) missingEnv.push("SENDBLUE_OWN_NUMBER");
if (!WEBHOOK_BASE) missingEnv.push("SENDBLUE_WEBHOOK_URL");
if (!WEBHOOK_PATH) missingEnv.push("SENDBLUE_WEBHOOK_PATH");
if (missingEnv.length > 0) {
  process.stderr.write(
    `sendblue-channel: missing required env: ${missingEnv.join(", ")}\n`,
  );
  process.exit(1);
}

// ---------------------------------------------------------------------------
// Message deduplication — persistent seen-message set
// ---------------------------------------------------------------------------

const SEEN_MESSAGES_FILE = "/tmp/sendblue-seen-messages.json";
const SEEN_MAX_AGE_MS = 60 * 60 * 1000; // 1 hour
const SEEN_PRUNE_INTERVAL_MS = 5 * 60 * 1000; // prune every 5 minutes

/** Map of message GUID -> timestamp (ms since epoch) */
let seenMessages: Map<string, number> = new Map();

function loadSeenMessages(): void {
  try {
    if (existsSync(SEEN_MESSAGES_FILE)) {
      const raw = readFileSync(SEEN_MESSAGES_FILE, "utf-8");
      const parsed = JSON.parse(raw) as Record<string, number>;
      seenMessages = new Map(Object.entries(parsed));
      process.stderr.write(`sendblue-channel: loaded ${seenMessages.size} seen message GUIDs\n`);
    }
  } catch (err) {
    process.stderr.write(
      `sendblue-channel: seen-messages file unreadable, starting fresh: ${String(err)}\n`,
    );
    seenMessages = new Map();
  }
}

/** Flush seen-messages map to disk (synchronous). */
function saveSeenMessagesSync(): void {
  try {
    const obj: Record<string, number> = {};
    for (const [guid, ts] of seenMessages) {
      obj[guid] = ts;
    }
    writeFileSync(SEEN_MESSAGES_FILE, JSON.stringify(obj) + "\n");
  } catch (err) {
    process.stderr.write(`sendblue-channel: failed to write seen-messages: ${String(err)}\n`);
  }
}

/** Debounced save: coalesces rapid claim/unclaim writes into one flush. */
let saveSeenTimer: ReturnType<typeof setTimeout> | null = null;
function saveSeenMessages(): void {
  if (saveSeenTimer !== null) {
    return;
  }
  saveSeenTimer = setTimeout(() => {
    saveSeenTimer = null;
    saveSeenMessagesSync();
  }, 50); // flush within 50ms — fast enough for durability, avoids blocking hot path
}

function pruneSeenMessages(): void {
  const cutoff = Date.now() - SEEN_MAX_AGE_MS;
  let pruned = 0;
  for (const [guid, ts] of seenMessages) {
    if (ts < cutoff) {
      seenMessages.delete(guid);
      pruned++;
    }
  }
  if (pruned > 0) {
    process.stderr.write(
      `sendblue-channel: pruned ${pruned} expired seen-message entries (${seenMessages.size} remaining)\n`,
    );
    saveSeenMessages();
  }
}

/**
 * Atomically check-and-mark a message as claimed for processing.
 * Prevents the race where both webhook and poll process the same message
 * concurrently across await boundaries.
 * Returns true if this caller claimed it, false if already seen/claimed.
 */
function claimMessage(guid: string): boolean {
  if (!guid) {
    return true; // no handle = can't dedup, allow processing
  }
  if (seenMessages.has(guid)) {
    return false; // already processed
  }
  // Mark immediately (synchronous) before any await point
  seenMessages.set(guid, Date.now());
  saveSeenMessages();
  return true;
}

function unclaimMessage(guid: string): void {
  if (!guid) {
    return;
  }
  seenMessages.delete(guid);
  saveSeenMessages();
}

// Load on startup
loadSeenMessages();

// Periodic pruning
setInterval(pruneSeenMessages, SEEN_PRUNE_INTERVAL_MS);

// ---------------------------------------------------------------------------
// Unhandled error safety net
// ---------------------------------------------------------------------------

process.on("unhandledRejection", (err) => {
  process.stderr.write(`sendblue-channel: unhandled rejection: ${String(err)}\n`);
});
process.on("uncaughtException", (err) => {
  process.stderr.write(`sendblue-channel: uncaught exception: ${String(err)}\n`);
});

// ---------------------------------------------------------------------------
// Sendblue API client
// ---------------------------------------------------------------------------

const API_HEADERS = {
  "Content-Type": "application/json",
  "sb-api-key-id": API_KEY_ID,
  "sb-api-secret-key": API_SECRET,
};

async function sendbluePost<T = Record<string, unknown>>(
  path: string,
  body: Record<string, unknown>,
): Promise<T> {
  const url = `${SENDBLUE_API}${path}`;
  let res: Response;
  try {
    res = await fetch(url, {
      method: "POST",
      headers: API_HEADERS,
      body: JSON.stringify(body),
    });
  } catch (err) {
    throw new Error(`Sendblue network error: ${err instanceof Error ? err.message : String(err)}`, {
      cause: err,
    });
  }
  if (!res.ok) {
    const text = await res.text().catch(() => "(no body)");
    throw new Error(`Sendblue API error ${res.status}: ${text}`);
  }
  return (await res.json()) as T;
}

/**
 * Fire-and-forget variant: sends the request but doesn't await the response
 * body. Used for mark-read / typing-indicator where latency matters and we
 * don't need the parsed result.
 */
function sendbluePostFireAndForget(path: string, body: Record<string, unknown>): void {
  const url = `${SENDBLUE_API}${path}`;
  fetch(url, {
    method: "POST",
    headers: API_HEADERS,
    body: JSON.stringify(body),
  })
    .then((res) => {
      if (!res.ok) {
        // Drain body to free resources, but don't block on it
        res.text().catch(() => {});
        process.stderr.write(`sendblue-channel: fire-and-forget ${path} returned ${res.status}\n`);
      }
      // Don't parse response body — we don't need it
    })
    .catch((err) => {
      process.stderr.write(
        `sendblue-channel: fire-and-forget ${path} error: ${err instanceof Error ? err.message : String(err)}\n`,
      );
    });
}

async function sendblueGet<T = Record<string, unknown>>(
  path: string,
  params?: Record<string, string>,
): Promise<T> {
  const url = new URL(`${SENDBLUE_API}${path}`);
  if (params) {
    for (const [k, v] of Object.entries(params)) {
      url.searchParams.set(k, v);
    }
  }
  let res: Response;
  try {
    res = await fetch(url.toString(), { headers: API_HEADERS });
  } catch (err) {
    throw new Error(`Sendblue network error: ${err instanceof Error ? err.message : String(err)}`, {
      cause: err,
    });
  }
  if (!res.ok) {
    const text = await res.text().catch(() => "(no body)");
    throw new Error(`Sendblue API error ${res.status}: ${text}`);
  }
  return (await res.json()) as T;
}

// ---------------------------------------------------------------------------
// Contact harness: per-contact memory, persona, history
// ---------------------------------------------------------------------------

// HARNESS_DIR holds per-contact memory/persona/history. Defaults to a path
// under STATE_DIR (which is itself env-driven, ~/.claude/channels/sendblue by
// default), so different hosts/checkouts don't trample each other's state.
// Override with SENDBLUE_HARNESS_DIR. Note: STATE_DIR is defined below; we
// inline its resolution here so harness paths are stable before STATE_DIR
// initialization order matters.
const HARNESS_DIR =
  process.env.SENDBLUE_HARNESS_DIR ??
  join(
    process.env.SENDBLUE_STATE_DIR ?? join(homedir(), ".claude", "channels", "sendblue"),
    "harness",
  );
const HISTORY_MAX = 50;

function normalizeNumber(number: string): string {
  const digits = number.replace(/\D/g, "");
  if (digits.length === 10) {
    return `+1${digits}`;
  }
  if (digits.length === 11 && digits.startsWith("1")) {
    return `+${digits}`;
  }
  return `+${digits}`;
}

function contactDir(number: string): string {
  return join(HARNESS_DIR, normalizeNumber(number));
}

function ensureContactDir(number: string): string {
  const dir = contactDir(number);
  mkdirSync(join(dir, "tasks"), { recursive: true });
  return dir;
}

interface HistoryEntry {
  ts: string;
  dir: "inbound" | "outbound";
  from?: string;
  to?: string;
  content: string;
  handle?: string;
}

function appendHistory(number: string, entry: HistoryEntry): void {
  try {
    const dir = ensureContactDir(number);
    const histFile = join(dir, "history.jsonl");
    appendFileSync(histFile, JSON.stringify(entry) + "\n");
    // Rotate: keep last HISTORY_MAX lines
    const lines = readFileSync(histFile, "utf-8").trim().split("\n").filter(Boolean);
    if (lines.length > HISTORY_MAX) {
      writeFileSync(histFile, lines.slice(-HISTORY_MAX).join("\n") + "\n");
    }
  } catch (err) {
    process.stderr.write(
      `sendblue-channel: history write failed: ${err instanceof Error ? err.message : String(err)}\n`,
    );
  }
}

function loadContactContext(number: string): string {
  const dir = contactDir(number);
  const parts: string[] = [];

  // Memory
  const memFile = join(dir, "memory.md");
  if (existsSync(memFile)) {
    const mem = readFileSync(memFile, "utf-8").trim();
    if (mem) {
      parts.push(`<memory>\n${mem}\n</memory>`);
    }
  }

  // Persona
  const personaFile = join(dir, "persona.md");
  if (existsSync(personaFile)) {
    const persona = readFileSync(personaFile, "utf-8").trim();
    if (persona) {
      parts.push(`<persona>\n${persona}\n</persona>`);
    }
  }

  // History (last 20 entries)
  const histFile = join(dir, "history.jsonl");
  if (existsSync(histFile)) {
    const lines = readFileSync(histFile, "utf-8").trim().split("\n").filter(Boolean);
    const recent = lines.slice(-20).map((l) => {
      try {
        const e = JSON.parse(l) as HistoryEntry;
        const who = e.dir === "inbound" ? (e.from ?? number) : "me";
        return `[${e.ts}] ${who}: ${e.content}`;
      } catch {
        return l;
      }
    });
    if (recent.length > 0) {
      parts.push(`<history>\n${recent.join("\n")}\n</history>`);
    }
  }

  // Active tasks
  const tasksDir = join(dir, "tasks");
  if (existsSync(tasksDir)) {
    try {
      const taskFiles = readdirSync(tasksDir).filter((f) => f.endsWith(".md"));
      if (taskFiles.length > 0) {
        const taskSummaries = taskFiles
          .map((f) => {
            const first = readFileSync(join(tasksDir, f), "utf-8").split("\n")[0] ?? f;
            return `- ${first}`;
          })
          .join("\n");
        parts.push(`<active-tasks>\n${taskSummaries}\n</active-tasks>`);
      }
    } catch {
      // ignore
    }
  }

  if (parts.length === 0) {
    return "";
  }
  return `<contact-context number="${normalizeNumber(number)}">\n${parts.join("\n")}\n</contact-context>\n\n`;
}

// ---------------------------------------------------------------------------
// Access control
// ---------------------------------------------------------------------------

const STATE_DIR =
  process.env.SENDBLUE_STATE_DIR ?? join(homedir(), ".claude", "channels", "sendblue");

// Initialize seen-groups registry (used to gate outbound to group chats).
mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 });
initSeenGroups(STATE_DIR);

const ACCESS_FILE = join(STATE_DIR, "access.json");

interface AccessConfig {
  dmPolicy: "allowlist" | "disabled";
  allowFrom: string[];
}

function loadAccess(): AccessConfig {
  try {
    const raw = readFileSync(ACCESS_FILE, "utf-8");
    return JSON.parse(raw) as AccessConfig;
  } catch {
    // Default: empty allowlist. Operators must explicitly add their number
    // to access.json (or use an out-of-band provisioning step) — we no
    // longer bake a phone number into the committed source.
    const defaults: AccessConfig = {
      dmPolicy: "allowlist",
      allowFrom: [],
    };
    saveAccess(defaults);
    return defaults;
  }
}

function saveAccess(access: AccessConfig): void {
  mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 });
  writeFileSync(ACCESS_FILE, JSON.stringify(access, null, 2) + "\n", { mode: 0o600 });
}

const access = loadAccess();
const allowedHandles = new Set(access.allowFrom.map((h) => normalizeAddress(h)));

function normalizeAddress(addr: string): string {
  const trimmed = addr.trim().toLowerCase();
  const digitsOnly = trimmed.replace(/\D/g, "");
  if (digitsOnly.length >= 10) {
    if (digitsOnly.length === 11 && digitsOnly.startsWith("1")) {
      return `+1${digitsOnly.slice(1)}`;
    }
    if (digitsOnly.length === 10) {
      return `+1${digitsOnly}`;
    }
    return `+${digitsOnly}`;
  }
  return trimmed;
}

function isAllowed(sender: string): boolean {
  if (access.dmPolicy === "disabled") {
    return false;
  }
  return allowedHandles.has(normalizeAddress(sender));
}

/** Predicate exposed to the groups module — uses the same allowlist. */
function isAllowedHandle(handle: string): boolean {
  if (!handle) {
    return false;
  }
  return allowedHandles.has(normalizeAddress(handle));
}

/**
 * Unified outbound access check: phone numbers go through the user allowlist;
 * group_ids go through the seen-groups registry (which only marks a group
 * allowed after an allowed sender has been observed participating).
 */
function isOutboundAllowed(chatId: string): boolean {
  if (isGroupId(chatId)) {
    return isGroupAllowed(chatId);
  }
  return isAllowed(chatId);
}

// ---------------------------------------------------------------------------
// MCP server
// ---------------------------------------------------------------------------

const mcp = new Server(
  { name: "sendblue-channel", version: "0.1.0" },
  {
    capabilities: {
      experimental: {
        "claude/channel": {},
        "claude/channel/permission": {},
      },
      tools: {},
    },
    instructions: [
      `Sendblue iMessage/SMS channel. Claude's number: ${OWN_NUMBER}.`,
      "",
      "You are the LEAD ORCHESTRATOR. iMessages arrive as your user turns.",
      "You have full computer access. Spawn code/task subagents for execution work.",
      "Reply concisely — 140 char max per message; split long responses into multiple reply() calls.",
      "Do NOT use markdown (bold/italic/bullets render as literal characters in iMessage).",
      "",
      'Messages arrive as <channel source="sendblue-channel" chat_id="..." sender="..." [group_id="..." participants="..."]>.',
      "chat_id may be either a phone number (e.g. +1...) OR a Sendblue group_id (UUID/group_ prefix).",
      "For group messages the inbound tag also includes group_id and participants attributes; reply with the same chat_id (the group_id) and the server will route to the group.",
      "Use chat_messages to fetch recent message history for a number or group.",
      "",
      "IMPORTANT — single orchestrator: Only YOU (the orchestrator) may use sendblue-channel MCP tools.",
      "Spawned workers do NOT have Sendblue access. Workers report status via /tmp/orchestrator-status.jsonl",
      "and TaskUpdate. You monitor worker status and relay results to the user.",
      "See CLAUDE.md section 12 for the full worker communication protocol.",
      "",
      "Contact context is injected automatically before each inbound message as <contact-context> (1:1) or <group-context> (group).",
      "Use memory/persona to calibrate your style. Update contact data as you learn more.",
    ].join("\n"),
  },
);

// ---------------------------------------------------------------------------
// Tools: reply + chat_messages
// ---------------------------------------------------------------------------

mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "reply",
      description:
        "Send an iMessage or SMS via Sendblue. chat_id can be a phone number (1:1 chat) or a Sendblue group_id (group chat). For groups the server routes to /api/send-group-message automatically.",
      inputSchema: {
        type: "object" as const,
        properties: {
          chat_id: {
            type: "string",
            description:
              'Recipient phone number (e.g. "+15551234567") OR Sendblue group_id (UUID, optionally with a "group_" prefix)',
          },
          text: {
            type: "string",
            description: "The message text to send",
          },
          media_url: {
            type: "string",
            description: "Optional URL of media to attach (image, etc.)",
          },
        },
        required: ["chat_id", "text"],
      },
    },
    {
      name: "chat_messages",
      description:
        "Fetch recent messages exchanged with a phone number or in a Sendblue group. Pass a phone number for 1:1 history or a group_id for group history.",
      inputSchema: {
        type: "object" as const,
        properties: {
          chat_id: {
            type: "string",
            description: "Phone number or group_id to fetch messages for",
          },
          limit: {
            type: "number",
            description: "Max messages to return (default 20)",
          },
        },
        required: ["chat_id"],
      },
    },
    {
      name: "send_reaction",
      description:
        "Send a tapback/reaction to a specific message via Sendblue. Requires the message_handle from the inbound webhook payload. chat_id may be a phone number or group_id.",
      inputSchema: {
        type: "object" as const,
        properties: {
          chat_id: {
            type: "string",
            description: "Phone number of the conversation OR Sendblue group_id",
          },
          message_handle: {
            type: "string",
            description: "The message_handle UUID from the inbound message webhook",
          },
          reaction: {
            type: "string",
            description: "Reaction type: love, like, dislike, laugh, emphasis, question",
          },
        },
        required: ["chat_id", "message_handle", "reaction"],
      },
    },
    {
      name: "mark_read",
      description:
        "Send a read receipt to a contact or group via Sendblue. chat_id may be a phone number or group_id.",
      inputSchema: {
        type: "object" as const,
        properties: {
          chat_id: {
            type: "string",
            description: "Phone number or group_id to mark messages as read for",
          },
        },
        required: ["chat_id"],
      },
    },
    {
      name: "typing_indicator",
      description:
        "Show a typing indicator to a contact or group via Sendblue. chat_id may be a phone number or group_id.",
      inputSchema: {
        type: "object" as const,
        properties: {
          chat_id: {
            type: "string",
            description: "Phone number or group_id to show typing indicator to",
          },
        },
        required: ["chat_id"],
      },
    },
  ],
}));

mcp.setRequestHandler(CallToolRequestSchema, async (req) => {
  const args = req.params.arguments ?? {};
  try {
    switch (req.params.name) {
      case "reply": {
        const chatId = args.chat_id as string;
        const content = args.text as string;
        const mediaUrl = args.media_url as string | undefined;

        if (!chatId || !content) {
          throw new Error("chat_id and text are required");
        }

        // Outbound access control (handles both numbers and group ids)
        if (!isOutboundAllowed(chatId)) {
          throw new Error(
            `Outbound blocked: ${chatId} is not allowed (number not in allowlist, or group has no allowed participants yet)`,
          );
        }

        if (isGroupId(chatId)) {
          const body: Record<string, unknown> = {
            group_id: chatId,
            content,
            from_number: OWN_NUMBER,
            status_callback: WEBHOOK_CALLBACK_URL,
          };
          if (mediaUrl) {
            body.media_url = mediaUrl;
          }
          await sendbluePost("/api/send-group-message", body);
          appendGroupHistory(HARNESS_DIR, {
            ts: new Date().toISOString(),
            dir: "outbound",
            group_id: chatId,
            content,
          });
          return { content: [{ type: "text", text: "sent (group)" }] };
        }

        const body: Record<string, unknown> = {
          number: chatId,
          content,
          from_number: OWN_NUMBER,
          status_callback: WEBHOOK_CALLBACK_URL,
        };
        if (mediaUrl) {
          body.media_url = mediaUrl;
        }

        await sendbluePost("/api/send-message", body);

        // Log outbound to history
        appendHistory(chatId, {
          ts: new Date().toISOString(),
          dir: "outbound",
          to: chatId,
          content,
        });

        return { content: [{ type: "text", text: "sent" }] };
      }

      case "chat_messages": {
        const chatId = args.chat_id as string;
        const limit = (args.limit as number) ?? 20;

        if (!chatId) {
          throw new Error("chat_id is required");
        }

        if (!isOutboundAllowed(chatId)) {
          throw new Error(`Access blocked: ${chatId} is not allowed`);
        }

        // Fetch messages from Sendblue
        const response = await sendblueGet<{ data: Array<Record<string, unknown>> }>(
          "/api/v2/messages",
        );

        const groupMode = isGroupId(chatId);

        // Filter: groups by group_id, 1:1 by number on either side
        const messages = (response.data ?? [])
          .filter((m) => {
            if (groupMode) {
              return (m.group_id as string | undefined) === chatId;
            }
            return (
              m.from_number === chatId || m.to_number === chatId || m.number === chatId
            );
          })
          .slice(0, limit);
        if (messages.length === 0) {
          return { content: [{ type: "text", text: "(no messages)" }] };
        }

        const formatted = messages
          .map((m) => {
            const ts =
              (m.date_sent as string | undefined) ?? (m.date_updated as string | undefined) ?? "";
            const dir = m.is_outbound ? "me" : ((m.from_number as string | undefined) ?? "unknown");
            const text = (m.content as string | undefined) ?? "";
            return `[${ts}] ${dir}: ${text}`;
          })
          .join("\n");

        return { content: [{ type: "text", text: formatted }] };
      }

      case "send_reaction": {
        const chatId = args.chat_id as string;
        const messageHandle = args.message_handle as string;
        const reaction = args.reaction as string;
        if (!chatId || !messageHandle || !reaction) {
          throw new Error("chat_id, message_handle, and reaction are required");
        }
        if (!isOutboundAllowed(chatId)) {
          throw new Error(`Blocked: ${chatId} not allowed`);
        }
        const reactionBody: Record<string, unknown> = {
          from_number: OWN_NUMBER,
          message_handle: messageHandle,
          reaction,
        };
        if (isGroupId(chatId)) {
          reactionBody.group_id = chatId;
        } else {
          reactionBody.number = chatId;
        }
        await sendbluePost("/api/send-reaction", reactionBody);
        return { content: [{ type: "text", text: `${reaction} reaction sent` }] };
      }

      case "mark_read": {
        const chatId = args.chat_id as string;
        if (!chatId) {
          throw new Error("chat_id is required");
        }
        if (!isOutboundAllowed(chatId)) {
          throw new Error(`Blocked: ${chatId} not allowed`);
        }
        const markBody: Record<string, unknown> = { from_number: OWN_NUMBER };
        if (isGroupId(chatId)) {
          markBody.group_id = chatId;
        } else {
          markBody.number = chatId;
        }
        await sendbluePost("/api/mark-read", markBody);
        return { content: [{ type: "text", text: "mark-read sent" }] };
      }

      case "typing_indicator": {
        const chatId = args.chat_id as string;
        if (!chatId) {
          throw new Error("chat_id is required");
        }
        if (!isOutboundAllowed(chatId)) {
          throw new Error(`Blocked: ${chatId} not allowed`);
        }
        const typingBody: Record<string, unknown> = { from_number: OWN_NUMBER };
        if (isGroupId(chatId)) {
          typingBody.group_id = chatId;
        } else {
          typingBody.number = chatId;
        }
        await sendbluePost("/api/send-typing-indicator", typingBody);
        return { content: [{ type: "text", text: "typing indicator sent" }] };
      }

      default:
        return {
          content: [{ type: "text", text: `unknown tool: ${req.params.name}` }],
          isError: true,
        };
    }
  } catch (err) {
    const msg = err instanceof Error ? err.message : String(err);
    process.stderr.write(`sendblue-channel: ${req.params.name} error: ${msg}\n`);
    return {
      content: [{ type: "text", text: `${req.params.name} failed: ${msg}` }],
      isError: true,
    };
  }
});

// ---------------------------------------------------------------------------
// Permission relay
// ---------------------------------------------------------------------------

const PermissionRequestSchema = z.object({
  method: z.literal("notifications/claude/channel/permission_request"),
  params: z.object({
    request_id: z.string(),
    tool_name: z.string(),
    description: z.string(),
    input_preview: z.string(),
  }),
});

mcp.setNotificationHandler(PermissionRequestSchema, async ({ params }) => {
  // Send permission prompt to all allowed numbers
  const prompt =
    `Claude wants to run ${params.tool_name}: ${params.description}\n\n` +
    `Reply "yes ${params.request_id}" or "no ${params.request_id}"`;

  for (const handle of allowedHandles) {
    try {
      await sendbluePost("/api/send-message", {
        number: handle,
        content: prompt,
        from_number: OWN_NUMBER,
      });
    } catch (err) {
      process.stderr.write(
        `sendblue-channel: failed to send permission prompt to ${handle}: ${String(err)}\n`,
      );
    }
  }
});

// ---------------------------------------------------------------------------
// Connect MCP
// ---------------------------------------------------------------------------

await mcp.connect(new StdioServerTransport());
process.stderr.write(`sendblue-channel: MCP connected\n`);

// ---------------------------------------------------------------------------
// Inbound webhook listener (with port retry)
// ---------------------------------------------------------------------------

const PERMISSION_REPLY_RE = /^\s*(y|yes|n|no)\s+([a-z0-9]{5})\s*$/i;

// Retry binding the webhook port. Other Claude sessions may hold a stale
// listener on the same port; we kill them and retry.
const PORT_BIND_MAX_RETRIES = 10;
const PORT_BIND_RETRY_DELAY_MS = 3000;

async function startWebhookServer(): Promise<void> {
  for (let attempt = 1; attempt <= PORT_BIND_MAX_RETRIES; attempt++) {
    try {
      // `await` is required so the Node polyfill (bun-polyfill.js) can
      // surface EADDRINUSE via Promise rejection. Bun-native serve()
      // returns a server object synchronously; awaiting it is a no-op.
      //
      // NOTE: reusePort is intentionally OFF. On Linux/macOS, SO_REUSEPORT
      // lets multiple processes silently load-balance on the same port,
      // which (a) defeats the EADDRINUSE retry/kill logic below and (b)
      // means two stale Claude sessions can both receive Sendblue webhooks
      // and double-dispatch messages. We want exactly one listener; if the
      // port is already held by a stale process, we detect it via the bind
      // failure and kill the holder in the catch block.
      await Bun.serve({
        port: WEBHOOK_PORT,
        hostname: "0.0.0.0", // Accept from tunnel
        async fetch(req) {
          return handleWebhookRequest(req);
        },
      });
      process.stderr.write(`sendblue-channel: webhook listener on port ${WEBHOOK_PORT}\n`);
      return;
    } catch (err) {
      const msg = err instanceof Error ? err.message : String(err);
      process.stderr.write(
        `sendblue-channel: port ${WEBHOOK_PORT} bind failed (attempt ${attempt}/${PORT_BIND_MAX_RETRIES}): ${msg}\n`,
      );

      // Try to kill the stale process holding the port
      try {
        const proc = Bun.spawnSync({
          cmd: ["lsof", "-t", "-iTCP:" + String(WEBHOOK_PORT), "-sTCP:LISTEN"],
        });
        const pids = new TextDecoder().decode(proc.stdout).trim().split("\n").filter(Boolean);
        const myPid = String(process.pid);
        for (const pid of pids) {
          if (pid !== myPid) {
            process.stderr.write(
              `sendblue-channel: killing stale listener PID ${pid} on port ${WEBHOOK_PORT}\n`,
            );
            try {
              process.kill(parseInt(pid, 10), "SIGTERM");
            } catch {
              // already dead
            }
          }
        }
      } catch {
        // lsof not available or failed — just wait and retry
      }

      if (attempt < PORT_BIND_MAX_RETRIES) {
        await new Promise((r) => setTimeout(r, PORT_BIND_RETRY_DELAY_MS));
      }
    }
  }
  process.stderr.write(
    `sendblue-channel: FATAL — could not bind port ${WEBHOOK_PORT} after ${PORT_BIND_MAX_RETRIES} attempts. Webhook listener disabled; polling only.\n`,
  );
}

/** Handle an incoming HTTP request on the webhook port. */
async function handleWebhookRequest(req: Request): Promise<Response> {
  const url = new URL(req.url);

  // Health check
  if (req.method === "GET" && url.pathname === "/health") {
    return new Response(JSON.stringify({ ok: true, channel: "sendblue" }), {
      headers: { "Content-Type": "application/json" },
    });
  }

  // Sendblue webhook callback (inbound messages + status updates).
  // WEBHOOK_PATH is a shared secret loaded at startup; see config block above.
  if (req.method === "POST" && url.pathname === WEBHOOK_PATH) {
    let body: Record<string, unknown>;
    try {
      body = (await req.json()) as Record<string, unknown>;
    } catch {
      return new Response("bad json", { status: 400 });
    }

    // Must respond 200 quickly to avoid Sendblue retries
    const respond = () => new Response("ok", { status: 200 });

    // Status callback (outbound delivery status) — log and ignore
    if (body.is_outbound === true) {
      process.stderr.write(
        `sendblue-channel: outbound status: ${String(body.status)} to ${String(body.number)}\n`,
      );
      return respond();
    }

    // Inbound message
    const sender = (body.from_number as string) ?? "";
    const toNumber = (body.to_number as string) ?? (body.number as string) ?? "";
    const content = (body.content as string) ?? "";
    const mediaUrl = body.media_url as string | undefined;
    const messageHandle = (body.message_handle as string) ?? "";
    const groupIdRaw = (body.group_id as string) ?? "";
    const participantsRaw = body.participants;
    const participants: string[] = Array.isArray(participantsRaw)
      ? (participantsRaw as unknown[]).filter((p): p is string => typeof p === "string")
      : [];
    // Strict validation: only treat the message as a group message when the
    // claimed group_id matches the Sendblue group_id shape. This blocks path-
    // traversal / shell-meta attacks where a hostile webhook supplies a
    // crafted `group_id` (e.g. `../etc/x`) and the rest of the handler then
    // uses it as a filesystem segment via `groupDir()`. If the value is
    // present but malformed, log it and fall through to 1:1 handling.
    const isGroup = isGroupId(groupIdRaw);
    const groupId = isGroup ? groupIdRaw : "";
    if (!isGroup && groupIdRaw) {
      process.stderr.write(
        `sendblue-channel: ignoring malformed group_id from webhook: ${JSON.stringify(groupIdRaw).slice(0, 80)}\n`,
      );
    }

    if (!sender || (!content && !mediaUrl)) {
      return respond();
    }

    // For 1:1, reject messages not addressed to our number. For groups,
    // to_number may be a participant rather than OWN_NUMBER — skip this check.
    if (!isGroup && toNumber && toNumber !== OWN_NUMBER) {
      process.stderr.write(
        `sendblue-channel: dropping webhook for ${toNumber} (not ours: ${OWN_NUMBER})\n`,
      );
      return respond();
    }

    // Access control:
    //   - 1:1: sender must be in allowlist
    //   - group: sender must be in allowlist OR the group must already have
    //     been promoted to `allowed` by a prior message from an allowed
    //     sender. The attacker-controlled `participants` field is no longer
    //     consulted for this decision (it was previously trusted, which let
    //     a hostile webhook spoof an allowed participant to bypass the
    //     allowlist).
    if (isGroup) {
      if (!isAllowedHandle(sender) && !isGroupAllowed(groupId)) {
        process.stderr.write(
          `sendblue-channel: dropped group ${groupId} message — sender ${sender} not allowed and group not previously promoted\n`,
        );
        return respond();
      }
    } else if (!isAllowed(sender)) {
      process.stderr.write(`sendblue-channel: dropped message from non-allowed ${sender}\n`);
      return respond();
    }

    // Record group activity so outbound to this group_id is allowed.
    if (isGroup) {
      recordGroupActivity(groupId, sender, participants, isAllowedHandle);
    }

    // Instant read receipt + typing indicator BEFORE dedup or any processing.
    // Even duplicate/already-seen messages get an immediate read receipt.
    const ackBody: Record<string, unknown> = { from_number: OWN_NUMBER };
    if (isGroup) {
      ackBody.group_id = groupId;
    } else {
      ackBody.number = sender;
    }
    sendbluePostFireAndForget("/api/mark-read", ackBody);
    sendbluePostFireAndForget("/api/send-typing-indicator", ackBody);
    logReceipt(`WEBHOOK: mark-read + typing for ${isGroup ? `group ${groupId}` : sender}`);

    // Deduplication: atomically claim this message before any await.
    // This prevents the poll (running on a timer) from also processing it
    // while we await mcp.notification() below.
    if (messageHandle && !claimMessage(messageHandle)) {
      process.stderr.write(
        `sendblue-channel: [webhook] dedup — ${messageHandle} from ${sender} (already delivered by poll)\n`,
      );
      logReceipt(`WEBHOOK-DEDUP: ${messageHandle} from ${sender}`);
      return respond();
    }

    // Check for permission verdict (1:1 only — group permission flow not supported)
    if (!isGroup) {
      const m = PERMISSION_REPLY_RE.exec(content);
      if (m) {
        await mcp.notification({
          method: "notifications/claude/channel/permission" as `notifications/${string}`,
          params: {
            request_id: m[2].toLowerCase(),
            behavior: m[1].toLowerCase().startsWith("y") ? "allow" : "deny",
          },
        });
        process.stderr.write(
          `sendblue-channel: permission verdict from ${sender}: ${m[1]} ${m[2]}\n`,
        );
        return respond();
      }
    }

    // Log inbound to history (per-contact for 1:1, per-group for groups)
    if (isGroup) {
      appendGroupHistory(HARNESS_DIR, {
        ts: new Date().toISOString(),
        dir: "inbound",
        from: sender,
        group_id: groupId,
        participants,
        content,
        handle: messageHandle || undefined,
      });
    } else {
      appendHistory(sender, {
        ts: new Date().toISOString(),
        dir: "inbound",
        from: sender,
        content,
        handle: messageHandle || undefined,
      });
    }

    // Forward to Claude as channel event, prepending contact or group context
    const rawBody = mediaUrl ? `${content}\n[attachment: ${mediaUrl}]` : content;
    const context = isGroup
      ? loadGroupContext(HARNESS_DIR, groupId)
      : loadContactContext(sender);
    const messageBody = context + rawBody;

    const meta: Record<string, unknown> = {
      chat_id: isGroup ? groupId : sender,
      sender,
      message_handle: messageHandle,
      source: "imessage",
    };
    if (isGroup) {
      meta.group_id = groupId;
      meta.participants = JSON.stringify(participants);
    }

    try {
      await mcp.notification({
        method: "notifications/claude/channel",
        params: {
          content: messageBody,
          meta,
        },
      });
    } catch (err) {
      // Notification failed — unclaim so poll can retry delivery
      unclaimMessage(messageHandle);
      process.stderr.write(
        `sendblue-channel: [webhook] notification FAILED for ${messageHandle}: ${String(err)}\n`,
      );
      return respond();
    }

    // Message already claimed before the await — no separate markMessageSeen needed
    process.stderr.write(
      `sendblue-channel: [webhook] delivered ${messageHandle} from ${sender}${isGroup ? ` (group ${groupId})` : ""}: ${content.slice(0, 80)}\n`,
    );
    return respond();
  }

  return new Response("not found", { status: 404 });
}

await startWebhookServer();

// ---------------------------------------------------------------------------
// Inbound polling (fallback for when webhooks don't fire on free tier)
// ---------------------------------------------------------------------------

const POLL_INTERVAL_MS = 5000; // 5s — fast fallback when webhook misses
let lastSeenDate = new Date().toISOString(); // Only deliver messages newer than startup
let stopped = false;

async function pollInbound(): Promise<void> {
  try {
    const response = await sendblueGet<{
      data: Array<Record<string, unknown>>;
    }>("/api/v2/messages");

    const messages = response.data ?? [];
    // Filter to inbound messages newer than our watermark
    const newInbound = messages.filter((m) => {
      if (m.is_outbound) {
        return false;
      }
      const dateSent = m.date_sent as string;
      if (!dateSent || dateSent <= lastSeenDate) {
        return false;
      }
      // Group messages: keep regardless of to_number (the group is identified
      // by group_id, not by to_number).
      if (m.group_id) {
        return true;
      }
      const toNum = (m.to_number as string) ?? (m.number as string) ?? "";
      if (toNum && toNum !== OWN_NUMBER) {
        return false;
      }
      return true;
    });

    if (newInbound.length === 0) {
      return;
    }

    // Process each new message (oldest first). The watermark is advanced only
    // AFTER a message has been successfully handed off to mcp.notification()
    // (or explicitly filtered out for reasons that are stable across polls,
    // e.g. access-control denial). If mcp.notification() throws, we leave
    // the watermark at the latest *successfully* delivered message so the
    // failed message — and anything newer — is re-fetched on the next poll.
    // Previously the watermark was bumped to the newest fetched timestamp
    // before delivery, so a single failed notification permanently dropped
    // the message and every newer one in the same batch.
    newInbound.sort((a, b) => (a.date_sent as string).localeCompare(b.date_sent as string));
    let highestProcessedDate: string | null = null;
    let deliveryFailed = false;

    for (const msg of newInbound) {
      if (deliveryFailed) {
        // Bail out of the batch on first delivery failure. Older successful
        // deliveries still advance the watermark via highestProcessedDate;
        // this message and anything after it stay above the watermark and
        // will be retried (claimMessage() dedups if they later succeed via
        // webhook).
        break;
      }
      const msgDate = msg.date_sent as string;
      const sender = (msg.from_number as string) ?? "";
      const content = (msg.content as string) ?? "";
      const mediaUrl = msg.media_url as string | undefined;
      const messageHandle = (msg.message_handle as string) ?? "";
      const groupIdRaw = (msg.group_id as string) ?? "";
      const participantsRaw = msg.participants;
      const participants: string[] = Array.isArray(participantsRaw)
        ? (participantsRaw as unknown[]).filter((p): p is string => typeof p === "string")
        : [];
      // See webhook handler above — only treat as a group message when the
      // claimed group_id matches the strict Sendblue shape. Malformed values
      // (path traversal attempts, etc.) are logged and the message is skipped
      // so it cannot be used to escape the per-group history directory.
      const isGroup = isGroupId(groupIdRaw);
      const groupId = isGroup ? groupIdRaw : "";
      if (!isGroup && groupIdRaw) {
        process.stderr.write(
          `sendblue-channel: [poll] ignoring malformed group_id: ${JSON.stringify(groupIdRaw).slice(0, 80)}\n`,
        );
        // Stable skip — advance past this message so we don't re-evaluate it.
        highestProcessedDate = msgDate;
        continue;
      }

      if (!sender || (!content && !mediaUrl)) {
        highestProcessedDate = msgDate;
        continue;
      }
      // Access control: same logic as webhook path. Do not consult the
      // attacker-controlled `participants` field; require either the sender
      // itself to be on the allowlist, or the group to have already been
      // promoted by a prior allowed sender.
      if (isGroup) {
        if (!isAllowedHandle(sender) && !isGroupAllowed(groupId)) {
          highestProcessedDate = msgDate;
          continue;
        }
        recordGroupActivity(groupId, sender, participants, isAllowedHandle);
      } else if (!isAllowed(sender)) {
        highestProcessedDate = msgDate;
        continue;
      }

      // Deduplication: atomically claim this message before any await
      if (messageHandle && !claimMessage(messageHandle)) {
        // Already delivered (likely by the webhook path) — safe to advance.
        highestProcessedDate = msgDate;
        continue;
      }

      // Instant read receipt + typing indicator — fire-and-forget, zero blocking
      logReceipt(`POLL: firing mark-read + typing for ${isGroup ? `group ${groupId}` : sender}`);
      const ackBody: Record<string, unknown> = { from_number: OWN_NUMBER };
      if (isGroup) {
        ackBody.group_id = groupId;
      } else {
        ackBody.number = sender;
      }
      sendbluePostFireAndForget("/api/mark-read", ackBody);
      sendbluePostFireAndForget("/api/send-typing-indicator", ackBody);

      // Check for permission verdict (1:1 only)
      if (!isGroup) {
        const m = PERMISSION_REPLY_RE.exec(content);
        if (m) {
          try {
            await mcp.notification({
              method: "notifications/claude/channel/permission" as `notifications/${string}`,
              params: {
                request_id: m[2].toLowerCase(),
                behavior: m[1].toLowerCase().startsWith("y") ? "allow" : "deny",
              },
            });
          } catch (err) {
            unclaimMessage(messageHandle);
            process.stderr.write(
              `sendblue-channel: [poll] permission notification FAILED for ${messageHandle}: ${String(err)}\n`,
            );
            deliveryFailed = true;
            break;
          }
          process.stderr.write(
            `sendblue-channel: [poll] permission verdict from ${sender}: ${m[1]} ${m[2]}\n`,
          );
          highestProcessedDate = msgDate;
          continue;
        }
      }

      // Log inbound to history
      if (isGroup) {
        appendGroupHistory(HARNESS_DIR, {
          ts: new Date().toISOString(),
          dir: "inbound",
          from: sender,
          group_id: groupId,
          participants,
          content,
          handle: messageHandle || undefined,
        });
      } else {
        appendHistory(sender, {
          ts: new Date().toISOString(),
          dir: "inbound",
          from: sender,
          content,
          handle: messageHandle || undefined,
        });
      }

      // Forward to Claude as channel event
      const rawBody =
        mediaUrl && mediaUrl !== "null" ? `${content}\n[attachment: ${mediaUrl}]` : content;
      const context = isGroup
        ? loadGroupContext(HARNESS_DIR, groupId)
        : loadContactContext(sender);
      const messageBody = context + rawBody;

      const meta: Record<string, unknown> = {
        chat_id: isGroup ? groupId : sender,
        sender,
        message_handle: messageHandle,
        source: "imessage",
      };
      if (isGroup) {
        meta.group_id = groupId;
        meta.participants = JSON.stringify(participants);
      }

      try {
        await mcp.notification({
          method: "notifications/claude/channel",
          params: {
            content: messageBody,
            meta,
          },
        });
      } catch (err) {
        // Notification failed — unclaim so webhook or next poll can retry.
        // Stop advancing the watermark at the most recent successfully
        // processed message; this one (and anything newer in the same batch)
        // will be re-fetched on the next poll.
        unclaimMessage(messageHandle);
        process.stderr.write(
          `sendblue-channel: [poll] notification FAILED for ${messageHandle}: ${String(err)}\n`,
        );
        deliveryFailed = true;
        break;
      }

      // Message already claimed before the await — no separate markMessageSeen needed
      highestProcessedDate = msgDate;
      process.stderr.write(
        `sendblue-channel: [poll] delivered ${messageHandle} from ${sender}${isGroup ? ` (group ${groupId})` : ""}: ${content.slice(0, 80)}\n`,
      );
    }

    // Advance the watermark only past messages we successfully handed off
    // (or that we explicitly chose not to deliver for stable reasons). If
    // delivery failed mid-batch, we leave the watermark at the highest
    // delivered message and re-fetch the rest next poll.
    if (highestProcessedDate !== null) {
      lastSeenDate = highestProcessedDate;
    }
  } catch (err) {
    process.stderr.write(
      `sendblue-channel: poll error: ${err instanceof Error ? err.message : String(err)}\n`,
    );
  }
}

async function pollLoop(): Promise<void> {
  try {
    await pollInbound();
  } catch (err) {
    process.stderr.write(
      `sendblue-channel: poll loop error: ${err instanceof Error ? err.message : String(err)}\n`,
    );
  }
  if (!stopped) {
    setTimeout(pollLoop, POLL_INTERVAL_MS);
  }
}
void pollLoop();
process.stderr.write(`sendblue-channel: polling every ${POLL_INTERVAL_MS}ms\n`);

// ---------------------------------------------------------------------------
// Graceful shutdown
// ---------------------------------------------------------------------------

let shuttingDown = false;
function shutdown(): void {
  if (shuttingDown) {
    return;
  }
  shuttingDown = true;
  stopped = true;
  // Flush any pending debounced seen-messages write
  if (saveSeenTimer !== null) {
    clearTimeout(saveSeenTimer);
    saveSeenMessagesSync();
  }
  process.stderr.write("sendblue-channel: shutting down\n");
  process.exit(0);
}
process.stdin.on("end", shutdown);
process.stdin.on("close", shutdown);
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
process.on("SIGHUP", shutdown);

// Orphan detection: stdin close events don't always fire when the MCP client
// (Claude CLI / Codex) exits abnormally. Poll the parent PID — once we detect
// we've been reparented to launchd (ppid=1), shut down. Without this, crashed
// agent sessions leave bun server.ts processes running forever. 5s cadence is
// fine; the event loop is mostly idle when no MCP requests are in flight.
const INITIAL_PPID = process.ppid;
if (INITIAL_PPID !== 1) {
  const orphanCheck = setInterval(() => {
    if (process.ppid === 1) {
      process.stderr.write(
        `sendblue-channel: parent died (was ppid=${INITIAL_PPID}), shutting down\n`,
      );
      clearInterval(orphanCheck);
      shutdown();
    }
  }, 5000);
  // Don't keep the event loop alive for this check alone.
  orphanCheck.unref();
}
