Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync parquet to other file formats #592

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

sudhar91
Copy link

@sudhar91 sudhar91 commented Dec 7, 2024

Issue : #553

What is the purpose of the pull request

This PR is for changes made to sync parquet to delta ,hudi & Iceberg

Brief change log

  • Changes to sync from parquet to delta ,hudi & iceberg
  • Handle incremental sync

Co Authored by : @sundarshankar89

@@ -27,8 +27,9 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
public static final String PARQUET="PARQUET";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run mvn spotless:apply to clean up some of the formatting issues in the draft

package org.apache.xtable.parquet;

import java.io.IOException;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: we're avoiding the use of * imports in this repo

Optional<LocatedFileStatus> latestFile =
fsHelper
.getParquetFiles(hadoopConf, basePath)
.max(Comparator.comparing(FileStatus::getModificationTime));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to push down this filter so we don't need to iterate through all files under the base path? Maybe we can even limit the file listing to return files created after the modificationTime?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think we can push down with the api , even to filter files greater than modification we have to first list and then filter out, do u have any other idea on your mind for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just wanted to see if it was possible to help with large tables

SchemaBuilder.FieldAssembler<Schema> fieldAssembler =
SchemaBuilder.record(internalSchema.getName()).fields();
for (Schema.Field field : internalSchema.getFields()) {
fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the internal schema have defaults? Can it also have docs on fields? those would be dropped with this code

@Builder.Default private static final FileSystemHelper fsHelper = FileSystemHelper.getInstance();

@Builder.Default
private static final ParquetMetadataExtractor parquetMetadataExtractor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an implementation for this class missing?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added initially but not using it will remove it

InternalSchema schema,
Map<String, List<String>> partitionInfo) {
List<PartitionValue> partitionValues = new ArrayList<>();
java.nio.file.Path base = Paths.get(basePath).normalize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're always going to convert the basePath, you should try to find a way to convert it once in the caller and pass it in.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted , overall i need to simplify the logic as it is bit complicated

latestFile.stream()
.map(
file ->
InternalDataFile.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the logic for this conversion move to a common method that can also be called from the getTableChangeForCommit?

import org.apache.xtable.spi.extractor.ConversionSource;

@Builder
public class ParquetConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be a bit more robust if we use a time interval instead of a single long here. Then you will be able to draw a clear boundary for each run of the conversion source, what are you thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u explain a bit on the interval and how are u envisioning ? this long is the last synced modification time of file so in next run list files greater mod time so new files are synced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of an interval since it can also easily show where the start time was for the sync. This could be useful when the targets fall out of sync with each other. Currently if there are commits 1, 2, and 3 in the source and Target1 only is synced to 1 but Target2 is synced to 2, the incremental sync can sync 2 and 3 to Target1 and only 3 to Target2 as part of the same sync. I am not sure what that will look like for this source so I was thinking intervals can help us define these "commits" but I need to think through it some more.

return ParquetConversionSource.builder()
.tableName(sourceTable.getName())
.basePath(sourceTable.getBasePath())
.hadoopConf(new Configuration())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an init method called with the hadoop configuration, you should be able to simply use hadoopConf here

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

public class FileSystemHelper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a good idea to define an interface for getting the parquet files for the table and changes since the last run. Right now this is all being done through file listing but we should consider a case where someone implements a way to poll the changes through s3 events.

@the-other-tim-brown
Copy link
Contributor

@sudhar91 this is a great step forward on this feature! My main request is to pull some of these classes that are focused on conversion such as the partition and data file conversion into their own PR with unit tests written. It will be easier and quicker to get those straightforward changes reviewed while the rest of the details are figured out.

Map<String, List<String>> partitionInfo = initPartitionInfo();
for (FileStatus tableStatus : tableChanges) {
internalDataFiles.add(
InternalDataFile.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need recordCount which we should be able to get from the column stats

@sudhar91
Copy link
Author

@sudhar91 this is a great step forward on this feature! My main request is to pull some of these classes that are focused on conversion such as the partition and data file conversion into their own PR with unit tests written. It will be easier and quicker to get those straightforward changes reviewed while the rest of the details are figured out.

Thanks for taking ur time to review , i couldnt push my unit test because i was having a weird jar conflicts on my test will fix them next week, i raised this PR because wanted to validate whether my approach is right in terms of this feature :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants