Skip to content

Commit 94fe7a2

Browse files
authored
HIVE-29133: Support Z-ordering for Iceberg tables via CREATE TABLE (#6138)
1 parent 94874ff commit 94fe7a2

File tree

17 files changed

+1176
-22
lines changed

17 files changed

+1176
-22
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ private InputFormatConfig() {
7878

7979
public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
8080

81+
public static final String SORT_ORDER = "sort.order";
82+
public static final String SORT_COLUMNS = "sort.columns";
83+
public static final String ZORDER = "ZORDER";
84+
8185
public enum InMemoryDataModel {
8286
HIVE,
8387
GENERIC // Default data model is of Iceberg Generics

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iceberg.mr.hive;
2121

22+
import com.fasterxml.jackson.databind.JsonNode;
2223
import com.fasterxml.jackson.databind.ObjectMapper;
2324
import java.util.Arrays;
2425
import java.util.Collection;
@@ -41,6 +42,8 @@
4142
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
4243
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
4344
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
45+
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFieldDesc;
46+
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields;
4447
import org.apache.hadoop.hive.ql.util.NullOrdering;
4548
import org.apache.iceberg.BaseMetastoreTableOperations;
4649
import org.apache.iceberg.BaseTable;
@@ -74,6 +77,9 @@
7477
import org.slf4j.LoggerFactory;
7578

7679
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
80+
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS;
81+
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER;
82+
import static org.apache.iceberg.mr.InputFormatConfig.ZORDER;
7783

7884
public class BaseHiveIcebergMetaHook implements HiveMetaHook {
7985
private static final Logger LOG = LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
@@ -84,6 +90,7 @@ public class BaseHiveIcebergMetaHook implements HiveMetaHook {
8490
private static final Set<String> PARAMETERS_TO_REMOVE = ImmutableSet
8591
.of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC);
8692
static final String ORC_FILES_ONLY = "iceberg.orc.files.only";
93+
private static final String ZORDER_FIELDS_JSON_KEY = "zorderFields";
8794

8895
protected final Configuration conf;
8996
protected Table icebergTable = null;
@@ -217,28 +224,82 @@ private void validateCatalogConfigsDefined() {
217224
}
218225
}
219226

227+
/**
228+
* Persists the table's write sort order based on the HMS property 'default-sort-order'
229+
* that is populated by the DDL layer.
230+
* <p>
231+
* Behaviour:
232+
* - If the JSON represents Z-order, we remove DEFAULT_SORT_ORDER
233+
* as Iceberg does not have Z-order support in its spec.
234+
* So, we persist Z-order metadata in {@link org.apache.iceberg.mr.InputFormatConfig#SORT_ORDER}
235+
* and {@link org.apache.iceberg.mr.InputFormatConfig#SORT_COLUMNS} to be used by Hive Writer.
236+
* <p>
237+
* - Otherwise, the JSON is a list of SortFields; we convert it to Iceberg
238+
* SortOrder JSON and keep it in DEFAULT_SORT_ORDER for Iceberg to use it.
239+
*/
220240
private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
221241
Properties properties) {
222-
String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
223-
SortFields sortFields = null;
224-
if (!Strings.isNullOrEmpty(sortOderJSONString)) {
225-
try {
226-
sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
227-
} catch (Exception e) {
228-
LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
229-
return;
230-
}
242+
String sortOrderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
243+
if (Strings.isNullOrEmpty(sortOrderJSONString)) {
244+
return;
245+
}
246+
247+
if (isZOrderJSON(sortOrderJSONString)) {
248+
properties.remove(TableProperties.DEFAULT_SORT_ORDER);
249+
setZOrderSortOrder(sortOrderJSONString, properties, hmsTable.getTableName());
250+
return;
251+
}
252+
253+
try {
254+
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class);
231255
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
232-
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
256+
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema);
233257
sortFields.getSortFields().forEach(fieldDesc -> {
234258
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
235259
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
236260
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
237261
SortDirection.ASC : SortDirection.DESC;
238-
sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
262+
sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
239263
});
240-
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
264+
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build()));
265+
}
266+
} catch (Exception e) {
267+
LOG.warn("Can not read write order json: {}", sortOrderJSONString);
268+
}
269+
}
270+
271+
/**
272+
* Configures the Z-order sort order metadata in the given properties
273+
* based on the specified Z-order fields.
274+
*
275+
* @param jsonString the JSON string representing sort orders
276+
* @param properties the Properties object to store sort order metadata
277+
* @param tableName name of the table
278+
*/
279+
private void setZOrderSortOrder(String jsonString, Properties properties, String tableName) {
280+
try {
281+
ZOrderFields zorderFields = JSON_OBJECT_MAPPER.reader().readValue(jsonString, ZOrderFields.class);
282+
if (zorderFields != null && !zorderFields.getZOrderFields().isEmpty()) {
283+
List<String> columnNames = zorderFields.getZOrderFields().stream()
284+
.map(ZOrderFieldDesc::getColumnName)
285+
.collect(Collectors.toList());
286+
287+
properties.put(SORT_ORDER, ZORDER);
288+
properties.put(SORT_COLUMNS, String.join(",", columnNames));
289+
290+
LOG.debug("Applying Z-ordering for Iceberg Table {} with Columns: {}", tableName, columnNames);
241291
}
292+
} catch (Exception e) {
293+
LOG.warn("Failed to parse Z-order sort order", e);
294+
}
295+
}
296+
297+
private boolean isZOrderJSON(String jsonString) {
298+
try {
299+
JsonNode node = JSON_OBJECT_MAPPER.readTree(jsonString);
300+
return node.has(ZORDER_FIELDS_JSON_KEY);
301+
} catch (Exception e) {
302+
return false;
242303
}
243304
}
244305

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc;
8383
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
8484
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
85+
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
8586
import org.apache.hadoop.hive.ql.exec.Utilities;
8687
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
8788
import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -184,6 +185,7 @@
184185
import org.apache.iceberg.mr.InputFormatConfig;
185186
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
186187
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
188+
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
187189
import org.apache.iceberg.puffin.Blob;
188190
import org.apache.iceberg.puffin.BlobMetadata;
189191
import org.apache.iceberg.puffin.Puffin;
@@ -218,6 +220,9 @@
218220
import static org.apache.iceberg.SnapshotSummary.TOTAL_FILE_SIZE_PROP;
219221
import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP;
220222
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
223+
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS;
224+
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER;
225+
import static org.apache.iceberg.mr.InputFormatConfig.ZORDER;
221226

222227
public class HiveIcebergStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler {
223228
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class);
@@ -929,14 +934,64 @@ public DynamicPartitionCtx createDPContext(
929934
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table));
930935
}
931936

937+
// Even if table has no explicit sort order, honor z-order if configured
938+
Map<String, String> props = table.properties();
939+
if (ZORDER.equalsIgnoreCase(props.getOrDefault(SORT_ORDER, ""))) {
940+
addZOrderCustomExpr(props, dpCtx, table, hmsTable, writeOperation);
941+
}
942+
932943
return dpCtx;
933944
}
934945

946+
/**
947+
* Adds a custom sort expression to the DynamicPartitionCtx that performs local Z-ordering on write.
948+
*
949+
* Behavior:
950+
* - Reads Z-order properties from 'sort.order' and 'sort.columns' (comma-separated).
951+
* - Resolves the referenced columns to their positions in the physical row (taking into account
952+
* ACID virtual columns offset for overwrite/update operations).
953+
*/
954+
private void addZOrderCustomExpr(Map<String, String> props, DynamicPartitionCtx dpCtx, Table table,
955+
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation) {
956+
String colsProp = props.get(SORT_COLUMNS);
957+
if (StringUtils.isNotBlank(colsProp)) {
958+
List<String> zCols = Arrays.stream(colsProp.split(","))
959+
.map(String::trim)
960+
.filter(s -> !s.isEmpty())
961+
.toList();
962+
963+
List<Types.NestedField> fields = table.schema().columns();
964+
Map<String, Integer> fieldOrderMap = Maps.newHashMapWithExpectedSize(fields.size());
965+
for (int i = 0; i < fields.size(); ++i) {
966+
fieldOrderMap.put(fields.get(i).name(), i);
967+
}
968+
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
969+
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();
970+
971+
List<Integer> zIndices = zCols.stream().map(col -> {
972+
Integer base = fieldOrderMap.get(col);
973+
Preconditions.checkArgument(base != null, "Z-order column not found in schema: %s", col);
974+
return base + offset;
975+
}).toList();
976+
977+
dpCtx.addCustomSortExpressions(Collections.singletonList(allCols -> {
978+
List<ExprNodeDesc> args = zIndices.stream()
979+
.map(allCols::get)
980+
.toList();
981+
try {
982+
return ExprNodeGenericFuncDesc.newInstance(new GenericUDFIcebergZorder(), "iceberg_zorder", args);
983+
} catch (UDFArgumentException e) {
984+
throw new RuntimeException(e);
985+
}
986+
}));
987+
}
988+
}
989+
935990
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
936991
Operation writeOperation, DynamicPartitionCtx dpCtx,
937992
List<TransformSpec> transformSpecs) {
938-
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
939993
List<Types.NestedField> fields = table.schema().columns();
994+
Map<String, Integer> fieldOrderMap = Maps.newHashMapWithExpectedSize(fields.size());
940995
for (int i = 0; i < fields.size(); ++i) {
941996
fieldOrderMap.put(fields.get(i).name(), i);
942997
}

0 commit comments

Comments
 (0)