2626import java .util .Collections ;
2727import java .util .List ;
2828
29+ import org .apache .commons .lang3 .tuple .Pair ;
30+ import org .apache .hadoop .conf .Configuration ;
2931import org .apache .hudi .common .table .timeline .HoodieInstant ;
3032
33+ import org .apache .iceberg .BaseTable ;
3134import org .apache .iceberg .Snapshot ;
3235
36+ import org .apache .iceberg .Table ;
37+ import org .apache .iceberg .TableMetadata ;
38+ import org .apache .iceberg .TableOperations ;
39+ import org .apache .iceberg .hadoop .HadoopTables ;
3340import org .apache .xtable .conversion .ConversionConfig ;
3441import org .apache .xtable .conversion .ConversionController ;
3542import org .apache .xtable .conversion .ConversionSourceProvider ;
@@ -79,9 +86,13 @@ public ConvertTableResponse runSync(ConvertTableRequest request) {
7986 getConversionSourceProvider (request .getSourceFormat ());
8087 conversionController .sync (conversionConfig , conversionSourceProvider );
8188
82- InternalTable internalTable = new InternalTable ("ICEBERG" , request .getSourceTablePath () + "/metadata" , null );
83- ConvertTableResponse response = new ConvertTableResponse (Collections .singletonList (internalTable ));
84- return response ;
89+ Pair <String , String > responseFields = getIcebergSchemaAndMetadataPath (request .getSourceTablePath (), sparkHolder .jsc ().hadoopConfiguration ());
90+
91+ InternalTable internalTable =
92+ new InternalTable (
93+ "ICEBERG" ,
94+ responseFields .getLeft (), responseFields .getRight ());
95+ return new ConvertTableResponse (Collections .singletonList (internalTable ));
8596 }
8697
8798 private ConversionSourceProvider <?> getConversionSourceProvider (String sourceTableFormat ) {
@@ -104,4 +115,13 @@ private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTab
104115 throw new IllegalArgumentException ("Unsupported source format: " + sourceTableFormat );
105116 }
106117 }
118+
119+ public static Pair <String , String > getIcebergSchemaAndMetadataPath (String tableLocation , Configuration conf ) {
120+ HadoopTables tables = new HadoopTables (conf );
121+ Table table = tables .load (tableLocation );
122+ TableOperations ops = ((BaseTable ) table ).operations ();
123+ TableMetadata current = ops .current ();
124+ return Pair .of (current .metadataFileLocation (), current .schema ().toString ());
125+ }
126+
107127}
0 commit comments