Skip to content

Claude/logging#88

Closed
brian-arnold wants to merge 261 commits intonauticalab:mainfrom
brian-arnold:claude/logging
Closed

Claude/logging#88
brian-arnold wants to merge 261 commits intonauticalab:mainfrom
brian-arnold:claude/logging

Conversation

@brian-arnold
Copy link
Collaborator

Logging Infrastructure Overview

What it captures

Every time a packet function executes, the infrastructure captures stdout, stderr, Python logging output, and
tracebacks (on failure). These are bundled into a CapturedLogs dataclass.

How capture works

There are two capture mechanisms depending on where the function runs:

Local execution (LocalExecutor / no executor): LocalCaptureContext sets per-task/thread ContextVar buffers.
sys.stdout/sys.stderr are globally replaced (once, at observer creation) with ContextLocalTeeStream objects
that write to both the terminal and the active buffer. A ContextVarLoggingHandler on the root logger captures
Python logging the same way. Concurrent packets never intermingle because each asyncio task/thread gets its own
ContextVar copy.

Ray execution (RayExecutor): _make_capture_wrapper() returns a self-contained closure (no orcapod imports) that
does fd-level capture (os.dup2) on the worker process. It returns a plain 6-tuple over the Ray object store;
the driver reconstructs CapturedLogs from it.

How CapturedLogs flows through the call chain

User function
↓ catches exception, builds CapturedLogs
PythonPacketFunction.direct_call() → (PacketProtocol | None, CapturedLogs)
↓ or via executor.execute_callable() which also returns (raw, CapturedLogs)
PythonPacketFunction.call() → (PacketProtocol | None, CapturedLogs)

CachedPacketFunction.call() → (PacketProtocol | None, CapturedLogs)
↓ (cache hit returns empty CapturedLogs)
_FunctionPodBase.process_packet_with_capture() → (Tag, Packet | None, CapturedLogs)

FunctionNode._process_packet_internal() → (Tag, Packet | None, CapturedLogs)

FunctionNode.execute() — passes CapturedLogs to pkt_logger.record()

The key design decision: CapturedLogs travels as a return value, not through a ContextVar side-channel.
direct_call() catches user-function exceptions internally and returns (None, captured_failure) — it never
re-raises.

Observer/Logger interaction

The orchestrator injects an ExecutionObserverProtocol into each node's execute() call. The observer provides
lifecycle hooks (on_run_start, on_node_start, on_packet_start, etc.) and a factory method:

observer.create_packet_logger(node, tag, packet, pipeline_path=...) → PacketExecutionLoggerProtocol

FunctionNode.execute() calls this factory before each non-cached packet, then after execution:
tag_out, result, captured = self._process_packet_internal(tag, packet)
pkt_logger.record(captured) # writes to database

Concrete implementations

  • NoOpObserver / NoOpLogger — Default when no observability is configured. Zero-cost no-ops.
  • LoggingObserver / PacketLogger — Writes structured log rows to any ArrowDatabaseProtocol
    (InMemoryArrowDatabase, DeltaTableDatabase, etc.).

Log storage structure

LoggingObserver stores logs at pipeline-path-mirrored locations. If a function node's pipeline path is
("my_pipeline", "transform_a", "v0", ...), its logs go to ("my_pipeline", "logs", "transform_a", "v0", ...).
Each function node gets its own table. Retrieve with obs.get_logs(pipeline_path=node.pipeline_path).

Each log row has fixed columns (log_id, run_id, node_label, stdout, stderr, python_logs, traceback, success,
timestamp) plus dynamic tag columns — each tag key becomes its own queryable column rather than being
JSON-encoded.

Changes to orcapod-python

  1. CapturedLogs as return values (removed ContextVar side-channel)

