Skip to content
Open
79 changes: 79 additions & 0 deletions apps/sim/lib/mcp/resilience/ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# MCP Resilience Pipeline

**Objective:** Upgrade the core `executeTool` engine from a naive proxy to an Enterprise-Grade Resilience Pipeline, ensuring our AI workflows never suffer cascading failures from downstream server instability or LLM hallucinations.

---

## 1. The "Thundering Herd" Problem (Circuit Breaker)

**Before:**
When a downstream provider (e.g., a database or API) experienced latency or went down, our workflow engine would continuously retry. If 1,000 agents hit a struggling server simultaneously, they would overwhelm it (a DDOS-like "thundering herd"), crash our workflow executor, and severely degrade user experience across the platform.

**After (`CircuitBreakerMiddleware`):**
We implemented an intelligent State Machine with a **HALF-OPEN Concurrency Semaphore**.
- **The Trip:** If a server fails 3 times, we cut the circuit (`OPEN` state). All subsequent requests instantly *fast-fail* locally (0ms latency), protecting the downstream server from being hammered.
- **The Elegant Recovery:** After a cooldown, we allow exactly **one** probe request through (`HALF-OPEN`). If it succeeds, the circuit closes. If it fails, it trips again.

#### Live Demo Output

```mermaid
sequenceDiagram
participant Agent
participant Pipeline
participant TargetServer

Agent->>Pipeline: executeTool (Server Down)
Pipeline--xTargetServer: ❌ Fails (Attempt 1-3)
Note over Pipeline: 🔴 Tripped to OPEN
Agent->>Pipeline: executeTool
Pipeline-->>Agent: 🛑 Fast-Fail (0ms latency) - Target Protected
Note over Pipeline: ⏳ Cooldown... 🟡 HALF-OPEN
Agent->>Pipeline: executeTool (Probe)
Pipeline-->>TargetServer: Exact 1 request allowed
TargetServer-->>Pipeline: ✅ Success
Note over Pipeline: 🟢 Reset to CLOSED
Agent->>Pipeline: executeTool
Pipeline-->>TargetServer: Resume normal traffic
```

---

## 2. LLM Hallucinated Arguments (Schema Validator)

**Before:**
If an LLM hallucinated arguments that didn't match a tool's JSON schema, the downstream server or our proxy would throw a fatal exception. The workflow would crash, requiring user intervention, and wasting the compute/tokens already spent.

**After (`SchemaValidatorMiddleware`):**
We implemented high-performance **Zod Schema Caching**.
- We intercept the tool call *before* it leaves our system.
- If the schema is invalid, we do *not* crash. Instead, we return a gracefully formatted, native MCP error: `{ isError: true, content: "Schema validation failed: [Zod Error Details]" }`.
- **The Magic:** The LLM receives this error, realizes its mistake, and natively **self-corrects** on the next turn, achieving autonomous self-healing without dropping the user's workflow.

---

## 3. The "Black Box" Problem (Telemetry)

**Before:**
If a tool execution tool 10 seconds or failed, we had no granular visibility into *why*. Was it a network timeout? A validation error? A 500 from the target?

**After (`TelemetryMiddleware`):**
Every single tool execution now generates rich metadata:
- `latency_ms`
- Exact `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`)
- `serverId` and `workspaceId`

This allows us to build real-time monitoring dashboards to detect struggling third-party integrations before our users even report them.

---

## Architectural Impact: The Composable Pipeline

Perhaps the most significant engineering achievement is the **Architecture Shift**. We moved away from a brittle, monolithic proxy to a modern **Chain of Responsibility**.

```typescript
// The new elegant implementation in McpService
this.pipeline = new ResiliencePipeline()
.use(this.telemetry)
.use(this.schemaValidator)
.use(this.circuitBreaker)
```
45 changes: 45 additions & 0 deletions apps/sim/lib/mcp/resilience/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
### Part 1: Telemetry Hooks
Implement the foundation for tracking.
*(Change Rationale: Transitioning to a middleware pattern instead of a monolithic proxy, allowing telemetry to be composed easily).*
#### [NEW] `apps/sim/lib/mcp/resilience/telemetry.ts`
- Implement telemetry middleware hook to capture `latency_ms` and `failure_reason` (e.g., `TIMEOUT`, `VALIDATION_ERROR`, `API_500`).

### Part 2: Circuit Breaker State Machine
Implement the state management logic.
*(Change Rationale: Added a HALF-OPEN concurrency lock (semaphore) to prevent the "thundering herd" issue on the downstream server. Documented that this operates on local, per-instance state using an LRU cache to prevent memory leaks).*
#### [NEW] `apps/sim/lib/mcp/resilience/circuit-breaker.ts`
- Implement the `CircuitBreaker` middleware with states: `CLOSED`, `OPEN`, and `HALF-OPEN`.
- Handle failure thresholds, reset timeouts, and logic for failing fast.
- **Concurrency Lock:** During `HALF-OPEN`, strictly gate the transition so only **one** probe request is allowed through. All other concurrent requests will fail-fast until the probe resolves.
- **Memory & State:** Use an LRU cache or scoped ties for the CircuitBreaker registry, binding the lifecycle of the breaker explicitly to the lifecycle of the MCP connection to prevent memory leaks. Also, this operates on local, per-instance state.

### Part 3: Schema Validation
Implement the Zod validation logic for LLM arguments.
*(Change Rationale: Added schema compilation caching to avoid severe CPU bottlenecking per request, and returning `isError: true` on validation failures to natively trigger LLM self-correction).*
#### [NEW] `apps/sim/lib/mcp/resilience/schema-validator.ts`
- Logic to enforce schemas using `Zod` as a middleware.
- **Schema Caching:** Compile JSON Schemas to Zod schemas and cache them in a registry mapped to `toolId` during the initial discovery phase or lazily on first compile. Flush cached validators dynamically when listening for MCP lifecycle events (e.g., mid-session tool list updates).
- **LLM Self-Correction:** Instead of throwing exceptions that crash the workflow engine when Zod validation fails, intercept validation errors and return a gracefully formatted MCP execution result: `{ isError: true, content: [{ type: "text", text: "Schema validation failed: [Zod Error Details]" }] }`.

### Part 4: Resilience Pipeline Integration
Wrap up the tools via a Pipeline instead of a monolithic proxy.
*(Change Rationale: Switched from a God Object Proxy to a Middleware Pipeline to support granular, per-tool enablement).*
#### [NEW] `apps/sim/lib/mcp/resilience/pipeline.ts`
- Implement a chain of responsibility (interceptor/middleware pipeline) for `executeTool`.
- Provide an API like `executeTool.use(telemetry).use(validate(cachedSchema)).use(circuitBreaker(config))` rather than a sequential sequence inside a rigid class.
- This composable architecture allows enabling or disabling specific middlewares dynamically per tool (e.g., un-trusted vs internal tools).

#### [MODIFY] `apps/sim/lib/mcp/service.ts`
- Update `mcpService.executeTool` to run requests through the configurable `ResiliencePipeline`, rather than hardcoded proxy logic.

## Verification Plan
### Automated Tests
- Create a mock MCP server execution test suite.
- Write tests in `apps/sim/lib/mcp/resilience/pipeline.test.ts` to assert:
- Circuit Breaker trips to `OPEN` on simulated `API_500` and trips to `HALF-OPEN` after a cooldown.
- **New Test:** Verify HALF-OPEN strictly allows exactly **one** simulated concurrent probe request through.
- **New Test:** Schema validation returns `isError: true` standard format for improper LLM args without triggering execution.
- Telemetry correctly logs latency.

### Manual Verification
- Execute tests generating visual output demonstrating the circuit breaker "tripping" and "recovering".
143 changes: 143 additions & 0 deletions apps/sim/lib/mcp/resilience/circuit-breaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { createLogger } from '@sim/logger'
import type { McpToolResult } from '@/lib/mcp/types'
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'

// Configure standard cache size limit
const MAX_SERVER_STATES = 1000

export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN'

export interface CircuitBreakerConfig {
/** Number of failures before tripping to OPEN */
failureThreshold: number
/** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */
resetTimeoutMs: number
}

interface ServerState {
state: CircuitState
failures: number
nextAttemptMs: number
isHalfOpenProbing: boolean
}

const logger = createLogger('mcp:resilience:circuit-breaker')

