Skip to content

refactor: slim node protocols, observer injection, async orchestrator alignment (PLT-922)#86

Merged
eywalker merged 23 commits intodevfrom
eywalker/plt-922-refactor-asyncpipelineorchestrator-to-use-node-protocols-and
Mar 15, 2026
Merged

refactor: slim node protocols, observer injection, async orchestrator alignment (PLT-922)#86
eywalker merged 23 commits intodevfrom
eywalker/plt-922-refactor-asyncpipelineorchestrator-to-use-node-protocols-and

Conversation

@eywalker
Copy link
Contributor

Summary

Refactors both orchestrators to use slim node protocols where nodes own their execution and orchestrators are pure topology schedulers.

  • Node protocols slimmed to execute() + async_execute() with observer injection — removed iter_packets, get_cached_results, execute_packet, compute_pipeline_entry_id, get_cached_output from protocol surface
  • AsyncExecutableProtocol deleted entirely
  • SourceNode gained execute(), async_execute() tightened (no inputs param)
  • FunctionNode execute() now owns per-packet cache logic + observer hooks, async_execute() tightened (single input_channel, no pipeline_config)
  • OperatorNode execute() gains REPLAY cache check + observer, async_execute() gains observer
  • SyncPipelineOrchestrator simplified to pure topology scheduler — calls node.execute() directly
  • AsyncPipelineOrchestrator takes graph: nx.DiGraph, returns OrchestratorResult, TypeGuard dispatch, materialize_results support via _CollectingWriter
  • Pipeline.run() updated, _run_async() removed

Deferred to follow-up issues

  • PLT-929: prefer_async flag on FunctionNode
  • PLT-930: Move async concurrency config to node-level construction

Test plan

  • Protocol shape tests verify new contracts (execute + async_execute required, old methods rejected)
  • Node-level tests for execute/async_execute with observer on all three node types
  • Sync orchestrator: observer injection flows through to nodes (linear, operator, diamond DAG)
  • Async orchestrator: observer injection flows through to nodes (linear, operator pipelines)
  • Cached flag: second run with DB reports cached=True via observer
  • materialize_results=True/False for both orchestrators
  • Fan-out (BroadcastChannel), terminal node, error propagation tests
  • Sync/async parity tests produce identical DB results
  • Full suite: 2185 tests passing

🤖 Generated with Claude Code

eywalker and others added 19 commits March 15, 2026 09:46
Design for refactoring AsyncPipelineOrchestrator to use node protocols,
slim execute/async_execute interface, observer injection, and tightened
per-type async signatures.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove pipeline_config from async_execute signature entirely
- Add iter_packets removal from SourceNodeProtocol
- Clarify SourceNode.execute populates _cached_results
- Clarify OperatorNode.execute calls get_cached_output first
- Detail FunctionNode.execute internal per-packet logic
- Add buffer_size forwarding in Pipeline.run default async path
- Add notes on terminal channel draining and existing run() methods

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…-922)

14-task plan covering: protocol slimming, node execute/async_execute
with observer injection, sync/async orchestrator simplification,
Pipeline.run() update, and comprehensive test coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ith observer (PLT-922)

Replace old fine-grained protocol methods (iter_packets, get_cached_results,
compute_pipeline_entry_id, execute_packet, get_cached_output) with uniform
execute/async_execute signatures that accept an observer param. Add tests
verifying the new shapes and that old-only implementations no longer satisfy
the protocols.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…T-922)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ignature (PLT-922)

Update all test files that pass a list to FunctionNode.async_execute to
pass a single reader instead. Update concurrency tests to reflect
sequential execution (concurrency deferred to PLT-930).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…acket logic (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…chestratorResult (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ve _run_async (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix SyncPipelineOrchestrator.run() to return empty node_outputs when
materialize_results=False (terminal nodes were previously left in the
buffer). Add TestMaterializeResults class covering sync/async variants.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n tests (PLT-922)

Add TestAsyncOrchestratorFanOut, TestAsyncOrchestratorTerminalNode, and
TestAsyncOrchestratorErrorPropagation test classes. Fan-out test uses
distinct functions (double vs triple) to ensure two separate compiled
FunctionNodes are created, since the pipeline deduplicates identical nodes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tor ordering (PLT-922)

- Replace `observer: Any` with `observer: ExecutionObserver | None` in all node execute/async_execute methods
- Add __getattr__ to _CollectingWriter for delegating unknown attributes to the real writer
- Move terminal channel drain into TaskGroup to prevent deadlock
- Sort operator input readers by node.upstreams order for non-commutative operators
- Add TODO(PLT-930) for deferred concurrency limiting in FunctionNode.async_execute

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922)

Sync orchestrator tests:
- Operator pipeline: all three node types fire hooks
- Cached flag: second run reports cached=True
- Diamond DAG: events follow topological order
- No observer: pipeline runs fine without one

Async orchestrator tests:
- Linear pipeline: source + function hooks fire
- Operator pipeline: all node types fire hooks, only function fires packet-level
- No observer: pipeline runs fine without one

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 88.96104% with 17 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/core/nodes/operator_node.py 73.68% 5 Missing ⚠️
src/orcapod/pipeline/async_orchestrator.py 90.00% 5 Missing ⚠️
src/orcapod/core/nodes/function_node.py 93.87% 3 Missing ⚠️
src/orcapod/core/nodes/source_node.py 88.23% 2 Missing ⚠️
src/orcapod/protocols/node_protocols.py 71.42% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors pipeline execution to align sync/async orchestrators around slim node protocols where nodes own execution (including caching + observer hooks) and orchestrators act as topology schedulers over compiled node graphs.

Changes:

  • Slim node protocol surface to execute() / async_execute() with observer injection; remove AsyncExecutableProtocol.
  • Refactor SyncPipelineOrchestrator and AsyncPipelineOrchestrator to run compiled nx.DiGraph graphs and (async) return OrchestratorResult with optional result materialization.
  • Update/expand tests to match new orchestrator/node APIs (observer injection, materialize_results, fan-out, terminal nodes, error propagation, parity).

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/test_pipeline/test_sync_orchestrator.py Updates async parity invocation to compiled graph + adds observer/materialize_results coverage.
tests/test_pipeline/test_orchestrator.py Reworks async orchestrator tests for run(graph) + adds fan-out/terminal/error/observer tests.
tests/test_pipeline/test_node_protocols.py New tests asserting revised protocol shapes + node execute/async_execute observer behavior.
tests/test_channels/test_pipeline_async_integration.py Updates async pipeline integration to compile/run/flush with graph-based orchestrator.
tests/test_channels/test_node_async_execute.py Removes AsyncExecutableProtocol conformance tests; updates FunctionNode async signature usage and concurrency expectations.
tests/test_channels/test_copilot_review_issues.py Updates concurrency-related assertions after concurrency limiting removal; updates async_execute signature usage.
tests/test_channels/test_async_execute.py Removes AsyncExecutableProtocol conformance tests section.
superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md Adds design doc for PLT-922 refactor.
superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md Adds detailed implementation plan/checklist for PLT-922.
src/orcapod/protocols/node_protocols.py Updates node protocols to execute/async_execute with observer + tightened async signatures per node type.
src/orcapod/protocols/core_protocols/async_executable.py Deletes AsyncExecutableProtocol.
src/orcapod/protocols/core_protocols/init.py Removes AsyncExecutableProtocol export.
src/orcapod/pipeline/sync_orchestrator.py Simplifies sync orchestrator to delegate to node.execute(observer=...).
src/orcapod/pipeline/graph.py Updates Pipeline.run() async path to call orchestrator with _node_graph; removes _run_async().
src/orcapod/pipeline/async_orchestrator.py Refactors async orchestrator to operate on nx.DiGraph, TypeGuard dispatch, result materialization, observer forwarding.
src/orcapod/core/nodes/source_node.py Adds SourceNode.execute(observer=...) + tightens async_execute signature with observer support.
src/orcapod/core/nodes/operator_node.py Adds observer support to execute and async_execute; execute checks REPLAY cache first.
src/orcapod/core/nodes/function_node.py Moves per-packet cache/observer logic into execute; tightens async_execute signature, removes pipeline_config-based concurrency limiting, adds observer hooks.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

eywalker and others added 4 commits March 15, 2026 20:30
- Fix async FunctionNode.async_execute DB mode: drive output from
  input_channel instead of bulk-replaying Phase 1. Cached packets now
  fire observer hooks with cached=True, matching sync behavior.
- Add predecessor-count validation for function nodes in async orchestrator
- Remove unused AsyncMock import and duplicate pytest import in tests
- Update test_sync_run_then_async_emits_from_cache to send input packets
  (output is now input-driven, not bulk-replayed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@eywalker eywalker merged commit 847d748 into dev Mar 15, 2026
4 checks passed
@eywalker eywalker deleted the eywalker/plt-922-refactor-asyncpipelineorchestrator-to-use-node-protocols-and branch March 15, 2026 20:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants