Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:extensions:ml")
implementation project(":sdks:java:managed")
implementation library.java.avro
implementation library.java.bigdataoss_util
implementation library.java.google_api_client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.snippets.transforms.io.iceberg;

// [START iceberg_schema_and_row]
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;

public class IcebergBeamSchemaAndRow {
Schema nestedSchema =
Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
Schema beamSchema =
Schema.builder()
.addBooleanField("boolean_field")
.addInt32Field("int_field")
.addInt64Field("long_field")
.addFloatField("float_field")
.addDoubleField("double_field")
.addDecimalField("numeric_field")
.addByteArrayField("bytes_field")
.addStringField("string_field")
.addLogicalTypeField("time_field", SqlTypes.TIME)
.addLogicalTypeField("date_field", SqlTypes.DATE)
.addLogicalTypeField("timestamp_field", Timestamp.MICROS)
.addDateTimeField("timestamptz_field")
.addArrayField("array_field", Schema.FieldType.INT32)
.addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
.addRowField("struct_field", nestedSchema)
.build();

Row beamRow =
Row.withSchema(beamSchema)
.withFieldValues(
ImmutableMap.<String, Object>builder()
.put("boolean_field", true)
.put("int_field", 1)
.put("long_field", 2L)
.put("float_field", 3.4f)
.put("double_field", 4.5d)
.put("numeric_field", new BigDecimal(67))
.put("bytes_field", new byte[] {1, 2, 3})
.put("string_field", "value")
.put("time_field", LocalTime.now())
.put("date_field", LocalDate.now())
.put("timestamp_field", Instant.now())
.put("timestamptz_field", DateTime.now())
.put("array_field", Arrays.asList(1, 2, 3))
.put("map_field", ImmutableMap.of("a", 1, "b", 2))
.put(
"struct_field",
Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
.build())
.build();
}
// [END iceberg_schema_and_row]
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.snippets.transforms.io.iceberg;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.AddFields;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

import java.util.Arrays;
import java.util.Map;

public class Quickstart {
static String PROJECT_ID = "apache-beam-testing";
static String BUCKET_NAME = "my-bucket";
public static void main(String[] args) {
// [START hadoop_catalog_props]
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "hadoop",
"warehouse", "file:///tmp/beam-iceberg-local-quickstart");
// [END hadoop_catalog_props]
}

public static void publicDatasets() {
// [START biglake_public_catalog_props]
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://biglake-public-nyc-taxi-iceberg",
"header.x-goog-user-project", PROJECT_ID,
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
// [END biglake_public_catalog_props]

// [START biglake_public_query]
Pipeline p = Pipeline.create();

// Set up query properties:
Map<String, Object> config =
ImmutableMap.of(
"table",
"public_data.nyc_taxicab",
"catalog_properties",
catalogProps,
"filter",
"data_file_year = 2021 AND tip_amount > 100",
"keep",
Arrays.asList("passenger_count", "total_amount", "trip_distance"));

// Read Iceberg records
PCollection<Row> icebergRows =
p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection();

// Perform further analysis on records
PCollection<Row> result =
icebergRows
.apply(AddFields.<Row>create().field("num_trips", Schema.FieldType.INT32, 1))
.apply(
Group.<Row>byFieldNames("passenger_count")
.aggregateField("num_trips", Sum.ofIntegers(), "num_trips")
.aggregateField("total_amount", Mean.of(), "avg_fare")
.aggregateField("trip_distance", Mean.of(), "avg_distance"));

// Print to console
result.apply(
MapElements.into(TypeDescriptors.voids())
.via(
row -> {
System.out.println(row);
return null;
}));

// Execute
p.run().waitUntilFinish();
// [END biglake_public_query]
}

public static void other() {
// [START biglake_catalog_props]
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://" + BUCKET_NAME,
"header.x-goog-user-project", PROJECT_ID,
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
// [END biglake_catalog_props]

// [START managed_iceberg_config]
Map<String, Object> managedConfig =
ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps);

// Note: The table will get created when inserting data (see below)
// [END managed_iceberg_config]

// [START managed_iceberg_insert]
Schema inputSchema =
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();

Pipeline p = Pipeline.create();
p.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
.apply(Managed.write("iceberg").withConfig(managedConfig));

p.run();
// [END managed_iceberg_insert]

// [START managed_iceberg_read]
Pipeline q = Pipeline.create();
PCollection<Row> rows =
q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection();

rows.apply(
MapElements.into(TypeDescriptors.voids())
.via(
row -> {
System.out.println(row);
return null;
}));

q.run();
// [END managed_iceberg_read]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoValue
public abstract class IcebergScanConfig implements Serializable {
Expand All @@ -65,9 +67,13 @@ public enum ScanType {
@Pure
public abstract String getTableIdentifier();

private static final Logger LOG = LoggerFactory.getLogger(IcebergScanConfig.class);

@Pure
public Table getTable() {
if (cachedTable == null) {
System.out.println("xxx loading a new table");
LOG.info("xxx loading a new table");
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
Expand Down Expand Up @@ -75,6 +77,7 @@ private IcebergUtils() {}
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
.put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
.put(MicrosInstant.IDENTIFIER, Types.TimestampType.withZone())
.build();

private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
Expand Down Expand Up @@ -294,6 +297,13 @@ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema s

/** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) {
if (row.getSchema().getFieldCount() != schema.columns().size()) {
throw new IllegalStateException(
String.format(
"Beam Row schema and Iceberg schema have different sizes.%n\tBeam Row columns: %s%n\tIceberg schema columns: %s",
row.getSchema().getFieldNames(),
schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList())));
}
return copyRowIntoRecord(GenericRecord.create(schema), row);
}

Expand Down Expand Up @@ -419,7 +429,11 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) {
// timestamptz
if (shouldAdjustToUtc) {
if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
if (beamValue instanceof java.time.Instant) { // MicrosInstant
java.time.Instant instant = (java.time.Instant) beamValue;
return DateTimeUtil.timestamptzFromNanos(
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano());
} else if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC);
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L);
Expand All @@ -434,7 +448,11 @@ private static Object getIcebergTimestampValue(Object beamValue, boolean shouldA
}

// timestamp
if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
if (beamValue instanceof java.time.Instant) { // MicrosInstant
java.time.Instant instant = (java.time.Instant) beamValue;
return DateTimeUtil.timestampFromNanos(
TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano());
} else if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
return beamValue;
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L);
Expand Down
Loading
Loading