Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-XXX: Support orc.compression.zstd.workers #1756

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public enum OrcConf {
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
COMPRESSION_ZSTD_WORKERS("orc.compression.zstd.workers",
"hive.exec.orc.compression.zstd.workers", 0,
"Define the number of workers to use with ZStandard codec while writing data."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
Expand Down
12 changes: 12 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ public static class ZstdCompressOptions {
private int compressionZstdLevel;
private int compressionZstdWindowLog;

private int compressionZstdWorkers;

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}
Expand All @@ -445,6 +447,14 @@ public int getCompressionZstdWindowLog() {
public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
this.compressionZstdWindowLog = compressionZstdWindowLog;
}

public int getCompressionZstdWorkers() {
return compressionZstdWorkers;
}

public void setCompressionZstdWorkers(int compressionZstdWorkers) {
this.compressionZstdWorkers = compressionZstdWorkers;
}
}

/**
Expand Down Expand Up @@ -520,6 +530,8 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWindowLog(
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWorkers(
OrcConf.COMPRESSION_ZSTD_WORKERS.getInt(tableProperties, conf));

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
if (zstdCompressOptions != null) {
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
options.setWorkers(zstdCompressOptions.getCompressionZstdWorkers());
}
}
compress.withCodec(codec, tempOptions);
Expand Down
24 changes: 18 additions & 6 deletions java/core/src/java/org/apache/orc/impl/ZstdCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public class ZstdCodec implements CompressionCodec {
private ZstdOptions zstdOptions = null;
private ZstdCompressCtx zstdCompressCtx = null;

public ZstdCodec(int level, int windowLog) {
this.zstdOptions = new ZstdOptions(level, windowLog);
public ZstdCodec(int level, int windowLog, int workers) {
this.zstdOptions = new ZstdOptions(level, windowLog, workers);
}

public ZstdCodec() {
this(1, 0);
this(1, 0, 0);
}

public ZstdOptions getZstdOptions() {
Expand All @@ -58,14 +58,17 @@ static class ZstdOptions implements Options {
private int level;
private int windowLog;

ZstdOptions(int level, int windowLog) {
private int workers;

ZstdOptions(int level, int windowLog, int workers) {
this.level = level;
this.windowLog = windowLog;
this.workers = workers;
}

@Override
public ZstdOptions copy() {
return new ZstdOptions(level, windowLog);
return new ZstdOptions(level, windowLog, workers);
}

@Override
Expand Down Expand Up @@ -123,6 +126,14 @@ public ZstdOptions setLevel(int newValue) {
return this;
}

public ZstdOptions setWorkers(int newValue) {
if (newValue < 0) {
throw new IllegalArgumentException("The number of workers should be non-negative.");
}
workers = newValue;
return this;
}

@Override
public ZstdOptions setData(DataKind newValue) {
return this; // We don't support setting DataKind in ZstdCodec.
Expand All @@ -148,7 +159,7 @@ public int hashCode() {
}

private static final ZstdOptions DEFAULT_OPTIONS =
new ZstdOptions(1, 0);
new ZstdOptions(1, 0, 0);

@Override
public Options getDefaultOptions() {
Expand Down Expand Up @@ -177,6 +188,7 @@ public boolean compress(ByteBuffer in, ByteBuffer out,
zstdCompressCtx.setLevel(zso.level);
zstdCompressCtx.setLong(zso.windowLog);
zstdCompressCtx.setChecksum(false);
zstdCompressCtx.setWorkers(zso.workers);

try {
int inBytes = in.remaining();
Expand Down
Loading