From 1e92896b732aa170889c48b980b75b176f481550 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 30 Nov 2025 18:33:57 -0500 Subject: [PATCH 1/4] stash --- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 87 +++++++++- .../io/gcp/bigquery/BigQueryUtilsTest.java | 161 ++++++++++++++++++ 2 files changed, 240 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 21f2e64cd1f8..3b8c3081b4a0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.util.Preconditions; @@ -157,8 +158,41 @@ public abstract static class SchemaConversionOptions implements Serializable { */ public abstract boolean getInferMaps(); + /** + * Controls how BigQuery {@code TIMESTAMP(12)} columns (picosecond precision) are converted to + * Beam schema types. + * + * + */ + public enum PicosecondTimestampConversion { + MICROS, + NANOS, + PICOS + } + + /** + * Controls how BigQuery {@code TIMESTAMP(12)} (picosecond precision) columns are converted to + * Beam schema types. + * + *

Since Beam's datetime types only support up to nanosecond precision, this option specifies + * whether to truncate to microseconds, truncate to nanoseconds, or preserve full precision as a + * string. + * + *

This option has no effect on {@code TIMESTAMP(6)} (microsecond). + * + *

Defaults to {@link PicosecondTimestampConversion#NANOS}. + */ + public abstract PicosecondTimestampConversion getPicosecondTimestampConversion(); + public static Builder builder() { - return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false); + return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder() + .setInferMaps(false) + .setPicosecondTimestampConversion(PicosecondTimestampConversion.NANOS); } /** Builder for {@link SchemaConversionOptions}. */ @@ -166,6 +200,9 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setInferMaps(boolean inferMaps); + public abstract Builder setPicosecondTimestampConversion( + PicosecondTimestampConversion conversion); + public abstract SchemaConversionOptions build(); } } @@ -350,12 +387,17 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { * *

Supports both standard and legacy SQL types. * - * @param typeName Name of the type returned by {@link TableFieldSchema#getType()} + * @param schema Schema of the type returned * @param nestedFields Nested fields for the given type (eg. RECORD type) * @return Corresponding Beam {@link FieldType} */ private static FieldType fromTableFieldSchemaType( - String typeName, List nestedFields, SchemaConversionOptions options) { + TableFieldSchema schema, + List nestedFields, + SchemaConversionOptions options) { + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- + String typeName = schema.getType(); // see // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (typeName) { @@ -373,7 +415,24 @@ private static FieldType fromTableFieldSchemaType( case "BOOL": return FieldType.BOOLEAN; case "TIMESTAMP": - return FieldType.DATETIME; + System.out.println("CLAUDE BigQueryUtils fromTableFieldSchemaType TIMESTAMP" + schema); + if ((schema.getTimestampPrecision() == null) || (schema.getTimestampPrecision() == 6)) { + return FieldType.DATETIME; + } + if (options.getPicosecondTimestampConversion() + == SchemaConversionOptions.PicosecondTimestampConversion.MICROS) { + return FieldType.logicalType(Timestamp.MICROS); + } + if (options.getPicosecondTimestampConversion() + == SchemaConversionOptions.PicosecondTimestampConversion.NANOS) { + return FieldType.logicalType(Timestamp.NANOS); + } + if (options.getPicosecondTimestampConversion() + == SchemaConversionOptions.PicosecondTimestampConversion.PICOS) { + return FieldType.STRING; + } + throw new UnsupportedOperationException( + "Converting BigQuery type " + typeName + " to Beam type is unsupported"); case "DATE": return FieldType.logicalType(SqlTypes.DATE); case "TIME": @@ -395,8 +454,8 @@ private static FieldType fromTableFieldSchemaType( if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName()) && BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) { return FieldType.map( - fromTableFieldSchemaType(key.getType(), key.getFields(), options), - fromTableFieldSchemaType(value.getType(), value.getFields(), options)); + fromTableFieldSchemaType(key, key.getFields(), options), + fromTableFieldSchemaType(value, value.getFields(), options)); } } Schema rowSchema = fromTableFieldSchema(nestedFields, options); @@ -413,8 +472,7 @@ private static Schema fromTableFieldSchema( Schema.Builder schemaBuilder = Schema.builder(); for (TableFieldSchema tableFieldSchema : tableFieldSchemas) { FieldType fieldType = - fromTableFieldSchemaType( - tableFieldSchema.getType(), tableFieldSchema.getFields(), options); + fromTableFieldSchemaType(tableFieldSchema, tableFieldSchema.getFields(), options); Optional fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf); if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent() @@ -803,6 +861,19 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso } catch (NumberFormatException e) { return java.time.Instant.parse(jsonBQString); } + } else if (fieldType.isLogicalType(Timestamp.IDENTIFIER)) { + System.out.println("CLAUDE fieldType.isLogicalType(Timestamp.IDENTIFIER) " + jsonBQString); + try { + if (jsonBQString.endsWith(" UTC")) { + // Remove " UTC" suffix and replace with "Z" for ISO format + jsonBQString = jsonBQString.replace(" UTC", "Z").replace(" ", "T"); + } + + return java.time.Instant.parse(jsonBQString); + } catch (Exception e) { + System.out.println("CLAUDE Exception: " + e); + throw e; + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index eeaf00e0f282..d26e789657ef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; @@ -1294,4 +1295,164 @@ public void testTrimSchema() { BigQueryUtils.trimSchema(BQ_ROW_TYPE, Arrays.asList("row.id", "row.value", "row.name"))); } } + + @Test + public void testFromTableSchema_timestampPrecision12_defaultToNanos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_micros() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampConversion( + BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.MICROS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MICROS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_nanos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampConversion( + BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.NANOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_picos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampConversion( + BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.PICOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.STRING).build(), beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision6_ignoredOption() { + // Standard microsecond precision should ignore the picosecond conversion option + TableFieldSchema microsTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(6L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(microsTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampConversion( + BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.PICOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema); + } + + @Test + public void testFromTableSchema_timestampNullPrecision_defaultsToDatetime() { + // Null precision should default to DATETIME (backwards compatibility) + TableFieldSchema timestamp = new TableFieldSchema().setName("ts").setType("TIMESTAMP"); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(timestamp)); + + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampNanos_isoFormat() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); + + String timestamp = "2024-08-10T16:52:07.123456789Z"; + java.time.Instant expected = java.time.Instant.parse(timestamp); + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + assertEquals(expected, beamRow.getValue("ts")); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampNanos_utcSuffix() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); + + // BigQuery format with " UTC" suffix + String timestamp = "2024-08-10 16:52:07.123456789 UTC"; + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts"); + assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear()); + assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue()); + assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth()); + assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour()); + assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute()); + assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond()); + assertEquals(123456789, actual.getNano()); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampNanos_variablePrecision() { + // Test that different decimal place counts are handled + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); + + // 3 decimal places + Row row3 = + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123Z")); + assertEquals(123000000, ((java.time.Instant) row3.getValue("ts")).getNano()); + + // 6 decimal places + Row row6 = + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123456Z")); + assertEquals(123456000, ((java.time.Instant) row6.getValue("ts")).getNano()); + + // 9 decimal places + Row row9 = + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123456789Z")); + assertEquals(123456789, ((java.time.Instant) row9.getValue("ts")).getNano()); + } + + @Test + public void testToBeamRow_timestampMicros_isoFormat() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.MICROS).build(); + + String timestamp = "2024-08-10T16:52:07.123456Z"; + java.time.Instant expected = java.time.Instant.parse(timestamp); + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + assertEquals(expected, beamRow.getValue("ts")); + } } From d25ea8f01831026762475a7ecde80e38b14d3b33 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Dec 2025 20:42:00 +0000 Subject: [PATCH 2/4] done. --- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 98 ++++++++----------- .../io/gcp/bigquery/TimestampPrecision.java | 26 +++++ .../io/gcp/bigquery/BigQueryUtilsTest.java | 61 +++++------- 3 files changed, 93 insertions(+), 92 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 3b8c3081b4a0..98930a1fade9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -159,40 +159,24 @@ public abstract static class SchemaConversionOptions implements Serializable { public abstract boolean getInferMaps(); /** - * Controls how BigQuery {@code TIMESTAMP(12)} columns (picosecond precision) are converted to - * Beam schema types. + * Controls how BigQuery {@code TIMESTAMP(12)} (picosecond precision) columns are mapped to Beam + * schema types. * - *

- */ - public enum PicosecondTimestampConversion { - MICROS, - NANOS, - PICOS - } - - /** - * Controls how BigQuery {@code TIMESTAMP(12)} (picosecond precision) columns are converted to - * Beam schema types. - * - *

Since Beam's datetime types only support up to nanosecond precision, this option specifies - * whether to truncate to microseconds, truncate to nanoseconds, or preserve full precision as a - * string. + *

Standard TIMESTAMP(6) columns are mapped to FieldType.DATETIME, which only support up to + * millisecond precision. This option allows mapping TIMESTAMP(12) columns to logical types + * Timestamp.MILLIS, Timestamp.MICROS, Timestamp.NANOS or preserve full picosecond precision as + * a STRING type. * - *

This option has no effect on {@code TIMESTAMP(6)} (microsecond). + *

This option has no effect on {@code TIMESTAMP(6)} (microsecond) columns. * - *

Defaults to {@link PicosecondTimestampConversion#NANOS}. + *

Defaults to {@link TimestampPrecision#NANOS}. */ - public abstract PicosecondTimestampConversion getPicosecondTimestampConversion(); + public abstract TimestampPrecision getPicosecondTimestampMapping(); public static Builder builder() { return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder() .setInferMaps(false) - .setPicosecondTimestampConversion(PicosecondTimestampConversion.NANOS); + .setPicosecondTimestampMapping(TimestampPrecision.NANOS); } /** Builder for {@link SchemaConversionOptions}. */ @@ -200,8 +184,7 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setInferMaps(boolean inferMaps); - public abstract Builder setPicosecondTimestampConversion( - PicosecondTimestampConversion conversion); + public abstract Builder setPicosecondTimestampMapping(TimestampPrecision conversion); public abstract SchemaConversionOptions build(); } @@ -293,6 +276,21 @@ public abstract Builder setPicosecondTimestampConversion( .toFormatter(); } + private static final java.time.format.DateTimeFormatter VAR_PRECISION_FORMATTER; + + static { + VAR_PRECISION_FORMATTER = + new java.time.format.DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + + // Variable Nano-of-second (0 to 9 digits) + // The 'true' argument means: "Expect a decimal point only if fractions exist" + .appendFraction(java.time.temporal.ChronoField.NANO_OF_SECOND, 0, 9, true) + .appendLiteral(" UTC") + .toFormatter() + .withZone(java.time.ZoneId.of("UTC")); + } + private static final Map BEAM_TO_BIGQUERY_TYPE_MAPPING = ImmutableMap.builder() .put(TypeName.BYTE, StandardSQLTypeName.INT64) @@ -398,8 +396,6 @@ private static FieldType fromTableFieldSchemaType( // see // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- String typeName = schema.getType(); - // see - // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (typeName) { case "STRING": return FieldType.STRING; @@ -415,24 +411,23 @@ private static FieldType fromTableFieldSchemaType( case "BOOL": return FieldType.BOOLEAN; case "TIMESTAMP": - System.out.println("CLAUDE BigQueryUtils fromTableFieldSchemaType TIMESTAMP" + schema); + // Timestamp columns can only have 6 or 12 precision. if ((schema.getTimestampPrecision() == null) || (schema.getTimestampPrecision() == 6)) { return FieldType.DATETIME; } - if (options.getPicosecondTimestampConversion() - == SchemaConversionOptions.PicosecondTimestampConversion.MICROS) { - return FieldType.logicalType(Timestamp.MICROS); - } - if (options.getPicosecondTimestampConversion() - == SchemaConversionOptions.PicosecondTimestampConversion.NANOS) { - return FieldType.logicalType(Timestamp.NANOS); - } - if (options.getPicosecondTimestampConversion() - == SchemaConversionOptions.PicosecondTimestampConversion.PICOS) { - return FieldType.STRING; + switch (options.getPicosecondTimestampMapping()) { + case MILLIS: + return FieldType.logicalType(Timestamp.MILLIS); + case MICROS: + return FieldType.logicalType(Timestamp.MICROS); + case NANOS: + return FieldType.logicalType(Timestamp.NANOS); + case PICOS: + return FieldType.STRING; + default: + throw new UnsupportedOperationException( + "Converting BigQuery type " + typeName + " to Beam type is unsupported"); } - throw new UnsupportedOperationException( - "Converting BigQuery type " + typeName + " to Beam type is unsupported"); case "DATE": return FieldType.logicalType(SqlTypes.DATE); case "TIME": @@ -761,6 +756,8 @@ public static TableRow toTableRow(Row row) { java.time.format.DateTimeFormatter localDateTimeFormatter = (0 == localDateTime.getNano()) ? ISO_LOCAL_DATE_TIME : BIGQUERY_DATETIME_FORMATTER; return localDateTimeFormatter.format(localDateTime); + } else if (Timestamp.IDENTIFIER.equals(fieldType.getLogicalType().getIdentifier())) { + return BigQueryAvroUtils.formatTimestamp((java.time.Instant) fieldValue); } else if ("Enum".equals(identifier)) { return fieldType .getLogicalType(EnumerationType.class) @@ -862,18 +859,7 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso return java.time.Instant.parse(jsonBQString); } } else if (fieldType.isLogicalType(Timestamp.IDENTIFIER)) { - System.out.println("CLAUDE fieldType.isLogicalType(Timestamp.IDENTIFIER) " + jsonBQString); - try { - if (jsonBQString.endsWith(" UTC")) { - // Remove " UTC" suffix and replace with "Z" for ISO format - jsonBQString = jsonBQString.replace(" UTC", "Z").replace(" ", "T"); - } - - return java.time.Instant.parse(jsonBQString); - } catch (Exception e) { - System.out.println("CLAUDE Exception: " + e); - throw e; - } + return VAR_PRECISION_FORMATTER.parse(jsonBQString, java.time.Instant::from); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java new file mode 100644 index 000000000000..f6437aa19226 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +/** Speciefies Timestamp precision. */ +public enum TimestampPrecision { + MILLIS, + MICROS, + NANOS, + PICOS +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index d26e789657ef..192497dd5197 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1309,6 +1309,23 @@ public void testFromTableSchema_timestampPrecision12_defaultToNanos() { beamSchema); } + @Test + public void testFromTableSchema_timestampPrecision12_millis() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.MILLIS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MILLIS)).build(), + beamSchema); + } + @Test public void testFromTableSchema_timestampPrecision12_micros() { TableFieldSchema picosTimestamp = @@ -1317,8 +1334,7 @@ public void testFromTableSchema_timestampPrecision12_micros() { BigQueryUtils.SchemaConversionOptions options = BigQueryUtils.SchemaConversionOptions.builder() - .setPicosecondTimestampConversion( - BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.MICROS) + .setPicosecondTimestampMapping(TimestampPrecision.MICROS) .build(); Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); @@ -1335,8 +1351,7 @@ public void testFromTableSchema_timestampPrecision12_nanos() { BigQueryUtils.SchemaConversionOptions options = BigQueryUtils.SchemaConversionOptions.builder() - .setPicosecondTimestampConversion( - BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.NANOS) + .setPicosecondTimestampMapping(TimestampPrecision.NANOS) .build(); Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); @@ -1353,8 +1368,7 @@ public void testFromTableSchema_timestampPrecision12_picos() { BigQueryUtils.SchemaConversionOptions options = BigQueryUtils.SchemaConversionOptions.builder() - .setPicosecondTimestampConversion( - BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.PICOS) + .setPicosecondTimestampMapping(TimestampPrecision.PICOS) .build(); Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); @@ -1370,8 +1384,7 @@ public void testFromTableSchema_timestampPrecision6_ignoredOption() { BigQueryUtils.SchemaConversionOptions options = BigQueryUtils.SchemaConversionOptions.builder() - .setPicosecondTimestampConversion( - BigQueryUtils.SchemaConversionOptions.PicosecondTimestampConversion.PICOS) + .setPicosecondTimestampMapping(TimestampPrecision.PICOS) .build(); Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); @@ -1389,19 +1402,6 @@ public void testFromTableSchema_timestampNullPrecision_defaultsToDatetime() { assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema); } - @Test - @SuppressWarnings("JavaInstantGetSecondsGetNano") - public void testToBeamRow_timestampNanos_isoFormat() { - Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); - - String timestamp = "2024-08-10T16:52:07.123456789Z"; - java.time.Instant expected = java.time.Instant.parse(timestamp); - - Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); - - assertEquals(expected, beamRow.getValue("ts")); - } - @Test @SuppressWarnings("JavaInstantGetSecondsGetNano") public void testToBeamRow_timestampNanos_utcSuffix() { @@ -1430,29 +1430,18 @@ public void testToBeamRow_timestampNanos_variablePrecision() { // 3 decimal places Row row3 = - BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123Z")); + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123 UTC")); assertEquals(123000000, ((java.time.Instant) row3.getValue("ts")).getNano()); // 6 decimal places Row row6 = - BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123456Z")); + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456 UTC")); assertEquals(123456000, ((java.time.Instant) row6.getValue("ts")).getNano()); // 9 decimal places Row row9 = - BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10T16:52:07.123456789Z")); + BigQueryUtils.toBeamRow( + schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789 UTC")); assertEquals(123456789, ((java.time.Instant) row9.getValue("ts")).getNano()); } - - @Test - public void testToBeamRow_timestampMicros_isoFormat() { - Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.MICROS).build(); - - String timestamp = "2024-08-10T16:52:07.123456Z"; - java.time.Instant expected = java.time.Instant.parse(timestamp); - - Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); - - assertEquals(expected, beamRow.getValue("ts")); - } } From c9fdb5fc624da837bdda5c9b68e4205084fc3695 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Dec 2025 22:07:40 +0000 Subject: [PATCH 3/4] Add test. --- .../io/gcp/bigquery/BigQueryUtilsTest.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 192497dd5197..675885b4e942 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1422,6 +1422,26 @@ public void testToBeamRow_timestampNanos_utcSuffix() { assertEquals(123456789, actual.getNano()); } + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampMicros_utcSuffix() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.MICROS).build(); + + // BigQuery format with " UTC" suffix + String timestamp = "2024-08-10 16:52:07.123456 UTC"; + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts"); + assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear()); + assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue()); + assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth()); + assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour()); + assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute()); + assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond()); + assertEquals(123456000, actual.getNano()); + } + @Test @SuppressWarnings("JavaInstantGetSecondsGetNano") public void testToBeamRow_timestampNanos_variablePrecision() { From 158404eaa202278f0217cfc1b67f5e74fc6f8897 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Dec 2025 22:24:22 +0000 Subject: [PATCH 4/4] Comments. --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 3 ++- .../apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 98930a1fade9..7141e5cb8e12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -412,7 +412,8 @@ private static FieldType fromTableFieldSchemaType( return FieldType.BOOLEAN; case "TIMESTAMP": // Timestamp columns can only have 6 or 12 precision. - if ((schema.getTimestampPrecision() == null) || (schema.getTimestampPrecision() == 6)) { + if ((schema.getTimestampPrecision() == null) + || Long.valueOf(6L).equals(schema.getTimestampPrecision())) { return FieldType.DATETIME; } switch (options.getPicosecondTimestampMapping()) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java index f6437aa19226..7d9b4c070834 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -/** Speciefies Timestamp precision. */ +/** Specifies Timestamp precision. */ public enum TimestampPrecision { MILLIS, MICROS,