Skip to content

feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#3046

Open
sumedhsakdeo wants to merge 32 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036
Open

feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads#3046
sumedhsakdeo wants to merge 32 commits intoapache:mainfrom
sumedhsakdeo:fix/arrow-scan-benchmark-3036

Conversation

@sumedhsakdeo
Copy link

@sumedhsakdeo sumedhsakdeo commented Feb 15, 2026

Summary

Addresses #3036 — ArrowScan.to_record_batches() uses executor.map + list() which eagerly materializes all record batches per file into memory, causing OOM on large tables.

This PR adds a new order parameter to to_arrow_batch_reader() with two implementations:

  • TaskOrder (default) — preserves existing behavior: batches grouped by file in task submission order, each file fully materialized before proceeding to the next.
  • ArrivalOrder — yields batches as they are produced across files without materializing entire files into memory. Accepts three sub-parameters:
    • concurrent_streams: int — number of files to read concurrently (default: 8). A per-scan ThreadPoolExecutor(max_workers=concurrent_streams) bounds concurrency.
    • batch_size: int | None — number of rows per batch passed to PyArrow's ds.Scanner (default: PyArrow's built-in 131,072).
    • max_buffered_batches: int — size of the bounded queue between producers and consumer (default: 16), providing backpressure to cap memory usage.

Problem

The current implementation materializes all batches from each file via list() inside executor.map, which runs up to min(32, cpu_count+4) files in parallel. For large files this means all batches from ~20 files are held in memory simultaneously before any are yielded to the consumer.

Solution

Before: OOM on large tables

batches = table.scan().to_arrow_batch_reader()

After: bounded memory, tunable parallelism

from pyiceberg.table import ArrivalOrder

batches = table.scan().to_arrow_batch_reader(
    order=ArrivalOrder(concurrent_streams=4, batch_size=10000),
)

Default behavior is unchanged — TaskOrder preserves the existing executor.map + list() path for backwards compatibility.

Architecture

When order=ArrivalOrder(...), batches flow through _bounded_concurrent_batches:

  1. All file tasks are submitted to a per-scan ThreadPoolExecutor(max_workers=concurrent_streams)
  2. Workers push batches into a bounded Queue(maxsize=max_buffered_batches) — when full, workers block (backpressure)
  3. The consumer yields batches from the queue via blocking queue.get()
  4. A sentinel value signals completion — no timeout-based polling
  5. On early termination (consumer stops), a cancel event is set and the queue is drained until the sentinel to unblock all stuck workers
  6. The executor context manager handles deterministic shutdown

Refactored to_record_batches into helpers: _prepare_tasks_and_deletes, _iter_batches_arrival, _iter_batches_materialized, _apply_limit.

Ordering semantics

Configuration File ordering Within-file ordering
TaskOrder() (default) Batches grouped by file, in task submission order Row order
ArrivalOrder(concurrent_streams=1) Grouped by file, sequential Row order
ArrivalOrder(concurrent_streams>1) Interleaved (no grouping guarantee) Row order within each file

PR Stack

Breakdown of this large PR into smaller PRs:

  1. PR 0: batch_size forwarding
  2. PR 1: TaskOrder/ArrivalOrder enum — stop materializing entire files
  3. PR 2: concurrent_streams — bounded concurrent reads in arrival order
  4. PR 3: benchmark + docs guidance

Benchmark results

32 files × 500K rows, 5 columns (int64, float64, string, bool, timestamp), batch_size=131,072 (PyArrow default):

Config Throughput (rows/s) TTFR (ms) Peak Arrow Memory
default (TaskOrder) 190,250,192 73.4 642.2 MB
ArrivalOrder(cs=1) 59,317,085 27.7 10.3 MB
ArrivalOrder(cs=2) 105,414,909 28.8 42.0 MB
ArrivalOrder(cs=4) 175,840,782 28.4 105.5 MB
ArrivalOrder(cs=8) 211,922,538 32.3 271.7 MB
ArrivalOrder(cs=16) 209,011,424 45.0 473.3 MB

