Conversation
| // Perform index lookup in metadataTable | ||
| // TODO: document here what this map is keyed by | ||
| Map<String, HoodieRecordGlobalLocation> recordIndex = lazyTableMetadata.get().readRecordIndex(recordKeys); | ||
| Map<String, HoodieRecordGlobalLocation> recordIndex = HoodieDataUtils.dedupeAndCollectAsMap(lazyTableMetadata.get().readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys))); |
There was a problem hiding this comment.
Is dedup needed here as RLI does not have duplicate keys or it can be simplify collected into a map?
There was a problem hiding this comment.
No, Have used this since it was method available for import. Will create a new method without duplication logic
| HoodieTableMetadata tableMetadata = lazyTableMetadata.get(); | ||
| HoodieTableFileSystemView fileSystemView = getFileSystemView(tableMetadata, metaClient); |
| import java.util.Map; | ||
| import java.util.function.UnaryOperator; | ||
|
|
||
| public class TrinoReaderContext |
There was a problem hiding this comment.
| public class TrinoReaderContext | |
| public class TrinoRecordContext |
| import java.util.Map; | ||
| import java.util.function.UnaryOperator; | ||
|
|
||
| public class TrinoReaderContext |
There was a problem hiding this comment.
Is this class mostly similar to AvroRecordContext?
There was a problem hiding this comment.
Could we directly use AvroRecordContext? Once the merging logic is based on Page we can reimplement a new reader/record context.
| if (bufferedRecord.isDelete()) { | ||
| return new HoodieEmptyRecord<>( | ||
| new HoodieKey(bufferedRecord.getRecordKey(), partitionPath), | ||
| HoodieRecord.HoodieRecordType.AVRO); | ||
| } |
There was a problem hiding this comment.
What about the ordering value and payload class handling? Do we have test coverage around updates and deletes with lower and higher ordering values? The current logic can lead to data loss (because the ordering value is 0 indicating it's commit-time-ordered deletes causing the delete to take effect regardless of the ordering value, in the EVENT_TIME_ORDERING merge mode) if the deletes have lower ordering value.
There was a problem hiding this comment.
Test cases to cover:
- MOR table v6, base + log files, DefaultHoodiePayload (payload class), timestamp
- MOR table v8, base + log files, EVENT_TIME_ORDERING (merge mode), timestamp
- MOR table v9, base + log files, EVENT_TIME_ORDERING (merge mode), timestamp
- MOR table v6, base + log files, OverwriteWithLatest (payload class)
- MOR table v8, base + log files, COMMIT_TIME_ORDERING (merge mode)
- MOR table v9, base + log files, COMMIT_TIME_ORDERING (merge mode)
Prepare the table in this sequence:
- first batch: inserts (20 keys)
- second batch: updates, with higher ordering values (5 keys), lower ordering values (other 5 keys)
- third batch: deletes, with higher ordering values (3 keys), lower ordering values (other 3 keys)
Use Trino to read the tables and validate the result records.
| HoodieRecord.HoodieRecordType.AVRO); | ||
| } | ||
|
|
||
| return new HoodieAvroIndexedRecord(bufferedRecord.getRecord()); |
There was a problem hiding this comment.
Similar here around payload class handling. We should add a test case on custom payload class.
| return null; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
For the methods that should not be called (i.e., from the write path), should they throw UnsupportedOperationException?
| GenericRecord genericRecord = new GenericData.Record(schema); | ||
| for (Schema.Field field : schema.getFields()) { | ||
| genericRecord.put(field.name(), record.get(field.pos())); | ||
| } | ||
| return genericRecord; |
There was a problem hiding this comment.
IndexedRecord can be casted to GenericRecord, so no need to reconstruct the record which introduces overhead?
| // TODO: this can rely on colToPos map directly instead of schema | ||
| Schema schema = record.getSchema(); | ||
| IndexedRecord newRecord = new GenericData.Record(schema); | ||
| List<Schema.Field> fields = schema.getFields(); | ||
| for (Schema.Field field : fields) { | ||
| int pos = schema.getField(field.name()).pos(); | ||
| newRecord.put(pos, record.get(pos)); | ||
| } | ||
| return newRecord; |
There was a problem hiding this comment.
Why not returning the record directly?
| public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) | ||
| { | ||
| List<Schema.Field> toFields = to.getFields(); | ||
| int[] projection = new int[toFields.size()]; | ||
| for (int i = 0; i < projection.length; i++) { | ||
| projection[i] = from.getField(toFields.get(i).name()).pos(); | ||
| } | ||
|
|
||
| return fromRecord -> { | ||
| IndexedRecord toRecord = new GenericData.Record(to); | ||
| for (int i = 0; i < projection.length; i++) { | ||
| toRecord.put(i, fromRecord.get(projection[i])); | ||
| } | ||
| return toRecord; | ||
| }; | ||
| } |
There was a problem hiding this comment.
Does this support nested fields and renames?
| if (bufferedRecord.isDelete()) { | ||
| return new HoodieEmptyRecord<>( | ||
| new HoodieKey(bufferedRecord.getRecordKey(), null), | ||
| HoodieRecord.HoodieRecordType.AVRO); | ||
| } | ||
|
|
||
| return new HoodieAvroIndexedRecord(bufferedRecord.getRecord()); |
There was a problem hiding this comment.
So it looks like the logic in TrinoReaderContext is migrated from here. So we should use this opportunity to make sure the implementation is solid.
| @Override | ||
| public Object getColumnValueAsJava(Schema recordSchema, String column, Properties props) | ||
| { | ||
| return null; |
There was a problem hiding this comment.
This needs to be implemented correctly for column stats to work
There was a problem hiding this comment.
This class is not used, i am gonna delete it.
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: