Skip to content

Commit

Permalink
Merge pull request #3 from arenadata/1.19.1-develop
Browse files Browse the repository at this point in the history
1.19.1 develop
  • Loading branch information
Asmoday authored Aug 28, 2024
2 parents 5edb5a9 + 7590f06 commit 18ece46
Show file tree
Hide file tree
Showing 62 changed files with 13,724 additions and 209 deletions.
183 changes: 172 additions & 11 deletions flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ under the License.
java.lang.NoClassDefFoundError: org/apache/hadoop/metrics/Updater errors
Using this dedicated property avoids CI failures with the Hadoop 3 profile
-->
<hive.hadoop.version>2.10.2</hive.hadoop.version>
<hive.hadoop.version>3.3.6</hive.hadoop.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -375,10 +375,179 @@ under the License.

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<artifactId>hive-standalone-metastore-common</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>javolution</groupId>
<artifactId>javolution</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-api</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
</exclusion>
<exclusion>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-hbase-compat-1.0</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
</exclusion>
<exclusion>
<groupId>javax.transaction</groupId>
<artifactId>transaction-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-web</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.joshelser</groupId>
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
</exclusion>

<!-- org.apache.hive:hive-service-rpc -->
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>

<!-- org.apache.hive:hive-serde -->
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-standalone-metastore-server</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
Expand Down Expand Up @@ -871,10 +1040,6 @@ under the License.
<artifactId>hadoop-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -960,10 +1125,6 @@ under the License.
<artifactId>hadoop-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -1298,7 +1459,7 @@ under the License.
<profile>
<id>hive3</id>
<properties>
<hive.version>3.1.3</hive.version>
<hive.version>4.0.0</hive.version>
<derby.version>10.14.1.0</derby.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private static void setSplitMaxSize(
long maxSplitBytes =
calculateMaxSplitBytes(
totalByteWithOpenCost, minNumSplits, defaultMaxSplitBytes, openCost);
jobConf.set(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, String.valueOf(maxSplitBytes));
jobConf.set(HiveConf.ConfVars.MAPRED_MAX_SPLIT_SIZE.varname, String.valueOf(maxSplitBytes));
}

private static long calculateMaxSplitBytes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private DataStreamSink<?> consume(
Class hiveOutputFormatClz =
hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
boolean isCompressed =
jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
jobConf.getBoolean(HiveConf.ConfVars.COMPRESS_RESULT.varname, false);
HiveWriterFactory writerFactory =
new HiveWriterFactory(
jobConf,
Expand Down Expand Up @@ -323,7 +323,7 @@ private DataStreamSink<?> consume(
new Path(
HiveConf.getVar(
HiveConfUtils.create(jobConf),
HiveConf.ConfVars.SCRATCHDIR));
HiveConf.ConfVars.SCRATCH_DIR));
// TODO: may append something more meaningful than a timestamp, like query ID
Path scratchDir =
new Path(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -63,7 +62,7 @@ public class HiveMapredSplitReader implements SplitReader {
protected Writable value;
private boolean fetched = false;
private boolean hasNext;
private final Deserializer deserializer;
private final AbstractSerDe deserializer;

// indices of fields to be returned, with projection applied (if any)
// TODO: push projection into underlying input format that supports it
Expand Down Expand Up @@ -122,11 +121,10 @@ public HiveMapredSplitReader(
value = this.recordReader.createValue();
try {
deserializer =
(Deserializer)
(AbstractSerDe)
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Configuration conf = new Configuration();
SerDeUtils.initializeSerDe(
deserializer, conf, hiveTablePartition.getTableProps(), null);
deserializer.initialize(conf, hiveTablePartition.getTableProps(), null);
structObjectInspector = (StructObjectInspector) deserializer.getObjectInspector();
structFields = structObjectInspector.getAllStructFieldRefs();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@
public class JobConfUtils {

/**
* Gets the {@link HiveConf.ConfVars#DEFAULTPARTITIONNAME} value from the {@link
* Gets the {@link HiveConf.ConfVars#DEFAULT_PARTITION_NAME} value from the {@link
* JobConfWrapper}.
*/
public static String getDefaultPartitionName(JobConfWrapper confWrapper) {
return getDefaultPartitionName(confWrapper.conf());
}

/** Gets the {@link HiveConf.ConfVars#DEFAULTPARTITIONNAME} value from the {@link JobConf}. */
/** Gets the {@link HiveConf.ConfVars#DEFAULT_PARTITION_NAME} value from the {@link JobConf}. */
public static String getDefaultPartitionName(JobConf jobConf) {
return jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
HiveConf.ConfVars.DEFAULT_PARTITION_NAME.varname,
HiveConf.ConfVars.DEFAULT_PARTITION_NAME.defaultStrVal);
}

private static void addCredentialsIntoJobConf(JobConf jobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class HiveWriterFactory implements Serializable {

// SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make sure to use a common
// base class
private transient Serializer recordSerDe;
private transient AbstractSerDe recordSerDe;

/** Field number excluding partition fields. */
private transient int formatFields;
Expand Down Expand Up @@ -135,7 +135,7 @@ public RecordWriter createRecordWriter(Path path) {
JobConf conf = new JobConf(confWrapper.conf());

if (isCompressed) {
String codecStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC.varname);
String codecStr = conf.get(HiveConf.ConfVars.COMPRESS_INTERMEDIATE_CODEC.varname);
if (!StringUtils.isNullOrWhitespaceOnly(codecStr)) {
//noinspection unchecked
Class<? extends CompressionCodec> codec =
Expand All @@ -146,7 +146,7 @@ public RecordWriter createRecordWriter(Path path) {
Thread.currentThread().getContextClassLoader());
FileOutputFormat.setOutputCompressorClass(conf, codec);
}
String typeStr = conf.get(HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE.varname);
String typeStr = conf.get(HiveConf.ConfVars.COMPRESS_INTERMEDIATE_TYPE.varname);
if (!StringUtils.isNullOrWhitespaceOnly(typeStr)) {
SequenceFile.CompressionType style =
SequenceFile.CompressionType.valueOf(typeStr);
Expand Down Expand Up @@ -182,11 +182,9 @@ private void checkInitialize() throws Exception {
serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got "
+ serdeLib.getClass().getName());
this.recordSerDe = (Serializer) serdeLib;
this.recordSerDe = (AbstractSerDe) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);

// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
recordSerDe.initialize(jobConf, tableProperties, null);

this.formatFields = allColumns.length - partitionColumns.length;
this.hiveConversions = new HiveObjectConversion[formatFields];
Expand Down
Loading

0 comments on commit 18ece46

Please sign in to comment.