TTFR = Time to First Record, cs = concurrent_streams

Note on throughput plateau at cs=8: This benchmark runs against local filesystem where Parquet reads are CPU-bound (decompression + decoding). Throughput plateaus once enough threads saturate available cores. On cloud storage (S3/GCS/ADLS), reads are I/O-bound with 50-200ms per-file latency, so higher concurrent_streams values (16-64+) would continue to show throughput gains until network bandwidth saturates. The optimal concurrent_streams will be higher for remote storage than what this local benchmark suggests.

Positional deletes, row filters, and limit are handled correctly in all modes.

Are these changes tested?

Yes. 25 new unit tests across two test files, plus a micro-benchmark:

  • tests/io/test_pyarrow.py (16 tests): batch_size controls rows per batch, arrival order yields all rows correctly, arrival order respects limit, within-file ordering preserved, positional deletes applied correctly in all three modes (task order, arrival order, concurrent), positional deletes with limit, concurrent_streams < 1 raises ValueError
  • tests/io/test_bounded_concurrent_batches.py (10 tests): single/multi-file correctness, incremental streaming, backpressure blocks producers when queue is full, error propagation from workers to consumer, early termination cancels workers cleanly, no deadlock when concurrent_streams > max_buffered_batches on early termination, concurrency limit enforced, empty task list, ArrowScan integration with limit
  • tests/benchmark/test_read_benchmark.py: read throughput micro-benchmark across 6 configurations measuring rows/sec, TTFR, and peak Arrow memory

Are there any user-facing changes?

Yes. New order parameter on DataScan.to_arrow_batch_reader():

  • order: ScanOrder | None — controls batch ordering. Pass TaskOrder() (default) or ArrivalOrder(concurrent_streams=N, batch_size=B, max_buffered_batches=M).

New public classes TaskOrder and ArrivalOrder (subclasses of ScanOrder) exported from pyiceberg.table.

All parameters are optional with backwards-compatible defaults. Existing code is unaffected.

Documentation updated in mkdocs/docs/api.md with usage examples, ordering semantics, and configuration guidance table.

@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch 3 times, most recently from ab8c31b to 7ad9910 Compare February 15, 2026 01:47
Add batch_size parameter to _task_to_record_batches,
_record_batches_from_scan_tasks_and_deletes, ArrowScan.to_record_batches,
and DataScan.to_arrow_batch_reader so users can control the number of
rows per RecordBatch returned by PyArrow's Scanner.

Closes partially apache#3036

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 7ad9910 to c86f0be Compare February 15, 2026 02:10
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from c86f0be to 05e07d1 Compare February 15, 2026 02:27
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch 2 times, most recently from 65a5007 to 1da7eb6 Compare February 17, 2026 03:39
sumedhsakdeo and others added 13 commits February 16, 2026 21:00
Introduce ScanOrder.TASK (default) and ScanOrder.ARRIVAL to control
batch ordering. TASK materializes each file before yielding; ARRIVAL
yields batches as produced for lower memory usage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add _bounded_concurrent_batches() with proper lock discipline:
- Queue backpressure caps memory (scan.max-buffered-batches, default 16)
- Semaphore limits concurrent file reads (concurrent_files param)
- Cancel event with timeouts on all blocking ops (no lock over IO)
- Error propagation and early termination support

When streaming=True and concurrent_files > 1, batches are yielded
as they arrive from parallel file reads. File ordering is not
guaranteed (documented).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace shared ExecutorFactory + Semaphore with per-scan
ThreadPoolExecutor(max_workers=concurrent_files) for deterministic
shutdown and simpler concurrency control.

Refactor to_record_batches into helpers:
- _prepare_tasks_and_deletes: resolve delete files
- _iter_batches_streaming: bounded concurrent streaming path
- _iter_batches_materialized: executor.map materialization path
- _apply_limit: unified row limit logic (was duplicated)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tests and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Setting `mock.call_count = 0` does not actually reset the mock's
internal call tracking, causing the second assertion to see accumulated
calls from both test phases. Use `reset_mock()` instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a parametrized benchmark case for default (executor.map) with
max_workers=4 to compare memory/throughput against unbounded threading.
Add TTFR (time to first record) measurement across all configurations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a "which config should I use?" tip box with recommended starting
points for common use cases, and clarify that batch_size is an advanced
tuning knob.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove @pytest.mark.benchmark so the read throughput tests are included
in the default `make test` filter as parametrize-marked tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…and docs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from 1da7eb6 to afb244c Compare February 17, 2026 05:07
sumedhsakdeo and others added 3 commits February 18, 2026 12:34
Replace ScanOrder.TASK/ARRIVAL with TaskOrder()/ArrivalOrder() instances.
Update concurrent_files → concurrent_streams parameter usage.

All existing test scenarios preserved with new type-safe API.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Restructure parameterized benchmark tests to use ScanOrder class instances:
- TaskOrder() for default behavior
- ArrivalOrder(concurrent_streams=N) for streaming configurations

Simplifies test parameters by eliminating separate concurrent_files argument.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Replace ScanOrder enum examples with new class-based API:
- TaskOrder() for default behavior
- ArrivalOrder(concurrent_streams=N) for streaming
- ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) for memory control

Add configuration guidance table and update ordering semantics.
Rename concurrent_files → concurrent_streams throughout examples.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
sumedhsakdeo and others added 10 commits February 18, 2026 17:29
- Remove ABC inheritance from ScanOrder since no abstract methods are defined
- Remove unused enum.Enum import
- Fix B008 error by moving TaskOrder() call from function default to inside function
- Clean up dataclass formatting

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Break long line in _iter_batches_arrival call for better readability
- Fix B008 error by moving TaskOrder() call from function default to inside function
- Sort imports alphabetically

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update all test function calls to use concurrent_streams parameter
- Fix parameter name mismatch with _bounded_concurrent_batches function signature
- Update variable names and comments to match new parameter name

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Sort imports alphabetically as required by ruff formatting
- No functional changes

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Add batch_size parameter to ArrivalOrder class with comprehensive documentation
- Include memory formula: Peak memory ≈ concurrent_streams × batch_size × max_buffered_batches × (average row size)
- Update default concurrent_streams from 1 to 8 for better performance out-of-the-box
- Remove batch_size parameter from to_arrow_batch_reader() and to_record_batches() methods
- Simplify API by putting batch_size where it has direct memory impact (streaming orders)
- TaskOrder uses PyArrow defaults, ArrivalOrder provides full memory control

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update benchmark tests to use simplified parameter structure
- Remove separate batch_size parameter from test calls
- Fix concurrent_streams validation error message in unit tests
- Maintain all existing test coverage and functionality

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Update all examples to use new ArrivalOrder(batch_size=X) syntax
- Add comprehensive memory formula with row size calculation
- Remove backward compatibility references (batch_size is new in this PR)
- Include performance characteristics and use case recommendations
- Provide clear guidance on TaskOrder vs ArrivalOrder memory behavior

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Import TaskOrder from pyiceberg.table in pyarrow.py
- Change to_record_batches signature to use TaskOrder() as default
  instead of None, ensuring consistent default scan ordering behavior

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace TaskOrder() function call in argument default with a
module-level singleton _DEFAULT_SCAN_ORDER to satisfy ruff B008
(no function calls in argument defaults).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When concurrent_files > 1 and max_buffered_batches is small, multiple
workers can be blocked on batch_queue.put() at the moment the consumer
closes early (e.g. due to a limit). The previous drain loop used
get_nowait() + empty() which had a race: empty() could return True
before a just-notified worker had a chance to put, leaving remaining
workers stuck on put() forever while executor.shutdown(wait=True) hung.