Core change: call(), direct_call(), async_call(), direct_async_call() on packet functions now return
tuple[PacketProtocol | None, CapturedLogs] instead of PacketProtocol | None. Similarly
executor.execute()/async_execute() return tuples. direct_call() catches user-function exceptions internally and
returns (None, captured_failure) instead of re-raising.

  • Protocols — packet_function.py, executor.py, node_protocols.py updated with new return types
  • Executors — base.py, local.py, ray.py pass through CapturedLogs; Ray's execute() now delegates to
    execute_callable()
  • packet_function.py — All _captured_logs.set() calls removed; PythonPacketFunction, CachedPacketFunction,
    PacketFunctionWrapper return tuples
  • function_pod.py — Added process_packet_with_capture() (returns 3-tuple with CapturedLogs); process_packet()
    unchanged (still 2-tuple, discards captured)
  • cached_function_pod.py — Added process_packet_with_capture() preserving pod-level caching
  • function_node.py — Uses process_packet_with_capture() to get CapturedLogs directly from the return value. No
    more _captured_logs ContextVar reads. Simplified execute logic (no try/except needed)
  1. Removed duplicate _ray_capture_wrapper
  • Deleted the module-level function from logging_capture.py (unused imports cleaned up too)
  • RayExecutor._make_capture_wrapper() is the single capture mechanism for Ray
  1. Pipeline-path-mirrored log storage with queryable tag columns
  • create_packet_logger() accepts pipeline_path kwarg on protocol, NoOpObserver, and LoggingObserver
  • Logs stored at pipeline_path[:1] + ("logs",) + pipeline_path[1:] — each function node gets its own log table
  • Tag data stored as individual columns (not a single JSON "tags" column)
  • get_logs(pipeline_path=...) retrieves node-specific logs
  1. Error handling modes
  • FunctionNode.execute() accepts error_policy: "continue" | "fail_fast"
  • SyncPipelineOrchestrator and AsyncPipelineOrchestrator accept error_policy and pass it through
  1. New files
  • src/orcapod/pipeline/logging_capture.py — Capture infrastructure (CapturedLogs, LocalCaptureContext, tee
    streams)
  • src/orcapod/pipeline/logging_observer.py — LoggingObserver + PacketLogger
  • src/orcapod/protocols/observability_protocols.py — Observer/Logger protocols
  • tests/test_pipeline/test_logging_capture.py — Unit tests for capture infrastructure
  • tests/test_pipeline/test_logging_observer_integration.py — 9 end-to-end integration tests
  1. Test updates

~20 test files updated to unpack the new tuple return types, and RecordingObserver classes updated to accept
pipeline_path kwarg.

…citly define function pod constructor parameters.
Update class and protocol names across core modules and adjust imports.
Rename extract_function_typespecs to extract_function_schemas. Add
optional
label, data_context and orcapod_config params to DynamicPodStream,
enable
future annotations in streams, and apply minor docstring and typing
fixes.
Move ContentHash into types and remove the duplicate implementation
from hashing_protocols. Replace PythonSchema/PythonSchemaLike with
Schema/SchemaLike and update imports across the codebase. Rename the
orcapod_object protocol to traceable and add minor protocol/doc edits.
Use future annotations in datagrams and change
StreamBase.identity_structure
to return (None,) for non-sourced streams. Add unit tests for streams
and
semantic types and add basedpyright to project dependencies.
- Align API to use config instead of orcapod_config across core
- Rename Operator base to OperatorPod and update subclasses
- Update StaticOutputPod to import Config and DataContext types
- Use ContentHash in object hasher type hints
- Refactor packet function schema handling and defaults
- Add version parsing and lazy output schema hash
Add a new content-based hashing subsystem:
- Implement BaseSemanticHasher and versioned factories
- Add TypeHandlerRegistry and BuiltinTypeHandlerRegistry with builtin
  handlers
- Provide ContentIdentifiableMixin and handler registration helpers
- Update contexts/defaults to expose semantic_hasher and
  type_handler_registry
- Add comprehensive tests for hasher, handlers, registry, and mixin
- Add optional_fields support to Schema and enforce defaults in type
  checks
