diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 21f2e64cd1f8..7141e5cb8e12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -67,6 +67,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.util.Preconditions; @@ -157,8 +158,25 @@ public abstract static class SchemaConversionOptions implements Serializable { */ public abstract boolean getInferMaps(); + /** + * Controls how BigQuery {@code TIMESTAMP(12)} (picosecond precision) columns are mapped to Beam + * schema types. + * + *

Standard TIMESTAMP(6) columns are mapped to FieldType.DATETIME, which only support up to + * millisecond precision. This option allows mapping TIMESTAMP(12) columns to logical types + * Timestamp.MILLIS, Timestamp.MICROS, Timestamp.NANOS or preserve full picosecond precision as + * a STRING type. + * + *

This option has no effect on {@code TIMESTAMP(6)} (microsecond) columns. + * + *

Defaults to {@link TimestampPrecision#NANOS}. + */ + public abstract TimestampPrecision getPicosecondTimestampMapping(); + public static Builder builder() { - return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false); + return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder() + .setInferMaps(false) + .setPicosecondTimestampMapping(TimestampPrecision.NANOS); } /** Builder for {@link SchemaConversionOptions}. */ @@ -166,6 +184,8 @@ public static Builder builder() { public abstract static class Builder { public abstract Builder setInferMaps(boolean inferMaps); + public abstract Builder setPicosecondTimestampMapping(TimestampPrecision conversion); + public abstract SchemaConversionOptions build(); } } @@ -256,6 +276,21 @@ public abstract static class Builder { .toFormatter(); } + private static final java.time.format.DateTimeFormatter VAR_PRECISION_FORMATTER; + + static { + VAR_PRECISION_FORMATTER = + new java.time.format.DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + + // Variable Nano-of-second (0 to 9 digits) + // The 'true' argument means: "Expect a decimal point only if fractions exist" + .appendFraction(java.time.temporal.ChronoField.NANO_OF_SECOND, 0, 9, true) + .appendLiteral(" UTC") + .toFormatter() + .withZone(java.time.ZoneId.of("UTC")); + } + private static final Map BEAM_TO_BIGQUERY_TYPE_MAPPING = ImmutableMap.builder() .put(TypeName.BYTE, StandardSQLTypeName.INT64) @@ -350,14 +385,17 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { * *

Supports both standard and legacy SQL types. * - * @param typeName Name of the type returned by {@link TableFieldSchema#getType()} + * @param schema Schema of the type returned * @param nestedFields Nested fields for the given type (eg. RECORD type) * @return Corresponding Beam {@link FieldType} */ private static FieldType fromTableFieldSchemaType( - String typeName, List nestedFields, SchemaConversionOptions options) { + TableFieldSchema schema, + List nestedFields, + SchemaConversionOptions options) { // see // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- + String typeName = schema.getType(); switch (typeName) { case "STRING": return FieldType.STRING; @@ -373,7 +411,24 @@ private static FieldType fromTableFieldSchemaType( case "BOOL": return FieldType.BOOLEAN; case "TIMESTAMP": - return FieldType.DATETIME; + // Timestamp columns can only have 6 or 12 precision. + if ((schema.getTimestampPrecision() == null) + || Long.valueOf(6L).equals(schema.getTimestampPrecision())) { + return FieldType.DATETIME; + } + switch (options.getPicosecondTimestampMapping()) { + case MILLIS: + return FieldType.logicalType(Timestamp.MILLIS); + case MICROS: + return FieldType.logicalType(Timestamp.MICROS); + case NANOS: + return FieldType.logicalType(Timestamp.NANOS); + case PICOS: + return FieldType.STRING; + default: + throw new UnsupportedOperationException( + "Converting BigQuery type " + typeName + " to Beam type is unsupported"); + } case "DATE": return FieldType.logicalType(SqlTypes.DATE); case "TIME": @@ -395,8 +450,8 @@ private static FieldType fromTableFieldSchemaType( if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName()) && BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) { return FieldType.map( - fromTableFieldSchemaType(key.getType(), key.getFields(), options), - fromTableFieldSchemaType(value.getType(), value.getFields(), options)); + fromTableFieldSchemaType(key, key.getFields(), options), + fromTableFieldSchemaType(value, value.getFields(), options)); } } Schema rowSchema = fromTableFieldSchema(nestedFields, options); @@ -413,8 +468,7 @@ private static Schema fromTableFieldSchema( Schema.Builder schemaBuilder = Schema.builder(); for (TableFieldSchema tableFieldSchema : tableFieldSchemas) { FieldType fieldType = - fromTableFieldSchemaType( - tableFieldSchema.getType(), tableFieldSchema.getFields(), options); + fromTableFieldSchemaType(tableFieldSchema, tableFieldSchema.getFields(), options); Optional fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf); if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent() @@ -703,6 +757,8 @@ public static TableRow toTableRow(Row row) { java.time.format.DateTimeFormatter localDateTimeFormatter = (0 == localDateTime.getNano()) ? ISO_LOCAL_DATE_TIME : BIGQUERY_DATETIME_FORMATTER; return localDateTimeFormatter.format(localDateTime); + } else if (Timestamp.IDENTIFIER.equals(fieldType.getLogicalType().getIdentifier())) { + return BigQueryAvroUtils.formatTimestamp((java.time.Instant) fieldValue); } else if ("Enum".equals(identifier)) { return fieldType .getLogicalType(EnumerationType.class) @@ -803,6 +859,8 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso } catch (NumberFormatException e) { return java.time.Instant.parse(jsonBQString); } + } else if (fieldType.isLogicalType(Timestamp.IDENTIFIER)) { + return VAR_PRECISION_FORMATTER.parse(jsonBQString, java.time.Instant::from); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java new file mode 100644 index 000000000000..7d9b4c070834 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TimestampPrecision.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +/** Specifies Timestamp precision. */ +public enum TimestampPrecision { + MILLIS, + MICROS, + NANOS, + PICOS +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index eeaf00e0f282..675885b4e942 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; @@ -1294,4 +1295,173 @@ public void testTrimSchema() { BigQueryUtils.trimSchema(BQ_ROW_TYPE, Arrays.asList("row.id", "row.value", "row.name"))); } } + + @Test + public void testFromTableSchema_timestampPrecision12_defaultToNanos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_millis() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.MILLIS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MILLIS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_micros() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.MICROS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MICROS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_nanos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.NANOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals( + Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(), + beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision12_picos() { + TableFieldSchema picosTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.PICOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.STRING).build(), beamSchema); + } + + @Test + public void testFromTableSchema_timestampPrecision6_ignoredOption() { + // Standard microsecond precision should ignore the picosecond conversion option + TableFieldSchema microsTimestamp = + new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(6L); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(microsTimestamp)); + + BigQueryUtils.SchemaConversionOptions options = + BigQueryUtils.SchemaConversionOptions.builder() + .setPicosecondTimestampMapping(TimestampPrecision.PICOS) + .build(); + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema); + } + + @Test + public void testFromTableSchema_timestampNullPrecision_defaultsToDatetime() { + // Null precision should default to DATETIME (backwards compatibility) + TableFieldSchema timestamp = new TableFieldSchema().setName("ts").setType("TIMESTAMP"); + TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(timestamp)); + + Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema); + + assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampNanos_utcSuffix() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); + + // BigQuery format with " UTC" suffix + String timestamp = "2024-08-10 16:52:07.123456789 UTC"; + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts"); + assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear()); + assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue()); + assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth()); + assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour()); + assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute()); + assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond()); + assertEquals(123456789, actual.getNano()); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampMicros_utcSuffix() { + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.MICROS).build(); + + // BigQuery format with " UTC" suffix + String timestamp = "2024-08-10 16:52:07.123456 UTC"; + + Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp)); + + java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts"); + assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear()); + assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue()); + assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth()); + assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour()); + assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute()); + assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond()); + assertEquals(123456000, actual.getNano()); + } + + @Test + @SuppressWarnings("JavaInstantGetSecondsGetNano") + public void testToBeamRow_timestampNanos_variablePrecision() { + // Test that different decimal place counts are handled + Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build(); + + // 3 decimal places + Row row3 = + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123 UTC")); + assertEquals(123000000, ((java.time.Instant) row3.getValue("ts")).getNano()); + + // 6 decimal places + Row row6 = + BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456 UTC")); + assertEquals(123456000, ((java.time.Instant) row6.getValue("ts")).getNano()); + + // 9 decimal places + Row row9 = + BigQueryUtils.toBeamRow( + schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789 UTC")); + assertEquals(123456789, ((java.time.Instant) row9.getValue("ts")).getNano()); + } }