export class CircuitBreakerMiddleware implements McpMiddleware {
// Use a Map to maintain insertion order for standard LRU-like eviction if necessary.
// We constrain it to prevent memory leaks if thousands of ephemeral servers connect.
private registry = new Map<string, ServerState>()
private config: CircuitBreakerConfig

constructor(config: Partial<CircuitBreakerConfig> = {}) {
this.config = {
failureThreshold: config.failureThreshold ?? 5,
resetTimeoutMs: config.resetTimeoutMs ?? 30000,
}
}

private getState(serverId: string): ServerState {
let state = this.registry.get(serverId)
if (!state) {
state = {
state: 'CLOSED',
failures: 0,
nextAttemptMs: 0,
isHalfOpenProbing: false,
}
this.registry.set(serverId, state)
this.evictIfNecessary()
}
return state
}

private evictIfNecessary() {
if (this.registry.size > MAX_SERVER_STATES) {
// Evict the oldest entry (first inserted)
const firstKey = this.registry.keys().next().value
if (firstKey) {
this.registry.delete(firstKey)
}
}
}

async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult> {
const { serverId, toolCall } = context
const serverState = this.getState(serverId)

// 1. Check current state and evaluate timeouts
if (serverState.state === 'OPEN') {
if (Date.now() > serverState.nextAttemptMs) {
// Time to try again, enter HALF-OPEN
logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`)
serverState.state = 'HALF-OPEN'
serverState.isHalfOpenProbing = false
} else {
// Fast-fail
throw new Error(
`Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.`
)
}
}

if (serverState.state === 'HALF-OPEN') {
if (serverState.isHalfOpenProbing) {
// Another request is already probing. Fast-fail concurrent requests.
throw new Error(
`Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.`
)
}
// We are the chosen ones. Lock it down.
serverState.isHalfOpenProbing = true
}

try {
// 2. Invoke the next layer
const result = await next(context)

// 3. Handle result parsing (isError = true counts as failure for us)
if (result.isError) {
this.recordFailure(serverId, serverState)
} else {
this.recordSuccess(serverId, serverState)
}

return result
} catch (error) {
// Note: we record failure on ANY exception
this.recordFailure(serverId, serverState)
throw error // Re-throw to caller
}
}

private recordSuccess(serverId: string, state: ServerState) {
if (state.state !== 'CLOSED') {
logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`)
}
state.state = 'CLOSED'
state.failures = 0
state.isHalfOpenProbing = false
}

private recordFailure(serverId: string, state: ServerState) {
if (state.state === 'HALF-OPEN') {
// The probe failed! Trip immediately back to OPEN.
logger.warn(`Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}`)
this.tripToOpen(state)
} else if (state.state === 'CLOSED') {
state.failures++
if (state.failures >= this.config.failureThreshold) {
logger.error(
`Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}`
)
this.tripToOpen(state)
}
}
}

private tripToOpen(state: ServerState) {
state.state = 'OPEN'
state.isHalfOpenProbing = false
state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs
}
}
90 changes: 90 additions & 0 deletions apps/sim/lib/mcp/resilience/demo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { CircuitBreakerMiddleware } from './circuit-breaker'
import { ResiliencePipeline } from './pipeline'
import { SchemaValidatorMiddleware } from './schema-validator'
import { TelemetryMiddleware } from './telemetry'
import type { McpExecutionContext } from './types'
Comment on lines +1 to +5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

demo.ts should not live in the library source tree

This file is a runnable demonstration script that uses console.log, hard-coded timeouts, and a mutable module-level counter. Including it inside apps/sim/lib/mcp/resilience/ means it is part of the production bundle namespace and will be traversed by any tooling that imports from this directory.

Consider moving it to a dedicated scripts/ or examples/ directory (or removing it entirely in favour of the tests in pipeline.test.ts).


// Setup Pipeline with a fast 1.5s reset timeout for the demo
const pipeline = new ResiliencePipeline()
.use(new TelemetryMiddleware())
.use(new SchemaValidatorMiddleware())
.use(new CircuitBreakerMiddleware({ failureThreshold: 3, resetTimeoutMs: 1500 }))

const mockContext: McpExecutionContext = {
toolCall: { name: 'flaky_tool', arguments: {} },
serverId: 'demo-server',
userId: 'demo-user',
workspaceId: 'demo-workspace',
}

let attemptTracker = 0

// A mock downstream MCP execution handler that fails the first 4 times, then succeeds
const mockExecuteTool = async () => {
attemptTracker++
console.log(`\n--- Request #${attemptTracker} ---`)

// Simulate network latency
await new Promise((r) => setTimeout(r, 50))

if (attemptTracker <= 3) {
throw new Error('Connection Refused: Target server is down!')
}

return { content: [{ type: 'text', text: 'Success! Target server is back online.' }] }
}

async function runDemo() {
console.log('🚀 Starting Resilience Pipeline Demo...\n')

// Attempt 1: CLOSED -> Fails
try {
await pipeline.execute(mockContext, mockExecuteTool)
} catch (e: any) {
console.error(`❌ Result: ${e.message}`)
}

// Attempt 2: CLOSED -> Fails
try {
await pipeline.execute(mockContext, mockExecuteTool)
} catch (e: any) {
console.error(`❌ Result: ${e.message}`)
}

// Attempt 3: CLOSED -> Fails (Hits threshold, trips to OPEN)
try {
await pipeline.execute(mockContext, mockExecuteTool)
} catch (e: any) {
console.error(`❌ Result: ${e.message}`)
}

// Attempt 4: OPEN (Fast fails immediately without hitting downstream mockExecuteTool)
try {
await pipeline.execute(mockContext, mockExecuteTool)
} catch (e: any) {
console.error(`🛑 Fast-Fail Result: ${e.message}`)
}

console.log('\n⏳ Waiting 2 seconds for Circuit Breaker to cool down...')
await new Promise((r) => setTimeout(r, 2000))

// Attempt 5: HALF-OPEN -> Succeeds! (Transitions back to CLOSED)
try {
const result = await pipeline.execute(mockContext, mockExecuteTool)
console.log(`✅ Result: ${result.content?.[0].text}`)
} catch (e: any) {
console.error(`❌ Result: ${e.message}`)
}

// Attempt 6: CLOSED -> Succeeds normally
try {
const result = await pipeline.execute(mockContext, mockExecuteTool)
console.log(`✅ Result: ${result.content?.[0].text}`)
} catch (e: any) {
console.error(`❌ Result: ${e.message}`)
}

console.log('\n🎉 Demo Complete!')
}

runDemo().catch(console.error)
Loading