- Export FunctionPod in core_protocols and adjust imports
- Extend Pod.process to accept an optional label and propagate outputs
- Update imports across modules to reference Schema
- Add tests for FunctionPod conformance and optional-field handling
Raise on variadic parameters in PythonPacketFunction since it maps
packet keys to fixed named inputs. This uses inspect.signature to
detect VAR_POSITIONAL and VAR_KEYWORD and raises ValueError with the
offending parameters list.

Tests cover: *args, **kwargs, a mixed variadic signature, and
acceptance of default values on fixed parameters.
Export DeltaTableDatabase from the databases package and remove legacy
backends
from the public API

Delete legacy Delta Lake/Arrow data stores and related test scaffolding

Update databases/__init__.py to expose only DeltaTableDatabase

Fix type annotation: CallableWithPod.pod now returns
TrackedPacketFunctionPod

Validate record_path in DeltaTableDatabase.add_records to reject empty
or too-deep paths

Add tests for DeltaTableDatabase protocol conformance and basic
round‑trip operations
- Propagate packet_function to the WrappedFunctionPod base
- Normalize IDs to strings in InMemoryArrowDatabase
- Add extended tests for function_pod coverage
Introduce parse_function_outputs(output_keys, values) to map raw
function outputs to keyed dicts.
Update PythonPacketFunction to call with the output keys and
adjust tests to use the new signature, removing the helper mock
Add guidance for DESIGN_ISSUES.md to CLAUDE.md and .zed/rules
to standardize issue tracking and workflow.
Switch output_keys to Sequence[str] instead of Collection[str].
Add a __call__ signature to CallableWithPod to document invocation.
Remove Collection from imports where relevant.
Guard dropping the CONTEXT_KEY column in FunctionPodStream.as_table
to avoid KeyError on empty streams.
Update DESIGN_ISSUES.md with the F9 fix and safeguards.
Add tests for function pod chaining and inactive-pod behavior to
ensure correct propagation of packets and tags.
Add DB-backed get_all_records support for FunctionPodNode
and tests covering empty and populated DB states, plus
column-config options (meta, source, system_tags, all_info)
and DB-cached iter_packets behavior.
- Add clear_cache on FunctionPodStream to reset in-memory caches
  and refresh state
- Call clear_cache when stream is stale in iter_packets
  to re-fetch input
- Fix TemporalMixin: do not store _modified_time in __init__; refresh
  via _update_modified_time
- Import timezone in static_output_pod for datetime compatibility
- Introduce RootSource as the base class for all sources, replacing
  SourceBase
- Update all sources to subclass RootSource and align protocol
  conformance
- Add legacy adapters under sources_legacy and wire bridging imports
- Extend SourceRegistry with safer register/unregister semantics and
  logging
- Add FieldNotResolvableError to inform field-resolution failures
- Implement New CSVSource, DataFrameSource, DictSource, ListSource
  backed by RootSource
- Ensure provenance tokens and output_schema shape remain compatible
Update core protocol definitions, type hints, imports and docstrings to
use
the '*Protocol' naming convention. Adjust usages across core modules,
datagrams, hashing, databases, streams, operators, sources, and tests.
Add comprehensive source tests for the updated protocols.
- Introduce PipelineElementProtocol and DerivedSource to model
  pipeline elements and static sources, enabling shared pipeline DB
  tables
- Rename object_hasher to semantic_hasher across the hashing API; update
  defaults, references, and docs
- Add SourceProtocol and migrate core protocols to reflect
  pipeline-element semantics
- Implement DerivedSource as a root Stream exposing DB-computed results
  and delegating output_schema and keys
- Add tests for DerivedSource and pipeline hashing integration
eywalker and others added 26 commits March 14, 2026 19:22
… the caller

execute() and execute_packet() are internal methods for orchestrators.
The caller guarantees input identity — no validation is performed.
This removes _validate_input_schema, _validate_stream_schema,
_validate_input_schemas and all associated tests.

Also removes xfail markers on DerivedSource tests (no longer needed
since validation was the cause of those failures).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n repeated runs

