Skip to content

Commit 335a0da

Browse files
committed
refactor: decouple PartitionSpec from Schema
1 parent 407c3d1 commit 335a0da

20 files changed

+120
-95
lines changed

src/iceberg/json_internal.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -519,8 +519,7 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
519519
std::move(transform));
520520
}
521521

522-
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
523-
const std::shared_ptr<Schema>& schema, const nlohmann::json& json) {
522+
Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(const nlohmann::json& json) {
524523
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
525524
ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, kFields));
526525

@@ -529,7 +528,7 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
529528
ICEBERG_ASSIGN_OR_RAISE(auto partition_field, PartitionFieldFromJson(field_json));
530529
partition_fields.push_back(std::move(*partition_field));
531530
}
532-
return std::make_unique<PartitionSpec>(schema, spec_id, std::move(partition_fields));
531+
return std::make_unique<PartitionSpec>(spec_id, std::move(partition_fields));
533532
}
534533

535534
Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& json) {
@@ -853,7 +852,6 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
853852
///
854853
/// \param[in] json The JSON object to parse.
855854
/// \param[in] format_version The format version of the table.
856-
/// \param[in] current_schema The current schema.
857855
/// \param[out] default_spec_id The default partition spec ID.
858856
/// \param[out] partition_specs The list of partition specs.
859857
Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
@@ -870,8 +868,7 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
870868
ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, kDefaultSpecId));
871869

872870
for (const auto& spec_json : spec_array) {
873-
ICEBERG_ASSIGN_OR_RAISE(auto spec,
874-
PartitionSpecFromJson(current_schema, spec_json));
871+
ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecFromJson(spec_json));
875872
partition_specs.push_back(std::move(spec));
876873
}
877874
} else {
@@ -902,8 +899,8 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
902899
std::move(field->transform()));
903900
}
904901

905-
auto spec = std::make_unique<PartitionSpec>(
906-
current_schema, PartitionSpec::kInitialSpecId, std::move(fields));
902+
auto spec =
903+
std::make_unique<PartitionSpec>(PartitionSpec::kInitialSpecId, std::move(fields));
907904
default_spec_id = spec->spec_id();
908905
partition_specs.push_back(std::move(spec));
909906
}

src/iceberg/json_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_s
177177
/// \return An `expected` value containing either a `PartitionSpec` object or an error. If
178178
/// the JSON is malformed or missing expected fields, an error will be returned.
179179
ICEBERG_EXPORT Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
180-
const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
180+
const nlohmann::json& json);
181181

182182
/// \brief Serializes a `SnapshotRef` object to JSON.
183183
///

src/iceberg/manifest_adapter.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,11 @@ Result<std::shared_ptr<StructType>> ManifestEntryAdapter::GetManifestEntryType()
152152
if (partition_spec_ == nullptr) [[unlikely]] {
153153
return ManifestEntry::TypeFromPartitionType(nullptr);
154154
}
155-
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, partition_spec_->PartitionType());
155+
std::shared_ptr<StructType> partition_type = nullptr;
156+
if (table_schema_ != nullptr) {
157+
ICEBERG_ASSIGN_OR_RAISE(partition_type,
158+
partition_spec_->PartitionType(table_schema_));
159+
}
156160
return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
157161
}
158162

src/iceberg/manifest_adapter.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@ class ICEBERG_EXPORT ManifestAdapter {
6161
/// Implemented by different versions with version-specific schemas.
6262
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
6363
public:
64-
explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
65-
: partition_spec_(std::move(partition_spec)) {}
64+
ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
65+
std::shared_ptr<Schema> table_schema)
66+
: partition_spec_(std::move(partition_spec)),
67+
table_schema_(std::move(table_schema)) {}
6668
~ManifestEntryAdapter() override;
6769

6870
virtual Status Append(const ManifestEntry& entry) = 0;
@@ -96,6 +98,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
9698

9799
protected:
98100
std::shared_ptr<PartitionSpec> partition_spec_;
101+
std::shared_ptr<Schema> table_schema_;
99102
std::shared_ptr<Schema> manifest_schema_;
100103
};
101104

