Skip to content

Commit

Permalink
avro-2282: json flush option for optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
clesaec authored and dkulp committed Jul 31, 2023
1 parent dcbff5b commit 6be1b1f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -71,11 +72,11 @@ public static EncoderFactory get() {
* likely to improve performance but may be useful for the
* downstream OutputStream.
* @return This factory, to enable method chaining:
*
*
* <pre>
* EncoderFactory factory = new EncoderFactory().configureBufferSize(4096);
* </pre>
*
*
* @see #binaryEncoder(OutputStream, BinaryEncoder)
*/
public EncoderFactory configureBufferSize(int size) {
Expand All @@ -90,7 +91,7 @@ public EncoderFactory configureBufferSize(int size) {
/**
* Returns this factory's configured default buffer size. Used when creating
* Encoder instances that buffer writes.
*
*
* @see #configureBufferSize(int)
* @see #binaryEncoder(OutputStream, BinaryEncoder)
* @return The preferred buffer size, in bytes.
Expand All @@ -109,11 +110,11 @@ public int getBufferSize() {
* outside this range are set to the nearest value in the range. The
* encoder will require at least this amount of memory.
* @return This factory, to enable method chaining:
*
*
* <pre>
* EncoderFactory factory = new EncoderFactory().configureBlockSize(8000);
* </pre>
*
*
* @see #blockingBinaryEncoder(OutputStream, BinaryEncoder)
*/
public EncoderFactory configureBlockSize(int size) {
Expand All @@ -131,7 +132,7 @@ public EncoderFactory configureBlockSize(int size) {
* #blockingBinaryEncoder(OutputStream, BinaryEncoder) will have block buffers
* of this size.
* <p/>
*
*
* @see #configureBlockSize(int)
* @see #blockingBinaryEncoder(OutputStream, BinaryEncoder)
* @return The preferred block size, in bytes.
Expand Down Expand Up @@ -297,6 +298,38 @@ public JsonEncoder jsonEncoder(Schema schema, OutputStream out, boolean pretty)
return new JsonEncoder(schema, out, pretty);
}

/**
* Creates a {@link JsonEncoder} using the OutputStream provided for writing
* data conforming to the Schema provided with optional pretty printing.
* <p/>
* {@link JsonEncoder} buffers its output. Data may not appear on the underlying
* OutputStream until {@link Encoder#flush()} is called.
* <p/>
* {@link JsonEncoder} is not thread-safe.
*
* @param schema The Schema for data written to this JsonEncoder. Cannot be
* null.
* @param out The OutputStream to write to. Cannot be null.
* @param pretty Pretty print encoding.
* @param autoflush Whether to Automatically flush the data to storage, default
* is true controls the underlying FLUSH_PASSED_TO_STREAM
* feature of JsonGenerator
* @return A JsonEncoder configured with <i>out</i>, <i>schema</i> and
* <i>pretty</i>
* @throws IOException
*/
public JsonEncoder jsonEncoder(Schema schema, OutputStream out, boolean pretty, boolean autoflush)
throws IOException {
EnumSet<JsonEncoder.JsonOptions> options = EnumSet.noneOf(JsonEncoder.JsonOptions.class);
if (pretty) {
options.add(JsonEncoder.JsonOptions.Pretty);
}
if (!autoflush) {
options.add(JsonEncoder.JsonOptions.NoFlushStream);
}
return new JsonEncoder(schema, out, options);
}

/**
* Creates a {@link JsonEncoder} using the {@link JsonGenerator} provided for
* output of data conforming to the Schema provided.
Expand Down
63 changes: 48 additions & 15 deletions lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
Expand All @@ -33,6 +35,7 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.PrettyPrinter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;

Expand All @@ -58,11 +61,15 @@ public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler
protected BitSet isEmpty = new BitSet();

JsonEncoder(Schema sc, OutputStream out) throws IOException {
this(sc, getJsonGenerator(out, false));
this(sc, getJsonGenerator(out, EnumSet.noneOf(JsonOptions.class)));
}

JsonEncoder(Schema sc, OutputStream out, boolean pretty) throws IOException {
this(sc, getJsonGenerator(out, pretty));
this(sc, getJsonGenerator(out, pretty ? EnumSet.of(JsonOptions.Pretty) : EnumSet.noneOf(JsonOptions.class)));
}

JsonEncoder(Schema sc, OutputStream out, Set<JsonOptions> options) throws IOException {
this(sc, getJsonGenerator(out, options));
}

JsonEncoder(Schema sc, JsonGenerator out) throws IOException {
Expand All @@ -78,24 +85,28 @@ public void flush() throws IOException {
}
}

enum JsonOptions {
Pretty,

// Prevent underlying outputstream to be flush for optimisation purpose.
NoFlushStream
}

// by default, one object per line.
// with pretty option use default pretty printer with root line separator.
private static JsonGenerator getJsonGenerator(OutputStream out, boolean pretty) throws IOException {
private static JsonGenerator getJsonGenerator(OutputStream out, Set<JsonOptions> options) throws IOException {
Objects.requireNonNull(out, "OutputStream cannot be null");
JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
if (pretty) {
DefaultPrettyPrinter pp = new DefaultPrettyPrinter() {
@Override
public void writeRootValueSeparator(JsonGenerator jg) throws IOException {
jg.writeRaw(LINE_SEPARATOR);
}
};
g.setPrettyPrinter(pp);
if (options.contains(JsonOptions.NoFlushStream)) {
g = g.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
}
final PrettyPrinter pp;
if (options.contains(JsonOptions.Pretty)) {
pp = new DefaultPrettyPrinter(LINE_SEPARATOR);
} else {
MinimalPrettyPrinter pp = new MinimalPrettyPrinter();
pp.setRootValueSeparator(LINE_SEPARATOR);
g.setPrettyPrinter(pp);
pp = new MinimalPrettyPrinter(LINE_SEPARATOR);
}
g.setPrettyPrinter(pp);
return g;
}

Expand All @@ -122,7 +133,29 @@ public void setIncludeNamespace(final boolean includeNamespace) {
* @return this JsonEncoder
*/
public JsonEncoder configure(OutputStream out) throws IOException {
this.configure(getJsonGenerator(out, false));
return this.configure(out, true);
}

/**
* Reconfigures this JsonEncoder to use the output stream provided.
* <p/>
* If the OutputStream provided is null, a NullPointerException is thrown.
* <p/>
* Otherwise, this JsonEncoder will flush its current output and then
* reconfigure its output to use a default UTF8 JsonGenerator that writes to the
* provided OutputStream.
*
* @param out The OutputStream to direct output to. Cannot be null.
* @throws IOException
* @throws NullPointerException if {@code out} is {@code null}
* @return this JsonEncoder
*/
public JsonEncoder configure(OutputStream out, boolean autoflush) throws IOException {
EnumSet<JsonOptions> jsonOptions = EnumSet.noneOf(JsonOptions.class);
if (!autoflush) {
jsonOptions.add(JsonOptions.NoFlushStream);
}
this.configure(getJsonGenerator(out, jsonOptions));
return this;
}

Expand Down
33 changes: 33 additions & 0 deletions lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -332,4 +334,35 @@ private String fromAvroToJson(byte[] avroBytes, Schema schema, boolean includeNa

return new String(output.toByteArray(), StandardCharsets.UTF_8.name());
}

@Test
public void testJsonEncoderInitAutoFlush() throws IOException {
Schema s = Schema.parse("\"int\"");
OutputStream baos = new ByteArrayOutputStream();
OutputStream out = new BufferedOutputStream(baos);
JsonEncoder enc = factory.jsonEncoder(s, out, false);
enc.configure(out, false);
enc.writeInt(24);
enc.flush();
assertEquals("", baos.toString());
out.flush();
assertEquals("24", baos.toString());
}

@Test
public void testJsonEncoderInitAutoFlushDisabled() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream out = new BufferedOutputStream(baos);
Schema ints = Schema.create(Type.INT);
Encoder e = factory.jsonEncoder(ints, out, false, false);
String separator = System.getProperty("line.separator");
GenericDatumWriter<Integer> writer = new GenericDatumWriter<Integer>(ints);
writer.write(1, e);
writer.write(2, e);
e.flush();
assertEquals("", baos.toString());
out.flush();
assertEquals("1" + separator + "2", baos.toString());
out.close();
}
}

0 comments on commit 6be1b1f

Please sign in to comment.