Skip to content

feat: add sync pipeline orchestrator with per-packet observability#85

Merged
eywalker merged 26 commits intodevfrom
eywalker/plt-921-design-and-implement-sync-pipeline-orchestrator
Mar 15, 2026
Merged

feat: add sync pipeline orchestrator with per-packet observability#85
eywalker merged 26 commits intodevfrom
eywalker/plt-921-design-and-implement-sync-pipeline-orchestrator

Conversation

@eywalker
Copy link
Contributor

@eywalker eywalker commented Mar 14, 2026

Summary

  • Implement SyncPipelineOrchestrator that walks a compiled node graph topologically with materialized buffers between nodes, per-packet observer hooks for function nodes, and a node authority pattern where nodes handle their own persistence
  • Introduce node protocols (SourceNodeProtocol, FunctionNodeProtocol, OperatorNodeProtocol) with TypeGuard dispatch functions, decoupling the orchestrator from concrete node classes
  • Nodes are self-validating executors: they validate input schemas (including system tags), compute, persist to DB, and cache results internally — the orchestrator never reaches into node internals
  • Add process() method to FunctionNode and OperatorNode — bulk execution with schema validation, returning materialized list[tuple[Tag, Packet]] (pod vs node distinction: pods return lazy streams, nodes return materialized lists)
  • Add ExecutionObserver protocol with dependency injection for logging, metrics, and debugging hooks
  • Pipeline.run() now accepts an optional orchestrator parameter and defaults to SyncPipelineOrchestrator

Closes PLT-921

Design Decisions

  • Node authority: Nodes are the sole authority over their results. process_packet / process validate inputs, compute, persist, and cache — no external store_result needed
  • Schema validation: Input tag + packet + system tag column names are validated against expected upstream schemas. System tags are critical because they encode pipeline topology
  • Two levels of persistence for FunctionNode: function-level result memoization (result DB, transcends pipelines) happens inside process_packet; pipeline provenance recording (pipeline DB) also happens inside process_packet — both are the node's concern
  • Pod vs Node: Pods return lazy StreamProtocol; Nodes return materialized list[tuple[Tag, Packet]]. Nodes are eager executors with validation + persistence + caching

Out of Scope

  • AsyncPipelineOrchestrator is intentionally not updated to conform to the new node protocol / Pipeline.run() orchestrator interface in this PR. The async orchestrator continues to work via Pipeline._run_async() (the legacy code path). Refactoring it to use the same node protocols and orchestrator interface will be done in a separate follow-up PR, informed by both working implementations.

Test plan

  • Node protocol TypeGuard dispatch tests (6 tests)
  • ExecutionObserver / NoOpObserver tests (5 tests)
  • FunctionNode process_packet: schema validation, pipeline record writes, result DB memoization, internal caching (6 tests)
  • FunctionNode process(): bulk execution, schema validation (4 tests)
  • OperatorNode process(): schema validation, DB persistence, caching (6 tests)
  • FunctionNode get_cached_results: targeted lookup, filtering, cache population (5 tests)
  • SyncPipelineOrchestrator: linear, operator, diamond, observer, unknown-type (5 tests)
  • Pipeline.run() integration: default orchestrator, explicit orchestrator, cache population (3 tests)
  • Sync vs async parity: linear + diamond topologies produce identical DB results (2 tests)
  • Full regression suite: 2159 tests passing

🤖 Generated with Claude Code

eywalker and others added 12 commits March 14, 2026 09:48
Design spec for PLT-921: synchronous pipeline orchestrator with
per-packet observability hooks, node protocols, and orchestrator-driven
execution model.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move design specs from docs/superpowers/ to superpowers/ at project root.
The docs/ directory is reserved for actual library documentation.
Updated CLAUDE.md and .zed/rules with the new convention.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- process_packet on FunctionNode is now pure computation (no DB side effects)
- store_result added to all three node protocols as uniform persistence step
- Orchestrator always calls store_result; nodes decide internally what to do
- FunctionNode.store_result handles both result cache and pipeline record writes
- SourceNode.store_result persists source snapshot to pipeline DB if configured
- Added FunctionNode purity tests and store_result tests to testing strategy

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Key design clarification: process_packet handles computation +
function-level memoization (CachedFunctionPod, result DB). store_result
handles pipeline provenance recording only (pipeline DB). This reflects
two distinct persistence concerns: function-level memoization transcends
pipelines, while pipeline provenance is execution-specific.

