diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java index 0188a29637d..055ef9541d9 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.EnumSet; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; @@ -71,11 +72,11 @@ public static EncoderFactory get() { * likely to improve performance but may be useful for the * downstream OutputStream. * @return This factory, to enable method chaining: - * + * *
    *         EncoderFactory factory = new EncoderFactory().configureBufferSize(4096);
    *         
- * + * * @see #binaryEncoder(OutputStream, BinaryEncoder) */ public EncoderFactory configureBufferSize(int size) { @@ -90,7 +91,7 @@ public EncoderFactory configureBufferSize(int size) { /** * Returns this factory's configured default buffer size. Used when creating * Encoder instances that buffer writes. - * + * * @see #configureBufferSize(int) * @see #binaryEncoder(OutputStream, BinaryEncoder) * @return The preferred buffer size, in bytes. @@ -109,11 +110,11 @@ public int getBufferSize() { * outside this range are set to the nearest value in the range. The * encoder will require at least this amount of memory. * @return This factory, to enable method chaining: - * + * *
    *         EncoderFactory factory = new EncoderFactory().configureBlockSize(8000);
    *         
- * + * * @see #blockingBinaryEncoder(OutputStream, BinaryEncoder) */ public EncoderFactory configureBlockSize(int size) { @@ -131,7 +132,7 @@ public EncoderFactory configureBlockSize(int size) { * #blockingBinaryEncoder(OutputStream, BinaryEncoder) will have block buffers * of this size. *

- * + * * @see #configureBlockSize(int) * @see #blockingBinaryEncoder(OutputStream, BinaryEncoder) * @return The preferred block size, in bytes. @@ -297,6 +298,38 @@ public JsonEncoder jsonEncoder(Schema schema, OutputStream out, boolean pretty) return new JsonEncoder(schema, out, pretty); } + /** + * Creates a {@link JsonEncoder} using the OutputStream provided for writing + * data conforming to the Schema provided with optional pretty printing. + *

+ * {@link JsonEncoder} buffers its output. Data may not appear on the underlying + * OutputStream until {@link Encoder#flush()} is called. + *

+ * {@link JsonEncoder} is not thread-safe. + * + * @param schema The Schema for data written to this JsonEncoder. Cannot be + * null. + * @param out The OutputStream to write to. Cannot be null. + * @param pretty Pretty print encoding. + * @param autoflush Whether to Automatically flush the data to storage, default + * is true controls the underlying FLUSH_PASSED_TO_STREAM + * feature of JsonGenerator + * @return A JsonEncoder configured with out, schema and + * pretty + * @throws IOException + */ + public JsonEncoder jsonEncoder(Schema schema, OutputStream out, boolean pretty, boolean autoflush) + throws IOException { + EnumSet options = EnumSet.noneOf(JsonEncoder.JsonOptions.class); + if (pretty) { + options.add(JsonEncoder.JsonOptions.Pretty); + } + if (!autoflush) { + options.add(JsonEncoder.JsonOptions.NoFlushStream); + } + return new JsonEncoder(schema, out, options); + } + /** * Creates a {@link JsonEncoder} using the {@link JsonGenerator} provided for * output of data conforming to the Schema provided. diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java index 71cc690b8a4..7e3a67eb6db 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java @@ -22,7 +22,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.BitSet; +import java.util.EnumSet; import java.util.Objects; +import java.util.Set; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; @@ -33,6 +35,7 @@ import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.PrettyPrinter; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; @@ -58,11 +61,15 @@ public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler protected BitSet isEmpty = new BitSet(); JsonEncoder(Schema sc, OutputStream out) throws IOException { - this(sc, getJsonGenerator(out, false)); + this(sc, getJsonGenerator(out, EnumSet.noneOf(JsonOptions.class))); } JsonEncoder(Schema sc, OutputStream out, boolean pretty) throws IOException { - this(sc, getJsonGenerator(out, pretty)); + this(sc, getJsonGenerator(out, pretty ? EnumSet.of(JsonOptions.Pretty) : EnumSet.noneOf(JsonOptions.class))); + } + + JsonEncoder(Schema sc, OutputStream out, Set options) throws IOException { + this(sc, getJsonGenerator(out, options)); } JsonEncoder(Schema sc, JsonGenerator out) throws IOException { @@ -78,24 +85,28 @@ public void flush() throws IOException { } } + enum JsonOptions { + Pretty, + + // Prevent underlying outputstream to be flush for optimisation purpose. + NoFlushStream + } + // by default, one object per line. // with pretty option use default pretty printer with root line separator. - private static JsonGenerator getJsonGenerator(OutputStream out, boolean pretty) throws IOException { + private static JsonGenerator getJsonGenerator(OutputStream out, Set options) throws IOException { Objects.requireNonNull(out, "OutputStream cannot be null"); JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - if (pretty) { - DefaultPrettyPrinter pp = new DefaultPrettyPrinter() { - @Override - public void writeRootValueSeparator(JsonGenerator jg) throws IOException { - jg.writeRaw(LINE_SEPARATOR); - } - }; - g.setPrettyPrinter(pp); + if (options.contains(JsonOptions.NoFlushStream)) { + g = g.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false); + } + final PrettyPrinter pp; + if (options.contains(JsonOptions.Pretty)) { + pp = new DefaultPrettyPrinter(LINE_SEPARATOR); } else { - MinimalPrettyPrinter pp = new MinimalPrettyPrinter(); - pp.setRootValueSeparator(LINE_SEPARATOR); - g.setPrettyPrinter(pp); + pp = new MinimalPrettyPrinter(LINE_SEPARATOR); } + g.setPrettyPrinter(pp); return g; } @@ -122,7 +133,29 @@ public void setIncludeNamespace(final boolean includeNamespace) { * @return this JsonEncoder */ public JsonEncoder configure(OutputStream out) throws IOException { - this.configure(getJsonGenerator(out, false)); + return this.configure(out, true); + } + + /** + * Reconfigures this JsonEncoder to use the output stream provided. + *

+ * If the OutputStream provided is null, a NullPointerException is thrown. + *

+ * Otherwise, this JsonEncoder will flush its current output and then + * reconfigure its output to use a default UTF8 JsonGenerator that writes to the + * provided OutputStream. + * + * @param out The OutputStream to direct output to. Cannot be null. + * @throws IOException + * @throws NullPointerException if {@code out} is {@code null} + * @return this JsonEncoder + */ + public JsonEncoder configure(OutputStream out, boolean autoflush) throws IOException { + EnumSet jsonOptions = EnumSet.noneOf(JsonOptions.class); + if (!autoflush) { + jsonOptions.add(JsonOptions.NoFlushStream); + } + this.configure(getJsonGenerator(out, jsonOptions)); return this; } diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java index 665a0e7b6f9..0c2842ec160 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java @@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericDatumWriter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; + +import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -332,4 +334,35 @@ private String fromAvroToJson(byte[] avroBytes, Schema schema, boolean includeNa return new String(output.toByteArray(), StandardCharsets.UTF_8.name()); } + + @Test + public void testJsonEncoderInitAutoFlush() throws IOException { + Schema s = Schema.parse("\"int\""); + OutputStream baos = new ByteArrayOutputStream(); + OutputStream out = new BufferedOutputStream(baos); + JsonEncoder enc = factory.jsonEncoder(s, out, false); + enc.configure(out, false); + enc.writeInt(24); + enc.flush(); + assertEquals("", baos.toString()); + out.flush(); + assertEquals("24", baos.toString()); + } + + @Test + public void testJsonEncoderInitAutoFlushDisabled() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream out = new BufferedOutputStream(baos); + Schema ints = Schema.create(Type.INT); + Encoder e = factory.jsonEncoder(ints, out, false, false); + String separator = System.getProperty("line.separator"); + GenericDatumWriter writer = new GenericDatumWriter(ints); + writer.write(1, e); + writer.write(2, e); + e.flush(); + assertEquals("", baos.toString()); + out.flush(); + assertEquals("1" + separator + "2", baos.toString()); + out.close(); + } }