src/iceberg/manifest_entry.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other) const {
3838

3939
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
4040
if (!partition_type) {
41-
partition_type = PartitionSpec::Unpartitioned()->schema();
41+
partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
4242
}
4343
return std::make_shared<StructType>(std::vector<SchemaField>{
4444
kContent,

src/iceberg/manifest_writer.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,10 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
6868

6969
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
7070
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
71-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
72-
auto adapter =
73-
std::make_unique<ManifestEntryAdapterV1>(snapshot_id, std::move(partition_spec));
71+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
72+
std::shared_ptr<Schema> table_schema) {
73+
auto adapter = std::make_unique<ManifestEntryAdapterV1>(
74+
snapshot_id, std::move(partition_spec), std::move(table_schema));
7475
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
7576
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
7677

@@ -83,9 +84,10 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
8384

8485
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
8586
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
86-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec) {
87-
auto adapter =
88-
std::make_unique<ManifestEntryAdapterV2>(snapshot_id, std::move(partition_spec));
87+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
88+
std::shared_ptr<Schema> table_schema) {
89+
auto adapter = std::make_unique<ManifestEntryAdapterV2>(
90+
snapshot_id, std::move(partition_spec), std::move(table_schema));
8991
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
9092
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
9193

@@ -99,9 +101,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
99101
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
100102
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
101103
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
102-
std::shared_ptr<PartitionSpec> partition_spec) {
103-
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id, first_row_id,
104-
std::move(partition_spec));
104+
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> table_schema) {
105+
auto adapter = std::make_unique<ManifestEntryAdapterV3>(
106+
snapshot_id, first_row_id, std::move(partition_spec), std::move(table_schema));
105107
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
106108
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
107109

src/iceberg/manifest_writer.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,41 @@ class ICEBERG_EXPORT ManifestWriter {
5959
/// \param manifest_location Path to the manifest file.
6060
/// \param file_io File IO implementation to use.
6161
/// \param partition_spec Partition spec for the manifest.
62+
/// \param table_schema Schema containing the source fields referenced by partition
63+
/// spec.
6264
/// \return A Result containing the writer or an error.
6365
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
6466
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
65-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
67+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
68+
std::shared_ptr<Schema> table_schema);
6669

6770
/// \brief Creates a writer for a manifest file.
6871
/// \param snapshot_id ID of the snapshot.
6972
/// \param manifest_location Path to the manifest file.
7073
/// \param file_io File IO implementation to use.
7174
/// \param partition_spec Partition spec for the manifest.
75+
/// \param table_schema Schema containing the source fields referenced by partition
76+
/// spec.
7277
/// \return A Result containing the writer or an error.
7378
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
7479
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
75-
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec);
80+
std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec> partition_spec,
81+
std::shared_ptr<Schema> table_schema);
7682

7783
/// \brief Creates a writer for a manifest file.
7884
/// \param snapshot_id ID of the snapshot.
7985
/// \param first_row_id First row ID of the snapshot.
8086
/// \param manifest_location Path to the manifest file.
8187
/// \param file_io File IO implementation to use.
8288
/// \param partition_spec Partition spec for the manifest.
89+
/// \param table_schema Schema containing the source fields referenced by partition
90+
/// spec.
8391
/// \return A Result containing the writer or an error.
8492
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
8593
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
8694
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
87-
std::shared_ptr<PartitionSpec> partition_spec);
95+
std::shared_ptr<PartitionSpec> partition_spec,
96+
std::shared_ptr<Schema> table_schema);
8897

8998
private:
9099
static constexpr int64_t kBatchSize = 1024;

src/iceberg/partition_spec.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <algorithm>
2323
#include <format>
24+
#include <memory>
2425
#include <ranges>
2526

2627
#include "iceberg/schema.h"
@@ -31,10 +32,9 @@
3132

3233
namespace iceberg {
3334

34-
PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
35-
std::vector<PartitionField> fields,
35+
PartitionSpec::PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
3636
std::optional<int32_t> last_assigned_field_id)
37-
: schema_(std::move(schema)), spec_id_(spec_id), fields_(std::move(fields)) {
37+
: spec_id_(spec_id), fields_(std::move(fields)) {
3838
if (last_assigned_field_id) {
3939
last_assigned_field_id_ = last_assigned_field_id.value();
4040
} else if (fields_.empty()) {
@@ -48,19 +48,17 @@ PartitionSpec::PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
4848

4949
const std::shared_ptr<PartitionSpec>& PartitionSpec::Unpartitioned() {
5050
static const std::shared_ptr<PartitionSpec> unpartitioned =
51-
std::make_shared<PartitionSpec>(
52-
/*schema=*/std::make_shared<Schema>(std::vector<SchemaField>{}), kInitialSpecId,
53-
std::vector<PartitionField>{}, kLegacyPartitionDataIdStart - 1);
51+
std::make_shared<PartitionSpec>(kInitialSpecId, std::vector<PartitionField>{},
52+
kLegacyPartitionDataIdStart - 1);
5453
return unpartitioned;
5554
}
5655

57-
const std::shared_ptr<Schema>& PartitionSpec::schema() const { return schema_; }
58-
5956
int32_t PartitionSpec::spec_id() const { return spec_id_; }
6057

6158
std::span<const PartitionField> PartitionSpec::fields() const { return fields_; }
6259

63-
Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
60+
Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType(
61+
std::shared_ptr<Schema> schema) {
6462
if (fields_.empty()) {
6563
return nullptr;
6664
}
@@ -75,7 +73,7 @@ Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
7573
for (const auto& partition_field : fields_) {
7674
// Get the source field from the original schema by source_id
7775
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
78-
schema_->FindFieldById(partition_field.source_id()));
76+
schema->FindFieldById(partition_field.source_id()));
7977
if (!source_field.has_value()) {
8078
// TODO(xiao.dong) when source field is missing,
8179
// should return an error or just use UNKNOWN type

src/iceberg/partition_spec.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
/// Partition specs for Iceberg tables.
2424

2525
#include <cstdint>
26+
#include <memory>
2627
#include <mutex>
2728
#include <optional>
2829
#include <span>
@@ -32,6 +33,7 @@
3233
#include "iceberg/iceberg_export.h"
3334
#include "iceberg/partition_field.h"
3435
#include "iceberg/result.h"
36+
#include "iceberg/schema.h"
3537
#include "iceberg/util/formattable.h"
3638

3739
namespace iceberg {
@@ -56,24 +58,20 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
5658
/// \param fields The partition fields.
5759
/// \param last_assigned_field_id The last assigned field ID. If not provided, it will
5860
/// be calculated from the fields.
59-
PartitionSpec(std::shared_ptr<Schema> schema, int32_t spec_id,
60-
std::vector<PartitionField> fields,
61+
PartitionSpec(int32_t spec_id, std::vector<PartitionField> fields,
6162
std::optional<int32_t> last_assigned_field_id = std::nullopt);
6263

6364
/// \brief Get an unsorted partition spec singleton.
6465
static const std::shared_ptr<PartitionSpec>& Unpartitioned();
6566

66-
/// \brief Get the table schema
67-
const std::shared_ptr<Schema>& schema() const;
68-
6967
/// \brief Get the spec ID.
7068
int32_t spec_id() const;
7169

7270
/// \brief Get a list view of the partition fields.
7371
std::span<const PartitionField> fields() const;
7472

7573
/// \brief Get the partition type.
76-
Result<std::shared_ptr<StructType>> PartitionType();
74+
Result<std::shared_ptr<StructType>> PartitionType(std::shared_ptr<Schema> schema);
7775

7876
std::string ToString() const override;
7977

@@ -87,7 +85,6 @@ class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
8785
/// \brief Compare two partition specs for equality.
8886
bool Equals(const PartitionSpec& other) const;
8987

90-
std::shared_ptr<Schema> schema_;
9188
const int32_t spec_id_;
9289
std::vector<PartitionField> fields_;
9390
int32_t last_assigned_field_id_;

src/iceberg/table_scan.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,12 +269,17 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
269269

270270
std::vector<std::shared_ptr<FileScanTask>> tasks;
271271
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, context_.table_metadata->PartitionSpec());
272-
auto partition_schema = partition_spec->schema();
272+
// auto partition_schema = context_.table_metadata->Schema().value();
273+
274+
// Get the table schema and partition type
275+
ICEBERG_ASSIGN_OR_RAISE(auto table_schema, context_.table_metadata->Schema());
276+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
277+
partition_spec->PartitionType(table_schema));
273278

274279
for (const auto& manifest_file : manifest_files) {
275280
ICEBERG_ASSIGN_OR_RAISE(
276281
auto manifest_reader,
277-
ManifestReader::Make(manifest_file, file_io_, partition_schema));
282+
ManifestReader::Make(manifest_file, file_io_, partition_type));
278283
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
279284

280285
// TODO(gty404): filter manifests using partition spec and filter expression

0 commit comments

Comments
 (0)