Also fixes from plan review:
- OperatorNodeProtocol uses public operator property (not _operator)
- _materialize_as_stream uses selective columns matching existing pattern
- populate_cache handles empty lists gracefully
- _iter_packets_concurrent updated in backward compat list
- No CachedFunctionPod modification needed (no private access)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…trator

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hestratorResult

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…or support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…che, operator property

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dd get_cached_results and populate_cache

Split process_packet into compute-only (process_packet) and
pipeline-provenance-only (store_result), with _process_and_store_packet
for backward-compatible iter_packets behavior. Added get_cached_results
for pipeline entry ID lookups and populate_cache for external cache
population.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename orchestrator.py to async_orchestrator.py, add orchestrator
parameter to Pipeline.run(), and default to SyncPipelineOrchestrator
for synchronous execution. Add _apply_results() to populate node
caches from orchestrator output.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 14, 2026 18:25
@codecov-commenter
Copy link

codecov-commenter commented Mar 14, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 86.02941% with 38 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/pipeline/sync_orchestrator.py 78.37% 24 Missing ⚠️
src/orcapod/core/nodes/function_node.py 94.44% 4 Missing ⚠️
src/orcapod/core/nodes/operator_node.py 76.47% 4 Missing ⚠️
src/orcapod/pipeline/observer.py 90.00% 2 Missing ⚠️
src/orcapod/protocols/node_protocols.py 92.85% 2 Missing ⚠️
src/orcapod/core/nodes/source_node.py 75.00% 1 Missing ⚠️
src/orcapod/pipeline/result.py 87.50% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new synchronous “push-style” pipeline execution path via a SyncPipelineOrchestrator, adds node-facing protocols + observer hooks for per-packet observability, and refactors node persistence/caching responsibilities so Pipeline.run() can delegate execution cleanly.

Changes:

  • Add SyncPipelineOrchestrator with per-node and per-packet observer hooks, returning an OrchestratorResult for optional cache population.
  • Introduce node protocols (SourceNodeProtocol, FunctionNodeProtocol, OperatorNodeProtocol) with TypeGuard dispatch helpers.
  • Refactor nodes to expose a uniform compute/store/cache interface (store_result(), populate_cache(), plus function-node cache helpers), and update Pipeline.run() to default to the sync orchestrator.

Reviewed changes

Copilot reviewed 20 out of 23 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/orcapod/pipeline/sync_orchestrator.py New sync orchestrator that topologically executes the compiled node graph with buffers + observer hooks.
src/orcapod/pipeline/graph.py Adds orchestrator parameter to Pipeline.run() and applies orchestrator results to node caches.
src/orcapod/protocols/node_protocols.py New node protocols and TypeGuard helpers to decouple orchestrator from concrete node classes.
src/orcapod/pipeline/observer.py Adds ExecutionObserver protocol and NoOpObserver.
src/orcapod/pipeline/result.py Adds OrchestratorResult return type for orchestrators.
src/orcapod/pipeline/async_orchestrator.py Renames/moves existing async orchestrator implementation.
src/orcapod/pipeline/__init__.py Re-exports orchestrators and updates module import paths.
src/orcapod/core/nodes/function_node.py Splits function computation vs pipeline provenance (process_packet vs store_result), adds cached lookup + cache population helpers.
src/orcapod/core/nodes/operator_node.py Adds operator property, get_cached_output, store_result, and populate_cache.
src/orcapod/core/nodes/source_node.py Adds in-memory cache population and cache-aware iter_packets; adds store_result hook (currently no-op).
tests/test_pipeline/test_sync_orchestrator.py New orchestrator tests including observer hooks, integration with Pipeline.run(), and sync/async parity.
tests/test_pipeline/test_observer.py Tests for ExecutionObserver/NoOpObserver.
tests/test_protocols/test_node_protocols.py Tests for TypeGuard dispatch.
tests/test_core/nodes/test_node_store_result.py Tests for store_result behavior across node types.
tests/test_core/nodes/test_node_populate_cache.py Tests for populate_cache behavior across node types.
tests/test_core/nodes/test_function_node_get_cached.py Tests for FunctionNode.get_cached_results.
tests/test_core/function_pod/test_function_pod_node.py Updates expectations around pipeline provenance recording (moved to store step).
superpowers/specs/2026-03-14-sync-orchestrator-design.md Design spec documenting orchestrator/protocol architecture and intended semantics.
superpowers/plans/2026-03-14-sync-orchestrator-plan.md Implementation plan artifact for the new orchestrator/protocol refactor.
CLAUDE.md / .zed/rules Repo guidance updated to place “superpowers” artifacts under superpowers/.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

