Closed
Conversation
…citly define function pod constructor parameters.
…a` and refine `ContentHash` methods.
Update class and protocol names across core modules and adjust imports. Rename extract_function_typespecs to extract_function_schemas. Add optional label, data_context and orcapod_config params to DynamicPodStream, enable future annotations in streams, and apply minor docstring and typing fixes.
Move ContentHash into types and remove the duplicate implementation from hashing_protocols. Replace PythonSchema/PythonSchemaLike with Schema/SchemaLike and update imports across the codebase. Rename the orcapod_object protocol to traceable and add minor protocol/doc edits. Use future annotations in datagrams and change StreamBase.identity_structure to return (None,) for non-sourced streams. Add unit tests for streams and semantic types and add basedpyright to project dependencies.
- Align API to use config instead of orcapod_config across core - Rename Operator base to OperatorPod and update subclasses - Update StaticOutputPod to import Config and DataContext types - Use ContentHash in object hasher type hints - Refactor packet function schema handling and defaults - Add version parsing and lazy output schema hash
Add a new content-based hashing subsystem: - Implement BaseSemanticHasher and versioned factories - Add TypeHandlerRegistry and BuiltinTypeHandlerRegistry with builtin handlers - Provide ContentIdentifiableMixin and handler registration helpers - Update contexts/defaults to expose semantic_hasher and type_handler_registry - Add comprehensive tests for hasher, handlers, registry, and mixin
- Add optional_fields support to Schema and enforce defaults in type checks - Export FunctionPod in core_protocols and adjust imports - Extend Pod.process to accept an optional label and propagate outputs - Update imports across modules to reference Schema - Add tests for FunctionPod conformance and optional-field handling
Raise on variadic parameters in PythonPacketFunction since it maps packet keys to fixed named inputs. This uses inspect.signature to detect VAR_POSITIONAL and VAR_KEYWORD and raises ValueError with the offending parameters list. Tests cover: *args, **kwargs, a mixed variadic signature, and acceptance of default values on fixed parameters.
Export DeltaTableDatabase from the databases package and remove legacy backends from the public API Delete legacy Delta Lake/Arrow data stores and related test scaffolding Update databases/__init__.py to expose only DeltaTableDatabase Fix type annotation: CallableWithPod.pod now returns TrackedPacketFunctionPod Validate record_path in DeltaTableDatabase.add_records to reject empty or too-deep paths Add tests for DeltaTableDatabase protocol conformance and basic round‑trip operations
- Propagate packet_function to the WrappedFunctionPod base - Normalize IDs to strings in InMemoryArrowDatabase - Add extended tests for function_pod coverage
Introduce parse_function_outputs(output_keys, values) to map raw function outputs to keyed dicts. Update PythonPacketFunction to call with the output keys and adjust tests to use the new signature, removing the helper mock
Add guidance for DESIGN_ISSUES.md to CLAUDE.md and .zed/rules to standardize issue tracking and workflow.
Switch output_keys to Sequence[str] instead of Collection[str]. Add a __call__ signature to CallableWithPod to document invocation. Remove Collection from imports where relevant.
Guard dropping the CONTEXT_KEY column in FunctionPodStream.as_table to avoid KeyError on empty streams. Update DESIGN_ISSUES.md with the F9 fix and safeguards. Add tests for function pod chaining and inactive-pod behavior to ensure correct propagation of packets and tags.
Add DB-backed get_all_records support for FunctionPodNode and tests covering empty and populated DB states, plus column-config options (meta, source, system_tags, all_info) and DB-cached iter_packets behavior.
- Add clear_cache on FunctionPodStream to reset in-memory caches and refresh state - Call clear_cache when stream is stale in iter_packets to re-fetch input - Fix TemporalMixin: do not store _modified_time in __init__; refresh via _update_modified_time - Import timezone in static_output_pod for datetime compatibility
- Introduce RootSource as the base class for all sources, replacing SourceBase - Update all sources to subclass RootSource and align protocol conformance - Add legacy adapters under sources_legacy and wire bridging imports - Extend SourceRegistry with safer register/unregister semantics and logging - Add FieldNotResolvableError to inform field-resolution failures - Implement New CSVSource, DataFrameSource, DictSource, ListSource backed by RootSource - Ensure provenance tokens and output_schema shape remain compatible
Update core protocol definitions, type hints, imports and docstrings to use the '*Protocol' naming convention. Adjust usages across core modules, datagrams, hashing, databases, streams, operators, sources, and tests. Add comprehensive source tests for the updated protocols.
- Introduce PipelineElementProtocol and DerivedSource to model pipeline elements and static sources, enabling shared pipeline DB tables - Rename object_hasher to semantic_hasher across the hashing API; update defaults, references, and docs - Add SourceProtocol and migrate core protocols to reflect pipeline-element semantics - Implement DerivedSource as a root Stream exposing DB-computed results and delegating output_schema and keys - Add tests for DerivedSource and pipeline hashing integration
… the caller execute() and execute_packet() are internal methods for orchestrators. The caller guarantees input identity — no validation is performed. This removes _validate_input_schema, _validate_stream_schema, _validate_input_schemas and all associated tests. Also removes xfail markers on DerivedSource tests (no longer needed since validation was the cause of those failures). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n repeated runs - _process_packet_internal now clears _cached_output_table and _cached_content_hash_column when caching new entries (prevents stale as_table() results) - get_cached_results clears _cached_output_packets before repopulating (prevents duplicate rows on repeated orchestrator runs) - Fixed docstrings: removed stale "after validation" references, updated orchestrator docstring about source node caching Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Design for refactoring AsyncPipelineOrchestrator to use node protocols, slim execute/async_execute interface, observer injection, and tightened per-type async signatures. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove pipeline_config from async_execute signature entirely - Add iter_packets removal from SourceNodeProtocol - Clarify SourceNode.execute populates _cached_results - Clarify OperatorNode.execute calls get_cached_output first - Detail FunctionNode.execute internal per-packet logic - Add buffer_size forwarding in Pipeline.run default async path - Add notes on terminal channel draining and existing run() methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…-922) 14-task plan covering: protocol slimming, node execute/async_execute with observer injection, sync/async orchestrator simplification, Pipeline.run() update, and comprehensive test coverage. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ith observer (PLT-922) Replace old fine-grained protocol methods (iter_packets, get_cached_results, compute_pipeline_entry_id, execute_packet, get_cached_output) with uniform execute/async_execute signatures that accept an observer param. Add tests verifying the new shapes and that old-only implementations no longer satisfy the protocols. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…T-922) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ignature (PLT-922) Update all test files that pass a list to FunctionNode.async_execute to pass a single reader instead. Update concurrency tests to reflect sequential execution (concurrency deferred to PLT-930). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…acket logic (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…chestratorResult (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ve _run_async (PLT-922) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix SyncPipelineOrchestrator.run() to return empty node_outputs when materialize_results=False (terminal nodes were previously left in the buffer). Add TestMaterializeResults class covering sync/async variants. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n tests (PLT-922) Add TestAsyncOrchestratorFanOut, TestAsyncOrchestratorTerminalNode, and TestAsyncOrchestratorErrorPropagation test classes. Fan-out test uses distinct functions (double vs triple) to ensure two separate compiled FunctionNodes are created, since the pipeline deduplicates identical nodes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tor ordering (PLT-922) - Replace `observer: Any` with `observer: ExecutionObserver | None` in all node execute/async_execute methods - Add __getattr__ to _CollectingWriter for delegating unknown attributes to the real writer - Move terminal channel drain into TaskGroup to prevent deadlock - Sort operator input readers by node.upstreams order for non-commutative operators - Add TODO(PLT-930) for deferred concurrency limiting in FunctionNode.async_execute Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922) Sync orchestrator tests: - Operator pipeline: all three node types fire hooks - Cached flag: second run reports cached=True - Diamond DAG: events follow topological order - No observer: pipeline runs fine without one Async orchestrator tests: - Linear pipeline: source + function hooks fire - Operator pipeline: all node types fire hooks, only function fires packet-level - No observer: pipeline runs fine without one Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix async FunctionNode.async_execute DB mode: drive output from input_channel instead of bulk-replaying Phase 1. Cached packets now fire observer hooks with cached=True, matching sync behavior. - Add predecessor-count validation for function nodes in async orchestrator - Remove unused AsyncMock import and duplicate pytest import in tests - Update test_sync_run_then_async_emits_from_cache to send input packets (output is now input-driven, not bulk-replayed) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Welcome to Codecov 🎉Once you merge this PR into your default branch, you're all set! Codecov will compare coverage reports and display results in all future pull requests. ℹ️ You can also turn on project coverage checks and project coverage reporting on Pull Request comment Thanks for integrating Codecov - We've got you covered ☂️ |
Collaborator
Author
|
cannot change target branch in UI |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Logging Infrastructure Overview
What it captures
Every time a packet function executes, the infrastructure captures stdout, stderr, Python logging output, and
tracebacks (on failure). These are bundled into a CapturedLogs dataclass.
How capture works
There are two capture mechanisms depending on where the function runs:
Local execution (LocalExecutor / no executor): LocalCaptureContext sets per-task/thread ContextVar buffers.
sys.stdout/sys.stderr are globally replaced (once, at observer creation) with ContextLocalTeeStream objects
that write to both the terminal and the active buffer. A ContextVarLoggingHandler on the root logger captures
Python logging the same way. Concurrent packets never intermingle because each asyncio task/thread gets its own
ContextVar copy.
Ray execution (RayExecutor): _make_capture_wrapper() returns a self-contained closure (no orcapod imports) that
does fd-level capture (os.dup2) on the worker process. It returns a plain 6-tuple over the Ray object store;
the driver reconstructs CapturedLogs from it.
How CapturedLogs flows through the call chain
User function
↓ catches exception, builds CapturedLogs
PythonPacketFunction.direct_call() → (PacketProtocol | None, CapturedLogs)
↓ or via executor.execute_callable() which also returns (raw, CapturedLogs)
PythonPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓
CachedPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓ (cache hit returns empty CapturedLogs)
_FunctionPodBase.process_packet_with_capture() → (Tag, Packet | None, CapturedLogs)
↓
FunctionNode._process_packet_internal() → (Tag, Packet | None, CapturedLogs)
↓
FunctionNode.execute() — passes CapturedLogs to pkt_logger.record()
The key design decision: CapturedLogs travels as a return value, not through a ContextVar side-channel.
direct_call() catches user-function exceptions internally and returns (None, captured_failure) — it never
re-raises.
Observer/Logger interaction
The orchestrator injects an ExecutionObserverProtocol into each node's execute() call. The observer provides
lifecycle hooks (on_run_start, on_node_start, on_packet_start, etc.) and a factory method:
observer.create_packet_logger(node, tag, packet, pipeline_path=...) → PacketExecutionLoggerProtocol
FunctionNode.execute() calls this factory before each non-cached packet, then after execution:
tag_out, result, captured = self._process_packet_internal(tag, packet)
pkt_logger.record(captured) # writes to database
Concrete implementations
(InMemoryArrowDatabase, DeltaTableDatabase, etc.).
Log storage structure
LoggingObserver stores logs at pipeline-path-mirrored locations. If a function node's pipeline path is
("my_pipeline", "transform_a", "v0", ...), its logs go to ("my_pipeline", "logs", "transform_a", "v0", ...).
Each function node gets its own table. Retrieve with obs.get_logs(pipeline_path=node.pipeline_path).
Each log row has fixed columns (log_id, run_id, node_label, stdout, stderr, python_logs, traceback, success,
timestamp) plus dynamic tag columns — each tag key becomes its own queryable column rather than being
JSON-encoded.
Changes to orcapod-python
Core change: call(), direct_call(), async_call(), direct_async_call() on packet functions now return
tuple[PacketProtocol | None, CapturedLogs] instead of PacketProtocol | None. Similarly
executor.execute()/async_execute() return tuples. direct_call() catches user-function exceptions internally and
returns (None, captured_failure) instead of re-raising.
execute_callable()
PacketFunctionWrapper return tuples
unchanged (still 2-tuple, discards captured)
more _captured_logs ContextVar reads. Simplified execute logic (no try/except needed)
streams)
~20 test files updated to unpack the new tuple return types, and RecordingObserver classes updated to accept
pipeline_path kwarg.