-
Notifications
You must be signed in to change notification settings - Fork 3.4k
feat(core): Implement Resilience Pipeline for MCP Execution #3498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Shanyu-Dabbiru
wants to merge
9
commits into
simstudioai:main
Choose a base branch
from
Shanyu-Dabbiru:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,011
−1
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
6eb788a
type contracts + impl plan created
Shanyu-Dabbiru 7b23bc1
chore(mcp): config setup for types and dependencies
Shanyu-Dabbiru 80917d2
feat(mcp): implement telemetry middleware
Shanyu-Dabbiru df97084
feat(mcp): implement resilience middleware pipeline
Shanyu-Dabbiru 1dd4353
test(mcp): implement resilience pipeline suite
Shanyu-Dabbiru a94e66b
test(mcp): fix telemetry middleware test assertions for custom logger
Shanyu-Dabbiru df2dbb6
feat(mcp): integrate resilience pipeline into McpService
Shanyu-Dabbiru e1b3d1f
feat(mcp): implement enterprise resilience pipeline (circuit breaker,…
Shanyu-Dabbiru 3bcb102
style: run biome formatting across resilience pipeline
Shanyu-Dabbiru File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| # MCP Resilience Pipeline | ||
|
|
||
| **Objective:** Upgrade the core `executeTool` engine from a naive proxy to an Enterprise-Grade Resilience Pipeline, ensuring our AI workflows never suffer cascading failures from downstream server instability or LLM hallucinations. | ||
|
|
||
| --- | ||
|
|
||
| ## 1. The "Thundering Herd" Problem (Circuit Breaker) | ||
|
|
||
| **Before:** | ||
| When a downstream provider (e.g., a database or API) experienced latency or went down, our workflow engine would continuously retry. If 1,000 agents hit a struggling server simultaneously, they would overwhelm it (a DDOS-like "thundering herd"), crash our workflow executor, and severely degrade user experience across the platform. | ||
|
|
||
| **After (`CircuitBreakerMiddleware`):** | ||
| We implemented an intelligent State Machine with a **HALF-OPEN Concurrency Semaphore**. | ||
| - **The Trip:** If a server fails 3 times, we cut the circuit (`OPEN` state). All subsequent requests instantly *fast-fail* locally (0ms latency), protecting the downstream server from being hammered. | ||
| - **The Elegant Recovery:** After a cooldown, we allow exactly **one** probe request through (`HALF-OPEN`). If it succeeds, the circuit closes. If it fails, it trips again. | ||
|
|
||
| #### Live Demo Output | ||
|
|
||
| ```mermaid | ||
| sequenceDiagram | ||
| participant Agent | ||
| participant Pipeline | ||
| participant TargetServer | ||
|
|
||
| Agent->>Pipeline: executeTool (Server Down) | ||
| Pipeline--xTargetServer: ❌ Fails (Attempt 1-3) | ||
| Note over Pipeline: 🔴 Tripped to OPEN | ||
| Agent->>Pipeline: executeTool | ||
| Pipeline-->>Agent: 🛑 Fast-Fail (0ms latency) - Target Protected | ||
| Note over Pipeline: ⏳ Cooldown... 🟡 HALF-OPEN | ||
| Agent->>Pipeline: executeTool (Probe) | ||
| Pipeline-->>TargetServer: Exact 1 request allowed | ||
| TargetServer-->>Pipeline: ✅ Success | ||
| Note over Pipeline: 🟢 Reset to CLOSED | ||
| Agent->>Pipeline: executeTool | ||
| Pipeline-->>TargetServer: Resume normal traffic | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## 2. LLM Hallucinated Arguments (Schema Validator) | ||
|
|
||
| **Before:** | ||
| If an LLM hallucinated arguments that didn't match a tool's JSON schema, the downstream server or our proxy would throw a fatal exception. The workflow would crash, requiring user intervention, and wasting the compute/tokens already spent. | ||
|
|
||
| **After (`SchemaValidatorMiddleware`):** | ||
| We implemented high-performance **Zod Schema Caching**. | ||
| - We intercept the tool call *before* it leaves our system. | ||
| - If the schema is invalid, we do *not* crash. Instead, we return a gracefully formatted, native MCP error: `{ isError: true, content: "Schema validation failed: [Zod Error Details]" }`. | ||
| - **The Magic:** The LLM receives this error, realizes its mistake, and natively **self-corrects** on the next turn, achieving autonomous self-healing without dropping the user's workflow. | ||
|
|
||
| --- | ||
|
|
||
| ## 3. The "Black Box" Problem (Telemetry) | ||
|
|
||
| **Before:** | ||
| If a tool execution tool 10 seconds or failed, we had no granular visibility into *why*. Was it a network timeout? A validation error? A 500 from the target? | ||
|
|
||
| **After (`TelemetryMiddleware`):** | ||
| Every single tool execution now generates rich metadata: | ||
| - `latency_ms` | ||
| - Exact `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`) | ||
| - `serverId` and `workspaceId` | ||
|
|
||
| This allows us to build real-time monitoring dashboards to detect struggling third-party integrations before our users even report them. | ||
|
|
||
| --- | ||
|
|
||
| ## Architectural Impact: The Composable Pipeline | ||
|
|
||
| Perhaps the most significant engineering achievement is the **Architecture Shift**. We moved away from a brittle, monolithic proxy to a modern **Chain of Responsibility**. | ||
|
|
||
| ```typescript | ||
| // The new elegant implementation in McpService | ||
| this.pipeline = new ResiliencePipeline() | ||
| .use(this.telemetry) | ||
| .use(this.schemaValidator) | ||
| .use(this.circuitBreaker) | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| ### Part 1: Telemetry Hooks | ||
| Implement the foundation for tracking. | ||
| *(Change Rationale: Transitioning to a middleware pattern instead of a monolithic proxy, allowing telemetry to be composed easily).* | ||
| #### [NEW] `apps/sim/lib/mcp/resilience/telemetry.ts` | ||
| - Implement telemetry middleware hook to capture `latency_ms` and `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`). | ||
|
|
||
| ### Part 2: Circuit Breaker State Machine | ||
| Implement the state management logic. | ||
| *(Change Rationale: Added a HALF-OPEN concurrency lock (semaphore) to prevent the "thundering herd" issue on the downstream server. Documented that this operates on local, per-instance state using an LRU cache to prevent memory leaks).* | ||
| #### [NEW] `apps/sim/lib/mcp/resilience/circuit-breaker.ts` | ||
| - Implement the `CircuitBreaker` middleware with states: `CLOSED`, `OPEN`, and `HALF-OPEN`. | ||
| - Handle failure thresholds, reset timeouts, and logic for failing fast. | ||
| - **Concurrency Lock:** During `HALF-OPEN`, strictly gate the transition so only **one** probe request is allowed through. All other concurrent requests will fail-fast until the probe resolves. | ||
| - **Memory & State:** Use an LRU cache or scoped ties for the CircuitBreaker registry, binding the lifecycle of the breaker explicitly to the lifecycle of the MCP connection to prevent memory leaks. Also, this operates on local, per-instance state. | ||
|
|
||
| ### Part 3: Schema Validation | ||
| Implement the Zod validation logic for LLM arguments. | ||
| *(Change Rationale: Added schema compilation caching to avoid severe CPU bottlenecking per request, and returning `isError: true` on validation failures to natively trigger LLM self-correction).* | ||
| #### [NEW] `apps/sim/lib/mcp/resilience/schema-validator.ts` | ||
| - Logic to enforce schemas using `Zod` as a middleware. | ||
| - **Schema Caching:** Compile JSON Schemas to Zod schemas and cache them in a registry mapped to `toolId` during the initial discovery phase or lazily on first compile. Flush cached validators dynamically when listening for MCP lifecycle events (e.g., mid-session tool list updates). | ||
| - **LLM Self-Correction:** Instead of throwing exceptions that crash the workflow engine when Zod validation fails, intercept validation errors and return a gracefully formatted MCP execution result: `{ isError: true, content: [{ type: "text", text: "Schema validation failed: [Zod Error Details]" }] }`. | ||
|
|
||
| ### Part 4: Resilience Pipeline Integration | ||
| Wrap up the tools via a Pipeline instead of a monolithic proxy. | ||
| *(Change Rationale: Switched from a God Object Proxy to a Middleware Pipeline to support granular, per-tool enablement).* | ||
| #### [NEW] `apps/sim/lib/mcp/resilience/pipeline.ts` | ||
| - Implement a chain of responsibility (interceptor/middleware pipeline) for `executeTool`. | ||
| - Provide an API like `executeTool.use(telemetry).use(validate(cachedSchema)).use(circuitBreaker(config))` rather than a sequential sequence inside a rigid class. | ||
| - This composable architecture allows enabling or disabling specific middlewares dynamically per tool (e.g., un-trusted vs internal tools). | ||
|
|
||
| #### [MODIFY] `apps/sim/lib/mcp/service.ts` | ||
| - Update `mcpService.executeTool` to run requests through the configurable `ResiliencePipeline`, rather than hardcoded proxy logic. | ||
|
|
||
| ## Verification Plan | ||
| ### Automated Tests | ||
| - Create a mock MCP server execution test suite. | ||
| - Write tests in `apps/sim/lib/mcp/resilience/pipeline.test.ts` to assert: | ||
| - Circuit Breaker trips to `OPEN` on simulated `API_500` and trips to `HALF-OPEN` after a cooldown. | ||
| - **New Test:** Verify HALF-OPEN strictly allows exactly **one** simulated concurrent probe request through. | ||
| - **New Test:** Schema validation returns `isError: true` standard format for improper LLM args without triggering execution. | ||
| - Telemetry correctly logs latency. | ||
|
|
||
| ### Manual Verification | ||
| - Execute tests generating visual output demonstrating the circuit breaker "tripping" and "recovering". |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| import { createLogger } from '@sim/logger' | ||
| import type { McpToolResult } from '@/lib/mcp/types' | ||
| import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types' | ||
|
|
||
| // Configure standard cache size limit | ||
| const MAX_SERVER_STATES = 1000 | ||
|
|
||
| export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN' | ||
|
|
||
| export interface CircuitBreakerConfig { | ||
| /** Number of failures before tripping to OPEN */ | ||
| failureThreshold: number | ||
| /** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */ | ||
| resetTimeoutMs: number | ||
| } | ||
|
|
||
| interface ServerState { | ||
| state: CircuitState | ||
| failures: number | ||
| nextAttemptMs: number | ||
| isHalfOpenProbing: boolean | ||
| } | ||
|
|
||
| const logger = createLogger('mcp:resilience:circuit-breaker') | ||
|
|
||
| export class CircuitBreakerMiddleware implements McpMiddleware { | ||
| // Use a Map to maintain insertion order for standard LRU-like eviction if necessary. | ||
| // We constrain it to prevent memory leaks if thousands of ephemeral servers connect. | ||
| private registry = new Map<string, ServerState>() | ||
| private config: CircuitBreakerConfig | ||
|
|
||
| constructor(config: Partial<CircuitBreakerConfig> = {}) { | ||
| this.config = { | ||
| failureThreshold: config.failureThreshold ?? 5, | ||
| resetTimeoutMs: config.resetTimeoutMs ?? 30000, | ||
| } | ||
| } | ||
|
|
||
| private getState(serverId: string): ServerState { | ||
| let state = this.registry.get(serverId) | ||
| if (!state) { | ||
| state = { | ||
| state: 'CLOSED', | ||
| failures: 0, | ||
| nextAttemptMs: 0, | ||
| isHalfOpenProbing: false, | ||
| } | ||
| this.registry.set(serverId, state) | ||
| this.evictIfNecessary() | ||
| } | ||
| return state | ||
| } | ||
|
|
||
| private evictIfNecessary() { | ||
| if (this.registry.size > MAX_SERVER_STATES) { | ||
| // Evict the oldest entry (first inserted) | ||
| const firstKey = this.registry.keys().next().value | ||
| if (firstKey) { | ||
| this.registry.delete(firstKey) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult> { | ||
| const { serverId, toolCall } = context | ||
| const serverState = this.getState(serverId) | ||
|
|
||
| // 1. Check current state and evaluate timeouts | ||
| if (serverState.state === 'OPEN') { | ||
| if (Date.now() > serverState.nextAttemptMs) { | ||
| // Time to try again, enter HALF-OPEN | ||
| logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`) | ||
| serverState.state = 'HALF-OPEN' | ||
| serverState.isHalfOpenProbing = false | ||
| } else { | ||
| // Fast-fail | ||
| throw new Error( | ||
| `Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.` | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| if (serverState.state === 'HALF-OPEN') { | ||
| if (serverState.isHalfOpenProbing) { | ||
| // Another request is already probing. Fast-fail concurrent requests. | ||
| throw new Error( | ||
| `Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.` | ||
| ) | ||
| } | ||
| // We are the chosen ones. Lock it down. | ||
| serverState.isHalfOpenProbing = true | ||
| } | ||
|
|
||
| try { | ||
| // 2. Invoke the next layer | ||
| const result = await next(context) | ||
|
|
||
| // 3. Handle result parsing (isError = true counts as failure for us) | ||
| if (result.isError) { | ||
| this.recordFailure(serverId, serverState) | ||
| } else { | ||
| this.recordSuccess(serverId, serverState) | ||
| } | ||
|
|
||
| return result | ||
| } catch (error) { | ||
| // Note: we record failure on ANY exception | ||
| this.recordFailure(serverId, serverState) | ||
| throw error // Re-throw to caller | ||
| } | ||
| } | ||
|
|
||
| private recordSuccess(serverId: string, state: ServerState) { | ||
| if (state.state !== 'CLOSED') { | ||
| logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`) | ||
| } | ||
| state.state = 'CLOSED' | ||
| state.failures = 0 | ||
| state.isHalfOpenProbing = false | ||
| } | ||
|
|
||
| private recordFailure(serverId: string, state: ServerState) { | ||
| if (state.state === 'HALF-OPEN') { | ||
| // The probe failed! Trip immediately back to OPEN. | ||
| logger.warn(`Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}`) | ||
| this.tripToOpen(state) | ||
| } else if (state.state === 'CLOSED') { | ||
| state.failures++ | ||
| if (state.failures >= this.config.failureThreshold) { | ||
| logger.error( | ||
| `Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}` | ||
| ) | ||
| this.tripToOpen(state) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private tripToOpen(state: ServerState) { | ||
| state.state = 'OPEN' | ||
| state.isHalfOpenProbing = false | ||
| state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import { CircuitBreakerMiddleware } from './circuit-breaker' | ||
| import { ResiliencePipeline } from './pipeline' | ||
| import { SchemaValidatorMiddleware } from './schema-validator' | ||
| import { TelemetryMiddleware } from './telemetry' | ||
| import type { McpExecutionContext } from './types' | ||
|
|
||
| // Setup Pipeline with a fast 1.5s reset timeout for the demo | ||
| const pipeline = new ResiliencePipeline() | ||
| .use(new TelemetryMiddleware()) | ||
| .use(new SchemaValidatorMiddleware()) | ||
| .use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 })) | ||
|
|
||
| const mockContext: McpExecutionContext = { | ||
| toolCall: { name: 'flaky_tool', arguments: {} }, | ||
| serverId: 'demo-server', | ||
| userId: 'demo-user', | ||
| workspaceId: 'demo-workspace', | ||
| } | ||
|
|
||
| let attemptTracker = 0 | ||
|
|
||
| // A mock downstream MCP execution handler that fails the first 4 times, then succeeds | ||
| const mockExecuteTool = async () => { | ||
| attemptTracker++ | ||
| console.log(`\n--- Request #${attemptTracker} ---`) | ||
|
|
||
| // Simulate network latency | ||
| await new Promise((r) => setTimeout(r, 50)) | ||
|
|
||
| if (attemptTracker <= 3) { | ||
| throw new Error('Connection Refused: Target server is down!') | ||
| } | ||
|
|
||
| return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] } | ||
| } | ||
|
|
||
| async function runDemo() { | ||
| console.log('🚀 Starting Resilience Pipeline Demo...\n') | ||
|
|
||
| // Attempt 1: CLOSED -> Fails | ||
| try { | ||
| await pipeline.execute(mockContext, mockExecuteTool) | ||
| } catch (e: any) { | ||
| console.error(`❌ Result: ${e.message}`) | ||
| } | ||
|
|
||
| // Attempt 2: CLOSED -> Fails | ||
| try { | ||
| await pipeline.execute(mockContext, mockExecuteTool) | ||
| } catch (e: any) { | ||
| console.error(`❌ Result: ${e.message}`) | ||
| } | ||
|
|
||
| // Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN) | ||
| try { | ||
| await pipeline.execute(mockContext, mockExecuteTool) | ||
| } catch (e: any) { | ||
| console.error(`❌ Result: ${e.message}`) | ||
| } | ||
|
|
||
| // Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool) | ||
| try { | ||
| await pipeline.execute(mockContext, mockExecuteTool) | ||
| } catch (e: any) { | ||
| console.error(`🛑 Fast-Fail Result: ${e.message}`) | ||
| } | ||
|
|
||
| console.log('\n⏳ Waiting 2 seconds for Circuit Breaker to cool down...') | ||
| await new Promise((r) => setTimeout(r, 2000)) | ||
|
|
||
| // Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED) | ||
| try { | ||
| const result = await pipeline.execute(mockContext, mockExecuteTool) | ||
| console.log(`✅ Result: ${result.content?.[0].text}`) | ||
| } catch (e: any) { | ||
| console.error(`❌ Result: ${e.message}`) | ||
| } | ||
|
|
||
| // Attempt 6: CLOSED -> Succeeds normally | ||
| try { | ||
| const result = await pipeline.execute(mockContext, mockExecuteTool) | ||
| console.log(`✅ Result: ${result.content?.[0].text}`) | ||
| } catch (e: any) { | ||
| console.error(`❌ Result: ${e.message}`) | ||
| } | ||
|
|
||
| console.log('\n🎉 Demo Complete!') | ||
| } | ||
|
|
||
| runDemo().catch(console.error) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
demo.tsshould not live in the library source treeThis file is a runnable demonstration script that uses
console.log, hard-coded timeouts, and a mutable module-level counter. Including it insideapps/sim/lib/mcp/resilience/means it is part of the production bundle namespace and will be traversed by any tooling that imports from this directory.Consider moving it to a dedicated
scripts/orexamples/directory (or removing it entirely in favour of the tests inpipeline.test.ts).