eywalker and others added 7 commits March 14, 2026 18:38
Nodes should self-cache during store_result and get_cached_results
calls, eliminating the need for external populate_cache and
Pipeline._apply_results().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tore_result

Nodes now self-cache during store_result and get_cached_results calls,
eliminating the need for Pipeline._apply_results to push results back
into nodes after orchestrator execution.

- Remove populate_cache from all three node protocols and implementations
- SourceNode.store_result populates _cached_results
- OperatorNode.store_result populates _cached_output_stream (and writes
  to DB when applicable)
- FunctionNode.store_result populates _cached_output_packets
- FunctionNode.get_cached_results also populates _cached_output_packets
- Remove Pipeline._apply_results (no longer needed)
- Delete test_node_populate_cache.py, add self-caching tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Nodes become self-validating, self-persisting executors. Remove
store_result, add schema validation, add process() to FunctionNode
and OperatorNode. Clean pod vs node distinction: pods return lazy
streams, nodes return materialized lists.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…), remove store_result

- process_packet now validates input schema, computes, writes pipeline
  records, and caches results (previously only computed)
- Add _validate_input_schema and _validate_stream_schema for schema checks
- Add process(input_stream) bulk method with stream-level validation
- Rename _process_and_store_packet → _process_packet_internal (core
  compute+persist+cache without validation)
- Remove store_result (merged into _process_packet_internal)
- Rename _async_process_and_store_packet → _async_process_packet_internal
- Remove standalone async_process_packet (merged into async internal)
- Update sync_orchestrator, tests, and async execute tests accordingly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…result and operator property

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…packets call

SourceNode now caches its packets internally on the first iter_packets()
call instead of relying on the orchestrator to call store_result().
SourceNodeProtocol is updated to remove store_result, and the orchestrator
no longer calls it after materializing source output.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ocess()

FunctionNodeProtocol now declares process() instead of store_result,
matching the node authority pattern where nodes handle their own
persistence internally.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new synchronous “push” execution path for pipelines via SyncPipelineOrchestrator, along with node protocols and observer hooks, while shifting toward a node-authority model where nodes validate, persist, and cache their own results.

Changes:

  • Introduce SyncPipelineOrchestrator, ExecutionObserver/NoOpObserver, OrchestratorResult, and node protocol TypeGuard dispatch.
  • Refactor nodes toward authority/self-caching with new process() APIs and schema validation logic; integrate orchestrator selection into Pipeline.run().
  • Add extensive unit/integration tests plus design/plan docs under superpowers/.

Reviewed changes

