Conversation
Design spec for PLT-921: synchronous pipeline orchestrator with per-packet observability hooks, node protocols, and orchestrator-driven execution model. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move design specs from docs/superpowers/ to superpowers/ at project root. The docs/ directory is reserved for actual library documentation. Updated CLAUDE.md and .zed/rules with the new convention. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- process_packet on FunctionNode is now pure computation (no DB side effects) - store_result added to all three node protocols as uniform persistence step - Orchestrator always calls store_result; nodes decide internally what to do - FunctionNode.store_result handles both result cache and pipeline record writes - SourceNode.store_result persists source snapshot to pipeline DB if configured - Added FunctionNode purity tests and store_result tests to testing strategy Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Key design clarification: process_packet handles computation + function-level memoization (CachedFunctionPod, result DB). store_result handles pipeline provenance recording only (pipeline DB). This reflects two distinct persistence concerns: function-level memoization transcends pipelines, while pipeline provenance is execution-specific. Also fixes from plan review: - OperatorNodeProtocol uses public operator property (not _operator) - _materialize_as_stream uses selective columns matching existing pattern - populate_cache handles empty lists gracefully - _iter_packets_concurrent updated in backward compat list - No CachedFunctionPod modification needed (no private access) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…trator Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hestratorResult Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…or support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…che, operator property Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dd get_cached_results and populate_cache Split process_packet into compute-only (process_packet) and pipeline-provenance-only (store_result), with _process_and_store_packet for backward-compatible iter_packets behavior. Added get_cached_results for pipeline entry ID lookups and populate_cache for external cache population. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename orchestrator.py to async_orchestrator.py, add orchestrator parameter to Pipeline.run(), and default to SyncPipelineOrchestrator for synchronous execution. Add _apply_results() to populate node caches from orchestrator output. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new synchronous “push-style” pipeline execution path via a SyncPipelineOrchestrator, adds node-facing protocols + observer hooks for per-packet observability, and refactors node persistence/caching responsibilities so Pipeline.run() can delegate execution cleanly.
Changes:
- Add
SyncPipelineOrchestratorwith per-node and per-packet observer hooks, returning anOrchestratorResultfor optional cache population. - Introduce node protocols (
SourceNodeProtocol,FunctionNodeProtocol,OperatorNodeProtocol) with TypeGuard dispatch helpers. - Refactor nodes to expose a uniform compute/store/cache interface (
store_result(),populate_cache(), plus function-node cache helpers), and updatePipeline.run()to default to the sync orchestrator.
Reviewed changes
Copilot reviewed 20 out of 23 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/pipeline/sync_orchestrator.py |
New sync orchestrator that topologically executes the compiled node graph with buffers + observer hooks. |
src/orcapod/pipeline/graph.py |
Adds orchestrator parameter to Pipeline.run() and applies orchestrator results to node caches. |
src/orcapod/protocols/node_protocols.py |
New node protocols and TypeGuard helpers to decouple orchestrator from concrete node classes. |
src/orcapod/pipeline/observer.py |
Adds ExecutionObserver protocol and NoOpObserver. |
src/orcapod/pipeline/result.py |
Adds OrchestratorResult return type for orchestrators. |
src/orcapod/pipeline/async_orchestrator.py |
Renames/moves existing async orchestrator implementation. |
src/orcapod/pipeline/__init__.py |
Re-exports orchestrators and updates module import paths. |
src/orcapod/core/nodes/function_node.py |
Splits function computation vs pipeline provenance (process_packet vs store_result), adds cached lookup + cache population helpers. |
src/orcapod/core/nodes/operator_node.py |
Adds operator property, get_cached_output, store_result, and populate_cache. |
src/orcapod/core/nodes/source_node.py |
Adds in-memory cache population and cache-aware iter_packets; adds store_result hook (currently no-op). |
tests/test_pipeline/test_sync_orchestrator.py |
New orchestrator tests including observer hooks, integration with Pipeline.run(), and sync/async parity. |
tests/test_pipeline/test_observer.py |
Tests for ExecutionObserver/NoOpObserver. |
tests/test_protocols/test_node_protocols.py |
Tests for TypeGuard dispatch. |
tests/test_core/nodes/test_node_store_result.py |
Tests for store_result behavior across node types. |
tests/test_core/nodes/test_node_populate_cache.py |
Tests for populate_cache behavior across node types. |
tests/test_core/nodes/test_function_node_get_cached.py |
Tests for FunctionNode.get_cached_results. |
tests/test_core/function_pod/test_function_pod_node.py |
Updates expectations around pipeline provenance recording (moved to store step). |
superpowers/specs/2026-03-14-sync-orchestrator-design.md |
Design spec documenting orchestrator/protocol architecture and intended semantics. |
superpowers/plans/2026-03-14-sync-orchestrator-plan.md |
Implementation plan artifact for the new orchestrator/protocol refactor. |
CLAUDE.md / .zed/rules |
Repo guidance updated to place “superpowers” artifacts under superpowers/. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Nodes should self-cache during store_result and get_cached_results calls, eliminating the need for external populate_cache and Pipeline._apply_results(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tore_result Nodes now self-cache during store_result and get_cached_results calls, eliminating the need for Pipeline._apply_results to push results back into nodes after orchestrator execution. - Remove populate_cache from all three node protocols and implementations - SourceNode.store_result populates _cached_results - OperatorNode.store_result populates _cached_output_stream (and writes to DB when applicable) - FunctionNode.store_result populates _cached_output_packets - FunctionNode.get_cached_results also populates _cached_output_packets - Remove Pipeline._apply_results (no longer needed) - Delete test_node_populate_cache.py, add self-caching tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Nodes become self-validating, self-persisting executors. Remove store_result, add schema validation, add process() to FunctionNode and OperatorNode. Clean pod vs node distinction: pods return lazy streams, nodes return materialized lists. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…), remove store_result - process_packet now validates input schema, computes, writes pipeline records, and caches results (previously only computed) - Add _validate_input_schema and _validate_stream_schema for schema checks - Add process(input_stream) bulk method with stream-level validation - Rename _process_and_store_packet → _process_packet_internal (core compute+persist+cache without validation) - Remove store_result (merged into _process_packet_internal) - Rename _async_process_and_store_packet → _async_process_packet_internal - Remove standalone async_process_packet (merged into async internal) - Update sync_orchestrator, tests, and async execute tests accordingly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…result and operator property Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…packets call SourceNode now caches its packets internally on the first iter_packets() call instead of relying on the orchestrator to call store_result(). SourceNodeProtocol is updated to remove store_result, and the orchestrator no longer calls it after materializing source output. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ocess() FunctionNodeProtocol now declares process() instead of store_result, matching the node authority pattern where nodes handle their own persistence internally. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a new synchronous “push” execution path for pipelines via SyncPipelineOrchestrator, along with node protocols and observer hooks, while shifting toward a node-authority model where nodes validate, persist, and cache their own results.
Changes:
- Introduce
SyncPipelineOrchestrator,ExecutionObserver/NoOpObserver,OrchestratorResult, and node protocolTypeGuarddispatch. - Refactor nodes toward authority/self-caching with new
process()APIs and schema validation logic; integrate orchestrator selection intoPipeline.run(). - Add extensive unit/integration tests plus design/plan docs under
superpowers/.
Reviewed changes
Copilot reviewed 24 out of 27 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_protocols/test_node_protocols.py | Tests for node protocol TypeGuard dispatch helpers. |
| tests/test_protocols/init.py | Package marker for protocol tests. |
| tests/test_pipeline/test_sync_orchestrator.py | Integration tests for sync orchestrator + Pipeline.run() orchestration. |
| tests/test_pipeline/test_observer.py | Tests for ExecutionObserver protocol and NoOpObserver. |
| tests/test_core/nodes/test_node_process.py | Tests for node process()/process_packet() validation/persistence/caching. |
| tests/test_core/nodes/test_function_node_get_cached.py | Tests for FunctionNode.get_cached_results() behavior. |
| tests/test_core/nodes/init.py | Package marker for node tests. |
| tests/test_core/function_pod/test_function_pod_node.py | Updates to use new internal processing APIs in function node tests. |
| tests/test_channels/test_node_async_execute.py | Updates async/sequential routing tests to patch new internal methods. |
| superpowers/specs/2026-03-14-sync-orchestrator-design.md | Design doc for sync orchestrator + protocols/observer/result types. |
| superpowers/specs/2026-03-14-remove-populate-cache-design.md | Design doc for removing populate_cache and self-caching nodes. |
| superpowers/specs/2026-03-14-node-authority-design.md | Design doc for node-authority (self-validating/self-persisting) model. |
| superpowers/plans/2026-03-14-sync-orchestrator-plan.md | Implementation plan for sync orchestrator + protocols + integration. |
| superpowers/plans/2026-03-14-remove-populate-cache-plan.md | Implementation plan for removing populate_cache. |
| superpowers/plans/2026-03-14-node-authority-plan.md | Implementation plan for node-authority refactor. |
| src/orcapod/protocols/node_protocols.py | Defines node protocols and TypeGuard dispatch. |
| src/orcapod/pipeline/sync_orchestrator.py | Implements synchronous orchestrator with per-node/per-packet observer hooks. |
| src/orcapod/pipeline/result.py | Adds OrchestratorResult return type. |
| src/orcapod/pipeline/observer.py | Adds ExecutionObserver protocol and NoOpObserver. |
| src/orcapod/pipeline/graph.py | Adds orchestrator option to Pipeline.run() and defaults to sync orchestrator. |
| src/orcapod/pipeline/async_orchestrator.py | Renames/relocates the async orchestrator implementation. |
| src/orcapod/pipeline/init.py | Exports sync/async orchestrators. |
| src/orcapod/core/nodes/source_node.py | Adds source-node caching behavior for orchestrated runs. |
| src/orcapod/core/nodes/operator_node.py | Adds process() and input schema validation; integrates persistence/caching. |
| src/orcapod/core/nodes/function_node.py | Adds schema validation + process(); refactors internal processing/caching paths. |
| CLAUDE.md | Documents convention for placing “superpowers” artifacts. |
| .zed/rules | Mirrors the “superpowers artifacts” convention for editor rules. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…tream Build an empty ArrowTableStream with correct schema from upstream_node instead of raising ValueError. This handles cases where an upstream node produces zero packets (e.g., FunctionNode filters everything out). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… iteration, stale docs, index ordering - FunctionNode: use sorted() in validation error messages for deterministic output - FunctionNode._validate_stream_schema: compare Schema objects (type-aware) instead of set(keys) - OperatorNode._validate_input_schemas: compare Schema objects (type-aware) instead of set(keys) - OperatorNode.process(): only persist to DB on CacheMode.LOG, not REPLAY - SourceNode.iter_packets(): revert to lazy delegation instead of eager materialization - Pipeline.run() docstring: remove stale store_result references - FunctionNode._process_packet_internal: use input tag (not tag_out) for pipeline record - FunctionNode._async_process_packet_internal: same input tag fix - FunctionNode: add cache_index parameter to _process_packet_internal and _async_process_packet_internal - _iter_packets_concurrent: pass correct index to avoid cache index misalignment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per-packet validation (_validate_input_schema) now uses Schema object comparison with system tags, matching the stream-level validation approach. Both FunctionNode and OperatorNode validation paths are now consistent: Schema-based (order-independent, type-aware) with system tag inclusion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… DerivedSource tests (PLT-924) - ArrowTableStream.identity_structure() and pipeline_identity_structure() now delegate to super() when a producer is set, so materialized streams inherit the same identity as their origin node - SyncPipelineOrchestrator._materialize_as_stream() passes producer and upstreams from the upstream node so downstream operators embed correct pipeline hashes in system tag column names - Update NamedProducer mock in stream tests to include argument_symmetry() - Add TestMaterializedStreamIdentity test class covering pipeline_hash parity, content_hash parity, system tag schema preservation, and end-to-end operator system tag correctness - Mark two DerivedSource hash tests xfail(PLT-924) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a new synchronous “push-style” pipeline execution path built around a SyncPipelineOrchestrator, plus node-facing protocols and observer hooks to enable per-packet observability and node-authoritative execution (validation/persistence/caching handled by nodes).
Changes:
- Introduces
SyncPipelineOrchestrator,ExecutionObserver/NoOpObserver,OrchestratorResult, and node protocol TypeGuards for orchestrator↔node dispatch. - Refactors FunctionNode/OperatorNode to support node-authoritative bulk execution (
process) and schema validation, plus internal caching behaviors. - Updates
Pipeline.run()to accept an optional orchestrator and to default to sync orchestration (while keeping legacy async path via_run_async).
Reviewed changes
Copilot reviewed 27 out of 30 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/orcapod/protocols/node_protocols.py | Adds node protocols + TypeGuard helpers used by orchestrators for dispatch. |
| src/orcapod/pipeline/observer.py | Defines ExecutionObserver protocol and NoOpObserver. |
| src/orcapod/pipeline/result.py | Introduces OrchestratorResult container for orchestrator outputs. |
| src/orcapod/pipeline/sync_orchestrator.py | Implements synchronous orchestrator with per-node/per-packet hooks and buffering/materialization. |
| src/orcapod/pipeline/graph.py | Extends Pipeline.run() to accept an orchestrator and defaults to sync orchestrator execution. |
| src/orcapod/pipeline/async_orchestrator.py | Renames/moves async orchestrator module (import path update). |
| src/orcapod/pipeline/init.py | Exports SyncPipelineOrchestrator and updates async orchestrator import. |
| src/orcapod/core/nodes/function_node.py | Adds schema validation + process() + internal execution helpers and cached-result lookup. |
| src/orcapod/core/nodes/operator_node.py | Adds schema validation + process() and cached-output retrieval for REPLAY mode. |
| src/orcapod/core/nodes/source_node.py | Adds optional _cached_results pathway for future/limited source caching behavior. |
| src/orcapod/core/streams/arrow_table_stream.py | Adjusts identity/pipeline identity to delegate to StreamBase (producer/upstream identity semantics). |
| tests/test_protocols/init.py | Package marker for protocol tests. |
| tests/test_protocols/test_node_protocols.py | Tests TypeGuard dispatch functions for node protocols. |
| tests/test_pipeline/test_observer.py | Tests observer protocol conformance and NoOp behavior. |
| tests/test_pipeline/test_sync_orchestrator.py | Comprehensive sync orchestrator tests (linear/diamond/operator/observer/parity/materialization identity). |
| tests/test_pipeline/test_pipeline.py | Marks two DerivedSource system-tag schema tests as xfail (PLT-924). |
| tests/test_core/streams/test_streams.py | Updates producer test fixture to implement argument_symmetry for new identity behavior. |
| tests/test_core/nodes/init.py | Package marker for node tests. |
| tests/test_core/nodes/test_node_process.py | Tests new node process/process_packet behaviors: validation, persistence, caching. |
| tests/test_core/nodes/test_function_node_get_cached.py | Tests FunctionNode.get_cached_results() behavior and cache population. |
| tests/test_core/function_pod/test_function_pod_node.py | Updates function pod node tests to use internal processing path and adds coverage. |
| tests/test_channels/test_node_async_execute.py | Updates async execution tests to patch internal async processing functions. |
| superpowers/specs/2026-03-14-sync-orchestrator-design.md | Design spec for sync orchestrator + protocols/observer/result types. |
| superpowers/specs/2026-03-14-remove-populate-cache-design.md | Follow-up design spec: remove populate_cache in favor of self-caching nodes. |
| superpowers/specs/2026-03-14-node-authority-design.md | Design spec: node authority pattern (validation/persist/cache inside nodes). |
| superpowers/plans/2026-03-14-sync-orchestrator-plan.md | Implementation plan for sync orchestrator workstream. |
| superpowers/plans/2026-03-14-remove-populate-cache-plan.md | Plan for removing populate_cache and shifting to self-caching. |
| superpowers/plans/2026-03-14-node-authority-plan.md | Plan for node-authority refactor. |
| CLAUDE.md | Documents repo guidance on where to place “superpowers” artifacts. |
| .zed/rules | Mirrors “superpowers artifact” placement guidance for editor rules. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ket on nodes Distinguishes node methods (internal, for orchestrators) from pod methods (public process/process_packet). Parallels the existing async_execute naming. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… the caller execute() and execute_packet() are internal methods for orchestrators. The caller guarantees input identity — no validation is performed. This removes _validate_input_schema, _validate_stream_schema, _validate_input_schemas and all associated tests. Also removes xfail markers on DerivedSource tests (no longer needed since validation was the cause of those failures). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a synchronous “push-style” pipeline orchestrator with per-node / per-packet observability, and refactors node/orchestrator interaction via lightweight protocols so Pipeline.run() can default to the new sync execution path.
Changes:
- Introduces
SyncPipelineOrchestrator,ExecutionObserver/NoOpObserver, andOrchestratorResultto run a compiled node graph topologically with materialized buffers. - Adds node protocols + TypeGuard dispatch, and updates core nodes to support orchestrator-driven execution (
execute_packet/execute, operator replay support, source cached-results plumbing). - Updates
Pipeline.run()to accept an optionalorchestratorand reorganizes async orchestrator module location/exports; expands test coverage and adds design/plan artifacts.
Reviewed changes
Copilot reviewed 27 out of 30 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/orcapod/protocols/node_protocols.py | Defines node protocols + TypeGuard helpers for orchestrator dispatch. |
| src/orcapod/pipeline/sync_orchestrator.py | Implements sync orchestrator with buffering, observer hooks, and stream materialization. |
| src/orcapod/pipeline/observer.py | Adds ExecutionObserver protocol and NoOpObserver. |
| src/orcapod/pipeline/result.py | Adds OrchestratorResult container for node outputs. |
| src/orcapod/pipeline/graph.py | Extends Pipeline.run() with orchestrator parameter; sync default + async import update. |
| src/orcapod/pipeline/async_orchestrator.py | Moves/renames async orchestrator module (legacy path). |
| src/orcapod/pipeline/init.py | Exports SyncPipelineOrchestrator and updated async orchestrator import. |
| src/orcapod/core/nodes/function_node.py | Adds orchestrator-facing execute_packet/execute, internal processing helpers, and DB cache lookup helper. |
| src/orcapod/core/nodes/operator_node.py | Adds orchestrator-facing execute and get_cached_output to support sync orchestration and replay. |
| src/orcapod/core/nodes/source_node.py | Adds cached-results slot and uses it when present during iteration. |
| src/orcapod/core/streams/arrow_table_stream.py | Aligns identity/pipeline-identity structures with StreamBase producer symmetry behavior. |
| tests/test_protocols/test_node_protocols.py | Tests TypeGuard dispatch for node protocols. |
| tests/test_pipeline/test_sync_orchestrator.py | Integration tests for sync orchestrator, pipeline.run integration, parity, and identity/system-tag preservation. |
| tests/test_pipeline/test_observer.py | Tests observer protocol satisfaction + no-op hooks. |
| tests/test_core/nodes/test_node_execute.py | Tests node execute methods for persistence/caching behavior. |
| tests/test_core/nodes/test_function_node_get_cached.py | Tests targeted cache lookup and cache population behavior. |
| tests/test_core/streams/test_streams.py | Updates producer behavior in tests to support argument symmetry. |
| tests/test_core/function_pod/test_function_pod_node.py | Updates tests to reflect execute vs process naming/refactors. |
| tests/test_core/function_pod/test_function_node_caching.py | Updates wording to execute_packet terminology. |
| tests/test_channels/test_node_async_execute.py | Updates routing tests to patched internal execute/process paths. |
| superpowers/specs/2026-03-14-sync-orchestrator-design.md | Design spec for sync orchestrator + protocols/observer/result patterns. |
| superpowers/specs/2026-03-14-remove-populate-cache-design.md | Follow-up design note: self-caching nodes without populate_cache. |
| superpowers/specs/2026-03-14-node-authority-design.md | Design spec for node-authority/self-validating/self-persisting node execution. |
| superpowers/plans/2026-03-14-sync-orchestrator-plan.md | Implementation plan artifact for sync orchestrator and related refactors. |
| superpowers/plans/2026-03-14-remove-populate-cache-plan.md | Plan artifact for removing populate-cache pattern. |
| superpowers/plans/2026-03-14-node-authority-plan.md | Plan artifact for node-authority changes. |
| CLAUDE.md | Documents repo guidance for storing “superpowers” artifacts outside docs/. |
| .zed/rules | Mirrors the same guidance for editor rules. |
| tests/test_protocols/init.py | Test package init (structure). |
| tests/test_core/nodes/init.py | Test package init (structure). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…n repeated runs - _process_packet_internal now clears _cached_output_table and _cached_content_hash_column when caching new entries (prevents stale as_table() results) - get_cached_results clears _cached_output_packets before repopulating (prevents duplicate rows on repeated orchestrator runs) - Fixed docstrings: removed stale "after validation" references, updated orchestrator docstring about source node caching Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
SyncPipelineOrchestratorthat walks a compiled node graph topologically with materialized buffers between nodes, per-packet observer hooks for function nodes, and a node authority pattern where nodes handle their own persistenceSourceNodeProtocol,FunctionNodeProtocol,OperatorNodeProtocol) withTypeGuarddispatch functions, decoupling the orchestrator from concrete node classesprocess()method to FunctionNode and OperatorNode — bulk execution with schema validation, returning materializedlist[tuple[Tag, Packet]](pod vs node distinction: pods return lazy streams, nodes return materialized lists)ExecutionObserverprotocol with dependency injection for logging, metrics, and debugging hooksPipeline.run()now accepts an optionalorchestratorparameter and defaults toSyncPipelineOrchestratorCloses PLT-921
Design Decisions
process_packet/processvalidate inputs, compute, persist, and cache — no externalstore_resultneededprocess_packet; pipeline provenance recording (pipeline DB) also happens insideprocess_packet— both are the node's concernStreamProtocol; Nodes return materializedlist[tuple[Tag, Packet]]. Nodes are eager executors with validation + persistence + cachingOut of Scope
Pipeline.run()orchestrator interface in this PR. The async orchestrator continues to work viaPipeline._run_async()(the legacy code path). Refactoring it to use the same node protocols and orchestrator interface will be done in a separate follow-up PR, informed by both working implementations.Test plan
🤖 Generated with Claude Code