Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ set(ICEBERG_SOURCES
partition_spec.cc
partition_summary.cc
puffin/file_metadata.cc
puffin/puffin_format.cc
puffin/puffin_json_internal.cc
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to puffin/json_serde_internal.h and puffin/json_serde.cc respectively for consistency.

row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/partition_values.cc
Expand Down
30 changes: 4 additions & 26 deletions src/iceberg/deletes/roaring_position_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,6 @@ int64_t ToPosition(int32_t key, uint32_t pos32) {
return (int64_t{key} << 32) | int64_t{pos32};
}

void WriteLE64(char* buf, int64_t value) {
auto le = ToLittleEndian(static_cast<uint64_t>(value));
std::memcpy(buf, &le, sizeof(le));
}

void WriteLE32(char* buf, int32_t value) {
auto le = ToLittleEndian(static_cast<uint32_t>(value));
std::memcpy(buf, &le, sizeof(le));
}

int64_t ReadLE64(const char* buf) {
uint64_t v;
std::memcpy(&v, buf, sizeof(v));
return static_cast<int64_t>(FromLittleEndian(v));
}

int32_t ReadLE32(const char* buf) {
uint32_t v;
std::memcpy(&v, buf, sizeof(v));
return static_cast<int32_t>(FromLittleEndian(v));
}

Status ValidatePosition(int64_t pos) {
if (pos < 0 || pos > RoaringPositionBitmap::kMaxPosition) {
return InvalidArgument("Bitmap supports positions that are >= 0 and <= {}: {}",
Expand Down Expand Up @@ -189,12 +167,12 @@ Result<std::string> RoaringPositionBitmap::Serialize() const {
char* buf = result.data();

// Write bitmap count (array length including empties)
WriteLE64(buf, static_cast<int64_t>(impl_->bitmaps.size()));
WriteLittleEndian(static_cast<int64_t>(impl_->bitmaps.size()), buf);
buf += kBitmapCountSizeBytes;

// Write each bitmap with its key
for (int32_t key = 0; std::cmp_less(key, impl_->bitmaps.size()); ++key) {
WriteLE32(buf, key);
WriteLittleEndian(key, buf);
buf += kBitmapKeySizeBytes;
size_t written = impl_->bitmaps[key].write(buf, /*portable=*/true);
buf += written;
Expand All @@ -210,7 +188,7 @@ Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_vie
ICEBERG_PRECHECK(remaining >= kBitmapCountSizeBytes,
"Buffer too small for bitmap count: {} bytes", remaining);

int64_t bitmap_count = ReadLE64(buf);
auto bitmap_count = ReadLittleEndian<int64_t>(buf);
buf += kBitmapCountSizeBytes;
remaining -= kBitmapCountSizeBytes;

Expand All @@ -226,7 +204,7 @@ Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_vie
ICEBERG_PRECHECK(remaining >= kBitmapKeySizeBytes,
"Buffer too small for bitmap key: {} bytes", remaining);

int32_t key = ReadLE32(buf);
auto key = ReadLittleEndian<int32_t>(buf);
buf += kBitmapKeySizeBytes;
remaining -= kBitmapKeySizeBytes;

Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ iceberg_sources = files(
'partition_spec.cc',
'partition_summary.cc',
'puffin/file_metadata.cc',
'puffin/puffin_format.cc',
'puffin/puffin_json_internal.cc',
'row/arrow_array_wrapper.cc',
'row/manifest_wrapper.cc',
'row/partition_values.cc',
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/puffin/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.

install_headers(['file_metadata.h'], subdir: 'iceberg/puffin')
install_headers(['file_metadata.h', 'puffin_format.h'], subdir: 'iceberg/puffin')
91 changes: 91 additions & 0 deletions src/iceberg/puffin/puffin_format.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/puffin/puffin_format.h"

#include <utility>

#include "iceberg/util/endian.h"
#include "iceberg/util/macros.h"

namespace iceberg::puffin {

namespace {

constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to improve the readability?

switch (flag) {
case PuffinFlag::kFooterPayloadCompressed:
return {0, 0};
}
std::unreachable();
}

} // namespace

bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
return (flags[byte_num] & (1 << bit_num)) != 0;
}

void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
auto [byte_num, bit_num] = GetFlagPosition(flag);
flags[byte_num] |= (1 << bit_num);
}

void WriteInt32LittleEndian(int32_t value, std::span<uint8_t, 4> output) {
WriteLittleEndian(value, output.data());
}

int32_t ReadInt32LittleEndian(std::span<const uint8_t, 4> input) {
return ReadLittleEndian<int32_t>(input.data());
}

int32_t ReadInt32LittleEndian(std::span<const uint8_t> data, int32_t offset) {
ICEBERG_DCHECK(offset >= 0, "Offset must be non-negative");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ICEBERG_DCHECK is a debug check and should only be used in a rare case. If this is a generic function and cannot guarantee the input, we should return Result<int32_t> and replace it by ICEBERG_PRECHECK here.

ICEBERG_DCHECK(static_cast<size_t>(offset) + 4 <= data.size(), "Offset out of bounds");
return ReadInt32LittleEndian(std::span<const uint8_t, 4>(data.data() + offset, 4));
}

Result<std::vector<uint8_t>> Compress(PuffinCompressionCodec codec,
std::span<const uint8_t> input) {
switch (codec) {
case PuffinCompressionCodec::kNone:
return std::vector<uint8_t>(input.begin(), input.end());
case PuffinCompressionCodec::kLz4:
return NotSupported("LZ4 compression is not yet supported");
case PuffinCompressionCodec::kZstd:
return NotSupported("Zstd compression is not yet supported");
}
std::unreachable();
}

Result<std::vector<uint8_t>> Decompress(PuffinCompressionCodec codec,
std::span<const uint8_t> input) {
switch (codec) {
case PuffinCompressionCodec::kNone:
return std::vector<uint8_t>(input.begin(), input.end());
case PuffinCompressionCodec::kLz4:
return NotSupported("LZ4 decompression is not yet supported");
case PuffinCompressionCodec::kZstd:
return NotSupported("Zstd decompression is not yet supported");
}
std::unreachable();
}

} // namespace iceberg::puffin
89 changes: 89 additions & 0 deletions src/iceberg/puffin/puffin_format.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/puffin/puffin_format.h
/// Puffin file format constants and utilities.

#include <array>
#include <cstdint>
#include <span>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/puffin/file_metadata.h"
#include "iceberg/result.h"

namespace iceberg::puffin {

/// \brief Puffin file format constants.
struct ICEBERG_EXPORT PuffinFormat {
/// Magic bytes: "PFA1" (Puffin Fratercula arctica, version 1)
static constexpr std::array<uint8_t, 4> kMagic = {0x50, 0x46, 0x41, 0x31};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static constexpr std::array<uint8_t, 4> kMagic = {0x50, 0x46, 0x41, 0x31};
static constexpr std::array<uint8_t, 4> kMagicV1 = {0x50, 0x46, 0x41, 0x31};

Let's make V1 explicit.


static constexpr int32_t kMagicLength = 4;
static constexpr int32_t kFooterStartMagicOffset = 0;
static constexpr int32_t kFooterStartMagicLength = kMagicLength;
static constexpr int32_t kFooterStructPayloadSizeOffset = 0;
static constexpr int32_t kFooterStructFlagsOffset = kFooterStructPayloadSizeOffset + 4;
static constexpr int32_t kFooterStructFlagsLength = 4;
static constexpr int32_t kFooterStructMagicOffset =
kFooterStructFlagsOffset + kFooterStructFlagsLength;

/// Total length of the footer struct: payload_size(4) + flags(4) + magic(4)
static constexpr int32_t kFooterStructLength = kFooterStructMagicOffset + kMagicLength;

/// Default compression codec for footer payload.
static constexpr PuffinCompressionCodec kFooterCompressionCodec =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static constexpr PuffinCompressionCodec kFooterCompressionCodec =
static constexpr PuffinCompressionCodec kDefaultFooterCompressionCodec =

PuffinCompressionCodec::kLz4;
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PuffinFormat::kFooterCompressionCodec is set to kLz4, but Compress/Decompress currently return NotSupported for LZ4. This constant is likely to be used as the default when writing/reading footer payloads, and will cause immediate failures once that code lands. Consider defaulting to kNone until LZ4 support is implemented, or clearly documenting that the default codec is not yet supported and must not be used for writing.

Suggested change
PuffinCompressionCodec::kLz4;
PuffinCompressionCodec::kNone;

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping kLz4 as it reflects the Puffin spec's default footer compression codec, consistent with the Java implementation. This constant is a spec-level declaration — actual usage in the writer will check the Result from Compress(), so unsupported codecs surface as explicit errors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually a real compatibility concern where java puffin readers would likely break if a compressed footer was ever properly written. Java has never implemented lz4 compression: https://github.com/apache/iceberg/blob/8a51a685894eace51474f512b46a187566f27202/core/src/main/java/org/apache/iceberg/puffin/PuffinFormat.java#L112 which IIUC means anything written with a compressed footer would not be readable from Java. If you agree with this assessment it might be misreading the code but might be worth a discussion on the mailing list.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more closely it looks like java would just throw unsupported codec for compressed footers

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the spec calls for lz4, so there is maybe nothing to be done here

};

/// \brief Footer flags for Puffin files.
enum class PuffinFlag : uint8_t {
/// Whether the footer payload is compressed.
kFooterPayloadCompressed = 0,
};

/// \brief Check if a flag is set in the flags bytes.
ICEBERG_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag);

/// \brief Set a flag in the flags bytes.
ICEBERG_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);

/// \brief Write a 32-bit integer in little-endian format.
ICEBERG_EXPORT void WriteInt32LittleEndian(int32_t value, std::span<uint8_t, 4> output);

/// \brief Read a 32-bit integer from a fixed-size span in little-endian format.
ICEBERG_EXPORT int32_t ReadInt32LittleEndian(std::span<const uint8_t, 4> input);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is very code for this in the roaring bitmap code it should probably standardized in a common place?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added WriteLittleEndian/ReadLittleEndian templates to endian.h as public API. Updated both puffin_format.cc and roaring_position_bitmap.cc to use them, removing the duplicated local helpers.


/// \brief Read a 32-bit integer from a buffer at the given offset in little-endian
/// format.
ICEBERG_EXPORT int32_t ReadInt32LittleEndian(std::span<const uint8_t> data,
int32_t offset);

/// \brief Compress data using the specified codec.
ICEBERG_EXPORT Result<std::vector<uint8_t>> Compress(PuffinCompressionCodec codec,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the right place to put compression logic here as this can be shared by TableMetadata compression as well. We can consider a single place to unify the codec implementation (as well as the codec availability).

std::span<const uint8_t> input);

/// \brief Decompress data using the specified codec.
ICEBERG_EXPORT Result<std::vector<uint8_t>> Decompress(PuffinCompressionCodec codec,
std::span<const uint8_t> input);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roaring bitmap is using string_view as collection of bytes, we should probably standardize on one of them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used span here since it feels more natural for binary data. But I can see the argument for string_view given that CRoaring already uses std::string for serialization. Do you have a preference on which one the project should standardize on?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't. Another things to decide is if we are using C++17 or above if std::byte should be used instead. This is probably something that should be discussed on the mailing list I would guess, or at least get @wgtmac and other committers opinion on.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're on C++23 so std::byte is definitely available. Together with my previous comment for a generic codec interface/implementation, we can use std::span<const std::byte> as input and std::vector<std::byte> as output but with overloads that accepting std::string_view and returning std::string, etc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac I guess my main concern here would be to try to maintain internal consistency within all the code in the project. For consuming use-cases the overloads might be worth it. I wonder if it would be too premature to consider having a Buffer object?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to introduce a Buffer (and corresponding MemoryPool) at this moment. They are only used as ephemeral input/output buffers for CompressionCodec and FileIO.


} // namespace iceberg::puffin
127 changes: 127 additions & 0 deletions src/iceberg/puffin/puffin_json_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/puffin/puffin_json_internal.h"

#include <nlohmann/json.hpp>

#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg::puffin {

namespace {
constexpr std::string_view kBlobs = "blobs";
constexpr std::string_view kProperties = "properties";
constexpr std::string_view kType = "type";
constexpr std::string_view kFields = "fields";
constexpr std::string_view kSnapshotId = "snapshot-id";
constexpr std::string_view kSequenceNumber = "sequence-number";
constexpr std::string_view kOffset = "offset";
constexpr std::string_view kLength = "length";
constexpr std::string_view kCompressionCodec = "compression-codec";
} // namespace

nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
nlohmann::json json;
json[kType] = blob_metadata.type;
json[kFields] = blob_metadata.input_fields;
json[kSnapshotId] = blob_metadata.snapshot_id;
json[kSequenceNumber] = blob_metadata.sequence_number;
json[kOffset] = blob_metadata.offset;
json[kLength] = blob_metadata.length;

SetOptionalStringField(json, kCompressionCodec, blob_metadata.compression_codec);
SetContainerField(json, kProperties, blob_metadata.properties);

return json;
}

Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) {
BlobMetadata blob_metadata;

ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, kType));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.input_fields,
GetJsonValue<std::vector<int32_t>>(json, kFields));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.snapshot_id,
GetJsonValue<int64_t>(json, kSnapshotId));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.sequence_number,
GetJsonValue<int64_t>(json, kSequenceNumber));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.offset, GetJsonValue<int64_t>(json, kOffset));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.length, GetJsonValue<int64_t>(json, kLength));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.compression_codec,
GetJsonValueOrDefault<std::string>(json, kCompressionCodec));
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.properties,
FromJsonMap<std::string>(json, kProperties));

return blob_metadata;
}

nlohmann::json ToJson(const FileMetadata& file_metadata) {
nlohmann::json json;

nlohmann::json blobs_json = nlohmann::json::array();
for (const auto& blob : file_metadata.blobs) {
blobs_json.push_back(ToJson(blob));
}
json[kBlobs] = std::move(blobs_json);

SetContainerField(json, kProperties, file_metadata.properties);

return json;
}

Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json) {
FileMetadata file_metadata;

ICEBERG_ASSIGN_OR_RAISE(auto blobs_json, GetJsonValue<nlohmann::json>(json, kBlobs));
if (!blobs_json.is_array()) {
return JsonParseError("Cannot parse blobs from non-array: {}",
SafeDumpJson(blobs_json));
}

for (const auto& blob_json : blobs_json) {
ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json));
file_metadata.blobs.push_back(std::move(blob));
}

ICEBERG_ASSIGN_OR_RAISE(file_metadata.properties,
FromJsonMap<std::string>(json, kProperties));

return file_metadata;
}

std::string ToJsonString(const FileMetadata& file_metadata, bool pretty) {
auto json = ToJson(file_metadata);
return pretty ? json.dump(2) : json.dump();
}

Result<FileMetadata> FileMetadataFromJsonString(std::string_view json_string) {
if (json_string.empty()) {
return JsonParseError("Cannot parse empty JSON string");
}
try {
auto json = nlohmann::json::parse(json_string);
return FileMetadataFromJson(json);
} catch (const nlohmann::json::parse_error& e) {
return JsonParseError("Failed to parse JSON: {}", e.what());
}
}

} // namespace iceberg::puffin
Loading
Loading