Fix: replace the racy drain loop with a blocking drain-until-sentinel
loop. Each get() naturally wakes one blocked worker via not_full.notify();
that worker checks cancel and returns, eventually allowing the last worker
to put the sentinel. Stopping only on the sentinel guarantees all workers
have finished before we exit.

Also move batch_queue.put(_QUEUE_SENTINEL) outside remaining_lock to
avoid holding a lock during a potentially blocking call.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo changed the title feat: Add ScanOrder and concurrent_files to ArrowScan for bounded-memory reads feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads Feb 20, 2026
Copy link

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, just a few comments

@kevinjqliu
Copy link
Contributor

Thanks for the PR! This is touching a lot of different areas. I wonder if theres a simpler solution to resolve #3036. It seems like we just need to stop calling list() on iterators.

Let me take a closer look at the problem

- Add __post_init__ to ArrivalOrder raising ValueError if
  concurrent_streams < 1 or max_buffered_batches < 1. Previously
  max_buffered_batches=0 would silently create an unbounded queue.
- Split the ArrivalOrder row in the ordering semantics table to
  clarify that interleaving only occurs with concurrent_streams > 1;
  concurrent_streams=1 reads files sequentially.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@sumedhsakdeo sumedhsakdeo force-pushed the fix/arrow-scan-benchmark-3036 branch from d6bb913 to 039e91b Compare February 28, 2026 06:17
@sumedhsakdeo
Copy link
Author

Thanks @kevinjqliu! You're right that removing list() is the core fix — and ArrivalOrder does exactly that. Two reasons I kept TaskOrder as the default rather than just swapping list() out:

  1. Semantic change: today's behavior groups batches by file in submission order. Replacing list() with interleaved streaming would be a silent breaking change for any code that relies on per-file grouping. TaskOrder preserves the existing contract; ArrivalOrder opts in.

  2. No backpressure: executor.map submits all N tasks immediately even with lazy iteration — 1000 files still means 1000 concurrent threads. ArrivalOrder(concurrent_streams=N) bounds both concurrency and memory via a bounded queue.

Time-to-first-record (TTFR) is particularly critical for ML training workloads where the GPU stalls waiting for the first batch — executor.map blocks on file 0 completing before yielding anything, even if files 1–15 are already done (head-of-line blocking). On simulated S3 workloads (variable TTFB), ArrivalOrder shows 7x lower TTFR and ~11% higher throughput vs TaskOrder at the same thread count. Locally it's ~15% faster for the same reason.

The PR is broken into a stack if you'd prefer to review incrementally — happy to land them in order.

Replace the defensive isinstance(order, ScanOrder) guard with the
already-present isinstance(order, ArrivalOrder) branch logic. The
guard was causing ValueError in CI due to a module-identity mismatch
between the ScanOrder imported by pyarrow.py and the one used to
subclass TaskOrder. The code is correct without it: ArrivalOrder
takes the fast path, everything else falls through to TaskOrder
(materialized) behavior. Type safety is provided statically by the
annotation and _DEFAULT_SCAN_ORDER sentinel.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Member

@geruh geruh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising @sumedhsakdeo! I took an initial pass on this and If I'm following this correctly there seems to be two parts here, the bounded memory fix, and the "arrival ordering" optimization. To me these seem like two separate concerns that could be broken up.

However, I'm not sure about ScanOrder abstraction from a user perspective. Right now this is fundamentally just a choice on whether or not to stream the tasks with a bounded queue. I took a look at a few other projects and noticed pyarrow uses params like fragment_readahead, and batch_readahead. From my understanding ArrivalOrder(max_buffered_batches=N, concurrent_streams=M) is pretty similar to the configs above, and TaskOrder is similar to what we get with the executor logic today. Which makes it seem like we can consider a simpler param approach, unless there is a future strategy that motivates us to keep the abstraction.

Could the ordered path also use bounded streaming? You'd block on a slow file, but users would get the memory fix without giving up deterministic ordering.

case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
return lambda datafile: residual_evaluator_of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a cosmetic change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just rebase to reduce the noise here


Run with: uv run pytest tests/benchmark/test_read_benchmark.py -v -s
"""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mark with @pytest.mark.benchmark

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I'm not a bit fan of this change because it adds a lot of threading/locking.

I think it should be possible to do a full streaming of the tasks, where we open up the manifests sequentially.

@kevinjqliu
Copy link
Contributor

I took some time to understand how ArrowScan is implemented and used today; and also where the potential bottlenecks are. Here's a summary:

ArrowScan has 2 public functions for reading; to_table and to_record_batches.

  • to_table materializes the entire table as an arrow table
  • to_record_batches return an iterator with the expectation of lazy materialization

Let's focus on to_record_batches since to_table can just be implemented by consuming all the record batches.
We're looking at this problem both in terms of throughput (how much data can be read) and memory usage (how much data is buffered in memory).

ArrowScan can utilize multiple threads, via the ExecutorFactory, to spread out tasks to worker threads.
I realized a few things while reviewing the original code:

  1. There's an implicit ordering of the record batches; the ordering is deterministic based on task order (task 0, task 1, ...). Batches from later tasks are not emitted before earlier tasks complete.
  2. Because of the strict ordering and with multiple worker threads, there's inherent head-of-line blocking (as mentioned above). Later tasks may finish work, but their results cannot be emitted until earlier tasks complete. This blocking diminishes the overall throughput of ArrowScan's ability to produce record batches.
  3. Each worker materializes the entire task/file to memory (via list(self._record_batches_from_scan_tasks_and_deletes)) before returning results. This causes higher than expected memory utilization and imo is a bug because it's not the expected behavior for to_record_batches's lazy materialization.

To summarize, the current implementation fan out tasks to different workers. Each worker will read/materialize the entire task/file in memory before returning record batches. But only the first worker's record batches are consumed while the other workers wait.

The problem I would really like to solve immediately is (3), which drastically increases memory utilization (by # of workers * size of file in memory). I think the right behavior for to_record_batches is for each worker to not materialize the entire task/file in memory.

I think the bounded queue is a great idea for maximizing throughput while bounding memory usage. But it would only be useful when we relax the constraint for ordering. It would be great to brainstorm ways to introduce this change into the codebase while ensuring that it's maintainable.

@kevinjqliu
Copy link
Contributor

My first instinct is to create a new Scan class which implements the bounded queue. This will isolate the changes to the new class and we just need to expose a way for table to execute reads with this implementation.
The new Scan class will allow us to be explicit about trading off strict ordering in order to maximize throughput (while also bounding memory usage)

@kevinjqliu
Copy link
Contributor

created a discussion thread #3122

cbb330 added a commit to linkedin/openhouse that referenced this pull request Mar 11, 2026
## Summary

Point the dataloader's pyiceberg dependency at
[apache/iceberg-python#3046](apache/iceberg-python#3046)
which adds `ArrivalOrder` — bounded-memory concurrent record batch
streaming — to `ArrowScan.to_record_batches()`. This is a prerequisite
for the dataloader to leverage concurrent file reads in a future PR.

Thanks to @sumedhsakdeo for the upstream pyiceberg contribution that
this PR depends on and the guidance towards this solution.

### Dependency pinning approach

We use a **PEP 508 direct reference** to pin pyiceberg to the fork's
commit SHA:

```toml
"pyiceberg @ git+https://github.com/sumedhsakdeo/iceberg-python@75ba28bf..."
```

This gets baked into the published wheel's `Requires-Dist` metadata, so
any consumer (including lipy-openhouse) that installs
openhouse-dataloader will resolve pyiceberg from the pinned fork commit
— not from PyPI or lerna. Both dev/CI (`uv sync`) and deployed artifacts
use the same fork.

### Options evaluated

| Option | Pros | Cons | Verdict |
|--------|------|------|---------|
| **`[tool.uv.sources]` override** | Simple to configure; works for
local dev and CI (`uv sync`); does not change the published dependency
specifier (`pyiceberg~=0.11.0`) | uv-only — not part of PEP 621; **not
baked into wheel metadata**, so consumers (including lipy-openhouse)
would resolve stock pyiceberg from PyPI/Artifactory, missing the
ArrivalOrder API; dev and deploy diverge silently | Insufficient — only
covers dev/CI, not deployed artifacts |
| **JFrog Artifactory upload** | Immutable artifact, deterministic
resolution, works with any Python tooling (pip, uv, poetry) | Must
manually build and upload a wheel for each PR revision; artifact has no
traceability back to source; stale the moment the upstream PR gets a new
commit; need to clean up the custom artifact once the upstream PR merges
and a release is cut; unclear if ELR would need updating for a
custom-built version of an already-approved library, adding process risk
| Overkill for tracking a pre-merge upstream PR that is still evolving |
| **LinkedIn fork (`linkedin/iceberg-python`)** | Same pros as PEP 508
direct reference; LinkedIn-owned repo gives organizational control over
the fork | Blocked on internal process to open a new OSS project; CI is
more complicated and requires new repo setup and ELR process; adds
ongoing maintenance burden for a temporary dependency that will revert
to upstream once the PR merges | Too slow for the temporary pin we need
now. We will require more patches in feature so the process should be
started in parallel |
| **PEP 508 direct reference** | Clear source provenance; pinned commit
SHA in both pyproject.toml and wheel metadata; reproducible across dev,
CI, and production; no ELR ambiguity since it's the same OSS project at
an unreleased commit | Pinned to a fork (upstream `apache/` repo doesn't
expose PR commits to uv's git resolver); must manually bump SHA if the
upstream PR updates; requires `allow-direct-references = true` in
hatchling config | **Best fit** — portable, reproducible, and explicitly
pinned for both dev and deploy |

Once the upstream PR merges and a new pyiceberg release includes
ArrivalOrder, we revert to a standard version specifier (e.g.
`pyiceberg~=0.12.0`) and remove `allow-direct-references`.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**`integrations/python/dataloader/pyproject.toml`** — Changed pyiceberg
from `~=0.11.0` version specifier to a PEP 508 direct reference pinned
at commit `75ba28bf` on `sumedhsakdeo/iceberg-python`. Added
`[tool.hatch.metadata] allow-direct-references = true` for hatchling.

**`integrations/python/dataloader/tests/test_arrival_order.py`** — New
test file verifying:
- `ScanOrder`, `TaskOrder`, `ArrivalOrder` are importable from
`pyiceberg.table`
- `ArrivalOrder` dataclass defaults and custom parameters work correctly
- `ArrivalOrder` validates `concurrent_streams >= 1` and
`max_buffered_batches >= 1`
- `ArrowScan.to_record_batches()` accepts `order=TaskOrder()` and
`order=ArrivalOrder()` and returns correct data

**`integrations/python/dataloader/uv.lock`** — Updated to resolve
pyiceberg from git.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

`make verify` passes — all 102 tests pass (10 new + 92 existing), lint,
format, and mypy checks all green. Built wheel and confirmed
`Requires-Dist: pyiceberg @ git+https://...@75ba28bf...` is in the
published metadata.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

No breaking changes. The existing `to_record_batches()` call in
`DataLoaderSplit.__iter__` continues to use the default `TaskOrder()`
behavior. A follow-up PR will use `ArrivalOrder` to enable concurrent
reads.

---------

Co-authored-by: Sumedh Sakdeo <sumedhsakdeo@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants