-
Notifications
You must be signed in to change notification settings - Fork 99
feat(puffin): add format constants, utilities, and JSON serialization #603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/puffin/puffin_format.h" | ||
|
|
||
| #include <utility> | ||
|
|
||
| #include "iceberg/util/endian.h" | ||
| #include "iceberg/util/macros.h" | ||
|
|
||
| namespace iceberg::puffin { | ||
|
|
||
| namespace { | ||
|
|
||
| constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment to improve the readability? |
||
| switch (flag) { | ||
| case PuffinFlag::kFooterPayloadCompressed: | ||
| return {0, 0}; | ||
| } | ||
| std::unreachable(); | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
| bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) { | ||
| auto [byte_num, bit_num] = GetFlagPosition(flag); | ||
| return (flags[byte_num] & (1 << bit_num)) != 0; | ||
| } | ||
|
|
||
| void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) { | ||
| auto [byte_num, bit_num] = GetFlagPosition(flag); | ||
| flags[byte_num] |= (1 << bit_num); | ||
| } | ||
|
|
||
| void WriteInt32LittleEndian(int32_t value, std::span<uint8_t, 4> output) { | ||
| WriteLittleEndian(value, output.data()); | ||
| } | ||
|
|
||
| int32_t ReadInt32LittleEndian(std::span<const uint8_t, 4> input) { | ||
| return ReadLittleEndian<int32_t>(input.data()); | ||
| } | ||
|
|
||
| int32_t ReadInt32LittleEndian(std::span<const uint8_t> data, int32_t offset) { | ||
zhaoxuan1994 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ICEBERG_DCHECK(offset >= 0, "Offset must be non-negative"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ICEBERG_DCHECK is a debug check and should only be used in a rare case. If this is a generic function and cannot guarantee the input, we should return |
||
| ICEBERG_DCHECK(static_cast<size_t>(offset) + 4 <= data.size(), "Offset out of bounds"); | ||
| return ReadInt32LittleEndian(std::span<const uint8_t, 4>(data.data() + offset, 4)); | ||
| } | ||
|
|
||
| Result<std::vector<uint8_t>> Compress(PuffinCompressionCodec codec, | ||
| std::span<const uint8_t> input) { | ||
| switch (codec) { | ||
| case PuffinCompressionCodec::kNone: | ||
| return std::vector<uint8_t>(input.begin(), input.end()); | ||
| case PuffinCompressionCodec::kLz4: | ||
| return NotSupported("LZ4 compression is not yet supported"); | ||
| case PuffinCompressionCodec::kZstd: | ||
| return NotSupported("Zstd compression is not yet supported"); | ||
| } | ||
| std::unreachable(); | ||
| } | ||
|
|
||
| Result<std::vector<uint8_t>> Decompress(PuffinCompressionCodec codec, | ||
| std::span<const uint8_t> input) { | ||
| switch (codec) { | ||
| case PuffinCompressionCodec::kNone: | ||
| return std::vector<uint8_t>(input.begin(), input.end()); | ||
| case PuffinCompressionCodec::kLz4: | ||
| return NotSupported("LZ4 decompression is not yet supported"); | ||
| case PuffinCompressionCodec::kZstd: | ||
| return NotSupported("Zstd decompression is not yet supported"); | ||
| } | ||
| std::unreachable(); | ||
| } | ||
|
|
||
| } // namespace iceberg::puffin | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,89 @@ | ||||||
| /* | ||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||
| * or more contributor license agreements. See the NOTICE file | ||||||
| * distributed with this work for additional information | ||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||
| * to you under the Apache License, Version 2.0 (the | ||||||
| * "License"); you may not use this file except in compliance | ||||||
| * with the License. You may obtain a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, | ||||||
| * software distributed under the License is distributed on an | ||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
| * KIND, either express or implied. See the License for the | ||||||
| * specific language governing permissions and limitations | ||||||
| * under the License. | ||||||
| */ | ||||||
|
|
||||||
| #pragma once | ||||||
|
|
||||||
| /// \file iceberg/puffin/puffin_format.h | ||||||
| /// Puffin file format constants and utilities. | ||||||
|
|
||||||
| #include <array> | ||||||
| #include <cstdint> | ||||||
| #include <span> | ||||||
| #include <vector> | ||||||
|
|
||||||
| #include "iceberg/iceberg_export.h" | ||||||
| #include "iceberg/puffin/file_metadata.h" | ||||||
| #include "iceberg/result.h" | ||||||
|
|
||||||
| namespace iceberg::puffin { | ||||||
|
|
||||||
| /// \brief Puffin file format constants. | ||||||
| struct ICEBERG_EXPORT PuffinFormat { | ||||||
| /// Magic bytes: "PFA1" (Puffin Fratercula arctica, version 1) | ||||||
| static constexpr std::array<uint8_t, 4> kMagic = {0x50, 0x46, 0x41, 0x31}; | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Let's make V1 explicit. |
||||||
|
|
||||||
| static constexpr int32_t kMagicLength = 4; | ||||||
| static constexpr int32_t kFooterStartMagicOffset = 0; | ||||||
| static constexpr int32_t kFooterStartMagicLength = kMagicLength; | ||||||
| static constexpr int32_t kFooterStructPayloadSizeOffset = 0; | ||||||
| static constexpr int32_t kFooterStructFlagsOffset = kFooterStructPayloadSizeOffset + 4; | ||||||
| static constexpr int32_t kFooterStructFlagsLength = 4; | ||||||
| static constexpr int32_t kFooterStructMagicOffset = | ||||||
| kFooterStructFlagsOffset + kFooterStructFlagsLength; | ||||||
|
|
||||||
| /// Total length of the footer struct: payload_size(4) + flags(4) + magic(4) | ||||||
| static constexpr int32_t kFooterStructLength = kFooterStructMagicOffset + kMagicLength; | ||||||
|
|
||||||
| /// Default compression codec for footer payload. | ||||||
| static constexpr PuffinCompressionCodec kFooterCompressionCodec = | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| PuffinCompressionCodec::kLz4; | ||||||
|
||||||
| PuffinCompressionCodec::kLz4; | |
| PuffinCompressionCodec::kNone; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping kLz4 as it reflects the Puffin spec's default footer compression codec, consistent with the Java implementation. This constant is a spec-level declaration — actual usage in the writer will check the Result from Compress(), so unsupported codecs surface as explicit errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually a real compatibility concern where java puffin readers would likely break if a compressed footer was ever properly written. Java has never implemented lz4 compression: https://github.com/apache/iceberg/blob/8a51a685894eace51474f512b46a187566f27202/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java#L112 which IIUC means anything written with a compressed footer would not be readable from Java. If you agree with this assessment it might be misreading the code but might be worth a discussion on the mailing list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more closely it looks like java would just throw unsupported codec for compressed footers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the spec calls for lz4, so there is maybe nothing to be done here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is very code for this in the roaring bitmap code it should probably standardized in a common place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added WriteLittleEndian/ReadLittleEndian templates to endian.h as public API. Updated both puffin_format.cc and roaring_position_bitmap.cc to use them, removing the duplicated local helpers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not the right place to put compression logic here as this can be shared by TableMetadata compression as well. We can consider a single place to unify the codec implementation (as well as the codec availability).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roaring bitmap is using string_view as collection of bytes, we should probably standardize on one of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used span here since it feels more natural for binary data. But I can see the argument for string_view given that CRoaring already uses std::string for serialization. Do you have a preference on which one the project should standardize on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're on C++23 so std::byte is definitely available. Together with my previous comment for a generic codec interface/implementation, we can use std::span<const std::byte> as input and std::vector<std::byte> as output but with overloads that accepting std::string_view and returning std::string, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac I guess my main concern here would be to try to maintain internal consistency within all the code in the project. For consuming use-cases the overloads might be worth it. I wonder if it would be too premature to consider having a Buffer object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant to introduce a Buffer (and corresponding MemoryPool) at this moment. They are only used as ephemeral input/output buffers for CompressionCodec and FileIO.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/puffin/puffin_json_internal.h" | ||
|
|
||
| #include <nlohmann/json.hpp> | ||
|
|
||
| #include "iceberg/util/json_util_internal.h" | ||
| #include "iceberg/util/macros.h" | ||
|
|
||
| namespace iceberg::puffin { | ||
|
|
||
| namespace { | ||
| constexpr std::string_view kBlobs = "blobs"; | ||
| constexpr std::string_view kProperties = "properties"; | ||
| constexpr std::string_view kType = "type"; | ||
| constexpr std::string_view kFields = "fields"; | ||
| constexpr std::string_view kSnapshotId = "snapshot-id"; | ||
| constexpr std::string_view kSequenceNumber = "sequence-number"; | ||
| constexpr std::string_view kOffset = "offset"; | ||
| constexpr std::string_view kLength = "length"; | ||
| constexpr std::string_view kCompressionCodec = "compression-codec"; | ||
| } // namespace | ||
|
|
||
| nlohmann::json ToJson(const BlobMetadata& blob_metadata) { | ||
| nlohmann::json json; | ||
| json[kType] = blob_metadata.type; | ||
| json[kFields] = blob_metadata.input_fields; | ||
| json[kSnapshotId] = blob_metadata.snapshot_id; | ||
| json[kSequenceNumber] = blob_metadata.sequence_number; | ||
| json[kOffset] = blob_metadata.offset; | ||
| json[kLength] = blob_metadata.length; | ||
|
|
||
| SetOptionalStringField(json, kCompressionCodec, blob_metadata.compression_codec); | ||
| SetContainerField(json, kProperties, blob_metadata.properties); | ||
|
|
||
| return json; | ||
| } | ||
|
|
||
| Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) { | ||
| BlobMetadata blob_metadata; | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, kType)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.input_fields, | ||
| GetJsonValue<std::vector<int32_t>>(json, kFields)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.snapshot_id, | ||
| GetJsonValue<int64_t>(json, kSnapshotId)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.sequence_number, | ||
| GetJsonValue<int64_t>(json, kSequenceNumber)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.offset, GetJsonValue<int64_t>(json, kOffset)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.length, GetJsonValue<int64_t>(json, kLength)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.compression_codec, | ||
| GetJsonValueOrDefault<std::string>(json, kCompressionCodec)); | ||
| ICEBERG_ASSIGN_OR_RAISE(blob_metadata.properties, | ||
| FromJsonMap<std::string>(json, kProperties)); | ||
|
|
||
| return blob_metadata; | ||
| } | ||
|
|
||
| nlohmann::json ToJson(const FileMetadata& file_metadata) { | ||
| nlohmann::json json; | ||
|
|
||
| nlohmann::json blobs_json = nlohmann::json::array(); | ||
| for (const auto& blob : file_metadata.blobs) { | ||
| blobs_json.push_back(ToJson(blob)); | ||
| } | ||
| json[kBlobs] = std::move(blobs_json); | ||
|
|
||
| SetContainerField(json, kProperties, file_metadata.properties); | ||
|
|
||
| return json; | ||
| } | ||
|
|
||
| Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json) { | ||
| FileMetadata file_metadata; | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto blobs_json, GetJsonValue<nlohmann::json>(json, kBlobs)); | ||
| if (!blobs_json.is_array()) { | ||
| return JsonParseError("Cannot parse blobs from non-array: {}", | ||
| SafeDumpJson(blobs_json)); | ||
| } | ||
|
|
||
| for (const auto& blob_json : blobs_json) { | ||
| ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json)); | ||
| file_metadata.blobs.push_back(std::move(blob)); | ||
| } | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(file_metadata.properties, | ||
| FromJsonMap<std::string>(json, kProperties)); | ||
|
|
||
| return file_metadata; | ||
| } | ||
|
|
||
| std::string ToJsonString(const FileMetadata& file_metadata, bool pretty) { | ||
| auto json = ToJson(file_metadata); | ||
| return pretty ? json.dump(2) : json.dump(); | ||
| } | ||
|
|
||
| Result<FileMetadata> FileMetadataFromJsonString(std::string_view json_string) { | ||
| if (json_string.empty()) { | ||
| return JsonParseError("Cannot parse empty JSON string"); | ||
| } | ||
| try { | ||
| auto json = nlohmann::json::parse(json_string); | ||
| return FileMetadataFromJson(json); | ||
| } catch (const nlohmann::json::parse_error& e) { | ||
| return JsonParseError("Failed to parse JSON: {}", e.what()); | ||
| } | ||
| } | ||
|
|
||
| } // namespace iceberg::puffin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename to
puffin/json_serde_internal.handpuffin/json_serde.ccrespectively for consistency.