Copilot reviewed 24 out of 27 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/test_protocols/test_node_protocols.py Tests for node protocol TypeGuard dispatch helpers.
tests/test_protocols/init.py Package marker for protocol tests.
tests/test_pipeline/test_sync_orchestrator.py Integration tests for sync orchestrator + Pipeline.run() orchestration.
tests/test_pipeline/test_observer.py Tests for ExecutionObserver protocol and NoOpObserver.
tests/test_core/nodes/test_node_process.py Tests for node process()/process_packet() validation/persistence/caching.
tests/test_core/nodes/test_function_node_get_cached.py Tests for FunctionNode.get_cached_results() behavior.
tests/test_core/nodes/init.py Package marker for node tests.
tests/test_core/function_pod/test_function_pod_node.py Updates to use new internal processing APIs in function node tests.
tests/test_channels/test_node_async_execute.py Updates async/sequential routing tests to patch new internal methods.
superpowers/specs/2026-03-14-sync-orchestrator-design.md Design doc for sync orchestrator + protocols/observer/result types.
superpowers/specs/2026-03-14-remove-populate-cache-design.md Design doc for removing populate_cache and self-caching nodes.
superpowers/specs/2026-03-14-node-authority-design.md Design doc for node-authority (self-validating/self-persisting) model.
superpowers/plans/2026-03-14-sync-orchestrator-plan.md Implementation plan for sync orchestrator + protocols + integration.
superpowers/plans/2026-03-14-remove-populate-cache-plan.md Implementation plan for removing populate_cache.
superpowers/plans/2026-03-14-node-authority-plan.md Implementation plan for node-authority refactor.
src/orcapod/protocols/node_protocols.py Defines node protocols and TypeGuard dispatch.
src/orcapod/pipeline/sync_orchestrator.py Implements synchronous orchestrator with per-node/per-packet observer hooks.
src/orcapod/pipeline/result.py Adds OrchestratorResult return type.
src/orcapod/pipeline/observer.py Adds ExecutionObserver protocol and NoOpObserver.
src/orcapod/pipeline/graph.py Adds orchestrator option to Pipeline.run() and defaults to sync orchestrator.
src/orcapod/pipeline/async_orchestrator.py Renames/relocates the async orchestrator implementation.
src/orcapod/pipeline/init.py Exports sync/async orchestrators.
src/orcapod/core/nodes/source_node.py Adds source-node caching behavior for orchestrated runs.
src/orcapod/core/nodes/operator_node.py Adds process() and input schema validation; integrates persistence/caching.
src/orcapod/core/nodes/function_node.py Adds schema validation + process(); refactors internal processing/caching paths.
CLAUDE.md Documents convention for placing “superpowers” artifacts.
.zed/rules Mirrors the “superpowers artifacts” convention for editor rules.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

eywalker and others added 4 commits March 14, 2026 19:56
…tream

Build an empty ArrowTableStream with correct schema from upstream_node
instead of raising ValueError. This handles cases where an upstream
node produces zero packets (e.g., FunctionNode filters everything out).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… iteration, stale docs, index ordering

- FunctionNode: use sorted() in validation error messages for deterministic output
- FunctionNode._validate_stream_schema: compare Schema objects (type-aware) instead of set(keys)
- OperatorNode._validate_input_schemas: compare Schema objects (type-aware) instead of set(keys)
- OperatorNode.process(): only persist to DB on CacheMode.LOG, not REPLAY
- SourceNode.iter_packets(): revert to lazy delegation instead of eager materialization
- Pipeline.run() docstring: remove stale store_result references
- FunctionNode._process_packet_internal: use input tag (not tag_out) for pipeline record
- FunctionNode._async_process_packet_internal: same input tag fix
- FunctionNode: add cache_index parameter to _process_packet_internal and _async_process_packet_internal
- _iter_packets_concurrent: pass correct index to avoid cache index misalignment

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per-packet validation (_validate_input_schema) now uses Schema object
comparison with system tags, matching the stream-level validation
approach. Both FunctionNode and OperatorNode validation paths are
now consistent: Schema-based (order-independent, type-aware) with
system tag inclusion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… DerivedSource tests (PLT-924)

- ArrowTableStream.identity_structure() and pipeline_identity_structure() now
  delegate to super() when a producer is set, so materialized streams inherit
  the same identity as their origin node
- SyncPipelineOrchestrator._materialize_as_stream() passes producer and upstreams
  from the upstream node so downstream operators embed correct pipeline hashes
  in system tag column names
- Update NamedProducer mock in stream tests to include argument_symmetry()
- Add TestMaterializedStreamIdentity test class covering pipeline_hash parity,
  content_hash parity, system tag schema preservation, and end-to-end operator
  system tag correctness
- Mark two DerivedSource hash tests xfail(PLT-924)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new synchronous “push-style” pipeline execution path built around a SyncPipelineOrchestrator, plus node-facing protocols and observer hooks to enable per-packet observability and node-authoritative execution (validation/persistence/caching handled by nodes).

Changes:

  • Introduces SyncPipelineOrchestrator, ExecutionObserver/NoOpObserver, OrchestratorResult, and node protocol TypeGuards for orchestrator↔node dispatch.
  • Refactors FunctionNode/OperatorNode to support node-authoritative bulk execution (process) and schema validation, plus internal caching behaviors.
  • Updates Pipeline.run() to accept an optional orchestrator and to default to sync orchestration (while keeping legacy async path via _run_async).

