diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 11961658..f274fdf8 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -21,7 +21,7 @@ jobs: - name: Install Rust toolchain uses: actions-rs/toolchain@v1 with: - toolchain: 1.85.1 + toolchain: 1.86.0 components: llvm-tools-preview, rustfmt, clippy default: true override: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ea9ffb2c..94d85399 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -94,7 +94,7 @@ jobs: # Ensure installed pg_config is first on path export PATH=$PATH:/usr/lib/postgresql/${{ matrix.postgres }}/bin - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --profile minimal --default-toolchain 1.85.1 && \ + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --profile minimal --default-toolchain 1.86.0 && \ rustup --version && \ rustc --version && \ cargo --version diff --git a/.github/workflows/release_wasm_fdw.yml b/.github/workflows/release_wasm_fdw.yml index 78f2e41d..2a0c1964 100644 --- a/.github/workflows/release_wasm_fdw.yml +++ b/.github/workflows/release_wasm_fdw.yml @@ -29,7 +29,7 @@ jobs: - name: Set up Rust run: | # install Rust - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --profile minimal --default-toolchain 1.85.1 && \ + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --profile minimal --default-toolchain 1.86.0 && \ rustup --version && \ rustc --version && \ cargo --version diff --git a/.github/workflows/test_supabase_wrappers.yml b/.github/workflows/test_supabase_wrappers.yml index 65df4297..5d89f058 100644 --- a/.github/workflows/test_supabase_wrappers.yml +++ b/.github/workflows/test_supabase_wrappers.yml @@ -19,7 +19,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: 1.85.1 + toolchain: 1.86.0 default: true override: true components: rustfmt, clippy diff --git a/.github/workflows/test_wrappers.yml b/.github/workflows/test_wrappers.yml index 7a0032e5..fae0657b 100644 --- a/.github/workflows/test_wrappers.yml +++ b/.github/workflows/test_wrappers.yml @@ -27,7 +27,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: 1.85.1 + toolchain: 1.86.0 default: true override: true components: rustfmt, clippy @@ -76,7 +76,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: 1.85.1 + toolchain: 1.86.0 default: true override: true components: rustfmt, clippy diff --git a/Cargo.lock b/Cargo.lock index cd24445c..8457c9b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,6 +432,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "as-any" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -732,9 +738,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.6.3" +version = "1.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a18fd934af6ae7ca52410d4548b98eb895aab0f1ea417d168d85db1434a141" +checksum = "8bc1b40fb26027769f16960d2f4a6bc20c4bb755d403e552c8c1a73af433c246" dependencies = [ "aws-credential-types", "aws-runtime", @@ -922,9 +928,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.70.0" +version = "1.84.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83447efb7179d8e2ad2afb15ceb9c113debbc2ecdf109150e338e2e28b86190b" +checksum = "357a841807f6b52cb26123878b3326921e2a25faca412fabdd32bd35b7edd5d3" dependencies = [ "aws-credential-types", "aws-runtime", @@ -944,9 +950,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.71.0" +version = "1.85.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f9bfbbda5e2b9fe330de098f14558ee8b38346408efe9f2e9cee82dc1636a4" +checksum = "67e05f33b6c9026fecfe9b3b6740f34d41bc6ff641a6a32dabaab60209245b75" dependencies = [ "aws-credential-types", "aws-runtime", @@ -966,9 +972,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.71.0" +version = "1.86.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17b984a66491ec08b4f4097af8911251db79296b3e4a763060b45805746264f" +checksum = "e7d835f123f307cafffca7b9027c14979f1d403b417d8541d67cf252e8a21e35" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1138,9 +1144,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3946acbe1ead1301ba6862e712c7903ca9bb230bdf1fbd1b5ac54158ef2ab1f" +checksum = "4fa63ad37685ceb7762fa4d73d06f1d5493feb88e3f27259b9ed277f4c01b185" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1205,9 +1211,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.9" +version = "0.60.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +checksum = "3db87b96cb1b16c024980f133968d52882ca0daaee3a086c6decc500f6c99728" dependencies = [ "xmlparser", ] @@ -1228,9 +1234,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.5.0" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" dependencies = [ "fastrand 2.3.0", "gloo-timers", @@ -2560,6 +2566,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "dissimilar" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8975ffdaa0ef3661bfe02dbdcc06c9f829dfafe6a3c474de366a8d5e44276921" + [[package]] name = "dlv-list" version = "0.5.2" @@ -2810,6 +2822,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "expect-test" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63af43ff4431e848fb47472a920f14fa71c24de13255a5692e93d4e90302acb0" +dependencies = [ + "dissimilar", + "once_cell", +] + [[package]] name = "eyre" version = "0.6.12" @@ -3687,9 +3709,9 @@ dependencies = [ [[package]] name = "iceberg" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ad4b76a13ef469b09493330c4360630499949f4984478b77d6a70d252caf8b4" +checksum = "306fd4bf70d30687dc765110ecd19fc2bb21f16c3d5c188bc53a0d573bb6e675" dependencies = [ "anyhow", "apache-avro", @@ -3702,12 +3724,15 @@ dependencies = [ "arrow-schema", "arrow-select", "arrow-string", + "as-any", "async-trait", + "backon", "base64 0.22.1", "bimap", "bytes", "chrono", "derive_builder", + "expect-test", "fnv", "futures", "itertools 0.13.0", @@ -3728,6 +3753,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_with", + "strum 0.27.2", "thrift", "tokio", "typed-builder 0.20.1", @@ -3738,9 +3764,9 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d744269adcd9a89cea578b6e8443186cb9d5e7d41a47679f3b1d5b71842820e" +checksum = "3e6a0dc30703b0cbb7d3c245126936d92015f93ab3ac52e20edc011f42934628" dependencies = [ "async-trait", "chrono", @@ -3759,8 +3785,8 @@ dependencies = [ [[package]] name = "iceberg-catalog-s3tables" -version = "0.5.1" -source = "git+https://github.com/burmecia/iceberg-rust?rev=e565bc43c1b9fa6b25a601f68bcec1423a984cc1#e565bc43c1b9fa6b25a601f68bcec1423a984cc1" +version = "0.6.0" +source = "git+https://github.com/burmecia/iceberg-rust?rev=6548db2cc02b8ecd65e698e58d372d7dfb342b9c#6548db2cc02b8ecd65e698e58d372d7dfb342b9c" dependencies = [ "anyhow", "async-trait", @@ -3978,6 +4004,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4882,12 +4919,11 @@ checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" [[package]] name = "opendal" -version = "0.53.3" +version = "0.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f947c4efbca344c1a125753366033c8107f552b2e3f8251815ed1908f116ca3e" +checksum = "ffb9838d0575c6dbaf3fcec7255af8d5771996d4af900bbb6fa9a314dec00a1a" dependencies = [ "anyhow", - "async-trait", "backon", "base64 0.22.1", "bytes", @@ -7233,6 +7269,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spin" version = "0.9.8" @@ -7733,20 +7779,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.10", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6a1c21e0..e9edcccb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" [workspace.package] edition = "2021" -rust-version = "1.85.1" +rust-version = "1.86.0" homepage = "https://github.com/supabase/wrappers" repository = "https://github.com/supabase/wrappers" diff --git a/docs/catalog/iceberg.md b/docs/catalog/iceberg.md index 731cdf36..3bd4cd1b 100644 --- a/docs/catalog/iceberg.md +++ b/docs/catalog/iceberg.md @@ -11,7 +11,7 @@ tags: [Apache Iceberg](https://iceberg.apache.org/) is a high performance open-source format for large analytic tables. -The Iceberg Wrapper allows you to read data from Apache Iceberg within your Postgres database. +The Iceberg Wrapper allows you to read from and write to Apache Iceberg within your Postgres database. ## Preparation @@ -173,6 +173,7 @@ create schema if not exists iceberg; The full list of foreign table options are below: - `table` - Fully qualified source table name with all namespaces in Iceberg, required. +- `rowid_column` - The column to use as the row identifier for INSERT operations, required for data insertion. ## Entities @@ -219,7 +220,7 @@ Ref: [Iceberg Table Spec](https://iceberg.apache.org/spec/#iceberg-table-spec) | Object | Select | Insert | Update | Delete | Truncate | | ---------- | :----: | :----: | :----: | :----: | :------: | -| table | ✅ | ❌ | ❌ | ❌ | ❌ | +| table | ✅ | ✅ | ❌ | ❌ | ❌ | #### Usage @@ -234,7 +235,8 @@ create foreign table iceberg.guides ( ) server iceberg_server options ( - table 'docs_example.guides' + table 'docs_example.guides', + rowid_column 'id' ); ``` @@ -282,12 +284,62 @@ This FDW supports `where` clause pushdown with below operators. | bytea | binary | | uuid | uuid | +## Data Insertion + +The Iceberg FDW supports inserting data into Iceberg tables using standard SQL `INSERT` statements. + +### Basic Insert + +```sql +-- insert a single row +insert into iceberg.guides (id, title, content, created_at) +values (1, 'Getting Started', 'Welcome to our guides', now()); + +-- insert multiple rows +insert into iceberg.guides (id, title, content, created_at) +values + (2, 'Advanced Guide', 'Advanced topics', now()), + (3, 'Best Practices', 'Tips and tricks', now()); +``` + +### Insert from Select + +```sql +-- insert data from another table +insert into iceberg.guides (id, title, content, created_at) +select id, title, content, created_at +from some_other_table +where condition = true; +``` + +### Partition Considerations + +When inserting data into partitioned Iceberg tables, the FDW automatically handles partitioning based on the table's partition spec. Data will be written to the appropriate partition directories. + +```sql +-- for a table partitioned by sale_date, data is automatically partitioned +insert into iceberg.sales (product_id, amount, sale_date) +values (123, 99.99, '2025-01-15'); +``` + +### Performance Tips + +- **Batch Inserts**: Use multi-row inserts for better performance +- **Partition Awareness**: When possible, insert data in partition order to optimize file organization +- **Transaction Size**: Consider breaking very large inserts into smaller transactions + +### Limitations for Inserts + +- Schema evolution during insert is not supported +- Only append operations are supported (no upserts) +- Complex data types (nested structs, arrays, maps) have limited support + ## Limitations This section describes important limitations and considerations when using this FDW: - Only supports specific data type mappings between Postgres and Iceberg -- Only supports read operations (no INSERT, UPDATE, DELETE, or TRUNCATE) +- UPDATE, DELETE, and TRUNCATE operations are not supported - [Apache Iceberg schema evolution](https://iceberg.apache.org/spec/#schema-evolution) is not supported - When using Iceberg REST catalog, only supports AWS S3 (or compatible) as the storage - Materialized views using these foreign tables may fail during logical backups @@ -325,7 +377,8 @@ create foreign table if not exists iceberg.guides ( ) server iceberg_server options ( - table 'docs_example.guides' + table 'docs_example.guides', + rowid_column 'id' ); ``` @@ -375,3 +428,32 @@ where created_at >= timestamp '2025-05-16 12:34:56'; select * from iceberg.guides where id > 42 and title like 'Supabase%'; ``` +### Data Insertion Examples + +```sql +-- insert a single record +insert into iceberg.guides (id, title, content, created_at) +values (100, 'New Guide', 'This is a new guide', now()); + +-- insert multiple records at once +insert into iceberg.guides (id, title, content, created_at) +values + (101, 'Guide A', 'Content for Guide A', now()), + (102, 'Guide B', 'Content for Guide B', now()), + (103, 'Guide C', 'Content for Guide C', now()); + +-- insert data from a SELECT query +insert into iceberg.guides (id, title, content, created_at) +select + id + 1000, + 'Migrated: ' || title, + content, + created_at +from other_guides +where id < 10; + +-- verify the inserted data +select count(*) from iceberg.guides; +select * from iceberg.guides where id >= 100 order by id; +``` + diff --git a/docs/catalog/index.md b/docs/catalog/index.md index eec4fff0..9bd16c9c 100644 --- a/docs/catalog/index.md +++ b/docs/catalog/index.md @@ -24,7 +24,7 @@ Each FDW documentation includes a detailed "Limitations" section that describes | Firebase | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | Gravatar | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | HubSpot | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | -| Iceberg | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Iceberg | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | | Logflare | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | Notion | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | Orb | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index b4384cca..89303063 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -174,6 +174,7 @@ iceberg_fdw = [ "arrow-json", "chrono", "futures", + "parquet", "rust_decimal", "serde_json", "thiserror", @@ -250,7 +251,7 @@ regex = { version = "1", optional = true } url = { version = "2.3", optional = true } # for s3_fdw and s3vectors_fdw -aws-config = { version = "1.6.3", features = ["behavior-version-latest"], optional = true } +aws-config = { version = "1.8.6", features = ["behavior-version-latest"], optional = true } # for s3_fdw aws-sdk-s3 = { version = "1.86.0", optional = true } @@ -312,9 +313,9 @@ anyhow = { version = "1.0.81", optional = true } uuid = { version = "1.18.0", features = ["v7"], optional = true } # for iceberg_fdw -iceberg = { version = "0.5.1", optional = true } -iceberg-catalog-s3tables = { git = "https://github.com/burmecia/iceberg-rust", rev = "e565bc43c1b9fa6b25a601f68bcec1423a984cc1", package="iceberg-catalog-s3tables", optional = true } -iceberg-catalog-rest = { version = "0.5.1", optional = true } +iceberg = { version = "0.6.0", optional = true } +iceberg-catalog-s3tables = { git = "https://github.com/burmecia/iceberg-rust", rev = "6548db2cc02b8ecd65e698e58d372d7dfb342b9c", package="iceberg-catalog-s3tables", optional = true } +iceberg-catalog-rest = { version = "0.6.0", optional = true } rust_decimal = { version = "1.37.1", optional = true } # for duckdb_fdw diff --git a/wrappers/dockerfiles/s3/Dockerfile b/wrappers/dockerfiles/s3/Dockerfile index 05eee220..4af704c3 100644 --- a/wrappers/dockerfiles/s3/Dockerfile +++ b/wrappers/dockerfiles/s3/Dockerfile @@ -3,6 +3,6 @@ FROM python:3.13-slim RUN apt-get update && apt-get install -y --no-install-recommends wget -RUN pip install "pyiceberg[s3fs,pyarrow]" \ +RUN pip install "pyiceberg[s3fs,pyarrow]==0.10.0" \ && wget https://dl.min.io/client/mc/release/linux-amd64/mc \ && chmod +x mc diff --git a/wrappers/dockerfiles/s3/iceberg_seed.py b/wrappers/dockerfiles/s3/iceberg_seed.py index e240f8ed..1312f226 100644 --- a/wrappers/dockerfiles/s3/iceberg_seed.py +++ b/wrappers/dockerfiles/s3/iceberg_seed.py @@ -5,20 +5,22 @@ # Table: docs_example.bids # ============================================================================ -from datetime import date, datetime, timedelta +from datetime import date, datetime, time, timedelta from zoneinfo import ZoneInfo from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.partitioning import PartitionSpec, PartitionField -from pyiceberg.transforms import DayTransform +from pyiceberg.transforms import DayTransform, HourTransform, MonthTransform, YearTransform, IdentityTransform from pyiceberg.table.sorting import SortOrder, SortField -from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BinaryType, BooleanType, DateType, + TimeType, TimestampType, TimestamptzType, + TimestampNanoType, + TimestamptzNanoType, IntegerType, LongType, FloatType, @@ -49,99 +51,240 @@ ) namespace = "docs_example" -tblname = f"{namespace}.bids" - -schema = Schema( - NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), - NestedField(field_id=2, name="symbol", field_type=StringType(), required=True), - NestedField(field_id=3, name="bid", field_type=FloatType(), required=False), - NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), - NestedField( - field_id=5, - name="details", - field_type=StructType( - NestedField( - field_id=54, name="created_by", field_type=StringType(), required=False + +def create_bids_table(catalog, namespace): + tblname = f"{namespace}.bids" + schema = Schema( + NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=True), + NestedField(field_id=3, name="bid", field_type=FloatType(), required=False), + NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), + NestedField( + field_id=5, + name="details", + required=False, + field_type=StructType( + NestedField( + field_id=54, name="created_by", field_type=StringType(), required=False + ), + NestedField( + field_id=55, name="balance", field_type=FloatType(), required=False + ), + NestedField( + field_id=56, name="balance2", field_type=DoubleType(), required=False + ), + NestedField( + field_id=57, name="count", field_type=IntegerType(), required=False + ), + NestedField( + field_id=58, name="count2", field_type=LongType(), required=False + ), + NestedField( + field_id=59, name="valid", field_type=BooleanType(), required=False + ), ), ), - required=False, - ), - NestedField(field_id=6, name="amt", field_type=DecimalType(32, 3), required=False), - NestedField(field_id=7, name="dt", field_type=DateType(), required=False), - NestedField(field_id=8, name="tstz", field_type=TimestamptzType(), required=False), - NestedField(field_id=9, name="uid", field_type=UUIDType(), required=False), - NestedField(field_id=10, name="bin", field_type=BinaryType(), required=False), - NestedField(field_id=11, name="bcol", field_type=BooleanType(), required=False), - NestedField(field_id=12, name="list", field_type=ListType( - element_id=50, - element_type=StringType(), - required=False - ), required=False), - NestedField(field_id=13, name="icol", field_type=IntegerType(), required=False), - NestedField(field_id=14, name="map", field_type=MapType( - key_id=100, - key_type=StringType(), - value_id=102, - value_type=StringType(), - ), required=False), - NestedField(field_id=15, name="lcol", field_type=LongType(), required=False), - #NestedField(field_id=16, name="Upcol", field_type=StringType(), required=False), - #NestedField(field_id=17, name="space col", field_type=StringType(), required=False), -) + NestedField(field_id=6, name="amt", field_type=DecimalType(32, 3), required=False), + NestedField(field_id=7, name="dt", field_type=DateType(), required=False), + NestedField(field_id=8, name="tstz", field_type=TimestamptzType(), required=False), + NestedField(field_id=9, name="uid", field_type=UUIDType(), required=False), + NestedField(field_id=10, name="bin", field_type=BinaryType(), required=False), + NestedField(field_id=11, name="bcol", field_type=BooleanType(), required=False), + NestedField(field_id=12, name="list", field_type=ListType( + element_id=50, element_type=StringType(), required=False + ), required=False), + NestedField(field_id=13, name="icol", field_type=IntegerType(), required=False), + NestedField(field_id=14, name="map", field_type=MapType( + key_id=100, key_type=StringType(), value_id=102, value_type=StringType(), + ), required=False), + NestedField(field_id=15, name="lcol", field_type=LongType(), required=False), + #NestedField(field_id=16, name="Upcol", field_type=StringType(), required=False), + #NestedField(field_id=17, name="space col", field_type=StringType(), required=False), + NestedField(field_id=21, name="tcol", field_type=TimeType(), required=False), + + NestedField(field_id=22, name="map2", field_type=MapType( + key_id=200, key_type=StringType(), value_id=202, value_type=FloatType(), + ), required=False), + NestedField(field_id=23, name="map3", field_type=MapType( + key_id=204, key_type=StringType(), value_id=206, value_type=BooleanType(), value_required=False, + ), required=False), + NestedField(field_id=24, name="map4", field_type=MapType( + key_id=208, key_type=StringType(), value_id=210, value_type=IntegerType(), value_required=False, + ), required=False), + NestedField(field_id=25, name="map5", field_type=MapType( + key_id=212, key_type=StringType(), value_id=214, value_type=LongType(), value_required=False, + ), required=False), + NestedField(field_id=26, name="map6", field_type=MapType( + key_id=216, key_type=StringType(), value_id=218, value_type=DoubleType(), value_required=False, + ), required=False), -partition_spec = PartitionSpec( - PartitionField( - source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + NestedField(field_id=27, name="list2", field_type=ListType( + element_id=300, element_type=LongType(), element_required=False, required=False + ), required=False), + NestedField(field_id=28, name="list3", field_type=ListType( + element_id=302, element_type=FloatType(), element_required=False, required=False + ), required=False), + NestedField(field_id=29, name="list4", field_type=ListType( + element_id=304, element_type=BooleanType(), element_required=False, required=False + ), required=False), + NestedField(field_id=30, name="list5", field_type=ListType( + element_id=306, element_type=IntegerType(), element_required=False, required=False + ), required=False), + NestedField(field_id=31, name="list6", field_type=ListType( + element_id=308, element_type=DoubleType(), element_required=False, required=False + ), required=False), + + NestedField(field_id=32, name="pat_col_year", field_type=TimestampType(), required=False), + NestedField(field_id=33, name="pat_col_month", field_type=DateType(), required=False), + NestedField(field_id=34, name="pat_col_hour", field_type=TimestampType(), required=False), + NestedField(field_id=35, name="pat_bcol", field_type=BooleanType(), required=False), + NestedField(field_id=36, name="pat_icol", field_type=IntegerType(), required=False), + NestedField(field_id=37, name="pat_lcol", field_type=LongType(), required=False), + NestedField(field_id=38, name="pat_tcol", field_type=TimestampType(), required=False), + + identifier_field_ids=[1], ) -) -sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + ), + PartitionField( + source_id=2, field_id=1002, transform=IdentityTransform(), name="symbol_ident" + ), + PartitionField( + source_id=3, field_id=1004, transform=IdentityTransform(), name="bid_ident" + ), + PartitionField( + source_id=4, field_id=1006, transform=IdentityTransform(), name="ask_ident" + ), + PartitionField( + source_id=7, field_id=1008, transform=DayTransform(), name="dt_day" + ), + PartitionField( + source_id=11, field_id=1010, transform=IdentityTransform(), name="bcol_ident" + ), + PartitionField( + source_id=13, field_id=1012, transform=IdentityTransform(), name="icol_ident" + ), + PartitionField( + source_id=32, field_id=1014, transform=YearTransform(), name="pat_year" + ), + PartitionField( + source_id=33, field_id=1016, transform=MonthTransform(), name="pat_month" + ), + PartitionField( + source_id=34, field_id=1018, transform=HourTransform(), name="pat_hour" + ), + PartitionField( + source_id=35, field_id=1020, transform=IdentityTransform(), name="pat_bcol_ident" + ), + PartitionField( + source_id=36, field_id=1022, transform=IdentityTransform(), name="pat_icol_ident" + ), + PartitionField( + source_id=37, field_id=1024, transform=IdentityTransform(), name="pat_lcol_ident" + ), + PartitionField( + source_id=38, field_id=1026, transform=MonthTransform(), name="pat_tcol_month" + ), + ) -catalog.create_namespace_if_not_exists(namespace) + sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) -ns = catalog.list_namespaces() -tables = catalog.list_tables(namespace) + if catalog.table_exists(tblname): + catalog.purge_table(tblname) + tbl = catalog.create_table( + identifier=tblname, + schema=schema, + #location="s3://iceberg", + partition_spec=partition_spec, + sort_order=sort_order, + ) + table = catalog.load_table(tblname) -if catalog.table_exists(tblname): - catalog.purge_table(tblname) -tbl = catalog.create_table( - identifier=tblname, - schema=schema, - #location="s3://iceberg", - partition_spec=partition_spec, - sort_order=sort_order, -) -table = catalog.load_table(tblname) -data = table.scan().to_arrow() - -df = pa.Table.from_pylist( - [ - { - "datetime": datetime.now() - timedelta(days=1), - "symbol": "APL", - "bid": 12.34, "ask": 54.32, "amt": 998, - "tstz": datetime(2025, 5, 16, 12, 34, 56, tzinfo=ZoneInfo("Asia/Singapore")), - "details": {"created_by": "alice"}, - "map": { "nn": "qq", "nn2": "pp" }, - "bcol": True, - }, - { - "datetime": datetime.now(timezone.utc), - "symbol": "MCS", - "bid": 33.44, "ask": 11.22, "dt": date(2025, 5, 16), - "uid": uuid.UUID(bytes=bytes([0x42] * 16)).bytes, - "bin": bytes([0x43] * 16), - "list": ["xx", "yy"], - "map": { "kk": "val", "kk2": "123.4" }, - "icol": 1234, "lcol": 5678, - #"Upcol": "uppercase col name", - #"space col": "space in col name", - }, - ], - schema=tbl.schema().as_arrow(), -) + df = pa.Table.from_pylist( + [ + { + "datetime": datetime.now() - timedelta(days=1), + "symbol": "APL", + "bid": 12.34, "ask": 54.32, "amt": 998, + "tstz": datetime(2025, 5, 16, 12, 34, 56, tzinfo=ZoneInfo("Asia/Singapore")), + "details": { + "created_by": "alice", + "balance": 222.33, + "count": 42, + "valid": True, + }, + "map": { "nn": "qq", "nn2": "pp" }, + "bcol": True, + "pat_col_year": datetime.now() - timedelta(days=1), + "pat_col_month": date(2025, 5, 16), + "tcol": time.fromisoformat('04:23:01'), + }, + { + "datetime": datetime.now(timezone.utc), + "symbol": "MCS", + "bid": 33.44, "ask": 11.22, "dt": date(2025, 5, 16), + "uid": uuid.UUID(bytes=bytes([0x42] * 16)).bytes, + "bin": bytes([0x43] * 16), + "list": ["xx", "yy"], + "map": { "kk": "val", "kk2": "123.4" }, + "icol": 1234, "lcol": 5678, + #"Upcol": "uppercase col name", + #"space col": "space in col name", + }, + ], + schema=tbl.schema().as_arrow(), + ) + + tbl.overwrite(df) + + data = tbl.scan().to_arrow() + print(data) + + +def create_asks_table(catalog, namespace): + tblname = f"{namespace}.asks" + schema = Schema( + NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=True), + NestedField(field_id=3, name="ask", field_type=DoubleType(), required=False), + identifier_field_ids=[1], + ) -tbl.overwrite(df) + sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + + if catalog.table_exists(tblname): + catalog.purge_table(tblname) + tbl = catalog.create_table( + identifier=tblname, + schema=schema, + #location="s3://iceberg", + sort_order=sort_order, + ) + table = catalog.load_table(tblname) + + df = pa.Table.from_pylist( + [ + { + "datetime": datetime.now() - timedelta(days=1), + "symbol": "APL", + "ask": 12.34, + }, + ], + schema=tbl.schema().as_arrow(), + ) + + tbl.overwrite(df) + + data = tbl.scan().to_arrow() + print(data) + + +catalog.create_namespace_if_not_exists(namespace) +ns = catalog.list_namespaces() +tables = catalog.list_tables(namespace) -data = tbl.scan().to_arrow() -print(data) +create_bids_table(catalog, namespace) +create_asks_table(catalog, namespace) diff --git a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs index 58c689e3..23642f95 100644 --- a/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs +++ b/wrappers/src/fdw/airtable_fdw/airtable_fdw.rs @@ -173,7 +173,7 @@ impl ForeignDataWrapper for AirtableFdw { if !result.is_empty() { return Ok(result .drain(0..1) - .last() + .next_back() .map(|src_row| row.replace_with(src_row))); } } diff --git a/wrappers/src/fdw/duckdb_fdw/tests.rs b/wrappers/src/fdw/duckdb_fdw/tests.rs index 6bdfe1ad..8c1dd3e2 100644 --- a/wrappers/src/fdw/duckdb_fdw/tests.rs +++ b/wrappers/src/fdw/duckdb_fdw/tests.rs @@ -108,6 +108,7 @@ mod tests { .select( "SELECT datetime,symbol,bid,ask,details,amt,dt,tstz,bin,bcol,list,icol,map,lcol FROM duckdb.iceberg_docs_example_bids + WHERE symbol in ('APL', 'MCS') order by symbol", None, &[], @@ -129,7 +130,15 @@ mod tests { .filter_map(|r| r.get_by_name::("details").unwrap()) .map(|v| v.0.clone()) .collect::>(); - assert_eq!(results, vec![json!({ "created_by": "alice" })]); + assert_eq!( + results, + vec![json!({ + "created_by": "alice", + "balance": 222.33, + "count": 42, + "valid": true + })] + ); }); } diff --git a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs index b0ac14fa..98c81152 100644 --- a/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs +++ b/wrappers/src/fdw/firebase_fdw/firebase_fdw.rs @@ -331,7 +331,7 @@ impl ForeignDataWrapper for FirebaseFdw { Ok(self .scan_result .drain(0..1) - .last() + .next_back() .map(|src_row| row.replace_with(src_row))) } } diff --git a/wrappers/src/fdw/iceberg_fdw/README.md b/wrappers/src/fdw/iceberg_fdw/README.md index da9d16a7..f2f30a09 100644 --- a/wrappers/src/fdw/iceberg_fdw/README.md +++ b/wrappers/src/fdw/iceberg_fdw/README.md @@ -10,6 +10,7 @@ This is a foreign data wrapper for [Apache Iceberg](https://iceberg.apache.org/) | Version | Date | Notes | | ------- | ---------- | ---------------------------------------------------- | +| 0.1.3 | 2025-09-20 | Add data insertion support | | 0.1.2 | 2025-07-30 | Large data set query performance improvement | | 0.1.1 | 2025-05-15 | Refactor server options passdown | | 0.1.0 | 2025-05-07 | Initial version | diff --git a/wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs b/wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs index 88779f47..de01b00c 100644 --- a/wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs +++ b/wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs @@ -1,33 +1,42 @@ -use arrow_array::{array::ArrayRef, RecordBatch}; +use arrow_array::{array::ArrayRef, builder::ArrayBuilder, Array, RecordBatch}; use futures::StreamExt; use iceberg::{ expr::Predicate, scan::ArrowRecordBatchStream, - spec::{NestedFieldRef, PrimitiveType, Type}, + spec::{DataFileFormat, NestedFieldRef, PrimitiveType, Type}, table::Table, + transaction::{ApplyTransactionAction, Transaction}, + writer::{ + base_writer::data_file_writer::DataFileWriterBuilder, file_writer::ParquetWriterBuilder, + IcebergWriter, IcebergWriterBuilder, + }, Catalog, NamespaceIdent, TableIdent, }; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_catalog_s3tables::{S3TablesCatalog, S3TablesCatalogConfig}; +use parquet::file::properties::WriterProperties; use pgrx::pg_sys; use std::collections::{HashMap, HashSet, VecDeque}; +use std::sync::Arc; use supabase_wrappers::prelude::*; -use super::{mapper::Mapper, pushdown::try_pushdown, IcebergFdwError, IcebergFdwResult}; +use super::{ + mapper::Mapper, + pushdown::try_pushdown, + utils, + writer::{FileNameGenerator, LocationGenerator}, + IcebergFdwError, IcebergFdwResult, +}; use crate::stats; -// copy an option to another in an option HashMap, if the target option -// doesn't exist -fn copy_option(map: &mut HashMap, from_key: &str, to_key: &str) { - if !map.contains_key(to_key) { - let value = map.get(from_key).cloned().unwrap_or_default(); - map.insert(to_key.to_string(), value); - } +#[derive(Debug, Clone)] +struct InputRow { + cells: Vec>, } #[wrappers_fdw( - version = "0.1.2", + version = "0.1.3", author = "Supabase", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/iceberg_fdw", error_type = "IcebergFdwError" @@ -55,6 +64,9 @@ pub(crate) struct IcebergFdw { // for stats: total number of records and bytes read num_rows: usize, bytes_in: usize, + + // for insertion: buffer for accumulating input rows before sorting + input_rows: Vec, } impl IcebergFdw { @@ -138,7 +150,7 @@ impl IcebergFdw { let mut scan_files = self.rt.block_on(scan.plan_files())?; while let Some(sf) = self.rt.block_on(scan_files.next()) { let sf = sf.unwrap(); - log_debug1(&format!( + report_info(&format!( "file scan: {:?}, {}", sf.record_count, sf.data_file_path )); @@ -157,6 +169,116 @@ impl IcebergFdw { self.row_data.clear(); self.mapper.reset(); } + + // sort input rows by partition column values + fn sort_rows_by_partition( + &mut self, + metadata: &iceberg::spec::TableMetadata, + schema: &iceberg::spec::Schema, + ) -> IcebergFdwResult> { + let partition_spec = metadata.default_partition_spec(); + + // if no partition spec, return original order + if partition_spec.fields().is_empty() { + return Ok(self.input_rows.clone()); + } + + let mut rows_with_keys = Vec::new(); + + // compute partition key for each row + for row in &self.input_rows { + let partition_key = self.compute_partition_key_for_input_row(metadata, schema, row)?; + rows_with_keys.push((row.clone(), partition_key)); + } + + // sort by partition key + rows_with_keys.sort_by(|a, b| a.1.cmp(&b.1)); + + // extract sorted rows + let sorted_rows = rows_with_keys.into_iter().map(|(row, _)| row).collect(); + Ok(sorted_rows) + } + + // compute partition key for an input row + fn compute_partition_key_for_input_row( + &self, + metadata: &iceberg::spec::TableMetadata, + schema: &iceberg::spec::Schema, + row: &InputRow, + ) -> IcebergFdwResult { + let partition_spec = metadata.default_partition_spec(); + let mut key_parts = Vec::new(); + + for partition_field in partition_spec.fields() { + let source_field_id = partition_field.source_id; + let field_name = &partition_field.name; + + // find the column index for this field ID in the schema + let mut source_column_index = None; + for (idx, field) in schema.as_struct().fields().iter().enumerate() { + if field.id == source_field_id { + source_column_index = Some(idx); + break; + } + } + let column_index = source_column_index.ok_or_else(|| { + IcebergFdwError::ColumnNotFound(format!( + "cannot find source column with ID {} for partition field", + source_field_id + )) + })?; + + // get the cell value for this column + if let Some(Some(cell)) = row.cells.get(column_index) { + // for now, just use string representation + // in a full implementation, you'd handle different transforms + key_parts.push(format!("{}={}", field_name, cell)); + } else { + key_parts.push(format!("{}=null", field_name)); + } + } + + Ok(key_parts.join("/")) + } + + // build record batch from sorted input rows + fn build_record_batch_from_rows( + &mut self, + schema: &iceberg::spec::Schema, + sorted_rows: &[InputRow], + ) -> IcebergFdwResult { + let schema: arrow_schema::Schema = schema.try_into()?; + let mut builders: Vec> = Vec::new(); + + for field in &schema.fields { + let builder = arrow_array::builder::make_builder(field.data_type(), sorted_rows.len()); + builders.push(builder); + } + + // populate builders with sorted row data + for row in sorted_rows { + for (col_idx, cell) in row.cells.iter().enumerate() { + let builder = &mut builders[col_idx]; + let field_type = &schema.fields[col_idx].data_type(); + self.mapper + .append_array_value(builder, field_type, cell.as_ref())?; + } + } + + // convert builders to arrays + let mut arrays: Vec = Vec::new(); + for mut builder in builders.drain(..) { + let array = builder.finish(); + arrays.push(array); + } + + // create record batch + let rb_option = arrow_array::RecordBatchOptions::new().with_match_field_names(false); + let record_batch = + RecordBatch::try_new_with_options(Arc::new(schema.clone()), arrays, &rb_option)?; + + Ok(record_batch) + } } impl ForeignDataWrapper for IcebergFdw { @@ -186,9 +308,9 @@ impl ForeignDataWrapper for IcebergFdw { .collect(); // copy AWS credentials if they're not set by user - copy_option(&mut props, "aws_access_key_id", "s3.access-key-id"); - copy_option(&mut props, "aws_secret_access_key", "s3.secret-access-key"); - copy_option(&mut props, "region_name", "s3.region"); + utils::copy_option(&mut props, "aws_access_key_id", "s3.access-key-id"); + utils::copy_option(&mut props, "aws_secret_access_key", "s3.secret-access-key"); + utils::copy_option(&mut props, "region_name", "s3.region"); let batch_size = require_option_or("batch_size", &server.options, "4096") .parse::() @@ -233,6 +355,7 @@ impl ForeignDataWrapper for IcebergFdw { mapper: Mapper::default(), num_rows: 0, bytes_in: 0, + input_rows: Vec::new(), }) } @@ -292,6 +415,106 @@ impl ForeignDataWrapper for IcebergFdw { Ok(()) } + fn begin_modify(&mut self, options: &HashMap) -> IcebergFdwResult<()> { + let tbl_ident = TableIdent::from_strs(require_option("table", options)?.split("."))?; + self.table = self + .rt + .block_on(self.catalog.load_table(&tbl_ident))? + .into(); + self.input_rows.clear(); + + Ok(()) + } + + fn insert(&mut self, src: &Row) -> IcebergFdwResult<()> { + // save the row in the input row buffer + self.input_rows.push(InputRow { + cells: src.cells.clone(), + }); + + Ok(()) + } + + fn end_modify(&mut self) -> IcebergFdwResult<()> { + // only write if we have rows + if self.input_rows.is_empty() { + return Ok(()); + } + + // clone the table to avoid borrowing conflicts + let table = match &self.table { + Some(table) => table.clone(), + None => return Ok(()), + }; + + let metadata = table.metadata(); + let schema = metadata.current_schema(); + + // sort input_rows by partition column values + let sorted_rows = self.sort_rows_by_partition(metadata, schema)?; + + // build record batch from sorted rows + let record_batch = self.build_record_batch_from_rows(schema, &sorted_rows)?; + + // split the record batch by partition values + let partition_batches = utils::split_record_batch_by_partition(metadata, record_batch)?; + + let mut data_files = Vec::new(); + + // write each partition batch separately + for partition_batch in partition_batches.iter() { + let location_generator = LocationGenerator::new(metadata, partition_batch)?; + let file_name_generator = FileNameGenerator::new(DataFileFormat::Parquet); + + // get partition value from location generator + let partition_value = location_generator.partition_value(); + + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + schema.clone(), + table.file_io().clone(), + location_generator, + file_name_generator, + ); + let data_file_writer_builder = DataFileWriterBuilder::new( + parquet_writer_builder, + partition_value, + metadata.default_partition_spec().spec_id(), + ); + let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?; + + // write the record batch to Iceberg and close the writer and get + // the data file + self.rt + .block_on(data_file_writer.write(partition_batch.clone()))?; + let mut part_data_files = self.rt.block_on(data_file_writer.close())?; + + data_files.append(&mut part_data_files); + } + + // create transaction and commit the changes to update table metadata + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_files.clone()); + let tx = append_action.apply(tx)?; + let updated_table = self.rt.block_on(tx.commit(self.catalog.as_ref()))?; + + // update the cached table reference with the new metadata + self.table = Some(updated_table); + + if cfg!(debug_assertions) { + for data_file in &data_files { + report_info(&format!( + "Data file: {}, records: {}, size: {} bytes", + data_file.file_path(), + data_file.record_count(), + data_file.file_size_in_bytes() + )); + } + } + + Ok(()) + } + fn import_foreign_schema( &mut self, stmt: ImportForeignSchemaStmt, @@ -348,6 +571,7 @@ impl ForeignDataWrapper for IcebergFdw { } PrimitiveType::String => "text", PrimitiveType::Date => "date", + PrimitiveType::Time => "time", PrimitiveType::Timestamp => "timestamp", PrimitiveType::Timestamptz => "timestamp with time zone", PrimitiveType::Uuid => "uuid", @@ -371,15 +595,25 @@ impl ForeignDataWrapper for IcebergFdw { } if !fields.is_empty() { + let ident_field_ids: Vec = schema.identifier_field_ids().collect(); + let rowid_column = if ident_field_ids.len() == 1 { + schema + .field_by_id(ident_field_ids[0]) + .map(|field| format!(", rowid_column '{}'", field.name)) + } else { + None + }; + ret.push(format!( r#"create foreign table if not exists {} ( {} ) - server {} options (table '{}')"#, + server {} options (table '{}'{})"#, tbl.identifier().name, fields.join(","), stmt.server_name, tbl.identifier(), + rowid_column.unwrap_or_default(), )); } } diff --git a/wrappers/src/fdw/iceberg_fdw/mapper.rs b/wrappers/src/fdw/iceberg_fdw/mapper.rs index 4bca5557..c44fb662 100644 --- a/wrappers/src/fdw/iceberg_fdw/mapper.rs +++ b/wrappers/src/fdw/iceberg_fdw/mapper.rs @@ -1,6 +1,7 @@ -use arrow_array::{array, timezone::Tz, RecordBatch}; +use arrow_array::{array, builder::*, timezone::Tz, RecordBatch}; use arrow_json::ArrayWriter; -use chrono::{DateTime, NaiveDateTime}; +use arrow_schema::DataType; +use chrono::{DateTime, NaiveDateTime, Timelike}; use iceberg::spec::{PrimitiveType, Type}; use pgrx::{ datum::{self, datetime_support::DateTimeConversionError, JsonB}, @@ -40,7 +41,27 @@ fn parse_tz(s: Option<&str>) -> IcebergFdwResult { Ok(tz) } -// Iceberg cell to Wrappers cell mapper +// fill an struct with null to all fields +fn fill_empty_struct(struct_builder: &mut StructBuilder) { + struct_builder.append_null(); + for field_builder in struct_builder.field_builders_mut() { + if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } else if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } else if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } else if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } else if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } else if let Some(b) = field_builder.as_any_mut().downcast_mut::() { + b.append_null(); + } + } +} + +// iceberg cell to Wrappers cell mapper #[derive(Default)] pub(super) struct Mapper { // record batch in JSON format @@ -154,57 +175,30 @@ impl Mapper { } pg_sys::TIMEOID => { if let Type::Primitive(PrimitiveType::Time) = src_type { - if let Some(dt) = array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - }) + if let Some(t) = array + .downcast_ref::() + .and_then(|a| a.value_as_time(rec_offset)) .or_else(|| { array .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) + .and_then(|a| a.value_as_time(rec_offset)) }) .or_else(|| { array .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) + .and_then(|a| a.value_as_time(rec_offset)) }) { - let ts = naive_datetime_to_ts(dt)?; - cell = Some(Cell::Time(datum::Time::from(ts))); + let time = + datum::Time::new(t.hour() as _, t.minute() as _, t.second() as _)?; + cell = Some(Cell::Time(time)); } } } pg_sys::TIMESTAMPOID => { if let Type::Primitive(PrimitiveType::Timestamp) = src_type { if let Some(dt) = array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - }) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - }) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| a.value_as_datetime(rec_offset)) - }) - { - let ts = naive_datetime_to_ts(dt)?; - cell = Some(Cell::Timestamp(ts)); - } - } else if let Type::Primitive(PrimitiveType::TimestampNs) = src_type { - if let Some(dt) = array - .downcast_ref::() + .downcast_ref::() .and_then(|a| a.value_as_datetime(rec_offset)) { let ts = naive_datetime_to_ts(dt)?; @@ -215,46 +209,7 @@ impl Mapper { pg_sys::TIMESTAMPTZOID => { if let Type::Primitive(PrimitiveType::Timestamptz) = src_type { if let Some(dt) = array - .downcast_ref::() - .and_then(|a| { - parse_tz(a.timezone()) - .map(|tz| a.value_as_datetime_with_tz(rec_offset, tz)) - .transpose() - }) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| { - parse_tz(a.timezone()) - .map(|tz| a.value_as_datetime_with_tz(rec_offset, tz)) - .transpose() - }) - }) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| { - parse_tz(a.timezone()) - .map(|tz| a.value_as_datetime_with_tz(rec_offset, tz)) - .transpose() - }) - }) - .or_else(|| { - array - .downcast_ref::() - .and_then(|a| { - parse_tz(a.timezone()) - .map(|tz| a.value_as_datetime_with_tz(rec_offset, tz)) - .transpose() - }) - }) - { - let ts = datetime_to_tstz(dt?)?; - cell = Some(Cell::Timestamptz(ts)); - } - } else if let Type::Primitive(PrimitiveType::TimestamptzNs) = src_type { - if let Some(dt) = array - .downcast_ref::() + .downcast_ref::() .and_then(|a| { parse_tz(a.timezone()) .map(|tz| a.value_as_datetime_with_tz(rec_offset, tz)) @@ -304,7 +259,9 @@ impl Mapper { } } _ => { - return Err(IcebergFdwError::UnsupportedColumnType(col_name.into())); + return Err(IcebergFdwError::UnsupportedType(format!( + "unsupported column data type for column: {col_name}" + ))); } } @@ -312,4 +269,775 @@ impl Mapper { IcebergFdwError::IncompatibleColumnType(col_name.into(), (*src_type).to_string()) }) } + + pub(super) fn append_array_value( + &self, + builder: &mut Box, + field_type: &DataType, + cell: Option<&Cell>, + ) -> IcebergFdwResult<()> { + use rust_decimal::prelude::ToPrimitive; + use rust_decimal::Decimal; + use std::convert::TryInto; + + let unsupported = |ty: &DataType| { + IcebergFdwError::UnsupportedType(format!("unsupported column type: {ty:?}")) + }; + + match field_type { + DataType::Boolean => { + if let Some(bool_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::Bool(val)) => bool_builder.append_value(*val), + _ => bool_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Int32 => { + if let Some(int_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::I32(val)) => int_builder.append_value(*val), + Some(Cell::I16(val)) => int_builder.append_value((*val).into()), + Some(Cell::I8(val)) => int_builder.append_value((*val).into()), + _ => int_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Int64 => { + if let Some(int_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::I64(val)) => int_builder.append_value(*val), + Some(Cell::I32(val)) => int_builder.append_value((*val).into()), + Some(Cell::I16(val)) => int_builder.append_value((*val).into()), + Some(Cell::I8(val)) => int_builder.append_value((*val).into()), + _ => int_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Float32 => { + if let Some(float_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::F32(val)) => float_builder.append_value(*val), + Some(Cell::F64(val)) => float_builder.append_value(*val as f32), + _ => float_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Float64 => { + if let Some(float_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::F64(val)) => float_builder.append_value(*val), + Some(Cell::F32(val)) => float_builder.append_value((*val).into()), + _ => float_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Decimal128(_, scale) => { + if let Some(dec_builder) = builder.as_any_mut().downcast_mut::() + { + if let Some(Cell::Numeric(val)) = cell { + let decimal_str = val.to_string(); + let mut appended = false; + if let Ok(mut decimal) = Decimal::from_str_exact(&decimal_str) { + if let Ok(scale_u32) = (*scale).try_into() { + decimal.rescale(scale_u32); + if let Some(mantissa) = decimal.mantissa().to_i128() { + dec_builder.append_value(mantissa); + appended = true; + } + } + } + if !appended { + dec_builder.append_null(); + } + } else { + dec_builder.append_null(); + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Utf8 => { + if let Some(str_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::String(val)) => str_builder.append_value(val), + Some(Cell::Json(val)) => str_builder.append_value(val.0.to_string()), + _ => str_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::LargeUtf8 => { + if let Some(str_builder) = builder.as_any_mut().downcast_mut::() + { + match cell { + Some(Cell::String(val)) => str_builder.append_value(val), + Some(Cell::Json(val)) => str_builder.append_value(val.0.to_string()), + _ => str_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Binary => { + if let Some(bin_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::Bytea(val)) => { + let bytes = unsafe { varlena::varlena_to_byte_slice(*val) }; + bin_builder.append_value(bytes); + } + Some(Cell::Uuid(val)) => { + bin_builder.append_value(val.as_bytes()); + } + _ => bin_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::LargeBinary => { + if let Some(bin_builder) = builder.as_any_mut().downcast_mut::() + { + match cell { + Some(Cell::Bytea(val)) => { + let bytes = unsafe { varlena::varlena_to_byte_slice(*val) }; + bin_builder.append_value(bytes); + } + Some(Cell::Uuid(val)) => { + bin_builder.append_value(val.as_bytes()); + } + _ => bin_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::FixedSizeBinary(size) => { + if let Some(bin_builder) = builder + .as_any_mut() + .downcast_mut::() + { + match cell { + Some(Cell::Bytea(val)) => { + let bytes = unsafe { varlena::varlena_to_byte_slice(*val) }; + if bytes.len() == *size as usize { + let _ = bin_builder.append_value(bytes); + } else { + bin_builder.append_null(); + } + } + Some(Cell::Uuid(val)) if *size as usize == val.as_bytes().len() => { + let _ = bin_builder.append_value(val.as_bytes()); + } + _ => bin_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Date32 => { + if let Some(date_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::Date(val)) => { + date_builder.append_value(val.to_unix_epoch_days()) + } + _ => date_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Date64 => { + if let Some(date_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::Date(val)) => { + let days = i64::from(val.to_unix_epoch_days()); + date_builder.append_value(days * 86_400_000); + } + _ => date_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Time32(unit) => match unit { + arrow_schema::TimeUnit::Second => { + if let Some(time_builder) = + builder.as_any_mut().downcast_mut::() + { + match cell { + Some(Cell::Time(val)) => { + let micros = i64::from(*val); + time_builder.append_value((micros / 1_000_000) as i32); + } + _ => time_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + arrow_schema::TimeUnit::Millisecond => { + if let Some(time_builder) = builder + .as_any_mut() + .downcast_mut::() + { + match cell { + Some(Cell::Time(val)) => { + let micros = i64::from(*val); + time_builder.append_value((micros / 1_000) as i32); + } + _ => time_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + _ => return Err(unsupported(field_type)), + }, + DataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + if let Some(time_builder) = builder + .as_any_mut() + .downcast_mut::() + { + match cell { + Some(Cell::Time(val)) => { + let micros = i64::from(*val); + time_builder.append_value(micros); + } + _ => time_builder.append_null(), + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Timestamp(unit, tz) => match unit { + arrow_schema::TimeUnit::Second => { + if let Some(ts_builder) = builder + .as_any_mut() + .downcast_mut::() + { + let raw = match (tz.is_some(), cell) { + (true, Some(Cell::Timestamptz(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + (false, Some(Cell::Timestamp(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + _ => None, + }; + if let Some(micros) = raw { + ts_builder.append_value(micros / 1_000_000); + } else { + ts_builder.append_null(); + } + } else { + return Err(unsupported(field_type)); + } + } + arrow_schema::TimeUnit::Millisecond => { + if let Some(ts_builder) = builder + .as_any_mut() + .downcast_mut::() + { + let raw = match (tz.is_some(), cell) { + (true, Some(Cell::Timestamptz(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + (false, Some(Cell::Timestamp(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + _ => None, + }; + if let Some(micros) = raw { + ts_builder.append_value(micros / 1_000); + } else { + ts_builder.append_null(); + } + } else { + return Err(unsupported(field_type)); + } + } + arrow_schema::TimeUnit::Microsecond => { + if let Some(ts_builder) = builder + .as_any_mut() + .downcast_mut::() + { + let raw = match (tz.is_some(), cell) { + (true, Some(Cell::Timestamptz(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + (false, Some(Cell::Timestamp(val))) => { + Some(i64::from(*val) + PG_EPOCH_US) + } + _ => None, + }; + if let Some(micros) = raw { + ts_builder.append_value(micros); + } else { + ts_builder.append_null(); + } + } else { + return Err(unsupported(field_type)); + } + } + arrow_schema::TimeUnit::Nanosecond => { + return Err(unsupported(field_type)); + } + }, + DataType::Struct(fields) => { + if let Some(struct_builder) = builder.as_any_mut().downcast_mut::() { + match cell { + Some(Cell::Json(json_val)) => { + if let Some(json_object) = json_val.0.as_object() { + // process each field in the struct + let field_builders = struct_builder.field_builders_mut(); + + for (i, field) in fields.iter().enumerate() { + if i < field_builders.len() { + let field_name = field.name(); + let field_value = json_object.get(field_name); + + // handle field insertion based on field type + match field.data_type() { + DataType::Boolean => { + if let Some(bool_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::Bool(v)) => { + bool_builder.append_value(*v) + } + _ => bool_builder.append_null(), + } + } + } + DataType::Int32 => { + if let Some(int_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::Number(n)) + if n.is_i64() => + { + if let Some(val) = n.as_i64() { + int_builder + .append_value(val as i32); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + } + DataType::Int64 => { + if let Some(int_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::Number(n)) + if n.is_i64() => + { + if let Some(val) = n.as_i64() { + int_builder.append_value(val); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + } + DataType::Float32 => { + if let Some(float_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::Number(n)) + if n.is_f64() => + { + if let Some(val) = n.as_f64() { + float_builder + .append_value(val as f32); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + } + DataType::Float64 => { + if let Some(float_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::Number(n)) + if n.is_f64() => + { + if let Some(val) = n.as_f64() { + float_builder.append_value(val); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + } + DataType::Utf8 => { + if let Some(str_builder) = field_builders[i] + .as_any_mut() + .downcast_mut::() + { + match field_value { + Some(JsonValue::String(s)) => { + str_builder.append_value(s) + } + _ => str_builder.append_null(), + } + } + } + _ => { + return Err(unsupported(field_type)); + } + } + } + } + + struct_builder.append(true); + } else { + // not a JSON object, append null for all fields + fill_empty_struct(struct_builder); + } + } + _ => { + // no cell data, append null for all fields + fill_empty_struct(struct_builder); + } + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::List(field) => { + if let Some(list_builder) = builder + .as_any_mut() + .downcast_mut::>>() + { + // we only deal with primitive element list + match cell { + Some(Cell::Json(json_val)) => { + if let Some(json_array) = json_val.0.as_array() { + match field.data_type() { + DataType::Boolean => { + let bool_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::Bool(v) => bool_builder.append_value(*v), + _ => bool_builder.append_null(), + } + } + list_builder.append(true); + } + DataType::Int32 => { + let int_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::Number(n) if n.is_i64() => { + if let Some(val) = n.as_i64() { + int_builder.append_value(val as i32); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + list_builder.append(true); + } + DataType::Int64 => { + let int_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::Number(n) if n.is_i64() => { + if let Some(val) = n.as_i64() { + int_builder.append_value(val); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + list_builder.append(true); + } + DataType::Float32 => { + let float_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::Number(n) if n.is_f64() => { + if let Some(val) = n.as_f64() { + float_builder.append_value(val as f32); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + list_builder.append(true); + } + DataType::Float64 => { + let float_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::Number(n) if n.is_f64() => { + if let Some(val) = n.as_f64() { + float_builder.append_value(val); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + list_builder.append(true); + } + DataType::Utf8 => { + let string_builder = list_builder + .values() + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| unsupported(field_type))?; + for item in json_array { + match item { + JsonValue::String(s) => { + string_builder.append_value(s) + } + _ => string_builder.append_null(), + } + } + list_builder.append(true); + } + _ => { + return Err(unsupported(field_type)); + } + } + } else { + return Err(unsupported(field_type)); + } + } + _ => { + list_builder.append(false); + } + } + } else { + return Err(unsupported(field_type)); + } + } + DataType::Map(field, _sorted) => { + if let Some(map_builder) = builder + .as_any_mut() + .downcast_mut::, Box>>() + { + match cell { + Some(Cell::Json(json_val)) => { + if let Some(json_object) = json_val.0.as_object() { + // extract key and value types from the struct field + if let DataType::Struct(struct_fields) = field.data_type() { + if struct_fields.len() >= 2 { + let key_field = &struct_fields[0]; + let value_field = &struct_fields[1]; + + // process each key-value pair in the JSON object + // first process all keys, only accept string as key + for (key, _) in json_object { + match key_field.data_type() { + DataType::Utf8 => { + let key_builder = map_builder.keys(); + if let Some(str_builder) = key_builder + .as_any_mut() + .downcast_mut::() + { + str_builder.append_value(key); + } + } + _ => { + return Err(unsupported(field_type)); + } + } + } + + // then process all values + for (_, value) in json_object { + match value_field.data_type() { + DataType::Boolean => { + let value_builder = map_builder.values(); + if let Some(bool_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::Bool(v) => { + bool_builder.append_value(*v) + } + _ => bool_builder.append_null(), + } + } + } + DataType::Int32 => { + let value_builder = map_builder.values(); + if let Some(int_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::Number(n) if n.is_i64() => { + if let Some(val) = n.as_i64() { + int_builder + .append_value(val as i32); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + } + DataType::Int64 => { + let value_builder = map_builder.values(); + if let Some(int_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::Number(n) if n.is_i64() => { + if let Some(val) = n.as_i64() { + int_builder.append_value(val); + } else { + int_builder.append_null(); + } + } + _ => int_builder.append_null(), + } + } + } + DataType::Float32 => { + let value_builder = map_builder.values(); + if let Some(float_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::Number(n) if n.is_f64() => { + if let Some(val) = n.as_f64() { + float_builder + .append_value(val as f32); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + } + DataType::Float64 => { + let value_builder = map_builder.values(); + if let Some(float_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::Number(n) if n.is_f64() => { + if let Some(val) = n.as_f64() { + float_builder.append_value(val); + } else { + float_builder.append_null(); + } + } + _ => float_builder.append_null(), + } + } + } + DataType::Utf8 => { + let value_builder = map_builder.values(); + if let Some(str_builder) = value_builder + .as_any_mut() + .downcast_mut::() + { + match value { + JsonValue::String(s) => { + str_builder.append_value(s) + } + _ => str_builder.append_null(), + } + } + } + _ => { + return Err(unsupported(field_type)); + } + } + } + + map_builder.append(true)?; + } else { + map_builder.append(false)?; + } + } else { + map_builder.append(false)?; + } + } else { + map_builder.append(false)?; + } + } + _ => { + map_builder.append(false)?; + } + } + } else { + return Err(unsupported(field_type)); + } + } + _ => return Err(unsupported(field_type)), + } + + Ok(()) + } } diff --git a/wrappers/src/fdw/iceberg_fdw/mod.rs b/wrappers/src/fdw/iceberg_fdw/mod.rs index 3784e362..4ac39442 100644 --- a/wrappers/src/fdw/iceberg_fdw/mod.rs +++ b/wrappers/src/fdw/iceberg_fdw/mod.rs @@ -3,6 +3,8 @@ mod iceberg_fdw; mod mapper; mod pushdown; mod tests; +mod utils; +mod writer; use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; @@ -15,8 +17,8 @@ enum IcebergFdwError { #[error("column {0} is not found in source")] ColumnNotFound(String), - #[error("column '{0}' data type is not supported")] - UnsupportedColumnType(String), + #[error("{0}")] + UnsupportedType(String), #[error("column '{0}' data type '{1}' is incompatible")] IncompatibleColumnType(String, String), diff --git a/wrappers/src/fdw/iceberg_fdw/pushdown.rs b/wrappers/src/fdw/iceberg_fdw/pushdown.rs index 0efd0f68..c018938d 100644 --- a/wrappers/src/fdw/iceberg_fdw/pushdown.rs +++ b/wrappers/src/fdw/iceberg_fdw/pushdown.rs @@ -60,7 +60,12 @@ fn cell_to_iceberg_datum(cell: &Cell, tgt_type: &Type) -> IcebergFdwResult Some(Datum::date(v.to_unix_epoch_days())), Cell::Time(v) => { let (h, m, s, micro) = v.to_hms_micro(); - Some(Datum::time_from_hms_micro(h as _, m as _, s as _, micro)?) + Some(Datum::time_from_hms_micro( + h as _, + m as _, + s as _, + micro - (s as u32) * 1_000_000, + )?) } Cell::Timestamp(v) => { let (h, m, s, micro) = v.to_hms_micro(); diff --git a/wrappers/src/fdw/iceberg_fdw/tests.rs b/wrappers/src/fdw/iceberg_fdw/tests.rs index e14e54df..423422a6 100644 --- a/wrappers/src/fdw/iceberg_fdw/tests.rs +++ b/wrappers/src/fdw/iceberg_fdw/tests.rs @@ -116,7 +116,7 @@ mod tests { let results = c .select( - "SELECT tstz FROM iceberg.bids WHERE symbol = 'APL'", + "SELECT tstz FROM iceberg.bids WHERE symbol like 'APL%'", None, &[], ) @@ -168,7 +168,15 @@ mod tests { .filter_map(|r| r.get_by_name::("details").unwrap()) .map(|v| v.0.clone()) .collect::>(); - assert_eq!(results, vec![json!({ "created_by": "alice" })]); + assert_eq!( + results, + vec![json!({ + "created_by": "alice", + "balance": 222.33, + "count": 42, + "valid": true + })] + ); let results = c .select( @@ -193,6 +201,110 @@ mod tests { .map(|v| v.0.clone()) .collect::>(); assert_eq!(results, vec![json!({"nn": "qq", "nn2": "pp"})]); + + // test pushdown + let _results = c + .select( + "SELECT * FROM iceberg.bids + WHERE symbol not like 'APL%' + AND bid = 12.34 AND ask = 56.78 + AND icol = 1234 AND lcol = 5678 + AND datetime = '2025-09-15 11:22:33' + AND tstz = '2025-09-15 11:22:33+00:00' + AND tcol = '12:34:56.123' + AND bcol + ", + None, + &[], + ) + .unwrap(); + + // test data insertion with partitioning + c.update( + r#"INSERT INTO iceberg.bids( + datetime, symbol, bid, ask, amt, dt, tstz, list, map, + details, uid, lcol, + bin, + map2, map3, map4, map5, map6, + list2, list3, list4, list5, list6, + pat_col_year, pat_col_month, pat_col_hour, + pat_bcol, pat_icol, pat_lcol, pat_tcol + ) VALUES + ('2025-09-15 11:22:33', 'GOOG', 123.45, 1.23456, 123.345, + '2025-09-15', '2025-09-15 11:22:33+00:00', + '["row1a", "row1b"]', + '{"key1a": "value1a", "key1b": "value1b"}', + '{"created_by": "foo", "balance": 888.99, "balance2": 33.44, "count": 33, "count2": 44, "valid": false}', + null, + 123456789, + E'\\xDEADBEEF', + null, null, null, null, null, + null, null, null, null, null, + '2025-09-15 11:22:33+00:00', '2025-09-15', '2025-09-15 11:22:33', + true, 123, 345, '2025-09-15 11:22:33' + ), + ('2025-09-16 11:22:33', 'META', 123.45, 1.23456, 123.345, + '2025-09-16', '2025-09-16 11:22:33+00:00', + null, + null, + null, + null, + 123456789, + E'\\xDEADBEEF', + '{"aa": 222.33}', '{"aa": null, "bb": true}', '{"aa": 123}', '{"aa": 345}', '{"aa": 12.34}', + '[12, null, 56]', '[12.34, 56.78]', '[true, false]', '[1, 2]', '[123.45, 0.0]', + '2025-09-16 11:22:33+00:00', '2025-09-16', '2025-09-16 11:22:33', + null, null, null, null + ) + "#, + None, + &[], + ) + .unwrap(); + + let results = c + .select( + "SELECT * FROM iceberg.bids WHERE symbol = 'GOOG'", + None, + &[], + ) + .unwrap() + .filter_map(|r| r.get_by_name::<&str, _>("symbol").unwrap()) + .collect::>(); + assert_eq!(results, vec!["GOOG"]); + let results = c + .select( + "SELECT details FROM iceberg.bids WHERE symbol = 'GOOG'", + None, + &[], + ) + .unwrap() + .filter_map(|r| r.get_by_name::("details").unwrap()) + .map(|v| v.0.clone()) + .collect::>(); + assert_eq!( + results, + vec![json!({ + "created_by": "foo", + "balance": 888.99, + "balance2": 33.44, + "count": 33, + "count2": 44, + "valid": false, + })] + ); + + // test data insertion without partitioning + c.update( + r#"INSERT INTO iceberg.asks( + datetime, symbol, ask + ) VALUES + ('2025-09-15 11:22:33', 'APL', 123.45) + "#, + None, + &[], + ) + .unwrap(); }); } } diff --git a/wrappers/src/fdw/iceberg_fdw/utils.rs b/wrappers/src/fdw/iceberg_fdw/utils.rs new file mode 100644 index 00000000..402aa10a --- /dev/null +++ b/wrappers/src/fdw/iceberg_fdw/utils.rs @@ -0,0 +1,361 @@ +use arrow_array::{ + Array, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int32Array, + Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray, +}; +use iceberg::spec::{Literal, PrimitiveLiteral, Struct, TableMetadata, Transform}; +use std::collections::HashMap; + +use super::IcebergFdwResult; +use crate::fdw::iceberg_fdw::IcebergFdwError; + +// copy an option to another in an option HashMap, if the target option +// doesn't exist +pub(super) fn copy_option(map: &mut HashMap, from_key: &str, to_key: &str) { + if !map.contains_key(to_key) { + let value = map.get(from_key).cloned().unwrap_or_default(); + map.insert(to_key.to_string(), value); + } +} + +/// Split a record batch into multiple batches by partition values +/// Assumes the input record batch is already sorted by partition key +pub(super) fn split_record_batch_by_partition( + metadata: &TableMetadata, + record_batch: RecordBatch, +) -> IcebergFdwResult> { + let partition_spec = metadata.default_partition_spec(); + + // if no partition spec, return the original batch + if partition_spec.fields().is_empty() { + return Ok(vec![record_batch]); + } + + // since data is pre-sorted by partition key, we can efficiently detect boundaries + let mut result_batches = Vec::new(); + let mut partition_start = 0; + let mut current_partition_key: Option = None; + + for row_idx in 0..record_batch.num_rows() { + let (partition_key, _) = compute_partition_info(metadata, &record_batch, row_idx)?; + + // check if we've hit a partition boundary + if current_partition_key.as_ref() != Some(&partition_key) { + // create batch for previous partition (if any) + if row_idx > 0 { + let batch_length = row_idx - partition_start; + let partition_batch = record_batch.slice(partition_start, batch_length); + result_batches.push(partition_batch); + } + + // start new partition + partition_start = row_idx; + current_partition_key = Some(partition_key); + } + } + + // handle the last partition + if partition_start < record_batch.num_rows() { + let batch_length = record_batch.num_rows() - partition_start; + let partition_batch = record_batch.slice(partition_start, batch_length); + result_batches.push(partition_batch); + } + + Ok(result_batches) +} + +/// Convert seconds since Unix epoch to years since Unix epoch +fn seconds_to_years(seconds: i64) -> i64 { + // unix epoch is 1970-01-01 + // this is a simplified calculation that treats each year as 365.25 days + // more precise implementations would account for leap years properly + seconds / (365.25 * 24.0 * 60.0 * 60.0) as i64 +} + +/// Convert seconds since Unix epoch to months since Unix epoch +fn seconds_to_months(seconds: i64) -> i64 { + // unix epoch is 1970-01-01 + // this is a simplified calculation that treats each month as 30.44 days (365.25/12) + // more precise implementations would account for varying month lengths + seconds / (30.44 * 24.0 * 60.0 * 60.0) as i64 +} + +/// Compute partition information for a specific row, returning both the string key and struct value +pub(super) fn compute_partition_info( + metadata: &TableMetadata, + record_batch: &RecordBatch, + row_idx: usize, +) -> IcebergFdwResult<(String, Option)> { + let partition_spec = metadata.default_partition_spec(); + + // if no partition spec, return empty values + if partition_spec.fields().is_empty() { + return Ok(("".to_string(), None)); + } + + let schema = metadata.current_schema(); + let mut key_parts = Vec::new(); + let mut partition_values = Vec::new(); + + for partition_field in partition_spec.fields() { + let source_field_id = partition_field.source_id; + let field_name = &partition_field.name; + let transform = &partition_field.transform; + + // find the column index for this field ID in the schema + let mut source_column_index = None; + for (idx, field) in schema.as_ref().as_struct().fields().iter().enumerate() { + if field.id == source_field_id { + source_column_index = Some(idx); + break; + } + } + let column_index = source_column_index.ok_or_else(|| { + IcebergFdwError::ColumnNotFound(format!( + "cannot find source column with ID {} for partition field", + source_field_id + )) + })?; + + // get the column data from record batch + let column = record_batch.column(column_index); + + // extract partition value for this specific row + match transform { + Transform::Day => { + let mut days_since_epoch = None; + + if let Some(timestamp_array) = + column.as_any().downcast_ref::() + { + if !timestamp_array.is_null(row_idx) { + let timestamp_us = timestamp_array.value(row_idx); + days_since_epoch = Some(timestamp_us / (24 * 60 * 60 * 1_000_000)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let days = date_array.value(row_idx) as i64; + days_since_epoch = Some(days); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let timestamp_ms = date_array.value(row_idx); + days_since_epoch = Some(timestamp_ms / (24 * 60 * 60 * 1_000)); + } + } else { + return Err(IcebergFdwError::UnsupportedType( + "expected timestamp or date array type for day partition".to_string(), + )); + } + + if let Some(days) = days_since_epoch { + key_parts.push(format!("{}={}", field_name, days)); + partition_values + .push(Some(Literal::Primitive(PrimitiveLiteral::Int(days as i32)))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } + Transform::Year => { + let mut years_since_epoch = None; + + if let Some(timestamp_array) = + column.as_any().downcast_ref::() + { + if !timestamp_array.is_null(row_idx) { + let timestamp_us = timestamp_array.value(row_idx); + let seconds_since_epoch = timestamp_us / 1_000_000; + years_since_epoch = Some(seconds_to_years(seconds_since_epoch)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let days = date_array.value(row_idx) as i64; + let seconds_since_epoch = days * 24 * 60 * 60; + years_since_epoch = Some(seconds_to_years(seconds_since_epoch)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let timestamp_ms = date_array.value(row_idx); + let seconds_since_epoch = timestamp_ms / 1_000; + years_since_epoch = Some(seconds_to_years(seconds_since_epoch)); + } + } else { + return Err(IcebergFdwError::UnsupportedType( + "expected timestamp or date array type for year partition".to_string(), + )); + } + + if let Some(years) = years_since_epoch { + key_parts.push(format!("{}={}", field_name, years)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::Int( + years as i32, + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } + Transform::Month => { + let mut months_since_epoch = None; + + if let Some(timestamp_array) = + column.as_any().downcast_ref::() + { + if !timestamp_array.is_null(row_idx) { + let timestamp_us = timestamp_array.value(row_idx); + let seconds_since_epoch = timestamp_us / 1_000_000; + months_since_epoch = Some(seconds_to_months(seconds_since_epoch)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let days = date_array.value(row_idx) as i64; + let seconds_since_epoch = days * 24 * 60 * 60; + months_since_epoch = Some(seconds_to_months(seconds_since_epoch)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let timestamp_ms = date_array.value(row_idx); + let seconds_since_epoch = timestamp_ms / 1_000; + months_since_epoch = Some(seconds_to_months(seconds_since_epoch)); + } + } else { + return Err(IcebergFdwError::UnsupportedType( + "expected timestamp or date array type for month partition".to_string(), + )); + } + + if let Some(months) = months_since_epoch { + key_parts.push(format!("{}={}", field_name, months)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::Int( + months as i32, + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } + Transform::Hour => { + let mut hours_since_epoch = None; + + if let Some(timestamp_array) = + column.as_any().downcast_ref::() + { + if !timestamp_array.is_null(row_idx) { + let timestamp_us = timestamp_array.value(row_idx); + hours_since_epoch = Some(timestamp_us / (60 * 60 * 1_000_000)); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let days = date_array.value(row_idx) as i64; + hours_since_epoch = Some(days * 24); + } + } else if let Some(date_array) = column.as_any().downcast_ref::() { + if !date_array.is_null(row_idx) { + let timestamp_ms = date_array.value(row_idx); + hours_since_epoch = Some(timestamp_ms / (60 * 60 * 1_000)); + } + } else { + return Err(IcebergFdwError::UnsupportedType( + "expected timestamp or date array type for hour partition".to_string(), + )); + } + + if let Some(hours) = hours_since_epoch { + key_parts.push(format!("{}={}", field_name, hours)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::Int( + hours as i32, + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } + Transform::Identity => { + // for identity transform, use the raw value from the column + if let Some(boolean_array) = column.as_any().downcast_ref::() { + if !boolean_array.is_null(row_idx) { + let value = boolean_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values + .push(Some(Literal::Primitive(PrimitiveLiteral::Boolean(value)))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else if let Some(int32_array) = column.as_any().downcast_ref::() { + if !int32_array.is_null(row_idx) { + let value = int32_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values + .push(Some(Literal::Primitive(PrimitiveLiteral::Int(value)))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else if let Some(int64_array) = column.as_any().downcast_ref::() { + if !int64_array.is_null(row_idx) { + let value = int64_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values + .push(Some(Literal::Primitive(PrimitiveLiteral::Long(value)))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else if let Some(float32_array) = column.as_any().downcast_ref::() { + if !float32_array.is_null(row_idx) { + let value = float32_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::Float( + value.into(), + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else if let Some(float64_array) = column.as_any().downcast_ref::() { + if !float64_array.is_null(row_idx) { + let value = float64_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::Double( + value.into(), + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else if let Some(string_array) = column.as_any().downcast_ref::() { + if !string_array.is_null(row_idx) { + let value = string_array.value(row_idx); + key_parts.push(format!("{}={}", field_name, value)); + partition_values.push(Some(Literal::Primitive(PrimitiveLiteral::String( + value.to_string(), + )))); + } else { + key_parts.push(format!("{}=null", field_name)); + partition_values.push(None); + } + } else { + return Err(IcebergFdwError::UnsupportedType( + "expected primitive array type for identity partition".to_string(), + )); + } + } + _ => { + return Err(IcebergFdwError::UnsupportedType(format!( + "unsupported partition transform: {:?}", + transform + ))); + } + } + } + + let partition_key = key_parts.join("/"); + let partition_struct = if partition_values.is_empty() { + None + } else { + Some(Struct::from_iter(partition_values)) + }; + + Ok((partition_key, partition_struct)) +} diff --git a/wrappers/src/fdw/iceberg_fdw/writer.rs b/wrappers/src/fdw/iceberg_fdw/writer.rs new file mode 100644 index 00000000..5e53056e --- /dev/null +++ b/wrappers/src/fdw/iceberg_fdw/writer.rs @@ -0,0 +1,236 @@ +use arrow_array::RecordBatch; +use chrono::{Duration, NaiveDate}; +use iceberg::{ + spec::{DataFileFormat, Literal, PrimitiveLiteral, Struct, TableMetadata, Transform}, + writer::file_writer::location_generator, +}; + +use super::utils::compute_partition_info; +use super::IcebergFdwResult; +use crate::fdw::iceberg_fdw::IcebergFdwError; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub(super) struct FileNameGenerator { + format: String, + file_count: Arc, +} + +impl FileNameGenerator { + pub fn new(format: DataFileFormat) -> Self { + Self { + format: format.to_string(), + file_count: Arc::new(AtomicU64::new(0)), + } + } +} + +impl location_generator::FileNameGenerator for FileNameGenerator { + fn generate_file_name(&self) -> String { + let file_id = self.file_count.fetch_add(1, Ordering::Relaxed); + let uuid = Uuid::new_v4(); + format!( + "{:05}-{}-{}{}", + file_id, + 0, // task_id (always 0 for single task) + uuid.as_hyphenated(), + if self.format.is_empty() { + "".to_string() + } else { + format!(".{}", self.format) + } + ) + } +} + +#[derive(Clone, Debug)] +pub(super) struct LocationGenerator { + dir_path: String, + partition_value: Option, +} + +impl LocationGenerator { + pub fn new(metadata: &TableMetadata, record_batch: &RecordBatch) -> IcebergFdwResult { + let table_location = metadata.location(); + let prop = metadata.properties(); + let data_location = prop + .get("write.data.path") + .or(prop.get("write.folder-storage.path")); + let base_path = if let Some(data_location) = data_location { + data_location.clone() + } else { + format!("{}/data", table_location) + }; + + // compute partition value first + // use the first row (row 0) to compute partition value for the entire batch + let (_, partition_value) = compute_partition_info(metadata, record_batch, 0)?; + + // format partition path for directory structure + let partition_path = Self::format_partition_path(metadata, &partition_value)?; + let dir_path = if partition_path.is_empty() { + base_path + } else { + format!("{}/{}", base_path, partition_path) + }; + + Ok(Self { + dir_path, + partition_value, + }) + } + + /// Get the partition value for use with DataFileWriterBuilder + pub fn partition_value(&self) -> Option { + self.partition_value.clone() + } + + fn format_partition_path( + metadata: &TableMetadata, + partition_value: &Option, + ) -> IcebergFdwResult { + let Some(partition_struct) = partition_value else { + return Ok(String::new()); + }; + + let partition_spec = metadata.default_partition_spec(); + let mut path_parts = Vec::new(); + + for (idx, partition_field) in partition_spec.fields().iter().enumerate() { + let field_name = &partition_field.name; + let transform = &partition_field.transform; + + if let Some(literal) = &partition_struct[idx] { + match literal { + Literal::Primitive(PrimitiveLiteral::Int(value)) => { + match transform { + Transform::Day => { + // convert days since epoch to ISO date format + let epoch_date = + NaiveDate::from_ymd_opt(1970, 1, 1).ok_or_else(|| { + IcebergFdwError::UnsupportedType( + "invalid epoch date".to_string(), + ) + })?; + let date = epoch_date + Duration::days(*value as i64); + path_parts.push(format!( + "{}={}", + field_name, + date.format("%Y-%m-%d") + )); + } + Transform::Year => { + // year value is years since epoch (1970) + let year = 1970 + *value; + path_parts.push(format!("{}={}", field_name, year)); + } + Transform::Month => { + // month value is months since epoch (1970-01) + let months_since_epoch = *value as i64; + let years = months_since_epoch / 12; + let months = months_since_epoch % 12; + let year = 1970 + years; + let month = 1 + months; // Months are 1-indexed + path_parts.push(format!("{}={:04}-{:02}", field_name, year, month)); + } + Transform::Hour => { + // hour value is hours since epoch (1970-01-01 00:00) + let hours_since_epoch = *value as i64; + let days = hours_since_epoch / 24; + let hours = hours_since_epoch % 24; + let epoch_date = + NaiveDate::from_ymd_opt(1970, 1, 1).ok_or_else(|| { + IcebergFdwError::UnsupportedType( + "invalid epoch date".to_string(), + ) + })?; + let date = epoch_date + Duration::days(days); + path_parts.push(format!( + "{}={}-{:02}", + field_name, + date.format("%Y-%m-%d"), + hours + )); + } + Transform::Identity => { + // for identity transform, use the value directly + path_parts.push(format!("{}={}", field_name, value)); + } + _ => { + return Err(IcebergFdwError::UnsupportedType(format!( + "unsupported partition transform: {:?}", + transform + ))); + } + } + } + Literal::Primitive(PrimitiveLiteral::Boolean(value)) => { + if matches!(transform, Transform::Identity) { + path_parts.push(format!("{}={}", field_name, value)); + } else { + return Err(IcebergFdwError::UnsupportedType(format!( + "boolean values only supported with Identity transform, got: {:?}", + transform + ))); + } + } + Literal::Primitive(PrimitiveLiteral::Long(value)) => { + if matches!(transform, Transform::Identity) { + path_parts.push(format!("{}={}", field_name, value)); + } else { + return Err(IcebergFdwError::UnsupportedType(format!( + "long values only supported with Identity transform, got: {:?}", + transform + ))); + } + } + Literal::Primitive(PrimitiveLiteral::Float(value)) => { + if matches!(transform, Transform::Identity) { + path_parts.push(format!("{}={}", field_name, value)); + } else { + return Err(IcebergFdwError::UnsupportedType(format!( + "float values only supported with Identity transform, got: {:?}", + transform + ))); + } + } + Literal::Primitive(PrimitiveLiteral::Double(value)) => { + if matches!(transform, Transform::Identity) { + path_parts.push(format!("{}={}", field_name, value)); + } else { + return Err(IcebergFdwError::UnsupportedType(format!( + "double values only supported with Identity transform, got: {:?}", + transform + ))); + } + } + Literal::Primitive(PrimitiveLiteral::String(value)) => { + if matches!(transform, Transform::Identity) { + path_parts.push(format!("{}={}", field_name, value)); + } else { + return Err(IcebergFdwError::UnsupportedType(format!( + "string values only supported with Identity transform, got: {:?}", + transform + ))); + } + } + _ => { + return Err(IcebergFdwError::UnsupportedType( + "unsupported partition literal type".to_string(), + )); + } + } + } + } + + Ok(path_parts.join("/")) + } +} + +impl location_generator::LocationGenerator for LocationGenerator { + fn generate_location(&self, file_name: &str) -> String { + format!("{}/{}", self.dir_path, file_name) + } +} diff --git a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs index b240f17b..ab55b3ac 100644 --- a/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs +++ b/wrappers/src/fdw/logflare_fdw/logflare_fdw.rs @@ -291,7 +291,7 @@ impl ForeignDataWrapper for LogflareFdw { Ok(self .scan_result .drain(0..1) - .last() + .next_back() .map(|src_row| row.replace_with(src_row))) } } diff --git a/wrappers/src/fdw/redis_fdw/redis_fdw.rs b/wrappers/src/fdw/redis_fdw/redis_fdw.rs index f7c27dc3..7f8ecad6 100644 --- a/wrappers/src/fdw/redis_fdw/redis_fdw.rs +++ b/wrappers/src/fdw/redis_fdw/redis_fdw.rs @@ -105,7 +105,7 @@ impl RedisFdw { } let mut tgt_row = Row::new(); - if let Some(val) = self.scan_result.drain(0..1).last() { + if let Some(val) = self.scan_result.drain(0..1).next_back() { let tgt_col = &self.tgt_cols[0]; tgt_row.push(&tgt_col.name, Some(Cell::String(val.to_owned()))); }