feat: integration s3 with arrow filesystem#548
feat: integration s3 with arrow filesystem#548MisterRaindrop wants to merge 2 commits intoapache:mainfrom
Conversation
|
Thanks for adding this!
Yes, I believe this is worth doing. I supposed to reuse
There is a related discussion with regard to |
|
I recommend using MinIO. It is relatively stable and suitable for the current project development phase. Once the community reaches a consensus, the cost of replacing MinIO will not be high. |
|
I think it is fine to use minio at this moment to unblock us. Let me know what you think on my proposed approach above. We might also need to add a FileIO registry to provide default implementation on us and enable users to override their own implementations of s3 and others. The key in the FileIO registry can be associated with table property |
|
We may also need to add top-level CMake options like |
FYI, there is a PR to replace MinIO with RustFS, apache/iceberg#14928 |
ArrowFileSystemFileIO is ok, I referenced MakeLocalFileIO and implemented a simple MakeS3FileIO interface using arrowfilesystem.
you mean this It's equivalent to setting the io-impl string in the catalog's properties. Then, RestCatalog the FileIORegistry looks up the implementation in the io-impl map. Is that roughly how it works? If so, I can try implementing some simple code to see if it's correct. |
|
Yes, I think it looks reasonable. |
8436b72 to
5197fa9
Compare
|
The current code is only simple implemented. Could you help me check it is ok? |
|
Thanks for the update! I'm busy with some internal stuff these days. Will try to review this as soon as possible. |
wgtmac
left a comment
There was a problem hiding this comment.
I just took a quick pass on the latest commit. Can we simplify the implementation like this:
- Use a CMake option to enable S3.
- Define reserved iceberg properties for S3 and add functions to convert them to Arrow S3 options.
- To create a concrete S3FileIO, using Arrow API to create a S3FileSystem and wrap it by ArrowFileSystemFileIO.
- Register the factory to create FileIO of S3 to the registry before use.
- Add a file io utility to create the FileIO instance based on various condition.
| # Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) | ||
| set(ARROW_IPC ON) | ||
| set(ARROW_FILESYSTEM ON) | ||
| set(ARROW_S3 ON) |
There was a problem hiding this comment.
Can we add a cmake option ICEBERG_S3 and only toggle on ARROW_S3 when ICEBERG_S3 is on?
|
|
||
| #include <arrow/filesystem/filesystem.h> | ||
| #include <arrow/filesystem/localfs.h> | ||
| #if __has_include(<arrow/filesystem/s3fs.h>) |
There was a problem hiding this comment.
If we add ICEBERG_S3 option, we don't need to deal with this check.
| impl_name = io_impl->second; | ||
| } else { | ||
| // Use default based on warehouse URI scheme | ||
| if (warehouse.rfind("s3://", 0) == 0) { |
There was a problem hiding this comment.
BTW, shouldn't we use uri instead of warehouse property?
| /// This implementation is thread-safe as it creates a new FileSystem instance | ||
| /// for each operation. However, it may be less efficient than caching the | ||
| /// FileSystem. S3 initialization is done once per process. | ||
| class ArrowUriFileIO : public FileIO { |
There was a problem hiding this comment.
Why do we need this instead of reusing ArrowFIleSystemFileIO?
| /// | ||
| /// \param properties The configuration properties map. | ||
| /// \return Configured S3Options. | ||
| ::arrow::fs::S3Options ConfigureS3Options( |
There was a problem hiding this comment.
I agree this is something that we need.
| /// This overload automatically creates an appropriate FileIO based on the "io-impl" | ||
| /// property or the warehouse location URI scheme. | ||
| /// | ||
| /// FileIO selection logic: |
There was a problem hiding this comment.
It is better to add a iceberg/util/file_io_util.h to handle this logic and support reusing. Please note that Arrow Filesystem support is only available in the iceberg-bundle library, so we can only talk to the FileIO registry to create an FileIO instance.
Key changes based on reviewer (wgtmac) feedback:
1. Add ICEBERG_S3 CMake option to conditionally enable ARROW_S3,
replacing the unconditional `set(ARROW_S3 ON)`.
2. Replace `#if __has_include(<arrow/filesystem/s3fs.h>)` with
`#ifdef ICEBERG_HAVE_S3` compile definition controlled by CMake.
3. Remove ArrowUriFileIO class - reuse existing ArrowFileSystemFileIO
by wrapping Arrow S3FileSystem created via Arrow API.
4. Create missing files:
- s3_properties.h: S3 configuration property key constants
- file_io_registry.h/.cc: FileIO factory registry (Register/Load)
- file_io_register.cc: Arrow FileIO factory registration
- file_io_util.h/.cc: Reusable FileIO creation utility
- file_io_registry_test.cc: Unit tests for the registry
5. Extract FileIO creation logic from rest_catalog.cc into
iceberg/util/file_io_util.h for reusability.
6. Fix code issues:
- Implement path-style access (force_virtual_addressing = false)
- Add timeout value validation (non-negative check)
- Replace rfind("s3://", 0) with starts_with("s3://")
- Fix format string bug in rest_catalog_test.cc
7. Update CI workflow and build script to pass ICEBERG_S3=ON.
https://claude.ai/code/session_01GzV7A8VoYyWUN8QdtqgiMq
|
Do you want to revive this? |
Are you referring to this claude commit? I'm really sorry. I was just curious and tried it out. Actually, I did work on part of it, but I’ve been really busy lately and haven’t been able to spare more time. I think things might ease up next week or the week after. 😭😭😭 |
|
No worries. I just want to check if there is any update. Take your time :) |
5197fa9 to
3cdf39a
Compare
|
@wgtmac I update, please review it, thanks |
wgtmac
left a comment
There was a problem hiding this comment.
Thanks for updating this! I haven't looked at the test yet and have some initial comments.
| class ICEBERG_EXPORT FileIORegistry { | ||
| public: | ||
| /// Well-known implementation names | ||
| static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; |
There was a problem hiding this comment.
Let's use std::string_view instead of C-style string.
| public: | ||
| /// Well-known implementation names | ||
| static constexpr const char* kArrowLocalFileIO = "org.apache.iceberg.arrow.ArrowFileIO"; | ||
| static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; |
There was a problem hiding this comment.
It looks odd to use the Java classpath style here. I haven't looked at other impls yet, perhaps it is worth investigating their conventions as well? My initial idea is just to use default keys like "local" and "s3" to locate the FileIO implementations.
| static constexpr const char* kArrowS3FileIO = "org.apache.iceberg.arrow.ArrowS3FileIO"; | ||
|
|
||
| /// Factory function type for creating FileIO instances. | ||
| using Factory = std::function<Result<std::shared_ptr<FileIO>>( |
There was a problem hiding this comment.
It is better to return unique_ptr by default.
|
|
||
| /// Factory function type for creating FileIO instances. | ||
| using Factory = std::function<Result<std::shared_ptr<FileIO>>( | ||
| const std::string& warehouse, |
There was a problem hiding this comment.
| const std::string& warehouse, | |
| const std::string& name, |
| // Register Arrow local filesystem FileIO | ||
| FileIORegistry::Register( | ||
| FileIORegistry::kArrowLocalFileIO, | ||
| [](const std::string& /*warehouse*/, |
There was a problem hiding this comment.
| [](const std::string& /*warehouse*/, | |
| [](const std::string& /*name*/, |
Same for below.
| option(ICEBERG_BUILD_BUNDLE "Build the battery included library" ON) | ||
| option(ICEBERG_BUILD_REST "Build rest catalog client" ON) | ||
| option(ICEBERG_BUILD_REST_INTEGRATION_TESTS "Build rest catalog integration tests" OFF) | ||
| option(ICEBERG_S3 "Build with S3 support" ON) |
There was a problem hiding this comment.
It is worth noting that ICEBERG_S3 should be disabled if ICEBERG_BUILD_BUNDLE is OFF.
There was a problem hiding this comment.
Should we disable it by default?
There was a problem hiding this comment.
Currently, it appears that the entire project will prioritize development around the REST catalog, which primarily interacts with S3. Are we certain we want to disable ICEBERG_S3 by default?
There was a problem hiding this comment.
Yes, but it will incur unnecessary burden to developers who do not care about S3. For example, Arrow will need to download and bundle AWS SDK and use docker to run the test.
| } | ||
| return {}; | ||
| #else | ||
| return NotImplemented("Arrow S3 support is not enabled"); |
There was a problem hiding this comment.
| return NotImplemented("Arrow S3 support is not enabled"); | |
| return NotSupported("Arrow S3 support is not enabled"); |
| static std::once_flag init_flag; | ||
| static ::arrow::Status init_status = ::arrow::Status::OK(); | ||
| std::call_once(init_flag, []() { | ||
| ::arrow::fs::S3GlobalOptions options; |
There was a problem hiding this comment.
nit: add a TODO comment to support options supported by ::arrow::fs::S3GlobalOptions.
| auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs); | ||
| if (connect_timeout_it != properties.end()) { | ||
| try { | ||
| options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0; |
There was a problem hiding this comment.
Can we use from_chars just like ParseInteger does?
| Result<std::unique_ptr<FileIO>> MakeS3FileIO( | ||
| const std::string& uri, | ||
| const std::unordered_map<std::string, std::string>& properties) { | ||
| if (!uri.starts_with("s3://")) { |
There was a problem hiding this comment.
Define a constant for the magic s3://. s3a should be supported as well.
There was a problem hiding this comment.
s3a, as far as I know, is a protocol header for Hadoop. Don't we support the standard S3?
I have implemented Arrow FileSystem to access S3, but I'm still not sure if it meets the requirements.
There are still task or question to complete for the current PR, and it is not ready for merging yet.
Question:
Currently, the object storage options include Azure, AWS, and GCS. I have chosen AWS as the implementation for now is ok?
Task:
I need to deploy MinIO to facilitate testing access to S3, but I'm not sure where it would be best to set it up?