Reviewed changes

Copilot reviewed 27 out of 30 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/orcapod/protocols/node_protocols.py Adds node protocols + TypeGuard helpers used by orchestrators for dispatch.
src/orcapod/pipeline/observer.py Defines ExecutionObserver protocol and NoOpObserver.
src/orcapod/pipeline/result.py Introduces OrchestratorResult container for orchestrator outputs.
src/orcapod/pipeline/sync_orchestrator.py Implements synchronous orchestrator with per-node/per-packet hooks and buffering/materialization.
src/orcapod/pipeline/graph.py Extends Pipeline.run() to accept an orchestrator and defaults to sync orchestrator execution.
src/orcapod/pipeline/async_orchestrator.py Renames/moves async orchestrator module (import path update).
src/orcapod/pipeline/init.py Exports SyncPipelineOrchestrator and updates async orchestrator import.
src/orcapod/core/nodes/function_node.py Adds schema validation + process() + internal execution helpers and cached-result lookup.
src/orcapod/core/nodes/operator_node.py Adds schema validation + process() and cached-output retrieval for REPLAY mode.
src/orcapod/core/nodes/source_node.py Adds optional _cached_results pathway for future/limited source caching behavior.
src/orcapod/core/streams/arrow_table_stream.py Adjusts identity/pipeline identity to delegate to StreamBase (producer/upstream identity semantics).
tests/test_protocols/init.py Package marker for protocol tests.
tests/test_protocols/test_node_protocols.py Tests TypeGuard dispatch functions for node protocols.
tests/test_pipeline/test_observer.py Tests observer protocol conformance and NoOp behavior.
tests/test_pipeline/test_sync_orchestrator.py Comprehensive sync orchestrator tests (linear/diamond/operator/observer/parity/materialization identity).
tests/test_pipeline/test_pipeline.py Marks two DerivedSource system-tag schema tests as xfail (PLT-924).
tests/test_core/streams/test_streams.py Updates producer test fixture to implement argument_symmetry for new identity behavior.
tests/test_core/nodes/init.py Package marker for node tests.
tests/test_core/nodes/test_node_process.py Tests new node process/process_packet behaviors: validation, persistence, caching.
tests/test_core/nodes/test_function_node_get_cached.py Tests FunctionNode.get_cached_results() behavior and cache population.
tests/test_core/function_pod/test_function_pod_node.py Updates function pod node tests to use internal processing path and adds coverage.
tests/test_channels/test_node_async_execute.py Updates async execution tests to patch internal async processing functions.
superpowers/specs/2026-03-14-sync-orchestrator-design.md Design spec for sync orchestrator + protocols/observer/result types.
superpowers/specs/2026-03-14-remove-populate-cache-design.md Follow-up design spec: remove populate_cache in favor of self-caching nodes.
superpowers/specs/2026-03-14-node-authority-design.md Design spec: node authority pattern (validation/persist/cache inside nodes).
superpowers/plans/2026-03-14-sync-orchestrator-plan.md Implementation plan for sync orchestrator workstream.
superpowers/plans/2026-03-14-remove-populate-cache-plan.md Plan for removing populate_cache and shifting to self-caching.
superpowers/plans/2026-03-14-node-authority-plan.md Plan for node-authority refactor.
CLAUDE.md Documents repo guidance on where to place “superpowers” artifacts.
.zed/rules Mirrors “superpowers artifact” placement guidance for editor rules.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

eywalker and others added 2 commits March 14, 2026 22:11
…ket on nodes

Distinguishes node methods (internal, for orchestrators) from pod methods
(public process/process_packet). Parallels the existing async_execute naming.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… the caller

execute() and execute_packet() are internal methods for orchestrators.
The caller guarantees input identity — no validation is performed.
This removes _validate_input_schema, _validate_stream_schema,
_validate_input_schemas and all associated tests.

Also removes xfail markers on DerivedSource tests (no longer needed
since validation was the cause of those failures).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a synchronous “push-style” pipeline orchestrator with per-node / per-packet observability, and refactors node/orchestrator interaction via lightweight protocols so Pipeline.run() can default to the new sync execution path.

Changes:

  • Introduces SyncPipelineOrchestrator, ExecutionObserver/NoOpObserver, and OrchestratorResult to run a compiled node graph topologically with materialized buffers.
  • Adds node protocols + TypeGuard dispatch, and updates core nodes to support orchestrator-driven execution (execute_packet/execute, operator replay support, source cached-results plumbing).
  • Updates Pipeline.run() to accept an optional orchestrator and reorganizes async orchestrator module location/exports; expands test coverage and adds design/plan artifacts.

Reviewed changes

Copilot reviewed 27 out of 30 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/orcapod/protocols/node_protocols.py Defines node protocols + TypeGuard helpers for orchestrator dispatch.
src/orcapod/pipeline/sync_orchestrator.py Implements sync orchestrator with buffering, observer hooks, and stream materialization.
src/orcapod/pipeline/observer.py Adds ExecutionObserver protocol and NoOpObserver.
src/orcapod/pipeline/result.py Adds OrchestratorResult container for node outputs.
src/orcapod/pipeline/graph.py Extends Pipeline.run() with orchestrator parameter; sync default + async import update.
src/orcapod/pipeline/async_orchestrator.py Moves/renames async orchestrator module (legacy path).
src/orcapod/pipeline/init.py Exports SyncPipelineOrchestrator and updated async orchestrator import.
src/orcapod/core/nodes/function_node.py Adds orchestrator-facing execute_packet/execute, internal processing helpers, and DB cache lookup helper.
src/orcapod/core/nodes/operator_node.py Adds orchestrator-facing execute and get_cached_output to support sync orchestration and replay.
src/orcapod/core/nodes/source_node.py Adds cached-results slot and uses it when present during iteration.
src/orcapod/core/streams/arrow_table_stream.py Aligns identity/pipeline-identity structures with StreamBase producer symmetry behavior.
tests/test_protocols/test_node_protocols.py Tests TypeGuard dispatch for node protocols.
tests/test_pipeline/test_sync_orchestrator.py Integration tests for sync orchestrator, pipeline.run integration, parity, and identity/system-tag preservation.
tests/test_pipeline/test_observer.py Tests observer protocol satisfaction + no-op hooks.
tests/test_core/nodes/test_node_execute.py Tests node execute methods for persistence/caching behavior.
tests/test_core/nodes/test_function_node_get_cached.py Tests targeted cache lookup and cache population behavior.
tests/test_core/streams/test_streams.py Updates producer behavior in tests to support argument symmetry.
tests/test_core/function_pod/test_function_pod_node.py Updates tests to reflect execute vs process naming/refactors.
tests/test_core/function_pod/test_function_node_caching.py Updates wording to execute_packet terminology.
tests/test_channels/test_node_async_execute.py Updates routing tests to patched internal execute/process paths.
superpowers/specs/2026-03-14-sync-orchestrator-design.md Design spec for sync orchestrator + protocols/observer/result patterns.
superpowers/specs/2026-03-14-remove-populate-cache-design.md Follow-up design note: self-caching nodes without populate_cache.
superpowers/specs/2026-03-14-node-authority-design.md Design spec for node-authority/self-validating/self-persisting node execution.
superpowers/plans/2026-03-14-sync-orchestrator-plan.md Implementation plan artifact for sync orchestrator and related refactors.
superpowers/plans/2026-03-14-remove-populate-cache-plan.md Plan artifact for removing populate-cache pattern.
superpowers/plans/2026-03-14-node-authority-plan.md Plan artifact for node-authority changes.
CLAUDE.md Documents repo guidance for storing “superpowers” artifacts outside docs/.
.zed/rules Mirrors the same guidance for editor rules.
tests/test_protocols/init.py Test package init (structure).
tests/test_core/nodes/init.py Test package init (structure).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…n repeated runs

- _process_packet_internal now clears _cached_output_table and
  _cached_content_hash_column when caching new entries (prevents
  stale as_table() results)
- get_cached_results clears _cached_output_packets before repopulating
  (prevents duplicate rows on repeated orchestrator runs)
- Fixed docstrings: removed stale "after validation" references,
  updated orchestrator docstring about source node caching

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@eywalker eywalker merged commit 291c451 into dev Mar 15, 2026
4 checks passed
@eywalker eywalker deleted the eywalker/plt-921-design-and-implement-sync-pipeline-orchestrator branch March 15, 2026 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants