Conversation
Design for refactoring AsyncPipelineOrchestrator to use node protocols, slim execute/async_execute interface, observer injection, and tightened per-type async signatures. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove pipeline_config from async_execute signature entirely - Add iter_packets removal from SourceNodeProtocol - Clarify SourceNode.execute populates _cached_results - Clarify OperatorNode.execute calls get_cached_output first - Detail FunctionNode.execute internal per-packet logic - Add buffer_size forwarding in Pipeline.run default async path - Add notes on terminal channel draining and existing run() methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…-922) 14-task plan covering: protocol slimming, node execute/async_execute with observer injection, sync/async orchestrator simplification, Pipeline.run() update, and comprehensive test coverage. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ith observer (PLT-922) Replace old fine-grained protocol methods (iter_packets, get_cached_results, compute_pipeline_entry_id, execute_packet, get_cached_output) with uniform execute/async_execute signatures that accept an observer param. Add tests verifying the new shapes and that old-only implementations no longer satisfy the protocols. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…T-922) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ignature (PLT-922) Update all test files that pass a list to FunctionNode.async_execute to pass a single reader instead. Update concurrency tests to reflect sequential execution (concurrency deferred to PLT-930). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…acket logic (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…chestratorResult (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ve _run_async (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix SyncPipelineOrchestrator.run() to return empty node_outputs when materialize_results=False (terminal nodes were previously left in the buffer). Add TestMaterializeResults class covering sync/async variants. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n tests (PLT-922) Add TestAsyncOrchestratorFanOut, TestAsyncOrchestratorTerminalNode, and TestAsyncOrchestratorErrorPropagation test classes. Fan-out test uses distinct functions (double vs triple) to ensure two separate compiled FunctionNodes are created, since the pipeline deduplicates identical nodes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tor ordering (PLT-922) - Replace `observer: Any` with `observer: ExecutionObserver | None` in all node execute/async_execute methods - Add __getattr__ to _CollectingWriter for delegating unknown attributes to the real writer - Move terminal channel drain into TaskGroup to prevent deadlock - Sort operator input readers by node.upstreams order for non-commutative operators - Add TODO(PLT-930) for deferred concurrency limiting in FunctionNode.async_execute Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922) Sync orchestrator tests: - Operator pipeline: all three node types fire hooks - Cached flag: second run reports cached=True - Diamond DAG: events follow topological order - No observer: pipeline runs fine without one Async orchestrator tests: - Linear pipeline: source + function hooks fire - Operator pipeline: all node types fire hooks, only function fires packet-level - No observer: pipeline runs fine without one Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
Contributor
There was a problem hiding this comment.
Pull request overview
Refactors pipeline execution to align sync/async orchestrators around slim node protocols where nodes own execution (including caching + observer hooks) and orchestrators act as topology schedulers over compiled node graphs.
Changes:
- Slim node protocol surface to
execute()/async_execute()with observer injection; removeAsyncExecutableProtocol. - Refactor
SyncPipelineOrchestratorandAsyncPipelineOrchestratorto run compilednx.DiGraphgraphs and (async) returnOrchestratorResultwith optional result materialization. - Update/expand tests to match new orchestrator/node APIs (observer injection, materialize_results, fan-out, terminal nodes, error propagation, parity).
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline/test_sync_orchestrator.py | Updates async parity invocation to compiled graph + adds observer/materialize_results coverage. |
| tests/test_pipeline/test_orchestrator.py | Reworks async orchestrator tests for run(graph) + adds fan-out/terminal/error/observer tests. |
| tests/test_pipeline/test_node_protocols.py | New tests asserting revised protocol shapes + node execute/async_execute observer behavior. |
| tests/test_channels/test_pipeline_async_integration.py | Updates async pipeline integration to compile/run/flush with graph-based orchestrator. |
| tests/test_channels/test_node_async_execute.py | Removes AsyncExecutableProtocol conformance tests; updates FunctionNode async signature usage and concurrency expectations. |
| tests/test_channels/test_copilot_review_issues.py | Updates concurrency-related assertions after concurrency limiting removal; updates async_execute signature usage. |
| tests/test_channels/test_async_execute.py | Removes AsyncExecutableProtocol conformance tests section. |
| superpowers/specs/2026-03-15-async-orchestrator-refactor-design.md | Adds design doc for PLT-922 refactor. |
| superpowers/plans/2026-03-15-async-orchestrator-refactor-plan.md | Adds detailed implementation plan/checklist for PLT-922. |
| src/orcapod/protocols/node_protocols.py | Updates node protocols to execute/async_execute with observer + tightened async signatures per node type. |
| src/orcapod/protocols/core_protocols/async_executable.py | Deletes AsyncExecutableProtocol. |
| src/orcapod/protocols/core_protocols/init.py | Removes AsyncExecutableProtocol export. |
| src/orcapod/pipeline/sync_orchestrator.py | Simplifies sync orchestrator to delegate to node.execute(observer=...). |
| src/orcapod/pipeline/graph.py | Updates Pipeline.run() async path to call orchestrator with _node_graph; removes _run_async(). |
| src/orcapod/pipeline/async_orchestrator.py | Refactors async orchestrator to operate on nx.DiGraph, TypeGuard dispatch, result materialization, observer forwarding. |
| src/orcapod/core/nodes/source_node.py | Adds SourceNode.execute(observer=...) + tightens async_execute signature with observer support. |
| src/orcapod/core/nodes/operator_node.py | Adds observer support to execute and async_execute; execute checks REPLAY cache first. |
| src/orcapod/core/nodes/function_node.py | Moves per-packet cache/observer logic into execute; tightens async_execute signature, removes pipeline_config-based concurrency limiting, adds observer hooks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Fix async FunctionNode.async_execute DB mode: drive output from input_channel instead of bulk-replaying Phase 1. Cached packets now fire observer hooks with cached=True, matching sync behavior. - Add predecessor-count validation for function nodes in async orchestrator - Remove unused AsyncMock import and duplicate pytest import in tests - Update test_sync_run_then_async_emits_from_cache to send input packets (output is now input-driven, not bulk-replayed) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactors both orchestrators to use slim node protocols where nodes own their execution and orchestrators are pure topology schedulers.
execute()+async_execute()with observer injection — removediter_packets,get_cached_results,execute_packet,compute_pipeline_entry_id,get_cached_outputfrom protocol surfaceAsyncExecutableProtocoldeleted entirelyexecute(),async_execute()tightened (no inputs param)execute()now owns per-packet cache logic + observer hooks,async_execute()tightened (single input_channel, no pipeline_config)execute()gains REPLAY cache check + observer,async_execute()gains observernode.execute()directlygraph: nx.DiGraph, returnsOrchestratorResult, TypeGuard dispatch,materialize_resultssupport via_CollectingWriter_run_async()removedDeferred to follow-up issues
prefer_asyncflag on FunctionNodeTest plan
cached=Truevia observermaterialize_results=True/Falsefor both orchestrators🤖 Generated with Claude Code