/// <reference types="bun-types" />
/**
 * Shared Streamable HTTP transport + bearer middleware for in-house MCP
 * servers running on Bun.
 *
 * Routes (all return JSON unless noted):
 *   GET  /health           -> 200 {ok:true, service} UNAUTHENTICATED
 *   ANY  /mcp              -> Streamable HTTP MCP (bearer required)
 *   *                      -> 404, OR delegated to `attachTo` if provided.
 *
 * Legacy SSE (/sse + /messages) is deliberately NOT supported in Phase
 * A. A previous draft shipped a placeholder bridge that returned
 * -32601 for every tool call -- worse than a 404 because clients would
 * connect, receive the endpoint event, and then fail every request.
 * If a real legacy path is needed in Phase B/C it must be wired to a
 * second Server instance with its own transport.
 *
 * The same-process two-port layout (one port for the MCP server, one for an
 * existing webhook listener) is also a valid pattern. `attachTo` is just an
 * option for callers that already serve HTTP on the chosen port -- e.g.
 * `teams-channel` and `email-channel`, which have webhook handlers we don't
 * want to displace. When `attachTo` is set, this module does NOT call
 * `Bun.serve` -- it returns a composed fetch handler instead.
 *
 * Per-session transports AND per-session Server instances
 * -------------------------------------------------------
 * Every MCP client connection gets its OWN
 * WebStandardStreamableHTTPServerTransport instance AND its own
 * `Server` instance, keyed by the session id the SDK generates during
 * `initialize`. This matches the canonical multi-client pattern from
 * the MCP TypeScript SDK and the Python sibling library in
 * `./python/streamable_transport.py`.
 *
 * The SDK's `Protocol.connect()` (shared/protocol.js) stores its
 * transport in a single `_transport` slot and throws "Already connected
 * to a transport" on a second `connect()` call. That means one `Server`
 * instance can host AT MOST one live transport. To support concurrent
 * clients (e.g. Claude Desktop + Codex hitting the same MCP server) we
 * need a fresh `Server` per session, which is what `serverFactory`
 * provides.
 *
 * The previous implementation reused one transport for all requests.
 * The SDK tracks `_initialized` + `sessionId` per transport instance, so
 * any second client that POSTed `initialize` was rejected with HTTP 400
 * `Invalid Request: Server already initialized`. That blocked running
 * Claude Desktop and Codex against the same MCP server simultaneously,
 * and also broke any single client retrying after a TCP close.
 *
 * API: pass `serverFactory` (recommended) for true multi-client support.
 * The legacy `server` field still works for single-client deployments
 * but emits a one-time deprecation warning at startup; a second
 * concurrent `initialize` will hit the SDK's "Already connected to a
 * transport" error.
 */

