#!/usr/bin/env bun
/// <reference types="bun-types" />
/**
 * Email MCP channel server for Exult Healthcare AI agent.
 *
 * Polls Microsoft Graph for new inbox messages and surfaces them as
 * MCP channel notifications. Provides reply/send/forward tools.
 *
 * Transports:
 *   - Default (no flag): HTTP-only mode. Used by the long-lived supervisor
 *     (~/run-email-channel.sh) to serve desktop apps + external clients on
 *     port 18810. Uses the per-session `serverFactory: buildServer` pattern
 *     so concurrent clients (Claude Desktop + Codex) each get their own
 *     `Server` instance; mcp.notification(...) calls fan out to every
 *     connected session via `mcpHandle.activeServers()` and spill to
 *     PENDING_FILE for later drain when nothing is connected.
 *   - --stdio (or STDIO=1): stdio transport for the local Claude parent
 *     spawned via .mcp.json. Skips HTTP listener (port would conflict with
 *     supervisor). This is a PRIMARY notification delivery path — the
 *     stdio Server has a real subscriber (the Claude parent) so push
 *     notifications actually land.
 *
 * Env vars:
 *   MS365_CLIENT_ID       - Azure AD app client ID (required)
 *   MS365_CLIENT_SECRET    - Azure AD app client secret (required)
 *   MS365_TENANT_ID        - Azure AD tenant ID (required)
 *   MS365_MAILBOX          - Mailbox to monitor (default: gautam@exulthealthcare.com)
 *   EMAIL_POLL_INTERVAL    - Poll interval in seconds (default: 20)
 *   MCP_BEARER_TOKEN       - Required for HTTP mode; ignored in --stdio mode
 *   STDIO                  - Set to "1" as an alternative to --stdio flag
 */

import { readFileSync, writeFileSync, mkdirSync, existsSync } from "fs";
import { appendFile, mkdir, writeFile } from "fs/promises";
import { homedir } from "os";
import { join } from "path";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  ListToolsRequestSchema,
  CallToolRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import type { CallToolRequest } from "@modelcontextprotocol/sdk/types.js";
import { serveMcpOverHttp, MCP_PORTS } from "../mcp-shared/index.ts";
import {
  htmlToPlainText,
  emailPreview,
  truncate,
  type GraphMailMessage,
  type GraphAttachment,
  type DeltaResponse,
} from "./utils.js";

// ---------------------------------------------------------------------------
// Mode selection
//
// `--stdio` (or STDIO=1) puts us in stdio-transport mode for the local
// Claude parent. We MUST skip the HTTP listener in that mode because the
// long-lived supervisor (~/run-email-channel.sh) already binds port 18810.
// ---------------------------------------------------------------------------

const STDIO_MODE =
  process.argv.includes("--stdio") || process.env.STDIO === "1";

// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------

const CLIENT_ID = process.env.MS365_CLIENT_ID ?? "";
const CLIENT_SECRET = process.env.MS365_CLIENT_SECRET ?? "";
const TENANT_ID = process.env.MS365_TENANT_ID ?? "";
const MAILBOX = process.env.MS365_MAILBOX ?? "gautam@exulthealthcare.com";
const POLL_INTERVAL_SEC = parseInt(process.env.EMAIL_POLL_INTERVAL ?? "20", 10);
const GRAPH_BASE = "https://graph.microsoft.com/v1.0";

if (!CLIENT_ID || !CLIENT_SECRET || !TENANT_ID) {
  process.stderr.write(
    "email-channel: MS365_CLIENT_ID, MS365_CLIENT_SECRET, and MS365_TENANT_ID are required\n",
  );
  process.exit(1);
}

// ---------------------------------------------------------------------------
// Unhandled error safety net (same pattern as teams-channel)
// ---------------------------------------------------------------------------

process.on("unhandledRejection", (err) => {
  process.stderr.write(`email-channel: unhandled rejection: ${String(err)}\n`);
});
process.on("uncaughtException", (err) => {
  const msg = String(err);
  process.stderr.write(`email-channel: uncaught exception: ${msg}\n`);
  if (
    msg.includes("EPIPE") ||
    msg.includes("broken pipe") ||
    msg.includes("write after end")
  ) {
    process.stderr.write(
      "email-channel: MCP pipe broken, poll loop continues\n",
    );
    return;
  }
});
process.stdout?.on?.("error", (err: Error) => {
  process.stderr.write(
    `email-channel: stdout error (ignored): ${err.message}\n`,
  );
});

// ---------------------------------------------------------------------------
// State directories
//
// Stdio mode uses a sibling subdir so it doesn't race with the supervisor's
// HTTP process on delta-state.json or pending-messages.jsonl. Both processes
// poll independently; the per-process seenMessageIds Set plus separate state
// files mean each process tracks its own surfacing without stomping.
//
// Trade-off: this doubles MS Graph quota usage during periods where both
// processes are alive (every new email is fetched twice). Acceptable today
// because: (a) inbox volume is low, (b) Graph throttle window is generous
// at our scale, and (c) the alternative — shared delta state via a file
// lock — adds complexity that we don't need yet. Revisit if Graph usage
// shows up in throttling logs or quota dashboards.
// ---------------------------------------------------------------------------

const STATE_DIR = STDIO_MODE
  ? join(homedir(), ".claude", "channels", "email", "stdio")
  : join(homedir(), ".claude", "channels", "email");
mkdirSync(STATE_DIR, { recursive: true });

const DELTA_STATE_FILE = join(STATE_DIR, "delta-state.json");
const PENDING_FILE = join(STATE_DIR, "pending-messages.jsonl");
const AUDIT_LOG = "/tmp/email-channel-audit.log";
const ATTACHMENT_DIR = "/tmp/email-attachments";

function audit(
  direction: "IN" | "OUT",
  who: string,
  subject: string,
  preview: string,
): void {
  const line = `[${new Date().toISOString()}] [${direction}] [${who}] [${subject}] ${preview.slice(0, 120)}\n`;
  appendFile(AUDIT_LOG, line).catch(() => {});
}

// ---------------------------------------------------------------------------
// Token management (client_credentials flow via direct HTTP)
// ---------------------------------------------------------------------------

let cachedToken: { accessToken: string; expiresAt: number } | null = null;

async function getGraphToken(): Promise<string> {
  if (cachedToken && Date.now() < cachedToken.expiresAt - 60_000) {
    return cachedToken.accessToken;
  }

  const tokenUrl = `https://login.microsoftonline.com/${TENANT_ID}/oauth2/v2.0/token`;
  const body = new URLSearchParams({
    client_id: CLIENT_ID,
    client_secret: CLIENT_SECRET,
    scope: "https://graph.microsoft.com/.default",
    grant_type: "client_credentials",
  });

  const res = await fetch(tokenUrl, {
    method: "POST",
    headers: { "Content-Type": "application/x-www-form-urlencoded" },
    body: body.toString(),
  });

  if (!res.ok) {
    const text = await res.text();
    throw new Error(`Token acquisition failed (${res.status}): ${text}`);
  }

  const data = (await res.json()) as {
    access_token: string;
    expires_in: number;
  };
  cachedToken = {
    accessToken: data.access_token,
    expiresAt: Date.now() + data.expires_in * 1000,
  };

  process.stderr.write("email-channel: Graph token acquired\n");
  return cachedToken.accessToken;
}

// ---------------------------------------------------------------------------
// Graph API helpers
// ---------------------------------------------------------------------------

async function graphGet<T>(path: string): Promise<T> {
  const token = await getGraphToken();
  const url = path.startsWith("http") ? path : `${GRAPH_BASE}${path}`;
  const res = await fetch(url, {
    headers: { Authorization: `Bearer ${token}` },
  });
  if (!res.ok) {
    const text = await res.text();
    throw new Error(`Graph GET ${path} failed (${res.status}): ${text}`);
  }
  return (await res.json()) as T;
}

async function graphPost<T>(
  path: string,
  body: unknown,
): Promise<T | undefined> {
  const token = await getGraphToken();
  const url = path.startsWith("http") ? path : `${GRAPH_BASE}${path}`;
  const res = await fetch(url, {
    method: "POST",
    headers: {
      Authorization: `Bearer ${token}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify(body),
  });
  if (!res.ok) {
    const text = await res.text();
    throw new Error(`Graph POST ${path} failed (${res.status}): ${text}`);
  }
  if (res.status === 204 || res.status === 202) return undefined;
  return (await res.json()) as T;
}

async function graphPatch<T>(
  path: string,
  body: unknown,
): Promise<T | undefined> {
  const token = await getGraphToken();
  const url = path.startsWith("http") ? path : `${GRAPH_BASE}${path}`;
  const res = await fetch(url, {
    method: "PATCH",
    headers: {
      Authorization: `Bearer ${token}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify(body),
  });
  if (!res.ok) {
    const text = await res.text();
    throw new Error(`Graph PATCH ${path} failed (${res.status}): ${text}`);
  }
  if (res.status === 204 || res.status === 202) return undefined;
  return (await res.json()) as T;
}

// ---------------------------------------------------------------------------
// Archive helper
//
// Move a message from Inbox → Archive folder. Used post-reply / post-forward
// so the outbox-as-queue contract holds: archived = handled, read-in-inbox =
// in-flight (agent took ownership but reply not yet sent), unread =
// pending. The move is non-fatal: a reply that succeeds but fails to archive
// is reported as "Reply sent (archive failed)" — the user can manually move
// the message later. Inbox itself stays as the durable work queue.
// ---------------------------------------------------------------------------

async function archiveMessage(messageId: string): Promise<boolean> {
  try {
    await graphPost(
      `/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}/move`,
      { destinationId: "archive" },
    );
    return true;
  } catch (err) {
    process.stderr.write(
      `email-channel: archive failed for ${messageId}: ${String(err)}\n`,
    );
    return false;
  }
}

// ---------------------------------------------------------------------------
// Delta state persistence
// ---------------------------------------------------------------------------

interface DeltaState {
  deltaLink: string | null;
  lastPollTime: string;
}

function loadDeltaState(): DeltaState {
  try {
    if (existsSync(DELTA_STATE_FILE)) {
      return JSON.parse(readFileSync(DELTA_STATE_FILE, "utf-8")) as DeltaState;
    }
  } catch {
    process.stderr.write("email-channel: corrupt delta state, resetting\n");
  }
  return { deltaLink: null, lastPollTime: new Date().toISOString() };
}

function saveDeltaState(state: DeltaState): void {
  writeFileSync(DELTA_STATE_FILE, JSON.stringify(state, null, 2), {
    mode: 0o600,
  });
}

// ---------------------------------------------------------------------------
// Message dedup + anti-loop
// ---------------------------------------------------------------------------

const seenMessageIds = new Set<string>();
const MAX_SEEN = 5000;

function markSeen(messageId: string): boolean {
  if (seenMessageIds.has(messageId)) return false;
  seenMessageIds.add(messageId);
  if (seenMessageIds.size > MAX_SEEN) {
    const first = seenMessageIds.values().next().value;
    if (first) seenMessageIds.delete(first);
  }
  return true;
}

// ---------------------------------------------------------------------------
// Attachment handling
// ---------------------------------------------------------------------------

async function downloadAttachments(
  messageId: string,
): Promise<Array<{ name: string; path: string; contentType: string }>> {
  const dir = join(ATTACHMENT_DIR, messageId.replace(/[^a-zA-Z0-9-]/g, "_"));
  await mkdir(dir, { recursive: true });

  const data = await graphGet<{ value: GraphAttachment[] }>(
    `/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}/attachments?$top=25`,
  );

  const results: Array<{ name: string; path: string; contentType: string }> =
    [];
  for (const att of data.value) {
    if (att.isInline && att.size < 100_000) continue;
    if (!att.contentBytes) continue;

    const filePath = join(dir, att.name.replace(/[^a-zA-Z0-9._-]/g, "_"));
    const buffer = Buffer.from(att.contentBytes, "base64");
    await writeFile(filePath, buffer);
    results.push({
      name: att.name,
      path: filePath,
      contentType: att.contentType,
    });
  }
  return results;
}

// ---------------------------------------------------------------------------
// MCP Server
//
// Phase C consolidation: handlers are registered via `registerMcpHandlers`
// against a fresh `Server` each time `buildServer()` is called.
//   - In stdio mode (this process is spawned by the Claude parent via
//     .mcp.json): we call `buildServer()` once and connect it to a
//     StdioServerTransport. mcp.notification(...) is the primary path —
//     the stdio Server has a real subscriber.
//   - In HTTP mode (supervisor): we pass `buildServer` to mcp-shared as
//     `serverFactory`, which calls it once per session. The SDK's
//     Protocol.connect() refuses a second transport on the same Server,
//     so concurrent clients (Claude Desktop + Codex) each need their own
//     Server instance. notifyOrQueue() iterates `mcpHandle.activeServers()`
//     to fan out across all of them.
//
// All module-scoped state (Graph token cache, MAILBOX, PENDING_FILE,
// seenMessageIds, deltaState) is shared via closure so handlers behave
// identically across sessions.
// ---------------------------------------------------------------------------

const SERVER_INFO = { name: "email-channel", version: "1.0.0" } as const;
const SERVER_OPTIONS = {
  capabilities: {
    experimental: {
      "claude/channel": {},
      "claude/channel/permission": {},
    },
    tools: {},
  },
  instructions: [
    "Email channel for Exult Healthcare AI agent.",
    `Monitoring mailbox: ${MAILBOX}`,
    "",
    'Email messages arrive as <channel source="email-channel" chat_id="..." sender="...">.',
    "Reply with the reply tool, passing the message_id from the tag.",
    "",
    "Tools: reply, send, forward, mark_read, flag, list_thread, search, download_attachment.",
  ].join("\n"),
} as const;

// ---------------------------------------------------------------------------
// Tool implementations
// ---------------------------------------------------------------------------

function parseAddresses(
  csv: string,
): Array<{ emailAddress: { address: string } }> {
  return csv
    .split(",")
    .map((a) => a.trim())
    .filter(Boolean)
    .map((address) => ({ emailAddress: { address } }));
}

const listToolsHandler = async () => ({
  tools: [
    {
      name: "reply",
      description: "Reply to an email. Maintains thread and conversation. On success the original inbound message is auto-moved to the Archive folder (inbox-as-queue contract: archived = handled).",
      inputSchema: {
        type: "object" as const,
        properties: {
          message_id: {
            type: "string",
            description: "Graph message ID (from inbound message metadata)",
          },
          body: {
            type: "string",
            description: "Reply body (HTML supported)",
          },
          cc: {
            type: "string",
            description:
              "Optional comma-separated CC addresses",
          },
        },
        required: ["message_id", "body"],
      },
    },
    {
      name: "send",
      description: "Compose and send a new email.",
      inputSchema: {
        type: "object" as const,
        properties: {
          to: {
            type: "string",
            description: "Comma-separated recipient addresses",
          },
          subject: { type: "string", description: "Email subject" },
          body: { type: "string", description: "Email body (HTML supported)" },
          cc: {
            type: "string",
            description: "Optional comma-separated CC addresses",
          },
        },
        required: ["to", "subject", "body"],
      },
    },
    {
      name: "forward",
      description: "Forward an email to another recipient. On success the original inbound message is auto-moved to the Archive folder (same inbox-as-queue contract as reply).",
      inputSchema: {
        type: "object" as const,
        properties: {
          message_id: { type: "string", description: "Graph message ID" },
          to: {
            type: "string",
            description: "Comma-separated recipient addresses",
          },
          comment: {
            type: "string",
            description: "Optional comment to include above the forwarded message",
          },
        },
        required: ["message_id", "to"],
      },
    },
    {
      name: "mark_read",
      description: "Mark a message as read.",
      inputSchema: {
        type: "object" as const,
        properties: {
          message_id: { type: "string", description: "Graph message ID" },
        },
        required: ["message_id"],
      },
    },
    {
      name: "flag",
      description: "Flag or unflag a message for follow-up.",
      inputSchema: {
        type: "object" as const,
        properties: {
          message_id: { type: "string", description: "Graph message ID" },
          status: {
            type: "string",
            description: "Flag status: flagged, complete, or notFlagged",
          },
        },
        required: ["message_id", "status"],
      },
    },
    {
      name: "list_thread",
      description: "Get all messages in an email conversation thread.",
      inputSchema: {
        type: "object" as const,
        properties: {
          conversation_id: {
            type: "string",
            description: "Graph conversationId",
          },
          limit: {
            type: "number",
            description: "Max messages to return (default: 10)",
          },
        },
        required: ["conversation_id"],
      },
    },
    {
      name: "search",
      description:
        "Search the mailbox using KQL. Returns matching messages.",
      inputSchema: {
        type: "object" as const,
        properties: {
          query: {
            type: "string",
            description: 'KQL search query (e.g., "from:john subject:invoice")',
          },
          folder: {
            type: "string",
            description:
              "Folder to search (default: all). Options: inbox, sentitems, drafts, archive",
          },
          limit: {
            type: "number",
            description: "Max results (default: 10)",
          },
        },
        required: ["query"],
      },
    },
    {
      name: "download_attachment",
      description: "Download a specific attachment from a message.",
      inputSchema: {
        type: "object" as const,
        properties: {
          message_id: { type: "string", description: "Graph message ID" },
          attachment_id: { type: "string", description: "Attachment ID" },
        },
        required: ["message_id", "attachment_id"],
      },
    },
  ],
});

const callToolHandler = async (req: CallToolRequest) => {
  const { name } = req.params;
  const args = (req.params.arguments ?? {}) as Record<string, unknown>;

  try {
    switch (name) {
      case "reply": {
        const messageId = String(args.message_id);
        const body = String(args.body);
        const cc = args.cc ? String(args.cc) : undefined;

        const payload: Record<string, unknown> = {
          message: {
            body: { contentType: "HTML", content: body },
            ...(cc ? { ccRecipients: parseAddresses(cc) } : {}),
          },
        };

        await graphPost(
          `/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}/reply`,
          payload,
        );
        audit("OUT", MAILBOX, "reply", emailPreview(body));
        const archived = await archiveMessage(messageId);
        return {
          content: [
            {
              type: "text",
              text: archived
                ? `Reply sent to message ${truncate(messageId, 40)} (archived)`
                : `Reply sent to message ${truncate(messageId, 40)} (archive failed — message stays in inbox; please archive manually)`,
            },
          ],
        };
      }

      case "send": {
        const to = String(args.to);
        const subject = String(args.subject);
        const body = String(args.body);
        const cc = args.cc ? String(args.cc) : undefined;

        const payload = {
          message: {
            subject,
            body: { contentType: "HTML", content: body },
            toRecipients: parseAddresses(to),
            ...(cc ? { ccRecipients: parseAddresses(cc) } : {}),
          },
          saveToSentItems: true,
        };

        await graphPost(`/users/${encodeURIComponent(MAILBOX)}/sendMail`, payload);
        audit("OUT", to, subject, emailPreview(body));
        return {
          content: [
            { type: "text", text: `Email sent to ${to}: "${subject}"` },
          ],
        };
      }

      case "forward": {
        const messageId = String(args.message_id);
        const to = String(args.to);
        const comment = args.comment ? String(args.comment) : "";

        await graphPost(
          `/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}/forward`,
          {
            comment,
            toRecipients: parseAddresses(to),
          },
        );
        audit("OUT", to, "forward", comment || "(no comment)");
        const archived = await archiveMessage(messageId);
        return {
          content: [
            {
              type: "text",
              text: archived
                ? `Forwarded to ${to} (archived)`
                : `Forwarded to ${to} (archive failed — message stays in inbox; please archive manually)`,
            },
          ],
        };
      }

      case "mark_read": {
        const messageId = String(args.message_id);
        await graphPatch(`/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}`, {
          isRead: true,
        });
        return {
          content: [{ type: "text", text: "Marked as read" }],
        };
      }

      case "flag": {
        const messageId = String(args.message_id);
        const status = String(args.status);
        await graphPatch(`/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}`, {
          flag: { flagStatus: status },
        });
        return {
          content: [
            { type: "text", text: `Flag status set to: ${status}` },
          ],
        };
      }

      case "list_thread": {
        const conversationId = String(args.conversation_id).replace(/'/g, "''");
        const limit = Number(args.limit) || 10;
        const data = await graphGet<{ value: GraphMailMessage[] }>(
          `/users/${encodeURIComponent(MAILBOX)}/messages?$filter=conversationId eq '${conversationId}'&$top=${limit}&$orderby=receivedDateTime asc&$select=id,subject,from,receivedDateTime,bodyPreview,importance`,
        );
        const messages = data.value.map((m) => ({
          id: m.id,
          from: m.from.emailAddress.address,
          subject: m.subject,
          preview: m.bodyPreview?.slice(0, 200),
          received: m.receivedDateTime,
          importance: m.importance,
        }));
        return {
          content: [
            {
              type: "text",
              text: JSON.stringify(messages, null, 2),
            },
          ],
        };
      }

      case "search": {
        const query = String(args.query);
        const folder = args.folder ? String(args.folder) : undefined;
        const limit = Number(args.limit) || 10;
        const basePath = folder
          ? `/users/${encodeURIComponent(MAILBOX)}/mailFolders/${encodeURIComponent(folder)}/messages`
          : `/users/${encodeURIComponent(MAILBOX)}/messages`;
        const data = await graphGet<{ value: GraphMailMessage[] }>(
          `${basePath}?$search="${query}"&$top=${limit}&$select=id,subject,from,receivedDateTime,bodyPreview,importance,hasAttachments`,
        );
        const results = data.value.map((m) => ({
          id: m.id,
          from: m.from.emailAddress.address,
          subject: m.subject,
          preview: m.bodyPreview?.slice(0, 200),
          received: m.receivedDateTime,
          hasAttachments: m.hasAttachments,
        }));
        return {
          content: [
            { type: "text", text: JSON.stringify(results, null, 2) },
          ],
        };
      }

      case "download_attachment": {
        const messageId = String(args.message_id);
        const attachmentId = String(args.attachment_id);
        const att = await graphGet<GraphAttachment>(
          `/users/${encodeURIComponent(MAILBOX)}/messages/${encodeURIComponent(messageId)}/attachments/${encodeURIComponent(attachmentId)}`,
        );
        if (!att.contentBytes) {
          return {
            content: [
              { type: "text", text: "Attachment has no downloadable content" },
            ],
          };
        }
        const dir = join(
          ATTACHMENT_DIR,
          messageId.replace(/[^a-zA-Z0-9-]/g, "_"),
        );
        await mkdir(dir, { recursive: true });
        const filePath = join(
          dir,
          att.name.replace(/[^a-zA-Z0-9._-]/g, "_"),
        );
        const buffer = Buffer.from(att.contentBytes, "base64");
        await writeFile(filePath, buffer);
        return {
          content: [
            {
              type: "text",
              text: `Downloaded: ${att.name} (${att.size} bytes) -> ${filePath}`,
            },
          ],
        };
      }

      default:
        return {
          content: [{ type: "text", text: `Unknown tool: ${name}` }],
          isError: true,
        };
    }
  } catch (err) {
    const errMsg = err instanceof Error ? err.message : String(err);
    process.stderr.write(`email-channel: tool ${name} error: ${errMsg}\n`);
    return {
      content: [{ type: "text", text: `Error: ${errMsg}` }],
      isError: true,
    };
  }
};

/**
 * Register the listTools + callTool handlers on an MCP Server instance.
 * Factored so the same handlers can be attached to the stdio Server (this
 * process) and to each per-session HTTP Server built by `buildServer()`
 * without duplication. All handlers close over the same module-scoped
 * state (seenMessageIds, deltaState, cachedToken).
 */
function registerMcpHandlers(server: Server): void {
  server.setRequestHandler(ListToolsRequestSchema, listToolsHandler);
  server.setRequestHandler(CallToolRequestSchema, callToolHandler);
}

/**
 * Build a fresh `Server` with handlers wired up. Used both as the
 * one-shot factory for stdio mode and as the per-session factory passed
 * to mcp-shared in HTTP mode.
 */
function buildServer(): Server {
  const server = new Server(SERVER_INFO, SERVER_OPTIONS);
  registerMcpHandlers(server);
  return server;
}

// ---------------------------------------------------------------------------
// Connect MCP transport.
//
// Stdio mode: build one Server and connect a StdioServerTransport so
// mcp.notification(...) lands at the local Claude parent (the real
// subscriber). HTTP listener is skipped — the long-lived supervisor on the
// same host owns port 18810. `stdioServer` is kept as a module-scope
// reference so notifyOrQueue() / drainPendingMessages() can push directly
// to it without going through mcpHandle.
//
// HTTP mode (supervisor): mount Streamable HTTP via mcp-shared on
// MCP_PORTS.email (18810) using `serverFactory: buildServer`. Every new
// session gets its own Server instance; notifyOrQueue() fans out across
// `mcpHandle.activeServers()`.
// ---------------------------------------------------------------------------

let stdioServer: Server | null = null;
let mcpHandle: ReturnType<typeof serveMcpOverHttp> | null = null;

if (STDIO_MODE) {
  stdioServer = buildServer();
  const transport = new StdioServerTransport();

  transport.onerror = (err: Error) => {
    process.stderr.write(
      `email-channel: stdio MCP transport error (non-fatal): ${err.message}\n`,
    );
  };
  transport.onclose = () => {
    // Parent disconnected. In stdio mode there is no purpose to keep this
    // process alive — there is no other consumer for poll output and the
    // long-lived supervisor owns the HTTP listener. Without process.exit
    // here the setInterval poll loop keeps the process alive indefinitely,
    // leaking an orphaned child every time the parent restarts.
    process.stderr.write(
      "email-channel: stdio MCP transport closed — exiting\n",
    );
    try {
      saveDeltaState(deltaState);
    } catch {
      // Best-effort; do not block exit on state save.
    }
    process.exit(0);
  };

  try {
    await stdioServer.connect(transport);
    process.stderr.write("email-channel: MCP connected over stdio\n");
  } catch (err) {
    process.stderr.write(
      `email-channel: stdio MCP connect failed (poll loop still starts): ${String(err)}\n`,
    );
  }
} else {
  const mcpToken = process.env.MCP_BEARER_TOKEN;
  if (!mcpToken) {
    process.stderr.write(
      "email-channel: MCP_BEARER_TOKEN required for HTTP mode\n",
    );
    process.exit(1);
  }

  mcpHandle = serveMcpOverHttp({
    serverFactory: buildServer,
    port: MCP_PORTS.email,
    token: mcpToken,
  });

  process.stderr.write(
    `email-channel: MCP listening on http :${MCP_PORTS.email}\n`,
  );
}

/**
 * Snapshot the live `Server` instances we should push notifications to.
 * In stdio mode there is exactly one (`stdioServer`). In HTTP mode we
 * defer to mcp-shared's session tracker, which returns a fresh array of
 * every connected client's Server.
 */
function activeServers(): Server[] {
  if (STDIO_MODE) {
    return stdioServer ? [stdioServer] : [];
  }
  return mcpHandle ? mcpHandle.activeServers() : [];
}

// ---------------------------------------------------------------------------
// Drain pending messages from offline period
// ---------------------------------------------------------------------------

async function drainPendingMessages(): Promise<void> {
  if (!existsSync(PENDING_FILE)) return;

  let raw: string;
  try {
    raw = readFileSync(PENDING_FILE, "utf-8").trim();
  } catch {
    return;
  }
  if (!raw) return;

  const lines = raw.split("\n").filter(Boolean);
  process.stderr.write(
    `email-channel: draining ${lines.length} queued message(s)\n`,
  );

  // Snapshot the active session-Servers once per drain pass. Each
  // queued message broadcasts to every connected client (Claude
  // Desktop + Codex may both be live on HTTP; stdio always has at most
  // one). If the snapshot is empty we leave the queue alone -- the next
  // periodic drain will retry.
  const servers = activeServers();
  if (servers.length === 0) {
    process.stderr.write(
      "email-channel: no active MCP sessions; deferring drain\n",
    );
    return;
  }

  let delivered = 0;
  for (const line of lines) {
    try {
      const payload = JSON.parse(line) as {
        content: string;
        meta: Record<string, string>;
      };
      // Broadcast to every active session. If any single push fails we
      // still count it as delivered to the others -- this is best-effort
      // and the file is cleared once at least one server got it. If all
      // fail we break and leave the rest of the queue for the next pass.
      const results = await Promise.allSettled(
        servers.map((s) =>
          s.notification({
            method: "notifications/claude/channel",
            params: { content: payload.content, meta: payload.meta },
          }),
        ),
      );
      const ok = results.some((r) => r.status === "fulfilled");
      if (!ok) {
        process.stderr.write(
          `email-channel: drain failed (all ${servers.length} sessions rejected)\n`,
        );
        break;
      }
      delivered++;
    } catch (err) {
      process.stderr.write(`email-channel: drain failed: ${String(err)}\n`);
      break;
    }
  }

  if (delivered === lines.length) {
    writeFileSync(PENDING_FILE, "", { mode: 0o600 });
  } else {
    const remaining = lines.slice(delivered).join("\n") + "\n";
    writeFileSync(PENDING_FILE, remaining, { mode: 0o600 });
  }
  process.stderr.write(
    `email-channel: drained ${delivered}/${lines.length} queued messages\n`,
  );
}

// Initial drain (best-effort) + periodic re-drain so transient downtime
// self-heals without requiring a process restart.
drainPendingMessages().catch((err) => {
  process.stderr.write(`email-channel: drain error: ${String(err)}\n`);
});

const DRAIN_INTERVAL_MS = 30_000;
setInterval(() => {
  drainPendingMessages().catch((err) => {
    process.stderr.write(`email-channel: periodic drain error: ${String(err)}\n`);
  });
}, DRAIN_INTERVAL_MS);

// ---------------------------------------------------------------------------
// Notify or queue
//
// In stdio mode the (single) stdio Server is the primary delivery path with
// a real subscriber (the Claude parent). In HTTP mode every connected
// session-Server gets its own push so concurrent clients (Claude Desktop +
// Codex) both see the message. If no sessions are connected OR every push
// rejects, fall back to the on-disk spill queue. Either way, notifyOrQueue
// is safe and never throws.
// ---------------------------------------------------------------------------

async function notifyOrQueue(
  content: string,
  meta: Record<string, string>,
): Promise<void> {
  const servers = activeServers();
  if (servers.length > 0) {
    try {
      const results = await Promise.allSettled(
        servers.map((s) =>
          s.notification({
            method: "notifications/claude/channel",
            params: { content, meta },
          }),
        ),
      );
      if (results.some((r) => r.status === "fulfilled")) {
        return;
      }
      process.stderr.write(
        `email-channel: notification rejected by all ${servers.length} sessions, queuing\n`,
      );
    } catch (err) {
      process.stderr.write(
        `email-channel: notification failed, queuing: ${String(err)}\n`,
      );
    }
  }
  const line = JSON.stringify({ content, meta }) + "\n";
  await appendFile(PENDING_FILE, line).catch(() => {});
}

// ---------------------------------------------------------------------------
// Anti-loop: detect messages sent by the agent itself
// ---------------------------------------------------------------------------

function isSentByAgent(msg: GraphMailMessage): boolean {
  const senderAddr = msg.from?.emailAddress?.address?.toLowerCase() ?? "";
  return senderAddr === MAILBOX.toLowerCase();
}

// ---------------------------------------------------------------------------
// Process a new email message
// ---------------------------------------------------------------------------

async function processMessage(msg: GraphMailMessage): Promise<void> {
  if (!markSeen(msg.id)) return;
  if (isSentByAgent(msg)) return;
  // Inbox-as-queue echo filter: when the outlook-agent calls mark_read, the
  // MS Graph Delta API replays the message with isRead=true. We treat any
  // delta event whose isRead is already true as a state-change echo (or as
  // an email the agent has already accepted ownership of via boot-sweep) and
  // do NOT re-surface it — the agent would otherwise see a duplicate
  // notification for work it just took on. See agents/outlook-agent.md
  // "Inbox-as-Queue Design" for the full contract.
  if (msg.isRead) return;

  const senderAddr = msg.from?.emailAddress?.address ?? "unknown";
  const senderName = msg.from?.emailAddress?.name ?? senderAddr;
  const subject = msg.subject ?? "(no subject)";
  const bodyText =
    msg.body?.contentType === "html"
      ? htmlToPlainText(msg.body.content)
      : msg.body?.content ?? msg.bodyPreview ?? "";

  // Download attachments if present
  let attachmentPaths: string[] = [];
  if (msg.hasAttachments) {
    try {
      const downloaded = await downloadAttachments(msg.id);
      attachmentPaths = downloaded.map((a) => a.path);
    } catch (err) {
      process.stderr.write(
        `email-channel: attachment download failed: ${String(err)}\n`,
      );
    }
  }

  const content = `Email from ${senderName} <${senderAddr}>\nSubject: ${subject}\n\n${bodyText}`;

  // Surface To + Cc recipient lists so the outlook-agent can preserve the
  // Cc set when replying (per agents/outlook-agent.md "Cc preservation
  // contract"). Comma-separated email addresses; the agent applies the
  // @exulthealthcare.com allowlist before passing them back via the cc
  // param on `reply`. We strip the agent's own MAILBOX from both lists so
  // the agent can't accidentally re-Cc itself (would cause a reply loop).
  const mailboxLower = MAILBOX.toLowerCase();
  const toRecipients = (msg.toRecipients ?? [])
    .map((r) => r.emailAddress?.address)
    .filter((a): a is string => Boolean(a) && a.toLowerCase() !== mailboxLower)
    .join(",");
  const ccRecipients = (msg.ccRecipients ?? [])
    .map((r) => r.emailAddress?.address)
    .filter((a): a is string => Boolean(a) && a.toLowerCase() !== mailboxLower)
    .join(",");

  const meta: Record<string, string> = {
    source: "email-channel",
    sender_email: senderAddr,
    sender_name: senderName,
    subject,
    has_attachments: String(msg.hasAttachments),
    attachment_paths: JSON.stringify(attachmentPaths),
    received_at: msg.receivedDateTime,
    importance: msg.importance ?? "normal",
    is_read: String(msg.isRead),
    conversation_id: msg.conversationId ?? "",
    message_id: msg.id,
    to_recipients: toRecipients,
    cc_recipients: ccRecipients,
  };

  audit("IN", senderAddr, subject, emailPreview(bodyText));
  await notifyOrQueue(content, meta);
  // We do NOT mark-as-read here — the outlook-agent does, on task
  // acceptance. See agents/outlook-agent.md "Inbox-as-Queue Design".

  process.stderr.write(
    `email-channel: surfaced email from ${senderAddr}: "${truncate(subject, 60)}"\n`,
  );
}

// ---------------------------------------------------------------------------
// Poll loop
// ---------------------------------------------------------------------------

let deltaState = loadDeltaState();

async function initDelta(): Promise<void> {
  // Initial delta request — gets current state + deltaLink for future polls
  const selectFields =
    "id,subject,from,toRecipients,ccRecipients,receivedDateTime,importance,hasAttachments,isRead,body,bodyPreview,conversationId,parentFolderId";
  const url = `/users/${MAILBOX}/mailFolders/inbox/messages/delta?$select=${selectFields}&$top=25`;

  try {
    let data = await graphGet<DeltaResponse>(url);
    // Mark all existing messages as seen (don't surface on startup)
    for (const msg of data.value) {
      seenMessageIds.add(msg.id);
    }
    // Follow nextLink pages
    while (data["@odata.nextLink"]) {
      data = await graphGet<DeltaResponse>(data["@odata.nextLink"]);
      for (const msg of data.value) {
        seenMessageIds.add(msg.id);
      }
    }
    if (data["@odata.deltaLink"]) {
      deltaState.deltaLink = data["@odata.deltaLink"];
      deltaState.lastPollTime = new Date().toISOString();
      saveDeltaState(deltaState);
    }
    process.stderr.write(
      `email-channel: initial sync complete, ${seenMessageIds.size} existing messages indexed\n`,
    );
  } catch (err) {
    process.stderr.write(
      `email-channel: initial delta failed: ${String(err)}\n`,
    );
  }
}

async function pollOnce(): Promise<void> {
  if (!deltaState.deltaLink) {
    await initDelta();
    return;
  }

  try {
    let data = await graphGet<DeltaResponse>(deltaState.deltaLink);
    const newMessages: GraphMailMessage[] = [];

    for (const msg of data.value) {
      if (msg.from && msg.receivedDateTime) {
        newMessages.push(msg);
      }
    }
    while (data["@odata.nextLink"]) {
      data = await graphGet<DeltaResponse>(data["@odata.nextLink"]);
      for (const msg of data.value) {
        if (msg.from && msg.receivedDateTime) {
          newMessages.push(msg);
        }
      }
    }

    if (data["@odata.deltaLink"]) {
      deltaState.deltaLink = data["@odata.deltaLink"];
      deltaState.lastPollTime = new Date().toISOString();
      saveDeltaState(deltaState);
    }

    for (const msg of newMessages) {
      await processMessage(msg);
    }
  } catch (err) {
    const errMsg = String(err);
    process.stderr.write(`email-channel: poll error: ${errMsg}\n`);
    // If delta link expired, reset
    if (errMsg.includes("404") || errMsg.includes("syncStateNotFound")) {
      process.stderr.write(
        "email-channel: delta link expired, re-initializing\n",
      );
      deltaState.deltaLink = null;
      saveDeltaState(deltaState);
    }
  }
}

// ---------------------------------------------------------------------------
// Acquire token + start poll loop
//
// Token failure is non-fatal: the MCP transport (stdio or HTTP) must stay
// up so the Claude parent / clients can connect; the MS365 poller is
// best-effort and retries on its normal interval.
// ---------------------------------------------------------------------------

try {
  await getGraphToken();
  process.stderr.write(
    `email-channel: authenticated, monitoring ${MAILBOX}\n`,
  );
} catch (err) {
  process.stderr.write(
    `email-channel: token acquisition failed (non-fatal, will retry on poll): ${String(err)}\n`,
  );
}

// Initial sync — also non-fatal so a transient Graph outage at boot
// doesn't take down the MCP transport.
try {
  await initDelta();
} catch (err) {
  process.stderr.write(
    `email-channel: initDelta failed (non-fatal, will retry on poll): ${String(err)}\n`,
  );
}

// Poll loop
const pollIntervalMs = POLL_INTERVAL_SEC * 1000;
process.stderr.write(
  `email-channel: starting poll loop (every ${POLL_INTERVAL_SEC}s, mode=${STDIO_MODE ? "stdio" : "http"})\n`,
);

setInterval(() => {
  pollOnce().catch((err) => {
    process.stderr.write(`email-channel: poll loop error: ${String(err)}\n`);
  });
}, pollIntervalMs);

// Graceful shutdown — persist delta state to prevent duplicate delivery on restart
function shutdown(signal: string): void {
  process.stderr.write(`email-channel: ${signal} received, saving state and exiting\n`);
  saveDeltaState(deltaState);
  process.exit(0);
}
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));

// Keep alive
process.stderr.write(
  `email-channel: server running (mode=${STDIO_MODE ? "stdio" : "http"})\n`,
);
