From 6eb788afc79e1360781530e4522e7102e3f7a184 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 13:34:21 -0700 Subject: [PATCH 1/9] type contracts + impl plan created --- apps/sim/lib/mcp/resilience/README.md | 57 +++++++++++++++++++++++++++ apps/sim/lib/mcp/resilience/types.ts | 37 +++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 apps/sim/lib/mcp/resilience/README.md create mode 100644 apps/sim/lib/mcp/resilience/types.ts diff --git a/apps/sim/lib/mcp/resilience/README.md b/apps/sim/lib/mcp/resilience/README.md new file mode 100644 index 00000000000..eb5ac87469e --- /dev/null +++ b/apps/sim/lib/mcp/resilience/README.md @@ -0,0 +1,57 @@ +# Goal Description +Implement a "Day 2" reliability layer for the simstudioai/sim workflow engine by building a composable Resilience Interceptor/Middleware Pipeline for the MCP `executeTool` logic. This pipeline ensures enterprise-grade stability by introducing a Circuit Breaker State Machine, Zod-based Schema Enforcement for LLM outputs, and detailed Telemetry for latency and failure analysis, while addressing high-concurrency Node/TS environments. + +## User Review Required +- Please confirm if `apps/sim/lib/mcp/service.ts` is the correct core injection point for wrapping `executeTool`. +- Note on file path: `apps/sim/lib/workflow/executor.ts` was not found. Instead, `apps/sim/executor/execution/executor.ts` and `apps/sim/tools/workflow/executor.ts` were analyzed. Ensure intercepting `McpService`'s `executeTool` serves your architectural needs. +- Please confirm the schema enforcement approach: we will compile and cache JSON Schemas to Zod validators upon MCP server discovery or lazily, instead of parsing dynamically per request. + +## Proposed Changes + +We will split the implementation into discrete PRs / Commits to maintain structure. + +### Part 1: Telemetry Hooks +Implement the foundation for tracking. +*(Change Rationale: Transitioning to a middleware pattern instead of a monolithic proxy, allowing telemetry to be composed easily).* +#### [NEW] `apps/sim/lib/mcp/resilience/telemetry.ts` +- Implement telemetry middleware hook to capture `latency_ms` and `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`). + +### Part 2: Circuit Breaker State Machine +Implement the state management logic. +*(Change Rationale: Added a HALF-OPEN concurrency lock (semaphore) to prevent the "thundering herd" issue on the downstream server. Documented that this operates on local, per-instance state using an LRU cache to prevent memory leaks).* +#### [NEW] `apps/sim/lib/mcp/resilience/circuit-breaker.ts` +- Implement the `CircuitBreaker` middleware with states: `CLOSED`, `OPEN`, and `HALF-OPEN`. +- Handle failure thresholds, reset timeouts, and logic for failing fast. +- **Concurrency Lock:** During `HALF-OPEN`, strictly gate the transition so only **one** probe request is allowed through. All other concurrent requests will fail-fast until the probe resolves. +- **Memory & State:** Use an LRU cache or scoped ties for the CircuitBreaker registry, binding the lifecycle of the breaker explicitly to the lifecycle of the MCP connection to prevent memory leaks. Also, this operates on local, per-instance state. + +### Part 3: Schema Validation +Implement the Zod validation logic for LLM arguments. +*(Change Rationale: Added schema compilation caching to avoid severe CPU bottlenecking per request, and returning `isError: true` on validation failures to natively trigger LLM self-correction).* +#### [NEW] `apps/sim/lib/mcp/resilience/schema-validator.ts` +- Logic to enforce schemas using `Zod` as a middleware. +- **Schema Caching:** Compile JSON Schemas to Zod schemas and cache them in a registry mapped to `toolId` during the initial discovery phase or lazily on first compile. Flush cached validators dynamically when listening for MCP lifecycle events (e.g., mid-session tool list updates). +- **LLM Self-Correction:** Instead of throwing exceptions that crash the workflow engine when Zod validation fails, intercept validation errors and return a gracefully formatted MCP execution result: `{ isError: true, content: [{ type: "text", text: "Schema validation failed: [Zod Error Details]" }] }`. + +### Part 4: Resilience Pipeline Integration +Wrap up the tools via a Pipeline instead of a monolithic proxy. +*(Change Rationale: Switched from a God Object Proxy to a Middleware Pipeline to support granular, per-tool enablement).* +#### [NEW] `apps/sim/lib/mcp/resilience/pipeline.ts` +- Implement a chain of responsibility (interceptor/middleware pipeline) for `executeTool`. +- Provide an API like `executeTool.use(telemetry).use(validate(cachedSchema)).use(circuitBreaker(config))` rather than a sequential sequence inside a rigid class. +- This composable architecture allows enabling or disabling specific middlewares dynamically per tool (e.g., un-trusted vs internal tools). + +#### [MODIFY] `apps/sim/lib/mcp/service.ts` +- Update `mcpService.executeTool` to run requests through the configurable `ResiliencePipeline`, rather than hardcoded proxy logic. + +## Verification Plan +### Automated Tests +- Create a mock MCP server execution test suite. +- Write tests in `apps/sim/lib/mcp/resilience/pipeline.test.ts` to assert: + - Circuit Breaker trips to `OPEN` on simulated `API_500` and trips to `HALF-OPEN` after a cooldown. + - **New Test:** Verify HALF-OPEN strictly allows exactly **one** simulated concurrent probe request through. + - **New Test:** Schema validation returns `isError: true` standard format for improper LLM args without triggering execution. +- Telemetry correctly logs latency. + +### Manual Verification +- Execute tests generating visual output demonstrating the circuit breaker "tripping" and "recovering". diff --git a/apps/sim/lib/mcp/resilience/types.ts b/apps/sim/lib/mcp/resilience/types.ts new file mode 100644 index 00000000000..4338078062f --- /dev/null +++ b/apps/sim/lib/mcp/resilience/types.ts @@ -0,0 +1,37 @@ +import type { McpToolCall, McpToolResult } from '@/lib/mcp/types' + +/** + * Context passed through the Resilience Pipeline + */ +export interface McpExecutionContext { + toolCall: McpToolCall + serverId: string + userId: string + workspaceId: string + /** + * Additional parameters passed directly by the executeTool caller + */ + extraHeaders?: Record +} + +/** + * Standardized function signature for invoking the NEXT component in the pipeline + */ +export type McpMiddlewareNext = ( + context: McpExecutionContext +) => Promise + +/** + * Interface that all Resilience Middlewares must implement + */ +export interface McpMiddleware { + /** + * Execute the middleware logic + * @param context The current execution context + * @param next The next middleware/tool in the chain + */ + execute( + context: McpExecutionContext, + next: McpMiddlewareNext + ): Promise +} From 7b23bc160f994fcd2d5cd3619ca17766ef4f4da5 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 14:19:00 -0700 Subject: [PATCH 2/9] chore(mcp): config setup for types and dependencies --- apps/sim/package.json | 1 + apps/sim/tsconfig.json | 74 +++++++++++++++++++++++++++--------- bun.lock | 9 +++++ packages/logger/package.json | 2 +- 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/apps/sim/package.json b/apps/sim/package.json index 9068e49c3a0..06bb9bf7b17 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -174,6 +174,7 @@ "@sim/tsconfig": "workspace:*", "@testing-library/jest-dom": "^6.6.3", "@trigger.dev/build": "4.1.2", + "@types/bun": "1.3.10", "@types/fluent-ffmpeg": "2.1.28", "@types/html-to-text": "9.0.4", "@types/js-yaml": "4.0.9", diff --git a/apps/sim/tsconfig.json b/apps/sim/tsconfig.json index adb44824922..ed09bbfc4f5 100644 --- a/apps/sim/tsconfig.json +++ b/apps/sim/tsconfig.json @@ -3,24 +3,60 @@ "compilerOptions": { "baseUrl": ".", "paths": { - "@/*": ["./*"], - "@/components/*": ["components/*"], - "@/lib/*": ["./lib/*"], - "@/stores": ["./stores"], - "@/stores/*": ["./stores/*"], - "@/hooks/*": ["./hooks/*"], - "@/blocks": ["./blocks"], - "@/blocks/*": ["./blocks/*"], - "@/providers/*": ["./providers/*"], - "@/providers": ["./providers"], - "@/tools": ["./tools"], - "@/tools/*": ["./tools/*"], - "@/serializer": ["./serializer"], - "@/serializer/*": ["./serializer/*"], - "@sim/db": ["../../packages/db"], - "@sim/db/*": ["../../packages/db/*"], - "@/executor": ["./executor"], - "@/executor/*": ["./executor/*"] + "@/*": [ + "./*" + ], + "@/components/*": [ + "components/*" + ], + "@/lib/*": [ + "./lib/*" + ], + "@/stores": [ + "./stores" + ], + "@/stores/*": [ + "./stores/*" + ], + "@/hooks/*": [ + "./hooks/*" + ], + "@/blocks": [ + "./blocks" + ], + "@/blocks/*": [ + "./blocks/*" + ], + "@/providers/*": [ + "./providers/*" + ], + "@/providers": [ + "./providers" + ], + "@/tools": [ + "./tools" + ], + "@/tools/*": [ + "./tools/*" + ], + "@/serializer": [ + "./serializer" + ], + "@/serializer/*": [ + "./serializer/*" + ], + "@sim/db": [ + "../../packages/db" + ], + "@sim/db/*": [ + "../../packages/db/*" + ], + "@/executor": [ + "./executor" + ], + "@/executor/*": [ + "./executor/*" + ] } }, "include": [ @@ -38,4 +74,4 @@ "vitest.config.ts", "vitest.setup.ts" ] -} +} \ No newline at end of file diff --git a/bun.lock b/bun.lock index 99a678eb72e..cee1f7c4c66 100644 --- a/bun.lock +++ b/bun.lock @@ -207,6 +207,7 @@ "@sim/tsconfig": "workspace:*", "@testing-library/jest-dom": "^6.6.3", "@trigger.dev/build": "4.1.2", + "@types/bun": "1.3.10", "@types/fluent-ffmpeg": "2.1.28", "@types/html-to-text": "9.0.4", "@types/js-yaml": "4.0.9", @@ -1459,6 +1460,8 @@ "@types/babel__traverse": ["@types/babel__traverse@7.28.0", "", { "dependencies": { "@babel/types": "^7.28.2" } }, "sha512-8PvcXf70gTDZBgt9ptxJ8elBeBjcLOAcOtoO/mPJjtji1+CdGbHgm77om1GrsPxsiE+uXIpNSK64UYaIwQXd4Q=="], + "@types/bun": ["@types/bun@1.3.10", "", { "dependencies": { "bun-types": "1.3.10" } }, "sha512-0+rlrUrOrTSskibryHbvQkDOWRJwJZqZlxrUs1u4oOoTln8+WIXBPmAuCF35SWB2z4Zl3E84Nl/D0P7803nigQ=="], + "@types/chai": ["@types/chai@5.2.3", "", { "dependencies": { "@types/deep-eql": "*", "assertion-error": "^2.0.1" } }, "sha512-Mw558oeA9fFbv65/y4mHtXDs9bPnFMZAL/jxdPFUpOHHIXX91mcgEHbS5Lahr+pwZFR8A7GQleRWeI6cGFC2UA=="], "@types/cookie": ["@types/cookie@0.4.1", "", {}, "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q=="], @@ -1787,6 +1790,8 @@ "buildcheck": ["buildcheck@0.0.7", "", {}, "sha512-lHblz4ahamxpTmnsk+MNTRWsjYKv965MwOrSJyeD588rR3Jcu7swE+0wN5F+PbL5cjgu/9ObkhfzEPuofEMwLA=="], + "bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], + "bytes": ["bytes@3.1.2", "", {}, "sha512-/Nf7TyzTx6S3yRJObOAV7956r8cr2+Oj8AC5dt8wSP3BQAoeX58NoHyCU8P8zGkNXStjTSi6fzO6F0pBdcYbEg=="], "c12": ["c12@3.1.0", "", { "dependencies": { "chokidar": "^4.0.3", "confbox": "^0.2.2", "defu": "^6.1.4", "dotenv": "^16.6.1", "exsolve": "^1.0.7", "giget": "^2.0.0", "jiti": "^2.4.2", "ohash": "^2.0.11", "pathe": "^2.0.3", "perfect-debounce": "^1.0.0", "pkg-types": "^2.2.0", "rc9": "^2.1.2" }, "peerDependencies": { "magicast": "^0.3.5" }, "optionalPeers": ["magicast"] }, "sha512-uWoS8OU1MEIsOv8p/5a82c3H31LsWVR5qiyXVfBNOzfffjUWtPnhAb4BYI2uG2HfGmZmFjCtui5XNWaps+iFuw=="], @@ -4043,6 +4048,8 @@ "body-parser/iconv-lite": ["iconv-lite@0.7.1", "", { "dependencies": { "safer-buffer": ">= 2.1.2 < 3.0.0" } }, "sha512-2Tth85cXwGFHfvRgZWszZSvdo+0Xsqmw8k8ZwxScfcBneNUraK+dxRxRm24nszx80Y0TVio8kKLt5sLE7ZCLlw=="], + "bun-types/@types/node": ["@types/node@24.2.1", "", { "dependencies": { "undici-types": "~7.10.0" } }, "sha512-DRh5K+ka5eJic8CjH7td8QpYEV6Zo10gfRkjHCO3weqZHWDtAaSTFtl4+VMqOJ4N5jcuhZ9/l+yy8rVgw7BQeQ=="], + "c12/chokidar": ["chokidar@4.0.3", "", { "dependencies": { "readdirp": "^4.0.1" } }, "sha512-Qgzu8kfBvo+cA4962jnP1KkS6Dop5NS6g7R5LFYJr4b8Ub94PPQXUksCw9PvXoeXPRRddRNC5C1JQUR2SMGtnA=="], "c12/confbox": ["confbox@0.2.4", "", {}, "sha512-ysOGlgTFbN2/Y6Cg3Iye8YKulHw+R2fNXHrgSmXISQdMnomY6eNDprVdW9R5xBguEqI954+S6709UyiO7B+6OQ=="], @@ -4521,6 +4528,8 @@ "bl/readable-stream/string_decoder": ["string_decoder@1.3.0", "", { "dependencies": { "safe-buffer": "~5.2.0" } }, "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA=="], + "bun-types/@types/node/undici-types": ["undici-types@7.10.0", "", {}, "sha512-t5Fy/nfn+14LuOc2KNYg75vZqClpAiqscVvMygNnlsHBFpSXdJaYtXMcdNLpl/Qvc3P2cB3s6lOV51nqsFq4ag=="], + "c12/chokidar/readdirp": ["readdirp@4.1.2", "", {}, "sha512-GDhwkLfywWL2s6vEjyhri+eXmfH6j1L7JE27WhqLeYzoh/A3DBaYGEj2H/HFZCn/kMfim73FXxEJTw06WtxQwg=="], "cheerio/htmlparser2/entities": ["entities@7.0.1", "", {}, "sha512-TWrgLOFUQTH994YUyl1yT4uyavY5nNB5muff+RtWaqNVCAK408b5ZnnbNAUEWLTCpum9w6arT70i1XdQ4UeOPA=="], diff --git a/packages/logger/package.json b/packages/logger/package.json index 6602fe0aaba..6a6a20e92e8 100644 --- a/packages/logger/package.json +++ b/packages/logger/package.json @@ -27,4 +27,4 @@ "typescript": "^5.7.3", "vitest": "^3.0.8" } -} +} \ No newline at end of file From 80917d2ba49f984f3454bdc95aa7e969bd0a6d9c Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 14:19:04 -0700 Subject: [PATCH 3/9] feat(mcp): implement telemetry middleware --- apps/sim/lib/mcp/resilience/telemetry.ts | 56 ++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 apps/sim/lib/mcp/resilience/telemetry.ts diff --git a/apps/sim/lib/mcp/resilience/telemetry.ts b/apps/sim/lib/mcp/resilience/telemetry.ts new file mode 100644 index 00000000000..d3bc0eea663 --- /dev/null +++ b/apps/sim/lib/mcp/resilience/telemetry.ts @@ -0,0 +1,56 @@ +import { createLogger } from '@sim/logger' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' +import type { McpToolResult } from '@/lib/mcp/types' + +const logger = createLogger('mcp:telemetry') + +export class TelemetryMiddleware implements McpMiddleware { + async execute( + context: McpExecutionContext, + next: McpMiddlewareNext + ): Promise { + const startTime = performance.now() + + try { + const result = await next(context) + + const latency_ms = Math.round(performance.now() - startTime) + const isError = result.isError === true + + logger.info('MCP Tool Execution Completed', { + toolName: context.toolCall.name, + serverId: context.serverId, + workspaceId: context.workspaceId, + latency_ms, + success: !isError, + ...(isError && { failure_reason: 'TOOL_ERROR' }) + }) + + return result + } catch (error) { + const latency_ms = Math.round(performance.now() - startTime) + + // Attempt to determine failure reason based on error + let failure_reason = 'API_500' // General failure fallback + if (error instanceof Error) { + const lowerMsg = error.message.toLowerCase() + if (error.name === 'TimeoutError' || lowerMsg.includes('timeout')) { + failure_reason = 'TIMEOUT' + } else if (lowerMsg.includes('validation') || error.name === 'ZodError') { + failure_reason = 'VALIDATION_ERROR' + } + } + + logger.error('MCP Tool Execution Failed', { + toolName: context.toolCall.name, + serverId: context.serverId, + workspaceId: context.workspaceId, + latency_ms, + failure_reason, + err: error instanceof Error ? error.message : String(error) + }) + + throw error // Re-throw to allow upstream handling (e.g. circuit breaker) + } + } +} From df970844096414907dc6c0e06611f0b713f77ed2 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 14:19:10 -0700 Subject: [PATCH 4/9] feat(mcp): implement resilience middleware pipeline --- apps/sim/lib/mcp/resilience/pipeline.ts | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 apps/sim/lib/mcp/resilience/pipeline.ts diff --git a/apps/sim/lib/mcp/resilience/pipeline.ts b/apps/sim/lib/mcp/resilience/pipeline.ts new file mode 100644 index 00000000000..eef80a103e7 --- /dev/null +++ b/apps/sim/lib/mcp/resilience/pipeline.ts @@ -0,0 +1,42 @@ +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' +import type { McpToolResult } from '@/lib/mcp/types' + +export class ResiliencePipeline { + private middlewares: McpMiddleware[] = [] + + /** + * Add a middleware to the pipeline chain. + */ + use(middleware: McpMiddleware): this { + this.middlewares.push(middleware) + return this + } + + /** + * Execute the pipeline, processing the context through all middlewares, + * and finally invoking the terminal handler. + */ + async execute( + context: McpExecutionContext, + finalHandler: McpMiddlewareNext + ): Promise { + let index = -1 + + const dispatch = async (i: number): Promise => { + if (i <= index) { + throw new Error('next() called multiple times') + } + index = i + + // If we reached the end of the middlewares, call the final handler + if (i === this.middlewares.length) { + return finalHandler(context) + } + + const middleware = this.middlewares[i] + return middleware.execute(context, () => dispatch(i + 1)) + } + + return dispatch(0) + } +} From 1dd4353fbe34593610c3aaf960fca5edac6b2e06 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 14:19:10 -0700 Subject: [PATCH 5/9] test(mcp): implement resilience pipeline suite --- apps/sim/lib/mcp/resilience/pipeline.test.ts | 136 +++++++++++++++++++ apps/sim/test/setup.ts | 0 2 files changed, 136 insertions(+) create mode 100644 apps/sim/lib/mcp/resilience/pipeline.test.ts create mode 100644 apps/sim/test/setup.ts diff --git a/apps/sim/lib/mcp/resilience/pipeline.test.ts b/apps/sim/lib/mcp/resilience/pipeline.test.ts new file mode 100644 index 00000000000..08eb7e7bd9e --- /dev/null +++ b/apps/sim/lib/mcp/resilience/pipeline.test.ts @@ -0,0 +1,136 @@ +/// +import { test, expect, describe, mock } from 'bun:test' +import { ResiliencePipeline } from './pipeline' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' + +const infoInfo = mock() +const errorError = mock() + +// Mock logger before any imports of telemetry +mock.module('@sim/logger', () => ({ + createLogger: () => ({ + info: infoInfo, + error: errorError, + warn: mock(), + debug: mock() + }) +})) + +// Dynamically import TelemetryMiddleware so the mock applies +const { TelemetryMiddleware } = await import('./telemetry') +import { createLogger } from '@sim/logger' + +describe('ResiliencePipeline', () => { + const mockContext: McpExecutionContext = { + toolCall: { name: 'test_tool', arguments: {} }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1' + } + + test('should execute middlewares in order', async () => { + const pipeline = new ResiliencePipeline() + const order: number[] = [] + + const m1: McpMiddleware = { + execute: async (ctx, next) => { + order.push(1) + const res = await next(ctx) + order.push(4) + return res + } + } + + const m2: McpMiddleware = { + execute: async (ctx, next) => { + order.push(2) + const res = await next(ctx) + order.push(3) + return res + } + } + + pipeline.use(m1).use(m2) + + const finalHandler: McpMiddlewareNext = async () => { + return { content: [{ type: 'text', text: 'success' }] } + } + + const result = await pipeline.execute(mockContext, finalHandler) + + expect(order).toEqual([1, 2, 3, 4]) + expect(result.content?.[0].text).toBe('success') + }) +}) + +describe('TelemetryMiddleware', () => { + const mockContext: McpExecutionContext = { + toolCall: { name: 'telemetry_tool', arguments: {} }, + serverId: 'server-2', + userId: 'user-2', + workspaceId: 'workspace-2' + } + + test('should log success with latency', async () => { + infoInfo.mockClear() + + const telemetry = new TelemetryMiddleware() + + const finalHandler: McpMiddlewareNext = async () => { + // simulate some latency + await new Promise(r => setTimeout(r, 10)) + return { content: [] } + } + + await telemetry.execute(mockContext, finalHandler) + + expect(infoInfo).toHaveBeenCalled() + const logMsg = infoInfo.mock.calls[0][0] + const logCtx = infoInfo.mock.calls[0][1] + expect(logMsg).toBe('MCP Tool Execution Completed') + expect(logCtx.toolName).toBe('telemetry_tool') + expect(logCtx.latency_ms).toBeGreaterThanOrEqual(10) + expect(logCtx.success).toBe(true) + }) + + test('should log TOOL_ERROR when tool result has isError: true', async () => { + infoInfo.mockClear() + + const telemetry = new TelemetryMiddleware() + + const finalHandler: McpMiddlewareNext = async () => { + return { isError: true, content: [] } + } + + await telemetry.execute(mockContext, finalHandler) + + expect(infoInfo).toHaveBeenCalled() + const logCtx = infoInfo.mock.calls[0][1] + expect(logCtx.success).toBe(false) + expect(logCtx.failure_reason).toBe('TOOL_ERROR') + }) + + test('should log exception and rethrow with TIMEOUT explanation', async () => { + errorError.mockClear() + + const telemetry = new TelemetryMiddleware() + + const finalHandler: McpMiddlewareNext = async () => { + throw new Error('Connection timeout occurred') + } + + let caughtError: Error | null = null + try { + await telemetry.execute(mockContext, finalHandler) + } catch (e: any) { + caughtError = e + } + + expect(caughtError).toBeDefined() + expect(errorError).toHaveBeenCalled() + const logMsg = errorError.mock.calls[0][0] + const logCtx = errorError.mock.calls[0][1] + expect(logMsg).toBe('MCP Tool Execution Failed') + expect(logCtx.failure_reason).toBe('TIMEOUT') + }) +}) diff --git a/apps/sim/test/setup.ts b/apps/sim/test/setup.ts new file mode 100644 index 00000000000..e69de29bb2d From a94e66b72d8ceaae6c472510efdd48bda04e7670 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 14:23:44 -0700 Subject: [PATCH 6/9] test(mcp): fix telemetry middleware test assertions for custom logger --- apps/sim/lib/mcp/resilience/pipeline.test.ts | 29 ++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/apps/sim/lib/mcp/resilience/pipeline.test.ts b/apps/sim/lib/mcp/resilience/pipeline.test.ts index 08eb7e7bd9e..c15208c45bc 100644 --- a/apps/sim/lib/mcp/resilience/pipeline.test.ts +++ b/apps/sim/lib/mcp/resilience/pipeline.test.ts @@ -1,4 +1,3 @@ -/// import { test, expect, describe, mock } from 'bun:test' import { ResiliencePipeline } from './pipeline' import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' @@ -85,12 +84,12 @@ describe('TelemetryMiddleware', () => { await telemetry.execute(mockContext, finalHandler) expect(infoInfo).toHaveBeenCalled() - const logMsg = infoInfo.mock.calls[0][0] - const logCtx = infoInfo.mock.calls[0][1] - expect(logMsg).toBe('MCP Tool Execution Completed') - expect(logCtx.toolName).toBe('telemetry_tool') - expect(logCtx.latency_ms).toBeGreaterThanOrEqual(10) - expect(logCtx.success).toBe(true) + const msg = infoInfo.mock.calls[0][0] + const logCall = infoInfo.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Completed') + expect(logCall.toolName).toBe('telemetry_tool') + expect(logCall.latency_ms).toBeGreaterThanOrEqual(10) + expect(logCall.success).toBe(true) }) test('should log TOOL_ERROR when tool result has isError: true', async () => { @@ -105,9 +104,11 @@ describe('TelemetryMiddleware', () => { await telemetry.execute(mockContext, finalHandler) expect(infoInfo).toHaveBeenCalled() - const logCtx = infoInfo.mock.calls[0][1] - expect(logCtx.success).toBe(false) - expect(logCtx.failure_reason).toBe('TOOL_ERROR') + const msg = infoInfo.mock.calls[0][0] + const logCall = infoInfo.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Completed') + expect(logCall.success).toBe(false) + expect(logCall.failure_reason).toBe('TOOL_ERROR') }) test('should log exception and rethrow with TIMEOUT explanation', async () => { @@ -128,9 +129,9 @@ describe('TelemetryMiddleware', () => { expect(caughtError).toBeDefined() expect(errorError).toHaveBeenCalled() - const logMsg = errorError.mock.calls[0][0] - const logCtx = errorError.mock.calls[0][1] - expect(logMsg).toBe('MCP Tool Execution Failed') - expect(logCtx.failure_reason).toBe('TIMEOUT') + const msg = errorError.mock.calls[0][0] + const logCall = errorError.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Failed') + expect(logCall.failure_reason).toBe('TIMEOUT') }) }) From df2dbb63ed2742e29836872c8b40ba91e11916af Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 15:01:27 -0700 Subject: [PATCH 7/9] feat(mcp): integrate resilience pipeline into McpService This commit wires up the ResiliencePipeline (Telemetry, Circuit Breaker, Schema Validator) into the McpService executeTool logic. It also includes the previously missing context variables and updates the pipeline tests. --- .../sim/lib/mcp/resilience/circuit-breaker.ts | 148 +++++++++++++++ apps/sim/lib/mcp/resilience/demo.ts | 70 +++++++ apps/sim/lib/mcp/resilience/pipeline.test.ts | 179 ++++++++++++++++++ .../lib/mcp/resilience/schema-validator.ts | 152 +++++++++++++++ apps/sim/lib/mcp/service.ts | 32 +++- 5 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 apps/sim/lib/mcp/resilience/circuit-breaker.ts create mode 100644 apps/sim/lib/mcp/resilience/demo.ts create mode 100644 apps/sim/lib/mcp/resilience/schema-validator.ts diff --git a/apps/sim/lib/mcp/resilience/circuit-breaker.ts b/apps/sim/lib/mcp/resilience/circuit-breaker.ts new file mode 100644 index 00000000000..a084e8e39a4 --- /dev/null +++ b/apps/sim/lib/mcp/resilience/circuit-breaker.ts @@ -0,0 +1,148 @@ +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' +import type { McpToolResult } from '@/lib/mcp/types' +import { createLogger } from '@sim/logger' + +// Configure standard cache size limit +const MAX_SERVER_STATES = 1000 + +export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN' + +export interface CircuitBreakerConfig { + /** Number of failures before tripping to OPEN */ + failureThreshold: number + /** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */ + resetTimeoutMs: number +} + +interface ServerState { + state: CircuitState + failures: number + nextAttemptMs: number + isHalfOpenProbing: boolean +} + +const logger = createLogger('mcp:resilience:circuit-breaker') + +export class CircuitBreakerMiddleware implements McpMiddleware { + // Use a Map to maintain insertion order for standard LRU-like eviction if necessary. + // We constrain it to prevent memory leaks if thousands of ephemeral servers connect. + private registry = new Map() + private config: CircuitBreakerConfig + + constructor(config: Partial = {}) { + this.config = { + failureThreshold: config.failureThreshold ?? 5, + resetTimeoutMs: config.resetTimeoutMs ?? 30000, + } + } + + private getState(serverId: string): ServerState { + let state = this.registry.get(serverId) + if (!state) { + state = { + state: 'CLOSED', + failures: 0, + nextAttemptMs: 0, + isHalfOpenProbing: false, + } + this.registry.set(serverId, state) + this.evictIfNecessary() + } + return state + } + + private evictIfNecessary() { + if (this.registry.size > MAX_SERVER_STATES) { + // Evict the oldest entry (first inserted) + const firstKey = this.registry.keys().next().value + if (firstKey) { + this.registry.delete(firstKey) + } + } + } + + async execute( + context: McpExecutionContext, + next: McpMiddlewareNext + ): Promise { + const { serverId, toolCall } = context + const serverState = this.getState(serverId) + + // 1. Check current state and evaluate timeouts + if (serverState.state === 'OPEN') { + if (Date.now() > serverState.nextAttemptMs) { + // Time to try again, enter HALF-OPEN + logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`) + serverState.state = 'HALF-OPEN' + serverState.isHalfOpenProbing = false + } else { + // Fast-fail + throw new Error( + `Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.` + ) + } + } + + if (serverState.state === 'HALF-OPEN') { + if (serverState.isHalfOpenProbing) { + // Another request is already probing. Fast-fail concurrent requests. + throw new Error( + `Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.` + ) + } + // We are the chosen ones. Lock it down. + serverState.isHalfOpenProbing = true + } + + try { + // 2. Invoke the next layer + const result = await next(context) + + // 3. Handle result parsing (isError = true counts as failure for us) + if (result.isError) { + this.recordFailure(serverId, serverState) + } else { + this.recordSuccess(serverId, serverState) + } + + return result + } catch (error) { + // Note: we record failure on ANY exception + this.recordFailure(serverId, serverState) + throw error // Re-throw to caller + } + } + + private recordSuccess(serverId: string, state: ServerState) { + if (state.state !== 'CLOSED') { + logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`) + } + state.state = 'CLOSED' + state.failures = 0 + state.isHalfOpenProbing = false + } + + private recordFailure(serverId: string, state: ServerState) { + if (state.state === 'HALF-OPEN') { + // The probe failed! Trip immediately back to OPEN. + logger.warn( + `Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}` + ) + this.tripToOpen(state) + } else if (state.state === 'CLOSED') { + state.failures++ + if (state.failures >= this.config.failureThreshold) { + logger.error( + `Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}` + ) + this.tripToOpen(state) + } + } + } + + private tripToOpen(state: ServerState) { + state.state = 'OPEN' + state.isHalfOpenProbing = false + state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs + } +} diff --git a/apps/sim/lib/mcp/resilience/demo.ts b/apps/sim/lib/mcp/resilience/demo.ts new file mode 100644 index 00000000000..bc74922b1d4 --- /dev/null +++ b/apps/sim/lib/mcp/resilience/demo.ts @@ -0,0 +1,70 @@ +import { ResiliencePipeline } from './pipeline' +import { TelemetryMiddleware } from './telemetry' +import { CircuitBreakerMiddleware } from './circuit-breaker' +import { SchemaValidatorMiddleware } from './schema-validator' +import type { McpExecutionContext } from './types' + +// Setup Pipeline with a fast 1.5s reset timeout for the demo +const pipeline = new ResiliencePipeline() + .use(new TelemetryMiddleware()) + .use(new SchemaValidatorMiddleware()) + .use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 })) + +const mockContext: McpExecutionContext = { + toolCall: { name: 'flaky_tool', arguments: {} }, + serverId: 'demo-server', + userId: 'demo-user', + workspaceId: 'demo-workspace' +} + +let attemptTracker = 0 + +// A mock downstream MCP execution handler that fails the first 4 times, then succeeds +const mockExecuteTool = async () => { + attemptTracker++ + console.log(`\n--- Request #${attemptTracker} ---`) + + // Simulate network latency + await new Promise(r => setTimeout(r, 50)) + + if (attemptTracker <= 3) { + throw new Error('Connection Refused: Target server is down!') + } + + return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] } +} + +async function runDemo() { + console.log("šŸš€ Starting Resilience Pipeline Demo...\n") + + // Attempt 1: CLOSED -> Fails + try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } + + // Attempt 2: CLOSED -> Fails + try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } + + // Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN) + try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } + + // Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool) + try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`šŸ›‘ Fast-Fail Result: ${e.message}`) } + + console.log("\nā³ Waiting 2 seconds for Circuit Breaker to cool down...") + await new Promise(r => setTimeout(r, 2000)) + + // Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED) + try { + const result = await pipeline.execute(mockContext, mockExecuteTool) + console.log(`āœ… Result: ${result.content?.[0].text}`) + } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } + + // Attempt 6: CLOSED -> Succeeds normally + try { + const result = await pipeline.execute(mockContext, mockExecuteTool) + console.log(`āœ… Result: ${result.content?.[0].text}`) + } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } + + console.log("\nšŸŽ‰ Demo Complete!") +} + +runDemo().catch(console.error) diff --git a/apps/sim/lib/mcp/resilience/pipeline.test.ts b/apps/sim/lib/mcp/resilience/pipeline.test.ts index c15208c45bc..cc482204cec 100644 --- a/apps/sim/lib/mcp/resilience/pipeline.test.ts +++ b/apps/sim/lib/mcp/resilience/pipeline.test.ts @@ -135,3 +135,182 @@ describe('TelemetryMiddleware', () => { expect(logCall.failure_reason).toBe('TIMEOUT') }) }) + +const { CircuitBreakerMiddleware } = await import('./circuit-breaker') + +describe('CircuitBreakerMiddleware', () => { + const mockContext: McpExecutionContext = { + toolCall: { name: 'cb_tool', arguments: {} }, + serverId: 'cb-server-1', + userId: 'user-1', + workspaceId: 'workspace-1' + } + + test('should trip to OPEN after threshold failures', async () => { + const cb = new CircuitBreakerMiddleware({ failureThreshold: 2, resetTimeoutMs: 1000 }) + const errorMsg = 'Tool Failed' + const failingHandler: McpMiddlewareNext = async () => { throw new Error(errorMsg) } + + // 1st failure (CLOSED -> CLOSED) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) + + // 2nd failure (CLOSED -> OPEN) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) + + // 3rd attempt (OPEN -> Fail Fast) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow( + 'Circuit breaker is OPEN for server cb-server-1. Fast-failing request to cb_tool.' + ) + }) + + test('should transition CLOSED -> OPEN -> HALF-OPEN lock correctly', async () => { + const resetTimeoutMs = 50 + const cb = new CircuitBreakerMiddleware({ failureThreshold: 1, resetTimeoutMs }) + const failingHandler: McpMiddlewareNext = async () => { throw new Error('Fail') } + + // Trip to OPEN + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('Fail') + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('OPEN') + + // Wait for timeout to enter HALF-OPEN + await new Promise(r => setTimeout(r, resetTimeoutMs + 10)) + + // Create a Slow Probe Handler to mimic latency and hold the lock + let probeInProgress = false + const slowProbeHandler: McpMiddlewareNext = async () => { + probeInProgress = true + await new Promise(r => setTimeout(r, 100)) + return { content: [{ type: 'text', text: 'Probe Success' }] } + } + + // Send a batch of 3 concurrent requests while the reset timeout has passed + // The first should acquire HALF-OPEN, the rest should fail fast. + const promises = [ + cb.execute(mockContext, slowProbeHandler), + cb.execute(mockContext, async () => { return { content: [] } }), + cb.execute(mockContext, async () => { return { content: [] } }) + ] + + const results = await Promise.allSettled(promises) + + // Exactly one should succeed (the slow probe) + const fulfilled = results.filter(r => r.status === 'fulfilled') + expect(fulfilled.length).toBe(1) + expect((fulfilled[0] as PromiseFulfilledResult).value.content[0].text).toBe('Probe Success') + + // Exactly two should fail-fast due to HALF-OPEN lock + const rejected = results.filter(r => r.status === 'rejected') + expect(rejected.length).toBe(2) + expect((rejected[0] as PromiseRejectedResult).reason.message).toContain('Circuit breaker is HALF-OPEN') + + expect(probeInProgress).toBe(true) + + // Subsequent requests should now succeed (CLOSED again) + const newResult = await cb.execute(mockContext, async () => { return { content: [{ type: 'text', text: 'Normal' }] } }) + expect(newResult.content?.[0].text).toBe('Normal') + }) +}) + +const { SchemaValidatorMiddleware } = await import('./schema-validator') + +describe('SchemaValidatorMiddleware', () => { + const mockSchemaTool: any = { + name: 'test_schema_tool', + serverId: 's1', + serverName: 's1', + inputSchema: { + type: 'object', + properties: { + requiredStr: { type: 'string' }, + optionalNum: { type: 'number' }, + enumVal: { type: 'string', enum: ['A', 'B'] } + }, + required: ['requiredStr'] + } + } + + test('should compile, cache, and successfully validate valid args', async () => { + let providerCalled = 0 + const toolProvider = async (name: string) => { + providerCalled++ + return name === 'test_schema_tool' ? mockSchemaTool : undefined + } + + const validator = new SchemaValidatorMiddleware({ toolProvider }) + + const mockContext: any = { + toolCall: { + name: 'test_schema_tool', + arguments: { + requiredStr: 'hello', + enumVal: 'A' + } + }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1' + } + + let nextCalled = false + const nextHandler: any = async (ctx: any) => { + nextCalled = true + expect(ctx.toolCall.arguments).toEqual({ + requiredStr: 'hello', + enumVal: 'A' + }) + return { content: [{ type: 'text', text: 'ok' }] } + } + + const result1 = await validator.execute({ + ...mockContext, + toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } } + }, nextHandler) + expect(result1.content?.[0].text).toBe('ok') + expect(nextCalled).toBe(true) + expect(providerCalled).toBe(1) + + // Second call should hit the cache + nextCalled = false + const result2 = await validator.execute({ + ...mockContext, + toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } } + }, nextHandler) + expect(result2.content?.[0].text).toBe('ok') + expect(nextCalled).toBe(true) + expect(providerCalled).toBe(1) // from cache + }) + + test('should intercept validation failure and return gracefully formatted error', async () => { + const validator = new SchemaValidatorMiddleware() + validator.cacheTool(mockSchemaTool) + + const mockContext: any = { + toolCall: { + name: 'test_schema_tool', + arguments: { + // missing requiredStr + enumVal: 'C' // invalid enum + } + }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1' + } + + let nextCalled = false + const nextHandler: any = async () => { + nextCalled = true + return { content: [] } + } + + const result = await validator.execute(mockContext, nextHandler) + expect(nextCalled).toBe(false) + expect(result.isError).toBe(true) + expect(result.content?.[0].type).toBe('text') + + const errorText = result.content?.[0].text as string + expect(errorText).toContain('Schema validation failed') + expect(errorText).toContain('requiredStr') + expect(errorText).toContain('enumVal') + }) +}) diff --git a/apps/sim/lib/mcp/resilience/schema-validator.ts b/apps/sim/lib/mcp/resilience/schema-validator.ts new file mode 100644 index 00000000000..3d2c4891c2d --- /dev/null +++ b/apps/sim/lib/mcp/resilience/schema-validator.ts @@ -0,0 +1,152 @@ +import { z } from 'zod' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' +import type { McpTool, McpToolSchemaProperty, McpToolSchema } from '@/lib/mcp/types' +import type { McpToolResult } from '@/lib/mcp/types' +import { createLogger } from '@sim/logger' + +const logger = createLogger('mcp:schema-validator') + +export type ToolProvider = (toolName: string) => McpTool | undefined | Promise + +export class SchemaValidatorMiddleware implements McpMiddleware { + private schemaCache = new Map() + private toolProvider?: ToolProvider + + constructor(options?: { toolProvider?: ToolProvider }) { + this.toolProvider = options?.toolProvider + } + + /** + * Cache a tool's schema explicitly (e.g. during server discovery) + */ + cacheTool(tool: McpTool) { + if (!this.schemaCache.has(tool.name)) { + const zodSchema = this.compileSchema(tool.inputSchema) + this.schemaCache.set(tool.name, zodSchema) + } + } + + /** + * Clear caches, either for a specific tool or globally. + */ + clearCache(toolName?: string) { + if (toolName) { + this.schemaCache.delete(toolName) + } else { + this.schemaCache.clear() + } + } + + async execute( + context: McpExecutionContext, + next: McpMiddlewareNext + ): Promise { + const { toolCall } = context + const toolName = toolCall.name + + let zodSchema = this.schemaCache.get(toolName) + + if (!zodSchema && this.toolProvider) { + const tool = await this.toolProvider(toolName) + if (tool) { + zodSchema = this.compileSchema(tool.inputSchema) + this.schemaCache.set(toolName, zodSchema) + } + } + + if (zodSchema) { + const parseResult = await zodSchema.safeParseAsync(toolCall.arguments) + if (!parseResult.success) { + // Return natively formatted error payload + const errorDetails = parseResult.error.errors + .map(e => `${e.path.join('.') || 'root'}: ${e.message}`) + .join(', ') + + logger.warn('Schema validation failed', { toolName, error: errorDetails }) + + return { + isError: true, + content: [ + { + type: 'text', + text: `Schema validation failed: [${errorDetails}]` + } + ] + } + } + + // Sync successfully parsed / defaulted arguments back to context + context.toolCall.arguments = parseResult.data + } + + return next(context) + } + + private compileSchema(schema: McpToolSchema): z.ZodObject { + return this.compileObject(schema.properties || {}, schema.required || []) as z.ZodObject + } + + private compileObject(properties: Record, required: string[]): z.ZodTypeAny { + const shape: Record = {} + + for (const [key, prop] of Object.entries(properties)) { + let zodType = this.compileProperty(prop) + + if (!required.includes(key)) { + zodType = zodType.optional() + } + + shape[key] = zodType + } + + return z.object(shape) + } + + private compileProperty(prop: McpToolSchemaProperty): z.ZodTypeAny { + let baseType: z.ZodTypeAny = z.any() + + switch (prop.type) { + case 'string': + baseType = z.string() + break + case 'number': + case 'integer': + baseType = z.number() + break + case 'boolean': + baseType = z.boolean() + break + case 'array': + if (prop.items) { + baseType = z.array(this.compileProperty(prop.items)) + } else { + baseType = z.array(z.any()) + } + break + case 'object': + baseType = this.compileObject(prop.properties || {}, prop.required || []) + break + } + + // Apply Enum mappings + if (prop.enum && prop.enum.length > 0) { + if (prop.enum.length === 1) { + baseType = z.literal(prop.enum[0]) + } else { + // We use mapped literals injected into an array + const literals = prop.enum.map(e => z.literal(e)) + baseType = z.union(literals as any) + } + } + + if (prop.description) { + baseType = baseType.describe(prop.description) + } + + if (prop.default !== undefined) { + baseType = baseType.default(prop.default) + } + + return baseType + } +} diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 69e7cc81178..63a9f3f76af 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -27,6 +27,10 @@ import type { McpTransport, } from '@/lib/mcp/types' import { MCP_CONSTANTS } from '@/lib/mcp/utils' +import { ResiliencePipeline } from '@/lib/mcp/resilience/pipeline' +import { TelemetryMiddleware } from '@/lib/mcp/resilience/telemetry' +import { CircuitBreakerMiddleware } from '@/lib/mcp/resilience/circuit-breaker' +import { SchemaValidatorMiddleware } from '@/lib/mcp/resilience/schema-validator' const logger = createLogger('McpService') @@ -35,10 +39,23 @@ class McpService { private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT private unsubscribeConnectionManager?: () => void + private pipeline: ResiliencePipeline + private schemaValidator: SchemaValidatorMiddleware + private circuitBreaker: CircuitBreakerMiddleware + private telemetry: TelemetryMiddleware + constructor() { this.cacheAdapter = createMcpCacheAdapter() logger.info(`MCP Service initialized with ${getMcpCacheType()} cache`) + this.schemaValidator = new SchemaValidatorMiddleware() + this.circuitBreaker = new CircuitBreakerMiddleware() + this.telemetry = new TelemetryMiddleware() + this.pipeline = new ResiliencePipeline() + .use(this.telemetry) + .use(this.schemaValidator) + .use(this.circuitBreaker) + if (mcpConnectionManager) { this.unsubscribeConnectionManager = mcpConnectionManager.subscribe((event) => { this.clearCache(event.workspaceId) @@ -194,7 +211,16 @@ class McpService { const client = await this.createClient(resolvedConfig) try { - const result = await client.callTool(toolCall) + const context = { + serverId, + workspaceId, + userId, + toolCall, + extraHeaders, + } + const result = await this.pipeline.execute(context, async (ctx) => { + return await client.callTool(ctx.toolCall) + }) logger.info(`[${requestId}] Successfully executed tool ${toolCall.name}`) return result } finally { @@ -322,6 +348,7 @@ class McpService { try { const cached = await this.cacheAdapter.get(cacheKey) if (cached) { + cached.tools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t)) return cached.tools } } catch (error) { @@ -414,6 +441,7 @@ class McpService { logger.info( `[${requestId}] Discovered ${allTools.length} tools from ${servers.length - failedCount}/${servers.length} servers` ) + allTools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t)) return allTools } catch (error) { logger.error(`[${requestId}] Failed to discover MCP tools for user ${userId}:`, error) @@ -450,6 +478,7 @@ class McpService { try { const tools = await client.listTools() logger.info(`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`) + tools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t)) return tools } finally { await client.disconnect() @@ -533,6 +562,7 @@ class McpService { await this.cacheAdapter.clear() logger.debug('Cleared all MCP tool cache') } + this.schemaValidator.clearCache() } catch (error) { logger.warn('Failed to clear cache:', error) } From e1b3d1f8d2ed42f28dd3e172f704bf5932f1e1c3 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 16:32:02 -0700 Subject: [PATCH 8/9] feat(mcp): implement enterprise resilience pipeline (circuit breaker, schema validation) --- apps/sim/lib/mcp/resilience/ARCHITECTURE.md | 79 +++++++++++++++++++++ apps/sim/lib/mcp/resilience/README.md | 12 ---- 2 files changed, 79 insertions(+), 12 deletions(-) create mode 100644 apps/sim/lib/mcp/resilience/ARCHITECTURE.md diff --git a/apps/sim/lib/mcp/resilience/ARCHITECTURE.md b/apps/sim/lib/mcp/resilience/ARCHITECTURE.md new file mode 100644 index 00000000000..a66a3b4b821 --- /dev/null +++ b/apps/sim/lib/mcp/resilience/ARCHITECTURE.md @@ -0,0 +1,79 @@ +# MCP Resilience Pipeline + +**Objective:** Upgrade the core `executeTool` engine from a naive proxy to an Enterprise-Grade Resilience Pipeline, ensuring our AI workflows never suffer cascading failures from downstream server instability or LLM hallucinations. + +--- + +## 1. The "Thundering Herd" Problem (Circuit Breaker) + +**Before:** +When a downstream provider (e.g., a database or API) experienced latency or went down, our workflow engine would continuously retry. If 1,000 agents hit a struggling server simultaneously, they would overwhelm it (a DDOS-like "thundering herd"), crash our workflow executor, and severely degrade user experience across the platform. + +**After (`CircuitBreakerMiddleware`):** +We implemented an intelligent State Machine with a **HALF-OPEN Concurrency Semaphore**. +- **The Trip:** If a server fails 3 times, we cut the circuit (`OPEN` state). All subsequent requests instantly *fast-fail* locally (0ms latency), protecting the downstream server from being hammered. +- **The Elegant Recovery:** After a cooldown, we allow exactly **one** probe request through (`HALF-OPEN`). If it succeeds, the circuit closes. If it fails, it trips again. + +#### Live Demo Output + +```mermaid +sequenceDiagram + participant Agent + participant Pipeline + participant TargetServer + + Agent->>Pipeline: executeTool (Server Down) + Pipeline--xTargetServer: āŒ Fails (Attempt 1-3) + Note over Pipeline: šŸ”“ Tripped to OPEN + Agent->>Pipeline: executeTool + Pipeline-->>Agent: šŸ›‘ Fast-Fail (0ms latency) - Target Protected + Note over Pipeline: ā³ Cooldown... 🟔 HALF-OPEN + Agent->>Pipeline: executeTool (Probe) + Pipeline-->>TargetServer: Exact 1 request allowed + TargetServer-->>Pipeline: āœ… Success + Note over Pipeline: 🟢 Reset to CLOSED + Agent->>Pipeline: executeTool + Pipeline-->>TargetServer: Resume normal traffic +``` + +--- + +## 2. LLM Hallucinated Arguments (Schema Validator) + +**Before:** +If an LLM hallucinated arguments that didn't match a tool's JSON schema, the downstream server or our proxy would throw a fatal exception. The workflow would crash, requiring user intervention, and wasting the compute/tokens already spent. + +**After (`SchemaValidatorMiddleware`):** +We implemented high-performance **Zod Schema Caching**. +- We intercept the tool call *before* it leaves our system. +- If the schema is invalid, we do *not* crash. Instead, we return a gracefully formatted, native MCP error: `{ isError: true, content: "Schema validation failed: [Zod Error Details]" }`. +- **The Magic:** The LLM receives this error, realizes its mistake, and natively **self-corrects** on the next turn, achieving autonomous self-healing without dropping the user's workflow. + +--- + +## 3. The "Black Box" Problem (Telemetry) + +**Before:** +If a tool execution tool 10 seconds or failed, we had no granular visibility into *why*. Was it a network timeout? A validation error? A 500 from the target? + +**After (`TelemetryMiddleware`):** +Every single tool execution now generates rich metadata: +- `latency_ms` +- Exact `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`) +- `serverId` and `workspaceId` + +This allows us to build real-time monitoring dashboards to detect struggling third-party integrations before our users even report them. + +--- + +## Architectural Impact: The Composable Pipeline + +Perhaps the most significant engineering achievement is the **Architecture Shift**. We moved away from a brittle, monolithic proxy to a modern **Chain of Responsibility**. + +```typescript +// The new elegant implementation in McpService +this.pipeline = new ResiliencePipeline() + .use(this.telemetry) + .use(this.schemaValidator) + .use(this.circuitBreaker) +``` diff --git a/apps/sim/lib/mcp/resilience/README.md b/apps/sim/lib/mcp/resilience/README.md index eb5ac87469e..214fdb205b4 100644 --- a/apps/sim/lib/mcp/resilience/README.md +++ b/apps/sim/lib/mcp/resilience/README.md @@ -1,15 +1,3 @@ -# Goal Description -Implement a "Day 2" reliability layer for the simstudioai/sim workflow engine by building a composable Resilience Interceptor/Middleware Pipeline for the MCP `executeTool` logic. This pipeline ensures enterprise-grade stability by introducing a Circuit Breaker State Machine, Zod-based Schema Enforcement for LLM outputs, and detailed Telemetry for latency and failure analysis, while addressing high-concurrency Node/TS environments. - -## User Review Required -- Please confirm if `apps/sim/lib/mcp/service.ts` is the correct core injection point for wrapping `executeTool`. -- Note on file path: `apps/sim/lib/workflow/executor.ts` was not found. Instead, `apps/sim/executor/execution/executor.ts` and `apps/sim/tools/workflow/executor.ts` were analyzed. Ensure intercepting `McpService`'s `executeTool` serves your architectural needs. -- Please confirm the schema enforcement approach: we will compile and cache JSON Schemas to Zod validators upon MCP server discovery or lazily, instead of parsing dynamically per request. - -## Proposed Changes - -We will split the implementation into discrete PRs / Commits to maintain structure. - ### Part 1: Telemetry Hooks Implement the foundation for tracking. *(Change Rationale: Transitioning to a middleware pattern instead of a monolithic proxy, allowing telemetry to be composed easily).* From 3bcb1027a4e957195e4a5c5b69c9160e01bd89e6 Mon Sep 17 00:00:00 2001 From: Shanyu Dabbiru Date: Mon, 9 Mar 2026 16:47:55 -0700 Subject: [PATCH 9/9] style: run biome formatting across resilience pipeline --- .../sim/lib/mcp/resilience/circuit-breaker.ts | 237 ++++---- apps/sim/lib/mcp/resilience/demo.ts | 114 ++-- apps/sim/lib/mcp/resilience/pipeline.test.ts | 541 +++++++++--------- apps/sim/lib/mcp/resilience/pipeline.ts | 64 +-- .../lib/mcp/resilience/schema-validator.ts | 243 ++++---- apps/sim/lib/mcp/resilience/telemetry.ts | 91 ++- apps/sim/lib/mcp/resilience/types.ts | 9 +- apps/sim/lib/mcp/service.ts | 8 +- apps/sim/tsconfig.json | 74 +-- packages/logger/package.json | 2 +- 10 files changed, 686 insertions(+), 697 deletions(-) diff --git a/apps/sim/lib/mcp/resilience/circuit-breaker.ts b/apps/sim/lib/mcp/resilience/circuit-breaker.ts index a084e8e39a4..350714d4f69 100644 --- a/apps/sim/lib/mcp/resilience/circuit-breaker.ts +++ b/apps/sim/lib/mcp/resilience/circuit-breaker.ts @@ -1,6 +1,6 @@ -import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' -import type { McpToolResult } from '@/lib/mcp/types' import { createLogger } from '@sim/logger' +import type { McpToolResult } from '@/lib/mcp/types' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' // Configure standard cache size limit const MAX_SERVER_STATES = 1000 @@ -8,141 +8,136 @@ const MAX_SERVER_STATES = 1000 export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN' export interface CircuitBreakerConfig { - /** Number of failures before tripping to OPEN */ - failureThreshold: number - /** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */ - resetTimeoutMs: number + /** Number of failures before tripping to OPEN */ + failureThreshold: number + /** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */ + resetTimeoutMs: number } interface ServerState { - state: CircuitState - failures: number - nextAttemptMs: number - isHalfOpenProbing: boolean + state: CircuitState + failures: number + nextAttemptMs: number + isHalfOpenProbing: boolean } const logger = createLogger('mcp:resilience:circuit-breaker') export class CircuitBreakerMiddleware implements McpMiddleware { - // Use a Map to maintain insertion order for standard LRU-like eviction if necessary. - // We constrain it to prevent memory leaks if thousands of ephemeral servers connect. - private registry = new Map() - private config: CircuitBreakerConfig - - constructor(config: Partial = {}) { - this.config = { - failureThreshold: config.failureThreshold ?? 5, - resetTimeoutMs: config.resetTimeoutMs ?? 30000, - } + // Use a Map to maintain insertion order for standard LRU-like eviction if necessary. + // We constrain it to prevent memory leaks if thousands of ephemeral servers connect. + private registry = new Map() + private config: CircuitBreakerConfig + + constructor(config: Partial = {}) { + this.config = { + failureThreshold: config.failureThreshold ?? 5, + resetTimeoutMs: config.resetTimeoutMs ?? 30000, } - - private getState(serverId: string): ServerState { - let state = this.registry.get(serverId) - if (!state) { - state = { - state: 'CLOSED', - failures: 0, - nextAttemptMs: 0, - isHalfOpenProbing: false, - } - this.registry.set(serverId, state) - this.evictIfNecessary() - } - return state + } + + private getState(serverId: string): ServerState { + let state = this.registry.get(serverId) + if (!state) { + state = { + state: 'CLOSED', + failures: 0, + nextAttemptMs: 0, + isHalfOpenProbing: false, + } + this.registry.set(serverId, state) + this.evictIfNecessary() } - - private evictIfNecessary() { - if (this.registry.size > MAX_SERVER_STATES) { - // Evict the oldest entry (first inserted) - const firstKey = this.registry.keys().next().value - if (firstKey) { - this.registry.delete(firstKey) - } - } + return state + } + + private evictIfNecessary() { + if (this.registry.size > MAX_SERVER_STATES) { + // Evict the oldest entry (first inserted) + const firstKey = this.registry.keys().next().value + if (firstKey) { + this.registry.delete(firstKey) + } } - - async execute( - context: McpExecutionContext, - next: McpMiddlewareNext - ): Promise { - const { serverId, toolCall } = context - const serverState = this.getState(serverId) - - // 1. Check current state and evaluate timeouts - if (serverState.state === 'OPEN') { - if (Date.now() > serverState.nextAttemptMs) { - // Time to try again, enter HALF-OPEN - logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`) - serverState.state = 'HALF-OPEN' - serverState.isHalfOpenProbing = false - } else { - // Fast-fail - throw new Error( - `Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.` - ) - } - } - - if (serverState.state === 'HALF-OPEN') { - if (serverState.isHalfOpenProbing) { - // Another request is already probing. Fast-fail concurrent requests. - throw new Error( - `Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.` - ) - } - // We are the chosen ones. Lock it down. - serverState.isHalfOpenProbing = true - } - - try { - // 2. Invoke the next layer - const result = await next(context) - - // 3. Handle result parsing (isError = true counts as failure for us) - if (result.isError) { - this.recordFailure(serverId, serverState) - } else { - this.recordSuccess(serverId, serverState) - } - - return result - } catch (error) { - // Note: we record failure on ANY exception - this.recordFailure(serverId, serverState) - throw error // Re-throw to caller - } + } + + async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise { + const { serverId, toolCall } = context + const serverState = this.getState(serverId) + + // 1. Check current state and evaluate timeouts + if (serverState.state === 'OPEN') { + if (Date.now() > serverState.nextAttemptMs) { + // Time to try again, enter HALF-OPEN + logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`) + serverState.state = 'HALF-OPEN' + serverState.isHalfOpenProbing = false + } else { + // Fast-fail + throw new Error( + `Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.` + ) + } } - private recordSuccess(serverId: string, state: ServerState) { - if (state.state !== 'CLOSED') { - logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`) - } - state.state = 'CLOSED' - state.failures = 0 - state.isHalfOpenProbing = false + if (serverState.state === 'HALF-OPEN') { + if (serverState.isHalfOpenProbing) { + // Another request is already probing. Fast-fail concurrent requests. + throw new Error( + `Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.` + ) + } + // We are the chosen ones. Lock it down. + serverState.isHalfOpenProbing = true } - private recordFailure(serverId: string, state: ServerState) { - if (state.state === 'HALF-OPEN') { - // The probe failed! Trip immediately back to OPEN. - logger.warn( - `Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}` - ) - this.tripToOpen(state) - } else if (state.state === 'CLOSED') { - state.failures++ - if (state.failures >= this.config.failureThreshold) { - logger.error( - `Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}` - ) - this.tripToOpen(state) - } - } + try { + // 2. Invoke the next layer + const result = await next(context) + + // 3. Handle result parsing (isError = true counts as failure for us) + if (result.isError) { + this.recordFailure(serverId, serverState) + } else { + this.recordSuccess(serverId, serverState) + } + + return result + } catch (error) { + // Note: we record failure on ANY exception + this.recordFailure(serverId, serverState) + throw error // Re-throw to caller } + } - private tripToOpen(state: ServerState) { - state.state = 'OPEN' - state.isHalfOpenProbing = false - state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs + private recordSuccess(serverId: string, state: ServerState) { + if (state.state !== 'CLOSED') { + logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`) } + state.state = 'CLOSED' + state.failures = 0 + state.isHalfOpenProbing = false + } + + private recordFailure(serverId: string, state: ServerState) { + if (state.state === 'HALF-OPEN') { + // The probe failed! Trip immediately back to OPEN. + logger.warn(`Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}`) + this.tripToOpen(state) + } else if (state.state === 'CLOSED') { + state.failures++ + if (state.failures >= this.config.failureThreshold) { + logger.error( + `Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}` + ) + this.tripToOpen(state) + } + } + } + + private tripToOpen(state: ServerState) { + state.state = 'OPEN' + state.isHalfOpenProbing = false + state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs + } } diff --git a/apps/sim/lib/mcp/resilience/demo.ts b/apps/sim/lib/mcp/resilience/demo.ts index bc74922b1d4..d62f722e239 100644 --- a/apps/sim/lib/mcp/resilience/demo.ts +++ b/apps/sim/lib/mcp/resilience/demo.ts @@ -1,70 +1,90 @@ -import { ResiliencePipeline } from './pipeline' -import { TelemetryMiddleware } from './telemetry' import { CircuitBreakerMiddleware } from './circuit-breaker' +import { ResiliencePipeline } from './pipeline' import { SchemaValidatorMiddleware } from './schema-validator' +import { TelemetryMiddleware } from './telemetry' import type { McpExecutionContext } from './types' // Setup Pipeline with a fast 1.5s reset timeout for the demo const pipeline = new ResiliencePipeline() - .use(new TelemetryMiddleware()) - .use(new SchemaValidatorMiddleware()) - .use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 })) + .use(new TelemetryMiddleware()) + .use(new SchemaValidatorMiddleware()) + .use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 })) const mockContext: McpExecutionContext = { - toolCall: { name: 'flaky_tool', arguments: {} }, - serverId: 'demo-server', - userId: 'demo-user', - workspaceId: 'demo-workspace' + toolCall: { name: 'flaky_tool', arguments: {} }, + serverId: 'demo-server', + userId: 'demo-user', + workspaceId: 'demo-workspace', } let attemptTracker = 0 // A mock downstream MCP execution handler that fails the first 4 times, then succeeds const mockExecuteTool = async () => { - attemptTracker++ - console.log(`\n--- Request #${attemptTracker} ---`) + attemptTracker++ + console.log(`\n--- Request #${attemptTracker} ---`) - // Simulate network latency - await new Promise(r => setTimeout(r, 50)) + // Simulate network latency + await new Promise((r) => setTimeout(r, 50)) - if (attemptTracker <= 3) { - throw new Error('Connection Refused: Target server is down!') - } + if (attemptTracker <= 3) { + throw new Error('Connection Refused: Target server is down!') + } - return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] } + return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] } } async function runDemo() { - console.log("šŸš€ Starting Resilience Pipeline Demo...\n") - - // Attempt 1: CLOSED -> Fails - try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } - - // Attempt 2: CLOSED -> Fails - try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } - - // Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN) - try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } - - // Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool) - try { await pipeline.execute(mockContext, mockExecuteTool) } catch (e: any) { console.error(`šŸ›‘ Fast-Fail Result: ${e.message}`) } - - console.log("\nā³ Waiting 2 seconds for Circuit Breaker to cool down...") - await new Promise(r => setTimeout(r, 2000)) - - // Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED) - try { - const result = await pipeline.execute(mockContext, mockExecuteTool) - console.log(`āœ… Result: ${result.content?.[0].text}`) - } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } - - // Attempt 6: CLOSED -> Succeeds normally - try { - const result = await pipeline.execute(mockContext, mockExecuteTool) - console.log(`āœ… Result: ${result.content?.[0].text}`) - } catch (e: any) { console.error(`āŒ Result: ${e.message}`) } - - console.log("\nšŸŽ‰ Demo Complete!") + console.log('šŸš€ Starting Resilience Pipeline Demo...\n') + + // Attempt 1: CLOSED -> Fails + try { + await pipeline.execute(mockContext, mockExecuteTool) + } catch (e: any) { + console.error(`āŒ Result: ${e.message}`) + } + + // Attempt 2: CLOSED -> Fails + try { + await pipeline.execute(mockContext, mockExecuteTool) + } catch (e: any) { + console.error(`āŒ Result: ${e.message}`) + } + + // Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN) + try { + await pipeline.execute(mockContext, mockExecuteTool) + } catch (e: any) { + console.error(`āŒ Result: ${e.message}`) + } + + // Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool) + try { + await pipeline.execute(mockContext, mockExecuteTool) + } catch (e: any) { + console.error(`šŸ›‘ Fast-Fail Result: ${e.message}`) + } + + console.log('\nā³ Waiting 2 seconds for Circuit Breaker to cool down...') + await new Promise((r) => setTimeout(r, 2000)) + + // Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED) + try { + const result = await pipeline.execute(mockContext, mockExecuteTool) + console.log(`āœ… Result: ${result.content?.[0].text}`) + } catch (e: any) { + console.error(`āŒ Result: ${e.message}`) + } + + // Attempt 6: CLOSED -> Succeeds normally + try { + const result = await pipeline.execute(mockContext, mockExecuteTool) + console.log(`āœ… Result: ${result.content?.[0].text}`) + } catch (e: any) { + console.error(`āŒ Result: ${e.message}`) + } + + console.log('\nšŸŽ‰ Demo Complete!') } runDemo().catch(console.error) diff --git a/apps/sim/lib/mcp/resilience/pipeline.test.ts b/apps/sim/lib/mcp/resilience/pipeline.test.ts index cc482204cec..0f7655fba94 100644 --- a/apps/sim/lib/mcp/resilience/pipeline.test.ts +++ b/apps/sim/lib/mcp/resilience/pipeline.test.ts @@ -1,4 +1,4 @@ -import { test, expect, describe, mock } from 'bun:test' +import { describe, expect, mock, test } from 'bun:test' import { ResiliencePipeline } from './pipeline' import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' @@ -7,310 +7,329 @@ const errorError = mock() // Mock logger before any imports of telemetry mock.module('@sim/logger', () => ({ - createLogger: () => ({ - info: infoInfo, - error: errorError, - warn: mock(), - debug: mock() - }) + createLogger: () => ({ + info: infoInfo, + error: errorError, + warn: mock(), + debug: mock(), + }), })) // Dynamically import TelemetryMiddleware so the mock applies const { TelemetryMiddleware } = await import('./telemetry') -import { createLogger } from '@sim/logger' describe('ResiliencePipeline', () => { - const mockContext: McpExecutionContext = { - toolCall: { name: 'test_tool', arguments: {} }, - serverId: 'server-1', - userId: 'user-1', - workspaceId: 'workspace-1' + const mockContext: McpExecutionContext = { + toolCall: { name: 'test_tool', arguments: {} }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1', + } + + test('should execute middlewares in order', async () => { + const pipeline = new ResiliencePipeline() + const order: number[] = [] + + const m1: McpMiddleware = { + execute: async (ctx, next) => { + order.push(1) + const res = await next(ctx) + order.push(4) + return res + }, } - test('should execute middlewares in order', async () => { - const pipeline = new ResiliencePipeline() - const order: number[] = [] - - const m1: McpMiddleware = { - execute: async (ctx, next) => { - order.push(1) - const res = await next(ctx) - order.push(4) - return res - } - } - - const m2: McpMiddleware = { - execute: async (ctx, next) => { - order.push(2) - const res = await next(ctx) - order.push(3) - return res - } - } - - pipeline.use(m1).use(m2) - - const finalHandler: McpMiddlewareNext = async () => { - return { content: [{ type: 'text', text: 'success' }] } - } - - const result = await pipeline.execute(mockContext, finalHandler) - - expect(order).toEqual([1, 2, 3, 4]) - expect(result.content?.[0].text).toBe('success') - }) -}) + const m2: McpMiddleware = { + execute: async (ctx, next) => { + order.push(2) + const res = await next(ctx) + order.push(3) + return res + }, + } -describe('TelemetryMiddleware', () => { - const mockContext: McpExecutionContext = { - toolCall: { name: 'telemetry_tool', arguments: {} }, - serverId: 'server-2', - userId: 'user-2', - workspaceId: 'workspace-2' + pipeline.use(m1).use(m2) + + const finalHandler: McpMiddlewareNext = async () => { + return { content: [{ type: 'text', text: 'success' }] } } - test('should log success with latency', async () => { - infoInfo.mockClear() + const result = await pipeline.execute(mockContext, finalHandler) - const telemetry = new TelemetryMiddleware() + expect(order).toEqual([1, 2, 3, 4]) + expect(result.content?.[0].text).toBe('success') + }) +}) - const finalHandler: McpMiddlewareNext = async () => { - // simulate some latency - await new Promise(r => setTimeout(r, 10)) - return { content: [] } - } +describe('TelemetryMiddleware', () => { + const mockContext: McpExecutionContext = { + toolCall: { name: 'telemetry_tool', arguments: {} }, + serverId: 'server-2', + userId: 'user-2', + workspaceId: 'workspace-2', + } + + test('should log success with latency', async () => { + infoInfo.mockClear() + + const telemetry = new TelemetryMiddleware() + + const finalHandler: McpMiddlewareNext = async () => { + // simulate some latency + await new Promise((r) => setTimeout(r, 10)) + return { content: [] } + } - await telemetry.execute(mockContext, finalHandler) + await telemetry.execute(mockContext, finalHandler) - expect(infoInfo).toHaveBeenCalled() - const msg = infoInfo.mock.calls[0][0] - const logCall = infoInfo.mock.calls[0][1] - expect(msg).toBe('MCP Tool Execution Completed') - expect(logCall.toolName).toBe('telemetry_tool') - expect(logCall.latency_ms).toBeGreaterThanOrEqual(10) - expect(logCall.success).toBe(true) - }) + expect(infoInfo).toHaveBeenCalled() + const msg = infoInfo.mock.calls[0][0] + const logCall = infoInfo.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Completed') + expect(logCall.toolName).toBe('telemetry_tool') + expect(logCall.latency_ms).toBeGreaterThanOrEqual(10) + expect(logCall.success).toBe(true) + }) - test('should log TOOL_ERROR when tool result has isError: true', async () => { - infoInfo.mockClear() + test('should log TOOL_ERROR when tool result has isError: true', async () => { + infoInfo.mockClear() - const telemetry = new TelemetryMiddleware() + const telemetry = new TelemetryMiddleware() - const finalHandler: McpMiddlewareNext = async () => { - return { isError: true, content: [] } - } + const finalHandler: McpMiddlewareNext = async () => { + return { isError: true, content: [] } + } - await telemetry.execute(mockContext, finalHandler) + await telemetry.execute(mockContext, finalHandler) - expect(infoInfo).toHaveBeenCalled() - const msg = infoInfo.mock.calls[0][0] - const logCall = infoInfo.mock.calls[0][1] - expect(msg).toBe('MCP Tool Execution Completed') - expect(logCall.success).toBe(false) - expect(logCall.failure_reason).toBe('TOOL_ERROR') - }) + expect(infoInfo).toHaveBeenCalled() + const msg = infoInfo.mock.calls[0][0] + const logCall = infoInfo.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Completed') + expect(logCall.success).toBe(false) + expect(logCall.failure_reason).toBe('TOOL_ERROR') + }) - test('should log exception and rethrow with TIMEOUT explanation', async () => { - errorError.mockClear() + test('should log exception and rethrow with TIMEOUT explanation', async () => { + errorError.mockClear() - const telemetry = new TelemetryMiddleware() + const telemetry = new TelemetryMiddleware() - const finalHandler: McpMiddlewareNext = async () => { - throw new Error('Connection timeout occurred') - } + const finalHandler: McpMiddlewareNext = async () => { + throw new Error('Connection timeout occurred') + } - let caughtError: Error | null = null - try { - await telemetry.execute(mockContext, finalHandler) - } catch (e: any) { - caughtError = e - } + let caughtError: Error | null = null + try { + await telemetry.execute(mockContext, finalHandler) + } catch (e: any) { + caughtError = e + } - expect(caughtError).toBeDefined() - expect(errorError).toHaveBeenCalled() - const msg = errorError.mock.calls[0][0] - const logCall = errorError.mock.calls[0][1] - expect(msg).toBe('MCP Tool Execution Failed') - expect(logCall.failure_reason).toBe('TIMEOUT') - }) + expect(caughtError).toBeDefined() + expect(errorError).toHaveBeenCalled() + const msg = errorError.mock.calls[0][0] + const logCall = errorError.mock.calls[0][1] + expect(msg).toBe('MCP Tool Execution Failed') + expect(logCall.failure_reason).toBe('TIMEOUT') + }) }) const { CircuitBreakerMiddleware } = await import('./circuit-breaker') describe('CircuitBreakerMiddleware', () => { - const mockContext: McpExecutionContext = { - toolCall: { name: 'cb_tool', arguments: {} }, - serverId: 'cb-server-1', - userId: 'user-1', - workspaceId: 'workspace-1' + const mockContext: McpExecutionContext = { + toolCall: { name: 'cb_tool', arguments: {} }, + serverId: 'cb-server-1', + userId: 'user-1', + workspaceId: 'workspace-1', + } + + test('should trip to OPEN after threshold failures', async () => { + const cb = new CircuitBreakerMiddleware({ failureThreshold: 2, resetTimeoutMs: 1000 }) + const errorMsg = 'Tool Failed' + const failingHandler: McpMiddlewareNext = async () => { + throw new Error(errorMsg) } - test('should trip to OPEN after threshold failures', async () => { - const cb = new CircuitBreakerMiddleware({ failureThreshold: 2, resetTimeoutMs: 1000 }) - const errorMsg = 'Tool Failed' - const failingHandler: McpMiddlewareNext = async () => { throw new Error(errorMsg) } + // 1st failure (CLOSED -> CLOSED) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) - // 1st failure (CLOSED -> CLOSED) - await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) + // 2nd failure (CLOSED -> OPEN) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) - // 2nd failure (CLOSED -> OPEN) - await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow(errorMsg) + // 3rd attempt (OPEN -> Fail Fast) + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow( + 'Circuit breaker is OPEN for server cb-server-1. Fast-failing request to cb_tool.' + ) + }) - // 3rd attempt (OPEN -> Fail Fast) - await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow( - 'Circuit breaker is OPEN for server cb-server-1. Fast-failing request to cb_tool.' - ) - }) + test('should transition CLOSED -> OPEN -> HALF-OPEN lock correctly', async () => { + const resetTimeoutMs = 50 + const cb = new CircuitBreakerMiddleware({ failureThreshold: 1, resetTimeoutMs }) + const failingHandler: McpMiddlewareNext = async () => { + throw new Error('Fail') + } - test('should transition CLOSED -> OPEN -> HALF-OPEN lock correctly', async () => { - const resetTimeoutMs = 50 - const cb = new CircuitBreakerMiddleware({ failureThreshold: 1, resetTimeoutMs }) - const failingHandler: McpMiddlewareNext = async () => { throw new Error('Fail') } - - // Trip to OPEN - await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('Fail') - await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('OPEN') - - // Wait for timeout to enter HALF-OPEN - await new Promise(r => setTimeout(r, resetTimeoutMs + 10)) - - // Create a Slow Probe Handler to mimic latency and hold the lock - let probeInProgress = false - const slowProbeHandler: McpMiddlewareNext = async () => { - probeInProgress = true - await new Promise(r => setTimeout(r, 100)) - return { content: [{ type: 'text', text: 'Probe Success' }] } - } - - // Send a batch of 3 concurrent requests while the reset timeout has passed - // The first should acquire HALF-OPEN, the rest should fail fast. - const promises = [ - cb.execute(mockContext, slowProbeHandler), - cb.execute(mockContext, async () => { return { content: [] } }), - cb.execute(mockContext, async () => { return { content: [] } }) - ] - - const results = await Promise.allSettled(promises) - - // Exactly one should succeed (the slow probe) - const fulfilled = results.filter(r => r.status === 'fulfilled') - expect(fulfilled.length).toBe(1) - expect((fulfilled[0] as PromiseFulfilledResult).value.content[0].text).toBe('Probe Success') - - // Exactly two should fail-fast due to HALF-OPEN lock - const rejected = results.filter(r => r.status === 'rejected') - expect(rejected.length).toBe(2) - expect((rejected[0] as PromiseRejectedResult).reason.message).toContain('Circuit breaker is HALF-OPEN') - - expect(probeInProgress).toBe(true) - - // Subsequent requests should now succeed (CLOSED again) - const newResult = await cb.execute(mockContext, async () => { return { content: [{ type: 'text', text: 'Normal' }] } }) - expect(newResult.content?.[0].text).toBe('Normal') + // Trip to OPEN + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('Fail') + await expect(cb.execute(mockContext, failingHandler)).rejects.toThrow('OPEN') + + // Wait for timeout to enter HALF-OPEN + await new Promise((r) => setTimeout(r, resetTimeoutMs + 10)) + + // Create a Slow Probe Handler to mimic latency and hold the lock + let probeInProgress = false + const slowProbeHandler: McpMiddlewareNext = async () => { + probeInProgress = true + await new Promise((r) => setTimeout(r, 100)) + return { content: [{ type: 'text', text: 'Probe Success' }] } + } + + // Send a batch of 3 concurrent requests while the reset timeout has passed + // The first should acquire HALF-OPEN, the rest should fail fast. + const promises = [ + cb.execute(mockContext, slowProbeHandler), + cb.execute(mockContext, async () => { + return { content: [] } + }), + cb.execute(mockContext, async () => { + return { content: [] } + }), + ] + + const results = await Promise.allSettled(promises) + + // Exactly one should succeed (the slow probe) + const fulfilled = results.filter((r) => r.status === 'fulfilled') + expect(fulfilled.length).toBe(1) + expect((fulfilled[0] as PromiseFulfilledResult).value.content[0].text).toBe( + 'Probe Success' + ) + + // Exactly two should fail-fast due to HALF-OPEN lock + const rejected = results.filter((r) => r.status === 'rejected') + expect(rejected.length).toBe(2) + expect((rejected[0] as PromiseRejectedResult).reason.message).toContain( + 'Circuit breaker is HALF-OPEN' + ) + + expect(probeInProgress).toBe(true) + + // Subsequent requests should now succeed (CLOSED again) + const newResult = await cb.execute(mockContext, async () => { + return { content: [{ type: 'text', text: 'Normal' }] } }) + expect(newResult.content?.[0].text).toBe('Normal') + }) }) const { SchemaValidatorMiddleware } = await import('./schema-validator') describe('SchemaValidatorMiddleware', () => { - const mockSchemaTool: any = { + const mockSchemaTool: any = { + name: 'test_schema_tool', + serverId: 's1', + serverName: 's1', + inputSchema: { + type: 'object', + properties: { + requiredStr: { type: 'string' }, + optionalNum: { type: 'number' }, + enumVal: { type: 'string', enum: ['A', 'B'] }, + }, + required: ['requiredStr'], + }, + } + + test('should compile, cache, and successfully validate valid args', async () => { + let providerCalled = 0 + const toolProvider = async (name: string) => { + providerCalled++ + return name === 'test_schema_tool' ? mockSchemaTool : undefined + } + + const validator = new SchemaValidatorMiddleware({ toolProvider }) + + const mockContext: any = { + toolCall: { name: 'test_schema_tool', - serverId: 's1', - serverName: 's1', - inputSchema: { - type: 'object', - properties: { - requiredStr: { type: 'string' }, - optionalNum: { type: 'number' }, - enumVal: { type: 'string', enum: ['A', 'B'] } - }, - required: ['requiredStr'] - } + arguments: { + requiredStr: 'hello', + enumVal: 'A', + }, + }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1', } - test('should compile, cache, and successfully validate valid args', async () => { - let providerCalled = 0 - const toolProvider = async (name: string) => { - providerCalled++ - return name === 'test_schema_tool' ? mockSchemaTool : undefined - } - - const validator = new SchemaValidatorMiddleware({ toolProvider }) - - const mockContext: any = { - toolCall: { - name: 'test_schema_tool', - arguments: { - requiredStr: 'hello', - enumVal: 'A' - } - }, - serverId: 'server-1', - userId: 'user-1', - workspaceId: 'workspace-1' - } - - let nextCalled = false - const nextHandler: any = async (ctx: any) => { - nextCalled = true - expect(ctx.toolCall.arguments).toEqual({ - requiredStr: 'hello', - enumVal: 'A' - }) - return { content: [{ type: 'text', text: 'ok' }] } - } - - const result1 = await validator.execute({ - ...mockContext, - toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } } - }, nextHandler) - expect(result1.content?.[0].text).toBe('ok') - expect(nextCalled).toBe(true) - expect(providerCalled).toBe(1) - - // Second call should hit the cache - nextCalled = false - const result2 = await validator.execute({ - ...mockContext, - toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } } - }, nextHandler) - expect(result2.content?.[0].text).toBe('ok') - expect(nextCalled).toBe(true) - expect(providerCalled).toBe(1) // from cache - }) + let nextCalled = false + const nextHandler: any = async (ctx: any) => { + nextCalled = true + expect(ctx.toolCall.arguments).toEqual({ + requiredStr: 'hello', + enumVal: 'A', + }) + return { content: [{ type: 'text', text: 'ok' }] } + } - test('should intercept validation failure and return gracefully formatted error', async () => { - const validator = new SchemaValidatorMiddleware() - validator.cacheTool(mockSchemaTool) - - const mockContext: any = { - toolCall: { - name: 'test_schema_tool', - arguments: { - // missing requiredStr - enumVal: 'C' // invalid enum - } - }, - serverId: 'server-1', - userId: 'user-1', - workspaceId: 'workspace-1' - } - - let nextCalled = false - const nextHandler: any = async () => { - nextCalled = true - return { content: [] } - } - - const result = await validator.execute(mockContext, nextHandler) - expect(nextCalled).toBe(false) - expect(result.isError).toBe(true) - expect(result.content?.[0].type).toBe('text') - - const errorText = result.content?.[0].text as string - expect(errorText).toContain('Schema validation failed') - expect(errorText).toContain('requiredStr') - expect(errorText).toContain('enumVal') - }) + const result1 = await validator.execute( + { + ...mockContext, + toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } }, + }, + nextHandler + ) + expect(result1.content?.[0].text).toBe('ok') + expect(nextCalled).toBe(true) + expect(providerCalled).toBe(1) + + // Second call should hit the cache + nextCalled = false + const result2 = await validator.execute( + { + ...mockContext, + toolCall: { name: 'test_schema_tool', arguments: { requiredStr: 'hello', enumVal: 'A' } }, + }, + nextHandler + ) + expect(result2.content?.[0].text).toBe('ok') + expect(nextCalled).toBe(true) + expect(providerCalled).toBe(1) // from cache + }) + + test('should intercept validation failure and return gracefully formatted error', async () => { + const validator = new SchemaValidatorMiddleware() + validator.cacheTool(mockSchemaTool) + + const mockContext: any = { + toolCall: { + name: 'test_schema_tool', + arguments: { + // missing requiredStr + enumVal: 'C', // invalid enum + }, + }, + serverId: 'server-1', + userId: 'user-1', + workspaceId: 'workspace-1', + } + + let nextCalled = false + const nextHandler: any = async () => { + nextCalled = true + return { content: [] } + } + + const result = await validator.execute(mockContext, nextHandler) + expect(nextCalled).toBe(false) + expect(result.isError).toBe(true) + expect(result.content?.[0].type).toBe('text') + + const errorText = result.content?.[0].text as string + expect(errorText).toContain('Schema validation failed') + expect(errorText).toContain('requiredStr') + expect(errorText).toContain('enumVal') + }) }) diff --git a/apps/sim/lib/mcp/resilience/pipeline.ts b/apps/sim/lib/mcp/resilience/pipeline.ts index eef80a103e7..13288dcb27f 100644 --- a/apps/sim/lib/mcp/resilience/pipeline.ts +++ b/apps/sim/lib/mcp/resilience/pipeline.ts @@ -1,42 +1,42 @@ -import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' import type { McpToolResult } from '@/lib/mcp/types' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' export class ResiliencePipeline { - private middlewares: McpMiddleware[] = [] + private middlewares: McpMiddleware[] = [] - /** - * Add a middleware to the pipeline chain. - */ - use(middleware: McpMiddleware): this { - this.middlewares.push(middleware) - return this - } + /** + * Add a middleware to the pipeline chain. + */ + use(middleware: McpMiddleware): this { + this.middlewares.push(middleware) + return this + } - /** - * Execute the pipeline, processing the context through all middlewares, - * and finally invoking the terminal handler. - */ - async execute( - context: McpExecutionContext, - finalHandler: McpMiddlewareNext - ): Promise { - let index = -1 + /** + * Execute the pipeline, processing the context through all middlewares, + * and finally invoking the terminal handler. + */ + async execute( + context: McpExecutionContext, + finalHandler: McpMiddlewareNext + ): Promise { + let index = -1 - const dispatch = async (i: number): Promise => { - if (i <= index) { - throw new Error('next() called multiple times') - } - index = i + const dispatch = async (i: number): Promise => { + if (i <= index) { + throw new Error('next() called multiple times') + } + index = i - // If we reached the end of the middlewares, call the final handler - if (i === this.middlewares.length) { - return finalHandler(context) - } + // If we reached the end of the middlewares, call the final handler + if (i === this.middlewares.length) { + return finalHandler(context) + } - const middleware = this.middlewares[i] - return middleware.execute(context, () => dispatch(i + 1)) - } - - return dispatch(0) + const middleware = this.middlewares[i] + return middleware.execute(context, () => dispatch(i + 1)) } + + return dispatch(0) + } } diff --git a/apps/sim/lib/mcp/resilience/schema-validator.ts b/apps/sim/lib/mcp/resilience/schema-validator.ts index 3d2c4891c2d..11077f623a2 100644 --- a/apps/sim/lib/mcp/resilience/schema-validator.ts +++ b/apps/sim/lib/mcp/resilience/schema-validator.ts @@ -1,152 +1,151 @@ +import { createLogger } from '@sim/logger' import { z } from 'zod' +import type { McpTool, McpToolResult, McpToolSchema, McpToolSchemaProperty } from '@/lib/mcp/types' import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' -import type { McpTool, McpToolSchemaProperty, McpToolSchema } from '@/lib/mcp/types' -import type { McpToolResult } from '@/lib/mcp/types' -import { createLogger } from '@sim/logger' const logger = createLogger('mcp:schema-validator') export type ToolProvider = (toolName: string) => McpTool | undefined | Promise export class SchemaValidatorMiddleware implements McpMiddleware { - private schemaCache = new Map() - private toolProvider?: ToolProvider - - constructor(options?: { toolProvider?: ToolProvider }) { - this.toolProvider = options?.toolProvider + private schemaCache = new Map() + private toolProvider?: ToolProvider + + constructor(options?: { toolProvider?: ToolProvider }) { + this.toolProvider = options?.toolProvider + } + + /** + * Cache a tool's schema explicitly (e.g. during server discovery) + */ + cacheTool(tool: McpTool) { + if (!this.schemaCache.has(tool.name)) { + const zodSchema = this.compileSchema(tool.inputSchema) + this.schemaCache.set(tool.name, zodSchema) } - - /** - * Cache a tool's schema explicitly (e.g. during server discovery) - */ - cacheTool(tool: McpTool) { - if (!this.schemaCache.has(tool.name)) { - const zodSchema = this.compileSchema(tool.inputSchema) - this.schemaCache.set(tool.name, zodSchema) - } + } + + /** + * Clear caches, either for a specific tool or globally. + */ + clearCache(toolName?: string) { + if (toolName) { + this.schemaCache.delete(toolName) + } else { + this.schemaCache.clear() } + } - /** - * Clear caches, either for a specific tool or globally. - */ - clearCache(toolName?: string) { - if (toolName) { - this.schemaCache.delete(toolName) - } else { - this.schemaCache.clear() - } - } + async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise { + const { toolCall } = context + const toolName = toolCall.name - async execute( - context: McpExecutionContext, - next: McpMiddlewareNext - ): Promise { - const { toolCall } = context - const toolName = toolCall.name - - let zodSchema = this.schemaCache.get(toolName) - - if (!zodSchema && this.toolProvider) { - const tool = await this.toolProvider(toolName) - if (tool) { - zodSchema = this.compileSchema(tool.inputSchema) - this.schemaCache.set(toolName, zodSchema) - } - } + let zodSchema = this.schemaCache.get(toolName) + + if (!zodSchema && this.toolProvider) { + const tool = await this.toolProvider(toolName) + if (tool) { + zodSchema = this.compileSchema(tool.inputSchema) + this.schemaCache.set(toolName, zodSchema) + } + } - if (zodSchema) { - const parseResult = await zodSchema.safeParseAsync(toolCall.arguments) - if (!parseResult.success) { - // Return natively formatted error payload - const errorDetails = parseResult.error.errors - .map(e => `${e.path.join('.') || 'root'}: ${e.message}`) - .join(', ') - - logger.warn('Schema validation failed', { toolName, error: errorDetails }) - - return { - isError: true, - content: [ - { - type: 'text', - text: `Schema validation failed: [${errorDetails}]` - } - ] - } - } - - // Sync successfully parsed / defaulted arguments back to context - context.toolCall.arguments = parseResult.data + if (zodSchema) { + const parseResult = await zodSchema.safeParseAsync(toolCall.arguments) + if (!parseResult.success) { + // Return natively formatted error payload + const errorDetails = parseResult.error.errors + .map((e) => `${e.path.join('.') || 'root'}: ${e.message}`) + .join(', ') + + logger.warn('Schema validation failed', { toolName, error: errorDetails }) + + return { + isError: true, + content: [ + { + type: 'text', + text: `Schema validation failed: [${errorDetails}]`, + }, + ], } + } - return next(context) + // Sync successfully parsed / defaulted arguments back to context + context.toolCall.arguments = parseResult.data } - private compileSchema(schema: McpToolSchema): z.ZodObject { - return this.compileObject(schema.properties || {}, schema.required || []) as z.ZodObject - } + return next(context) + } - private compileObject(properties: Record, required: string[]): z.ZodTypeAny { - const shape: Record = {} + private compileSchema(schema: McpToolSchema): z.ZodObject { + return this.compileObject(schema.properties || {}, schema.required || []) as z.ZodObject + } - for (const [key, prop] of Object.entries(properties)) { - let zodType = this.compileProperty(prop) + private compileObject( + properties: Record, + required: string[] + ): z.ZodTypeAny { + const shape: Record = {} - if (!required.includes(key)) { - zodType = zodType.optional() - } + for (const [key, prop] of Object.entries(properties)) { + let zodType = this.compileProperty(prop) - shape[key] = zodType - } + if (!required.includes(key)) { + zodType = zodType.optional() + } - return z.object(shape) + shape[key] = zodType } - private compileProperty(prop: McpToolSchemaProperty): z.ZodTypeAny { - let baseType: z.ZodTypeAny = z.any() - - switch (prop.type) { - case 'string': - baseType = z.string() - break - case 'number': - case 'integer': - baseType = z.number() - break - case 'boolean': - baseType = z.boolean() - break - case 'array': - if (prop.items) { - baseType = z.array(this.compileProperty(prop.items)) - } else { - baseType = z.array(z.any()) - } - break - case 'object': - baseType = this.compileObject(prop.properties || {}, prop.required || []) - break - } - - // Apply Enum mappings - if (prop.enum && prop.enum.length > 0) { - if (prop.enum.length === 1) { - baseType = z.literal(prop.enum[0]) - } else { - // We use mapped literals injected into an array - const literals = prop.enum.map(e => z.literal(e)) - baseType = z.union(literals as any) - } + return z.object(shape) + } + + private compileProperty(prop: McpToolSchemaProperty): z.ZodTypeAny { + let baseType: z.ZodTypeAny = z.any() + + switch (prop.type) { + case 'string': + baseType = z.string() + break + case 'number': + case 'integer': + baseType = z.number() + break + case 'boolean': + baseType = z.boolean() + break + case 'array': + if (prop.items) { + baseType = z.array(this.compileProperty(prop.items)) + } else { + baseType = z.array(z.any()) } + break + case 'object': + baseType = this.compileObject(prop.properties || {}, prop.required || []) + break + } - if (prop.description) { - baseType = baseType.describe(prop.description) - } + // Apply Enum mappings + if (prop.enum && prop.enum.length > 0) { + if (prop.enum.length === 1) { + baseType = z.literal(prop.enum[0]) + } else { + // We use mapped literals injected into an array + const literals = prop.enum.map((e) => z.literal(e)) + baseType = z.union(literals as any) + } + } - if (prop.default !== undefined) { - baseType = baseType.default(prop.default) - } + if (prop.description) { + baseType = baseType.describe(prop.description) + } - return baseType + if (prop.default !== undefined) { + baseType = baseType.default(prop.default) } + + return baseType + } } diff --git a/apps/sim/lib/mcp/resilience/telemetry.ts b/apps/sim/lib/mcp/resilience/telemetry.ts index d3bc0eea663..f124dcef75f 100644 --- a/apps/sim/lib/mcp/resilience/telemetry.ts +++ b/apps/sim/lib/mcp/resilience/telemetry.ts @@ -1,56 +1,53 @@ import { createLogger } from '@sim/logger' -import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' import type { McpToolResult } from '@/lib/mcp/types' +import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' const logger = createLogger('mcp:telemetry') export class TelemetryMiddleware implements McpMiddleware { - async execute( - context: McpExecutionContext, - next: McpMiddlewareNext - ): Promise { - const startTime = performance.now() - - try { - const result = await next(context) - - const latency_ms = Math.round(performance.now() - startTime) - const isError = result.isError === true - - logger.info('MCP Tool Execution Completed', { - toolName: context.toolCall.name, - serverId: context.serverId, - workspaceId: context.workspaceId, - latency_ms, - success: !isError, - ...(isError && { failure_reason: 'TOOL_ERROR' }) - }) - - return result - } catch (error) { - const latency_ms = Math.round(performance.now() - startTime) - - // Attempt to determine failure reason based on error - let failure_reason = 'API_500' // General failure fallback - if (error instanceof Error) { - const lowerMsg = error.message.toLowerCase() - if (error.name === 'TimeoutError' || lowerMsg.includes('timeout')) { - failure_reason = 'TIMEOUT' - } else if (lowerMsg.includes('validation') || error.name === 'ZodError') { - failure_reason = 'VALIDATION_ERROR' - } - } - - logger.error('MCP Tool Execution Failed', { - toolName: context.toolCall.name, - serverId: context.serverId, - workspaceId: context.workspaceId, - latency_ms, - failure_reason, - err: error instanceof Error ? error.message : String(error) - }) - - throw error // Re-throw to allow upstream handling (e.g. circuit breaker) + async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise { + const startTime = performance.now() + + try { + const result = await next(context) + + const latency_ms = Math.round(performance.now() - startTime) + const isError = result.isError === true + + logger.info('MCP Tool Execution Completed', { + toolName: context.toolCall.name, + serverId: context.serverId, + workspaceId: context.workspaceId, + latency_ms, + success: !isError, + ...(isError && { failure_reason: 'TOOL_ERROR' }), + }) + + return result + } catch (error) { + const latency_ms = Math.round(performance.now() - startTime) + + // Attempt to determine failure reason based on error + let failure_reason = 'API_500' // General failure fallback + if (error instanceof Error) { + const lowerMsg = error.message.toLowerCase() + if (error.name === 'TimeoutError' || lowerMsg.includes('timeout')) { + failure_reason = 'TIMEOUT' + } else if (lowerMsg.includes('validation') || error.name === 'ZodError') { + failure_reason = 'VALIDATION_ERROR' } + } + + logger.error('MCP Tool Execution Failed', { + toolName: context.toolCall.name, + serverId: context.serverId, + workspaceId: context.workspaceId, + latency_ms, + failure_reason, + err: error instanceof Error ? error.message : String(error), + }) + + throw error // Re-throw to allow upstream handling (e.g. circuit breaker) } + } } diff --git a/apps/sim/lib/mcp/resilience/types.ts b/apps/sim/lib/mcp/resilience/types.ts index 4338078062f..bbcb4ee9569 100644 --- a/apps/sim/lib/mcp/resilience/types.ts +++ b/apps/sim/lib/mcp/resilience/types.ts @@ -17,9 +17,7 @@ export interface McpExecutionContext { /** * Standardized function signature for invoking the NEXT component in the pipeline */ -export type McpMiddlewareNext = ( - context: McpExecutionContext -) => Promise +export type McpMiddlewareNext = (context: McpExecutionContext) => Promise /** * Interface that all Resilience Middlewares must implement @@ -30,8 +28,5 @@ export interface McpMiddleware { * @param context The current execution context * @param next The next middleware/tool in the chain */ - execute( - context: McpExecutionContext, - next: McpMiddlewareNext - ): Promise + execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise } diff --git a/apps/sim/lib/mcp/service.ts b/apps/sim/lib/mcp/service.ts index 63a9f3f76af..e1c36bc9996 100644 --- a/apps/sim/lib/mcp/service.ts +++ b/apps/sim/lib/mcp/service.ts @@ -11,6 +11,10 @@ import { generateRequestId } from '@/lib/core/utils/request' import { McpClient } from '@/lib/mcp/client' import { mcpConnectionManager } from '@/lib/mcp/connection-manager' import { isMcpDomainAllowed, validateMcpDomain } from '@/lib/mcp/domain-check' +import { CircuitBreakerMiddleware } from '@/lib/mcp/resilience/circuit-breaker' +import { ResiliencePipeline } from '@/lib/mcp/resilience/pipeline' +import { SchemaValidatorMiddleware } from '@/lib/mcp/resilience/schema-validator' +import { TelemetryMiddleware } from '@/lib/mcp/resilience/telemetry' import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config' import { createMcpCacheAdapter, @@ -27,10 +31,6 @@ import type { McpTransport, } from '@/lib/mcp/types' import { MCP_CONSTANTS } from '@/lib/mcp/utils' -import { ResiliencePipeline } from '@/lib/mcp/resilience/pipeline' -import { TelemetryMiddleware } from '@/lib/mcp/resilience/telemetry' -import { CircuitBreakerMiddleware } from '@/lib/mcp/resilience/circuit-breaker' -import { SchemaValidatorMiddleware } from '@/lib/mcp/resilience/schema-validator' const logger = createLogger('McpService') diff --git a/apps/sim/tsconfig.json b/apps/sim/tsconfig.json index ed09bbfc4f5..adb44824922 100644 --- a/apps/sim/tsconfig.json +++ b/apps/sim/tsconfig.json @@ -3,60 +3,24 @@ "compilerOptions": { "baseUrl": ".", "paths": { - "@/*": [ - "./*" - ], - "@/components/*": [ - "components/*" - ], - "@/lib/*": [ - "./lib/*" - ], - "@/stores": [ - "./stores" - ], - "@/stores/*": [ - "./stores/*" - ], - "@/hooks/*": [ - "./hooks/*" - ], - "@/blocks": [ - "./blocks" - ], - "@/blocks/*": [ - "./blocks/*" - ], - "@/providers/*": [ - "./providers/*" - ], - "@/providers": [ - "./providers" - ], - "@/tools": [ - "./tools" - ], - "@/tools/*": [ - "./tools/*" - ], - "@/serializer": [ - "./serializer" - ], - "@/serializer/*": [ - "./serializer/*" - ], - "@sim/db": [ - "../../packages/db" - ], - "@sim/db/*": [ - "../../packages/db/*" - ], - "@/executor": [ - "./executor" - ], - "@/executor/*": [ - "./executor/*" - ] + "@/*": ["./*"], + "@/components/*": ["components/*"], + "@/lib/*": ["./lib/*"], + "@/stores": ["./stores"], + "@/stores/*": ["./stores/*"], + "@/hooks/*": ["./hooks/*"], + "@/blocks": ["./blocks"], + "@/blocks/*": ["./blocks/*"], + "@/providers/*": ["./providers/*"], + "@/providers": ["./providers"], + "@/tools": ["./tools"], + "@/tools/*": ["./tools/*"], + "@/serializer": ["./serializer"], + "@/serializer/*": ["./serializer/*"], + "@sim/db": ["../../packages/db"], + "@sim/db/*": ["../../packages/db/*"], + "@/executor": ["./executor"], + "@/executor/*": ["./executor/*"] } }, "include": [ @@ -74,4 +38,4 @@ "vitest.config.ts", "vitest.setup.ts" ] -} \ No newline at end of file +} diff --git a/packages/logger/package.json b/packages/logger/package.json index 6a6a20e92e8..6602fe0aaba 100644 --- a/packages/logger/package.json +++ b/packages/logger/package.json @@ -27,4 +27,4 @@ "typescript": "^5.7.3", "vitest": "^3.0.8" } -} \ No newline at end of file +}