import type { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { WebStandardStreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { requireBearer } from "./auth.js";

export interface ServeMcpOptions {
  /**
   * Legacy single-instance MCP Server. Mutually exclusive with
   * `serverFactory`. When set, this module emits a one-time deprecation
   * warning at startup and will only handle ONE concurrent client --
   * any second `initialize` POST hits the SDK's
   * `Protocol.connect()` "Already connected to a transport" error
   * because the same Server can't bind to two transports at once.
   *
   * Prefer `serverFactory` for new code. Kept here for back-compat with
   * single-client deployments (e.g. one Claude Desktop) where the
   * caller hasn't yet refactored their construction code into a
   * function.
   *
   * @deprecated for multi-client use; pass `serverFactory` instead.
   */
  server?: Server;
  /**
   * Factory that builds a fresh `Server` instance for each new MCP
   * session. The factory MUST register its own tool/resource handlers
   * before returning. Recommended for any caller that may have more
   * than one client connected at a time.
   *
   * The returned Server is connected to the per-session transport and
   * lives for the duration of that session. It is closed when the
   * session ends (transport `onclose` fires) so factories that allocate
   * per-instance resources (timers, sockets) MUST clean those up via
   * their Server's onclose hook -- mcp-shared does not own them.
   */
  serverFactory?: () => Server | Promise<Server>;
  /** TCP port to listen on (ignored when `attachTo` is provided). */
  port: number;
  /** MCP route path. Defaults to "/mcp". */
  path?: string;
  /** Bearer token required for all routes except /health. */
  token: string;
  /**
   * @deprecated Not supported in Phase A.
   *
   * Passing `legacySse: true` will throw at startup. The earlier
   * placeholder bridge returned a JSON-RPC -32601 for every dispatched
   * tool call, which is worse than a 404 because clients would happily
   * connect via /sse and then fail every subsequent request. Real
   * legacy support requires a second Server + transport instance and
   * will land in a follow-up PR.
   */
  legacySse?: boolean;
  /**
   * Optional fall-through fetch handler. If provided, routes that aren't
   * /mcp, /sse, /messages or /health are passed to this handler. In this
   * mode `Bun.serve` is NOT called -- the caller is responsible for
   * wiring the returned fetch handler into their existing HTTP server.
   */
  attachTo?: (req: Request) => Response | Promise<Response | undefined>;
}

export interface ServeMcpResult {
  /** The composed fetch handler. */
  fetch: (req: Request) => Promise<Response>;
  /**
   * The Bun server instance, if one was started. Undefined when the
   * caller passed `attachTo` (in which case they own the server).
   */
  bunServer?: ReturnType<typeof Bun.serve>;
  /** Close the underlying Bun server, if one was started. */
  close: () => Promise<void>;
  /**
   * Snapshot of currently-connected per-session `Server` instances.
   * Callers that push notifications to clients (e.g. email-channel's
   * inbox poller, teams-channel's webhook bridge) MUST iterate this
   * list and call `.notification()` on each Server -- there is no
   * single "the Server" anymore. Returns an empty array when no
   * sessions are active.
   *
   * In legacy `server`-only mode this returns `[server]` while the
   * single session is active (i.e. after the initialize handshake
   * completes) and `[]` otherwise.
   */
  activeServers: () => Server[];
}

// Rate-limited bearer-failure logging.
//
// We log the FIRST failure in a window immediately so operators see an
// attack as it begins, then suppress subsequent failures for WINDOW_MS
// and flush a summary count when the next failure crosses the window
// boundary. This avoids the previous bug where a short burst that
// stopped abruptly would never produce any log line.
//
// We intentionally do NOT log the offered token, the path, or any
// header values -- only a counter so operators know auth is failing.
function makeRateLimitedAuthLogger(): () => void {
  let windowStart = 0;
  let suppressed = 0;
  const WINDOW_MS = 60_000;
  return () => {
    const now = Date.now();
    if (now - windowStart >= WINDOW_MS) {
      // Flush any suppressed failures from the previous window, then
      // log this one immediately and open a new window.
      if (suppressed > 0) {
        process.stderr.write(
          `mcp-shared: ${suppressed} additional bearer auth failure(s) in previous window\n`,
        );
      }
      process.stderr.write("mcp-shared: bearer auth failure\n");
      windowStart = now;
      suppressed = 0;
    } else {
      suppressed += 1;
      // Suppress until window rolls over; the flush above will summarize.
    }
  };
}

function jsonResponse(body: unknown, status = 200): Response {
  return new Response(JSON.stringify(body), {
    status,
    headers: { "content-type": "application/json" },
  });
}

function notFound(): Response {
  return jsonResponse({ error: "not_found" }, 404);
}

/**
 * JSON-RPC error response shaped like the SDK's own error replies. Used
 * for cases the SDK would never see (no session header AND not an
 * initialize body) so the wire format stays consistent for clients.
 */
function jsonRpcError(
  status: number,
  code: number,
  message: string,
): Response {
  return new Response(
    JSON.stringify({
      jsonrpc: "2.0",
      error: { code, message },
      id: null,
    }),
    {
      status,
      headers: { "content-type": "application/json" },
    },
  );
}

/**
 * Boot a Streamable HTTP MCP listener on `port`, OR return a composed
 * fetch handler suitable for mounting into an existing Bun.serve.
 */
export function serveMcpOverHttp(opts: ServeMcpOptions): ServeMcpResult {
  const path = opts.path ?? "/mcp";
  if (opts.legacySse === true) {
    // Fail loudly rather than silently advertising a broken transport.
    // See the legacySse JSDoc on ServeMcpOptions for the full rationale.
    throw new Error(
      "mcp-shared: legacySse is not supported in Phase A. " +
        "The previous placeholder bridge returned -32601 for every " +
        "dispatched tool call, which breaks any client that probes /sse " +
        "successfully and then tries to actually use the connection. " +
        "Drop the option, or wait for the dedicated second-Server " +
        "implementation in a follow-up PR.",
    );
  }

  // Validate server vs serverFactory: exactly one is required. Both
  // together is a programmer mistake -- mcp-shared can't know whether
  // to use the singleton or spin up a fresh one per session.
  const hasServer = opts.server !== undefined;
  const hasFactory = opts.serverFactory !== undefined;
  if (hasServer && hasFactory) {
    throw new Error(
      "mcp-shared: pass EITHER 'server' (legacy single-instance) OR " +
        "'serverFactory' (recommended for multi-client), not both.",
    );
  }
  if (!hasServer && !hasFactory) {
    throw new Error(
      "mcp-shared: must pass either 'server' or 'serverFactory'.",
    );
  }
  if (hasServer && !hasFactory) {
    // One-time deprecation warning. The 'server' path still works for
    // single-client deployments but a second concurrent `initialize`
    // will hit Protocol.connect()'s "Already connected to a transport"
    // error because the same Server can't bind to two transports.
    process.stderr.write(
      "mcp-shared: 'server' (single-instance) is deprecated for " +
        "multi-session use -- pass 'serverFactory' instead. Concurrent " +
        'clients will receive "Already connected to a transport" ' +
        "errors.\n",
    );
  }

  const logAuthFailure = makeRateLimitedAuthLogger();

  // Per-session entries. Keyed by Mcp-Session-Id, populated via the SDK's
  // onsessioninitialized callback. Each entry pairs the per-session
  // transport with the Server instance bound to it.
  //
  // Why per-session Server? The SDK's Protocol.connect() stores its
  // transport in a single `_transport` slot and throws on a second
  // connect, so one Server can only host one live transport. Concurrent
  // clients therefore need separate Server instances -- supplied by the
  // caller's serverFactory.
  //
  // Legacy `server`-only mode keeps the same map shape but never holds
  // more than one entry at a time (the second initialize would throw at
  // server.connect()).
  interface SessionEntry {
    transport: WebStandardStreamableHTTPServerTransport;
    server: Server;
  }
  const sessions = new Map<string, SessionEntry>();

  // Resolve the service name once. In factory mode we don't have a
  // representative Server until the first session connects, so fall
  // back to a generic label. We could lazily upgrade after the first
  // session but the log line only fires at startup, so the label here
  // is fine as-is.
  const serviceName =
    (opts.server as unknown as { _serverInfo?: { name?: string } } | undefined)
      ?._serverInfo?.name ?? "mcp";

  /**
   * Create a new transport for an incoming initialize request, wire its
   * lifecycle callbacks to keep the map in sync, and connect a Server
   * to it. In factory mode the factory is invoked to build a fresh
   * Server; in legacy mode the singleton `opts.server` is reused.
   */
  async function createAndConnectTransport(): Promise<WebStandardStreamableHTTPServerTransport> {
    // Build (or reuse) the Server BEFORE constructing the transport so
    // a factory throw doesn't leave a half-wired transport orphaned.
    const server: Server = opts.serverFactory
      ? await opts.serverFactory()
      : (opts.server as Server);

    const transport = new WebStandardStreamableHTTPServerTransport({
      sessionIdGenerator: () => crypto.randomUUID(),
      onsessioninitialized: (sid) => {
        sessions.set(sid, { transport, server });
      },
    });
    transport.onclose = () => {
      const sid = transport.sessionId;
      if (sid) {
        const entry = sessions.get(sid);
        if (entry && entry.transport === transport) {
          sessions.delete(sid);
          // In factory mode the Server is per-session, so close it too.
          // In legacy mode the Server is owned by the caller -- don't
          // close it or we'd break any subsequent initialize attempt.
          if (opts.serverFactory) {
            entry.server.close().catch(() => {
              // Best-effort: the transport is already gone, nothing to
              // bubble up to.
            });
          }
        }
      }
    };
    await server.connect(transport);
    return transport;
  }

  /**
   * Dispatch a request to the MCP transport layer. Handles session
   * routing: existing sessions go to their transport, new initialize
   * requests get a fresh transport, everything else 400s.
   */
  async function dispatchMcp(req: Request): Promise<Response> {
    const sessionId = req.headers.get("mcp-session-id") ?? undefined;

    // Path 1: existing session -- look up and dispatch. POST/GET/DELETE
    // all funnel through the transport's handleRequest.
    if (sessionId) {
      const existing = sessions.get(sessionId);
      if (!existing) {
        // SDK uses 404 + -32001 for unknown sessions. Mirror that so
        // clients see consistent behaviour whether the session was
        // never created or expired between requests.
        return jsonRpcError(404, -32001, "Session not found");
      }
      return existing.transport.handleRequest(req);
    }

    // Path 2: no session id. Only POST `initialize` is legal here.
    // GET/DELETE without a session id can never succeed -- forward them
    // to a throw-away transport so the SDK produces the canonical error
    // shape (400 "Bad Request: Mcp-Session-Id header is required" via
    // validateSession), keeping the wire format predictable.
    if (req.method !== "POST") {
      return jsonRpcError(
        400,
        -32000,
        "Bad Request: Mcp-Session-Id header is required",
      );
    }

    // POST without session id: peek at the body to confirm it's an
    // `initialize` request. We must NOT let the SDK call req.json() on
    // a body we've already consumed, so parse here and pass via
    // `parsedBody`. The SDK's HandleRequestOptions documents this
    // exact use case for body-parser middleware.
    let parsedBody: unknown;
    try {
      parsedBody = await req.json();
    } catch {
      return jsonRpcError(400, -32700, "Parse error: Invalid JSON");
    }

    const isInit = Array.isArray(parsedBody)
      ? parsedBody.some((m) => isInitializeRequest(m))
      : isInitializeRequest(parsedBody);

    if (!isInit) {
      // Non-initialize POST without a session id. The SDK would return
      // exactly this error via validateSession; replicate it without
      // spinning up a transport we'd immediately discard.
      return jsonRpcError(
        400,
        -32000,
        "Bad Request: Mcp-Session-Id header is required",
      );
    }

    // Genuine new-session initialize. Spin up a transport, connect, and
    // dispatch with the parsed body so the SDK doesn't try to read the
    // already-consumed stream.
    //
    // In legacy `server`-only mode this can throw if a previous session
    // is still attached to the same Server (SDK Protocol.connect()
    // refuses a second transport). We surface that as a 500 with the
    // SDK's exact error text so operators can debug.
    let transport: WebStandardStreamableHTTPServerTransport;
    try {
      transport = await createAndConnectTransport();
    } catch (err) {
      const msg = err instanceof Error ? err.message : String(err);
      // Internal error, but we still want JSON-RPC-shaped body so the
      // client can parse it.
      return jsonRpcError(500, -32000, `Internal error: ${msg}`);
    }
    return transport.handleRequest(req, { parsedBody });
  }

  const fetch = async (req: Request): Promise<Response> => {
    const url = new URL(req.url);
    const pathname = url.pathname;

    // Health check: unauthenticated, always 200.
    if (pathname === "/health" && req.method === "GET") {
      return jsonResponse({ ok: true, service: serviceName });
    }

    // Primary MCP path.
    if (pathname === path) {
      const authError = requireBearer(req, opts.token);
      if (authError) {
        logAuthFailure();
        return authError;
      }
      return dispatchMcp(req);
    }

    // Fall through to caller-supplied handler, if any.
    if (opts.attachTo) {
      const out = await opts.attachTo(req);
      if (out) return out;
    }

    return notFound();
  };

  async function closeAllSessions(): Promise<void> {
    // Snapshot first because transport.close() fires onclose which
    // mutates the map (and may call server.close() in factory mode).
    const snapshot = Array.from(sessions.values());
    sessions.clear();
    await Promise.allSettled(
      snapshot.flatMap((entry) => {
        const ps: Array<Promise<unknown>> = [entry.transport.close()];
        // In factory mode the Server is per-session and owned by us;
        // close it here as a safety net in case onclose didn't fire
        // (we already cleared the map above).
        if (opts.serverFactory) {
          ps.push(entry.server.close());
        }
        return ps;
      }),
    );
  }

  function activeServers(): Server[] {
    return Array.from(sessions.values(), (entry) => entry.server);
  }

  // attachTo mode: do not bind a port; caller owns the listener.
  if (opts.attachTo) {
    process.stderr.write(
      `mcp-shared: composed handler ready for service '${serviceName}' (path ${path}, attachTo mode)\n`,
    );
    return {
      fetch,
      close: async () => {
        await closeAllSessions();
      },
      activeServers,
    };
  }

  // Standalone mode: bind a Bun server.
  //
  // idleTimeout=0 disables Bun's default 10s request idle timeout, which would
  // otherwise close long-lived Streamable HTTP SSE streams mid-flight. A short
  // idle on a GET /mcp SSE connection causes mcp-remote / Claude Desktop bridges
  // to see "Failed to open SSE stream: Bad Gateway" via Tailscale Funnel (which
  // translates the upstream-reset into 502). The MCP spec expects SSE streams
  // to stay open for the lifetime of the session. The transport itself sends
  // its own heartbeats; idle timeout at the bun layer is the wrong place for
  // a deadline.
  const bunServer = Bun.serve({
    port: opts.port,
    fetch,
    idleTimeout: 0,
  });
  process.stderr.write(
    `mcp-shared: listening on :${opts.port} path ${path} (service '${serviceName}')\n`,
  );

  return {
    fetch,
    bunServer,
    close: async () => {
      await closeAllSessions();
      bunServer.stop();
    },
    activeServers,
  };
}