- _process_packet_internal now clears _cached_output_table and
  _cached_content_hash_column when caching new entries (prevents
  stale as_table() results)
- get_cached_results clears _cached_output_packets before repopulating
  (prevents duplicate rows on repeated orchestrator runs)
- Fixed docstrings: removed stale "after validation" references,
  updated orchestrator docstring about source node caching

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Design for refactoring AsyncPipelineOrchestrator to use node protocols,
slim execute/async_execute interface, observer injection, and tightened
per-type async signatures.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove pipeline_config from async_execute signature entirely
- Add iter_packets removal from SourceNodeProtocol
- Clarify SourceNode.execute populates _cached_results
- Clarify OperatorNode.execute calls get_cached_output first
- Detail FunctionNode.execute internal per-packet logic
- Add buffer_size forwarding in Pipeline.run default async path
- Add notes on terminal channel draining and existing run() methods

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…-922)

14-task plan covering: protocol slimming, node execute/async_execute
with observer injection, sync/async orchestrator simplification,
Pipeline.run() update, and comprehensive test coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ith observer (PLT-922)

Replace old fine-grained protocol methods (iter_packets, get_cached_results,
compute_pipeline_entry_id, execute_packet, get_cached_output) with uniform
execute/async_execute signatures that accept an observer param. Add tests
verifying the new shapes and that old-only implementations no longer satisfy
the protocols.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…T-922)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ignature (PLT-922)

Update all test files that pass a list to FunctionNode.async_execute to
pass a single reader instead. Update concurrency tests to reflect
sequential execution (concurrency deferred to PLT-930).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…acket logic (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…chestratorResult (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ve _run_async (PLT-922)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fix SyncPipelineOrchestrator.run() to return empty node_outputs when
materialize_results=False (terminal nodes were previously left in the
buffer). Add TestMaterializeResults class covering sync/async variants.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n tests (PLT-922)

Add TestAsyncOrchestratorFanOut, TestAsyncOrchestratorTerminalNode, and
TestAsyncOrchestratorErrorPropagation test classes. Fan-out test uses
distinct functions (double vs triple) to ensure two separate compiled
FunctionNodes are created, since the pipeline deduplicates identical nodes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tor ordering (PLT-922)

- Replace `observer: Any` with `observer: ExecutionObserver | None` in all node execute/async_execute methods
- Add __getattr__ to _CollectingWriter for delegating unknown attributes to the real writer
- Move terminal channel drain into TaskGroup to prevent deadlock
- Sort operator input readers by node.upstreams order for non-commutative operators
- Add TODO(PLT-930) for deferred concurrency limiting in FunctionNode.async_execute

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…PLT-922)

Sync orchestrator tests:
- Operator pipeline: all three node types fire hooks
- Cached flag: second run reports cached=True
- Diamond DAG: events follow topological order
- No observer: pipeline runs fine without one

Async orchestrator tests:
- Linear pipeline: source + function hooks fire
- Operator pipeline: all node types fire hooks, only function fires packet-level
- No observer: pipeline runs fine without one

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix async FunctionNode.async_execute DB mode: drive output from
  input_channel instead of bulk-replaying Phase 1. Cached packets now
  fire observer hooks with cached=True, matching sync behavior.
- Add predecessor-count validation for function nodes in async orchestrator
- Remove unused AsyncMock import and duplicate pytest import in tests
- Update test_sync_run_then_async_emits_from_cache to send input packets
  (output is now input-driven, not bulk-replayed)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@codecov
Copy link

codecov bot commented Mar 17, 2026

Welcome to Codecov 🎉

Once you merge this PR into your default branch, you're all set! Codecov will compare coverage reports and display results in all future pull requests.

ℹ️ You can also turn on project coverage checks and project coverage reporting on Pull Request comment

Thanks for integrating Codecov - We've got you covered ☂️

@brian-arnold brian-arnold marked this pull request as ready for review March 17, 2026 18:49
@brian-arnold
Copy link
Collaborator Author

cannot change target branch in UI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants