From 3fd405c2ee24729b878e711b25023485651c4792 Mon Sep 17 00:00:00 2001 From: Oscar Westra van Holthe - Kind Date: Thu, 4 Apr 2024 10:59:47 +0200 Subject: [PATCH] AVRO-3666 [Java] Use the new schema parser (#2642) * AVRO-3666: Redo schema parsing code This undoes the split schema parsing to allow forward references, which is to be handles by the SchemaParser & ParseContext classes. It uses the new ParseContext for the classic schema parser to accommodate this. Next step: use the new SchemaParser and resolve unresolved / forward references after parsing. This will also resolve "forward" references that were parsed in subsequent files. * AVRO-3666: Resolve references after parsing By resolving references after parsing, we allow both forward references within a file as between subsequent files. This change also includes using the new SchemaParser everywhere, as using it is the best way to flush out bugs. * AVRO-3666: Remove wrong test * AVRO-1535: Fix aliases as well * AVRO-3666: Re-enable disabled test Also includes changes necessary to debug. * AVRO-3666: Fix RAT exclusion The wrong exclusion was removed. * AVRO-3666: Remove unused field * AVRO-3666: Introduce SchemaParser.ParseResult This ensures the SchemaParser never returns unresolved schemata. * AVRO-3666: Use SchemaParser for documentation * AVRO-3666: Refactor after review * AVRO-3666: Fix javadoc * AVRO-3666: Fix merge bug * AVRO-3666: Fix CodeQL warnings * AVRO-3666: Increase test coverage * AVRO-3666: Fix tests * AVRO-3666: Refactor schema parsing for readability The JSON schema parser is quite complex (it is a large method). This change splits it in multiple methods, naming the various stages. * AVRO-3666: rename method to avoid confusion * AVRO-3666: Reduce PR size This change reduces the PR size, but does require some extra work after merging: the new SchemaParser class is hardly used, and the (now) obsolete Schema.Parser class is used heavily. * AVRO-3666: Reduce PR size more * AVRO-3666: Reduce PR size again * AVRO-3666: Spotless * Update lang/java/avro/src/main/java/org/apache/avro/Schema.java Co-authored-by: Fokko Driesprong * AVRO-3666: Spotless --------- Co-authored-by: Fokko Driesprong --- .../Getting started (Java)/_index.md | 2 +- .../apache/avro/FormattedSchemaParser.java | 8 +- .../org/apache/avro/JsonSchemaParser.java | 28 +- .../java/org/apache/avro/ParseContext.java | 198 ++++- .../main/java/org/apache/avro/Protocol.java | 129 +-- .../src/main/java/org/apache/avro/Schema.java | 759 +++++++----------- .../java/org/apache/avro/SchemaParser.java | 69 +- .../org/apache/avro/util/SchemaResolver.java | 166 +--- .../org/apache/avro/DummySchemaParser.java | 2 +- .../org/apache/avro/ParseContextTest.java | 19 +- .../test/java/org/apache/avro/TestSchema.java | 30 +- .../org/apache/avro/TestSchemaParser.java | 32 +- .../avro/compiler/idl/SchemaResolver.java | 14 +- .../compiler/specific/SpecificCompiler.java | 13 +- .../org/apache/avro/compiler/idl/idl.jj | 2 + .../java/org/apache/avro/idl/IdlFile.java | 68 +- .../java/org/apache/avro/idl/IdlReader.java | 35 +- .../test/idl/input/schema_syntax_schema.avdl | 2 +- .../idl/src/test/idl/input/status_schema.avdl | 2 + .../java/org/apache/avro/idl/TestCycle.java | 2 +- .../java/org/apache/avro/mojo/IDLMojo.java | 1 - .../java/org/apache/avro/mojo/SchemaMojo.java | 11 +- .../java/org/apache/avro/tool/IdlTool.java | 1 - 23 files changed, 809 insertions(+), 784 deletions(-) diff --git a/doc/content/en/docs/++version++/Getting started (Java)/_index.md b/doc/content/en/docs/++version++/Getting started (Java)/_index.md index 2d964c9c16c..429e9837641 100644 --- a/doc/content/en/docs/++version++/Getting started (Java)/_index.md +++ b/doc/content/en/docs/++version++/Getting started (Java)/_index.md @@ -212,7 +212,7 @@ Let's go over the same example as in the previous section, but without using cod First, we use a SchemaParser to read our schema definition and create a Schema object. ```java -Schema schema = new SchemaParser().parse(new File("user.avsc")); +Schema schema = new SchemaParser().parse(new File("user.avsc")).mainSchema(); ``` Using this schema, let's create some users. diff --git a/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java b/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java index cd67788fa9e..c37eca15dc6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java +++ b/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java @@ -50,7 +50,7 @@ public interface FormattedSchemaParser { * when expecting JSON), it is a good idea not to do anything (especially * calling methods on the @code ParseContext}). *
  • The parameter {@code parseContext} is not thread-safe.
  • - *
  • When parsing, all parsed schema definitions should be added to the + *
  • All named schema definitions that are parsed should be added to the * provided {@link ParseContext}.
  • *
  • Optionally, you may return a "main" schema. Some schema definitions have * one, for example the schema defined by the root of the JSON document in a @@ -62,9 +62,9 @@ public interface FormattedSchemaParser { * the parsing process, so reserve that for rethrowing exceptions.
  • * * - * @param parseContext the current parse context: all parsed schemata should - * be added here to resolve names with; contains all - * previously known types + * @param parseContext the current parse context: all named schemata that are + * parsed should be added here, otherwise resolving + * schemata can fail; contains all previously known types * @param baseUri the base location of the schema, or {@code null} if * not known * @param formattedSchema the text of the schema definition(s) to parse diff --git a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java index c7d91878627..5dd532444a3 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java +++ b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java @@ -57,26 +57,32 @@ public static Schema parseInternal(String... fragments) { for (String fragment : fragments) { buffer.append(fragment); } - return new JsonSchemaParser().parse(new ParseContext(NameValidator.NO_VALIDATION), buffer, null); + + boolean saved = Schema.getValidateDefaults(); + try { + Schema.setValidateDefaults(false); + ParseContext context = new ParseContext(NameValidator.NO_VALIDATION); + Schema schema = new JsonSchemaParser().parse(context, buffer, true); + context.commit(); + context.resolveAllSchemas(); + return context.resolve(schema); + } finally { + Schema.setValidateDefaults(saved); + } } @Override public Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema) throws IOException, SchemaParseException { - return parse(parseContext, formattedSchema, parseContext.nameValidator); + return parse(parseContext, formattedSchema, false); } - private Schema parse(ParseContext parseContext, CharSequence formattedSchema, NameValidator nameValidator) + private Schema parse(ParseContext parseContext, CharSequence formattedSchema, boolean allowInvalidDefaults) throws SchemaParseException { - Schema.Parser parser = new Schema.Parser(nameValidator); - if (nameValidator == NameValidator.NO_VALIDATION) { + Schema.Parser parser = new Schema.Parser(parseContext); + if (allowInvalidDefaults) { parser.setValidateDefaults(false); - } else { - parser = new Schema.Parser(nameValidator); } - parser.addTypes(parseContext.typesByName().values()); - Schema schema = parser.parse(formattedSchema.toString()); - parser.getTypes().values().forEach(parseContext::put); - return schema; + return parser.parseInternal(formattedSchema.toString()); } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java index 401c93e50c5..b7bc42b9787 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java +++ b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java @@ -18,25 +18,36 @@ package org.apache.avro; import org.apache.avro.util.SchemaResolver; +import org.apache.avro.util.Schemas; +import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; /** * Class to define a name context, useful to reference schemata with. This * allows for the following: * *
      - *
    • Provide a default namespace for nested contexts, as found for example in - * JSON based schema definitions.
    • - *
    • Find schemata by name, including primitives.
    • *
    • Collect new named schemata.
    • + *
    • Find schemata by name, including primitives.
    • + *
    • Find schemas that do not exist yet.
    • + *
    • Resolve references to schemas that didn't exist yet when first used.
    • *
    * *

    + * This class is NOT thread-safe. + *

    + * + *

    * Note: this class has no use for most Avro users, but is a key component when * implementing a schema parser. *

    @@ -60,10 +71,27 @@ public class ParseContext { private static final Set NAMED_SCHEMA_TYPES = EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM, Schema.Type.FIXED); + /** + * Collection of old schemata. Can contain unresolved references if !isResolved. + */ private final Map oldSchemas; + /** + * Collection of new schemata. Can contain unresolved references. + */ private final Map newSchemas; + /** + * The name validator to use. + */ // Visible for use in JsonSchemaParser final NameValidator nameValidator; + /** + * Visitor that was used to resolve schemata with. If not available, some + * schemata in {@code oldSchemas} may not be fully resolved. If available, all + * schemata in {@code oldSchemas} are resolved, and {@code newSchemas} is empty. + * After visiting a schema, it can return the corresponding resolved schema for + * a schema that possibly contains unresolved references. + */ + private SchemaResolver.ResolvingVisitor resolvingVisitor; /** * Create a {@code ParseContext} for the default/{@code null} namespace, using @@ -78,22 +106,14 @@ public ParseContext() { * schemata. */ public ParseContext(NameValidator nameValidator) { - this(nameValidator, new LinkedHashMap<>(), new LinkedHashMap<>()); + this(requireNonNull(nameValidator), new LinkedHashMap<>(), new LinkedHashMap<>()); } private ParseContext(NameValidator nameValidator, Map oldSchemas, Map newSchemas) { this.nameValidator = nameValidator; this.oldSchemas = oldSchemas; this.newSchemas = newSchemas; - } - - /** - * Create a derived context using a different fallback namespace. - * - * @return a new context - */ - public ParseContext namespace() { - return new ParseContext(nameValidator, oldSchemas, newSchemas); + resolvingVisitor = null; } /** @@ -109,45 +129,64 @@ public boolean contains(String name) { /** *

    - * Resolve a schema by name. + * Find a schema by name and namespace. *

    * *

    * That is: *

    * - *
      - *
    • If {@code fullName} is a primitive name, return a (new) schema for - * it
    • - *
    • Otherwise: resolve the schema in its own namespace and in the null - * namespace (the former takes precedence)
    • - *
    + *
      + *
    1. If {@code name} is a primitive name, return a (new) schema for it
    2. + *
    3. Otherwise, determine the full schema name (using the given + * {@code namespace} if necessary), and find it
    4. + *
    5. If no schema was found and {@code name} is a simple name, find the schema + * in the default (null) namespace
    6. + *
    7. If still no schema was found, return an unresolved reference for the full + * schema name (see step 2)
    8. + *
    * - * Resolving means that the schema is returned if known, and otherwise an - * unresolved schema (a reference) is returned. + *

    + * Note: as an unresolved reference might be returned, the schema is not + * directly usable. Please {@link #put(Schema)} the schema using it in the + * context. The {@link SchemaParser} and protocol parsers will ensure you'll + * only get a resolved schema that is usable. + *

    * - * @param fullName the full schema name to resolve - * @return the schema - * @throws SchemaParseException when the schema does not exist + * @param name the schema name to find + * @param namespace the namespace to find the schema against + * @return the schema, or an unresolved reference */ - public Schema resolve(String fullName) { - Schema.Type type = PRIMITIVES.get(fullName); + public Schema find(String name, String namespace) { + Schema.Type type = PRIMITIVES.get(name); if (type != null) { return Schema.create(type); } - Schema schema = getSchema(fullName); + String fullName = fullName(name, namespace); + Schema schema = getNamedSchema(fullName); if (schema == null) { - // Not found; attempt to resolve in the default namespace - int lastDot = fullName.lastIndexOf('.'); - String name = fullName.substring(lastDot + 1); - schema = getSchema(name); + schema = getNamedSchema(name); } return schema != null ? schema : SchemaResolver.unresolvedSchema(fullName); } - private Schema getSchema(String fullName) { + private String fullName(String name, String namespace) { + if (namespace != null && name.lastIndexOf('.') < 0) { + return namespace + "." + name; + } + return name; + } + + /** + * Get a schema by name. Note that the schema might not (yet) be resolved/usable + * until {@link #resolveAllSchemas()} has been called. + * + * @param fullName a full schema name + * @return the schema, if known + */ + public Schema getNamedSchema(String fullName) { Schema schema = oldSchemas.get(fullName); if (schema == null) { schema = newSchemas.get(fullName); @@ -155,10 +194,6 @@ private Schema getSchema(String fullName) { return schema; } - private boolean notEmpty(String str) { - return str != null && !str.isEmpty(); - } - /** * Put the schema into this context. This is an idempotent operation: it only * fails if this context already has a different schema with the same name. @@ -184,6 +219,7 @@ public void put(Schema schema) { throw new SchemaParseException("Can't redefine: " + fullName); } } else { + resolvingVisitor = null; Schema previouslyAddedSchema = newSchemas.putIfAbsent(fullName, schema); if (previouslyAddedSchema != null && !previouslyAddedSchema.equals(schema)) { throw new SchemaParseException("Can't redefine: " + fullName); @@ -200,10 +236,10 @@ private String requireValidFullName(String fullName) { return fullName; } - private void validateName(String name, String what) { + private void validateName(String name, String typeOfName) { NameValidator.Result result = nameValidator.validate(name); if (!result.isOK()) { - throw new SchemaParseException(what + " \"" + name + "\" is invalid: " + result.getErrors()); + throw new SchemaParseException(typeOfName + " \"" + name + "\" is invalid: " + result.getErrors()); } } @@ -216,12 +252,94 @@ public void commit() { newSchemas.clear(); } + public SchemaParser.ParseResult commit(Schema mainSchema) { + Collection parsedNamedSchemas = newSchemas.values(); + SchemaParser.ParseResult parseResult = new SchemaParser.ParseResult() { + @Override + public Schema mainSchema() { + return mainSchema == null ? null : resolve(mainSchema); + } + + @Override + public List parsedNamedSchemas() { + return parsedNamedSchemas.stream().map(ParseContext.this::resolve).collect(Collectors.toList()); + } + }; + commit(); + return parseResult; + } + public void rollback() { newSchemas.clear(); } /** - * Return all known types by their fullname. + * Resolve all (named) schemas that were parsed. This resolves all forward + * references, even if parsed from different files. Note: the context must be + * committed for this method to work. + * + * @return all parsed schemas, in the order they were parsed + * @throws AvroTypeException if a schema reference cannot be resolved + */ + public List resolveAllSchemas() { + ensureSchemasAreResolved(); + + return new ArrayList<>(oldSchemas.values()); + } + + private void ensureSchemasAreResolved() { + if (hasNewSchemas()) { + throw new IllegalStateException("Schemas cannot be resolved unless the ParseContext is committed."); + } + if (resolvingVisitor == null) { + NameValidator saved = Schema.getNameValidator(); + try { + // Ensure we use the same validation when copying schemas as when they were + // defined. + Schema.setNameValidator(nameValidator); + SchemaResolver.ResolvingVisitor visitor = new SchemaResolver.ResolvingVisitor(oldSchemas::get); + oldSchemas.values().forEach(schema -> Schemas.visit(schema, visitor)); + // Before this point is where we can get exceptions due to resolving failures. + for (Map.Entry entry : oldSchemas.entrySet()) { + entry.setValue(visitor.getResolved(entry.getValue())); + } + resolvingVisitor = visitor; + } finally { + Schema.setNameValidator(saved); + } + } + } + + /** + * Resolve unresolved references in a schema that was parsed for this + * context using the types known to this context. Note: this method will + * ensure all known schemas are resolved, or throw, and thus requires the + * context to be committed. + * + * @param schema the schema resolve + * @return the fully resolved schema + * @throws AvroTypeException if a schema reference cannot be resolved + */ + public Schema resolve(Schema schema) { + ensureSchemasAreResolved(); + + // As all (named) schemas are resolved now, we know: + // — All named types are either in oldSchemas or unknown. + // — All unnamed types can be visited&resolved without validation. + + if (NAMED_SCHEMA_TYPES.contains(schema.getType()) && schema.getFullName() != null) { + return requireNonNull(oldSchemas.get(schema.getFullName()), () -> "Unknown schema: " + schema.getFullName()); + } else { + // Unnamed or anonymous schema + // (protocol message request parameters are anonymous records) + Schemas.visit(schema, resolvingVisitor); // This field is set, as ensureSchemasAreResolved(); was called. + return resolvingVisitor.getResolved(schema); + } + } + + /** + * Return all known types by their fullname. Warning: this returns all types, + * even uncommitted ones, including unresolved references! * * @return a map of all types by their name */ diff --git a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java index e837e6008a4..905f2778c6b 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java @@ -17,12 +17,6 @@ */ package org.apache.avro; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Field.Order; - import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -42,6 +36,12 @@ import java.util.Objects; import java.util.Set; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Field.Order; + /** * A set of messages forming an application protocol. *

    @@ -79,9 +79,9 @@ public class Protocol extends JsonProperties { /** A protocol message. */ public class Message extends JsonProperties { - private String name; - private String doc; - private Schema request; + private final String name; + private final String doc; + private final Schema request; /** Construct a message. */ private Message(String name, String doc, JsonProperties propMap, Schema request) { @@ -132,7 +132,7 @@ public String toString() { try { StringWriter writer = new StringWriter(); JsonGenerator gen = Schema.FACTORY.createGenerator(writer); - toJson(gen); + toJson(new HashSet<>(), gen); gen.flush(); return writer.toString(); } catch (IOException e) { @@ -140,19 +140,19 @@ public String toString() { } } - void toJson(JsonGenerator gen) throws IOException { + void toJson(Set knownNames, JsonGenerator gen) throws IOException { gen.writeStartObject(); if (doc != null) gen.writeStringField("doc", doc); writeProps(gen); // write out properties gen.writeFieldName("request"); - request.fieldsToJson(types, gen); + request.fieldsToJson(knownNames, namespace, gen); - toJson1(gen); + toJson1(knownNames, gen); gen.writeEndObject(); } - void toJson1(JsonGenerator gen) throws IOException { + void toJson1(Set knownNames, JsonGenerator gen) throws IOException { gen.writeStringField("response", "null"); gen.writeBooleanField("one-way", true); } @@ -177,9 +177,9 @@ public String getDoc() { } } - private class TwoWayMessage extends Message { - private Schema response; - private Schema errors; + private final class TwoWayMessage extends Message { + private final Schema response; + private final Schema errors; /** Construct a message. */ private TwoWayMessage(String name, String doc, Map propMap, Schema request, Schema response, @@ -227,15 +227,15 @@ public int hashCode() { } @Override - void toJson1(JsonGenerator gen) throws IOException { + void toJson1(Set knownNames, JsonGenerator gen) throws IOException { gen.writeFieldName("response"); - response.toJson(types, gen); + response.toJson(knownNames, namespace, gen); List errs = errors.getTypes(); // elide system error if (errs.size() > 1) { Schema union = Schema.createUnion(errs.subList(1, errs.size())); gen.writeFieldName("errors"); - union.toJson(types, gen); + union.toJson(knownNames, namespace, gen); } } @@ -245,7 +245,7 @@ void toJson1(JsonGenerator gen) throws IOException { private String namespace; private String doc; - private Schema.Names types = new Schema.Names(); + private ParseContext context = new ParseContext(); private final Map messages = new LinkedHashMap<>(); private byte[] md5; @@ -267,6 +267,7 @@ private Protocol() { * {@code doc}, and {@code namespace} as {code p} has. It also copies all the * {@code props}. */ + @SuppressWarnings("CopyConstructorMissesField") public Protocol(Protocol p) { this(p.getName(), p.getDoc(), p.getNamespace()); putAll(p); @@ -294,7 +295,6 @@ private void setName(String name, String namespace) { if (this.namespace != null && this.namespace.isEmpty()) { this.namespace = null; } - types.space(this.namespace); } /** The name of this protocol. */ @@ -314,19 +314,30 @@ public String getDoc() { /** The types of this protocol. */ public Collection getTypes() { - return types.values(); + return context.resolveAllSchemas(); + } + + /** @deprecated can return invalid schemata: do NOT use! */ + @Deprecated + public Collection getUnresolvedTypes() { + return context.typesByName().values(); } /** Returns the named type. */ public Schema getType(String name) { - return types.get(name); + Schema namedSchema = null; + if (!name.contains(".")) { + namedSchema = context.getNamedSchema(namespace + "." + name); + } + return namedSchema != null ? namedSchema : context.getNamedSchema(name); } /** Set the types of this protocol. */ public void setTypes(Collection newTypes) { - types = new Schema.Names(); + context = new ParseContext(); for (Schema s : newTypes) - types.add(s); + context.put(s); + context.commit(); } /** The messages of this protocol. */ @@ -349,12 +360,12 @@ public Message createMessage(Message m, Schema request) { } /** Create a one-way message. */ - public Message createMessage(String name, String doc, JsonProperties propMap, Schema request) { + public Message createMessage(String name, String doc, JsonProperties propMap, Schema request) { return new Message(name, doc, propMap, request); } /** Create a one-way message. */ - public Message createMessage(String name, String doc, Map propMap, Schema request) { + public Message createMessage(String name, String doc, Map propMap, Schema request) { return new Message(name, doc, propMap, request); } @@ -373,13 +384,13 @@ public Message createMessage(Message m, Schema request, Schema response, Schema } /** Create a two-way message. */ - public Message createMessage(String name, String doc, JsonProperties propMap, Schema request, Schema response, + public Message createMessage(String name, String doc, JsonProperties propMap, Schema request, Schema response, Schema errors) { return new TwoWayMessage(name, doc, propMap, request, response, errors); } /** Create a two-way message. */ - public Message createMessage(String name, String doc, Map propMap, Schema request, Schema response, + public Message createMessage(String name, String doc, Map propMap, Schema request, Schema response, Schema errors) { return new TwoWayMessage(name, doc, propMap, request, response, errors); } @@ -392,13 +403,13 @@ public boolean equals(Object o) { return false; Protocol that = (Protocol) o; return Objects.equals(this.name, that.name) && Objects.equals(this.namespace, that.namespace) - && Objects.equals(this.types, that.types) && Objects.equals(this.messages, that.messages) - && this.propsEqual(that); + && Objects.equals(this.context.resolveAllSchemas(), that.context.resolveAllSchemas()) + && Objects.equals(this.messages, that.messages) && this.propsEqual(that); } @Override public int hashCode() { - return 31 * Objects.hash(name, namespace, types, messages) + propsHashCode(); + return 31 * Objects.hash(name, namespace, context, messages) + propsHashCode(); } /** Render this as JSON. */ @@ -427,8 +438,6 @@ public String toString(boolean pretty) { } void toJson(JsonGenerator gen) throws IOException { - types.space(namespace); - gen.writeStartObject(); gen.writeStringField("protocol", name); if (namespace != null) { @@ -439,16 +448,16 @@ void toJson(JsonGenerator gen) throws IOException { gen.writeStringField("doc", doc); writeProps(gen); gen.writeArrayFieldStart("types"); - Schema.Names resolved = new Schema.Names(namespace); - for (Schema type : types.values()) - if (!resolved.contains(type)) - type.toJson(resolved, gen); + Set knownNames = new HashSet<>(); + for (Schema type : context.resolveAllSchemas()) + if (!knownNames.contains(type.getFullName())) + type.toJson(knownNames, namespace, gen); gen.writeEndArray(); gen.writeObjectFieldStart("messages"); for (Map.Entry e : messages.entrySet()) { gen.writeFieldName(e.getKey()); - e.getValue().toJson(gen); + e.getValue().toJson(knownNames, gen); } gen.writeEndObject(); gen.writeEndObject(); @@ -510,6 +519,27 @@ private void parse(JsonNode json) { parseMessages(json); parseDoc(json); parseProps(json); + + context.commit(); + context.resolveAllSchemas(); + resolveMessageSchemata(); + } + + private void resolveMessageSchemata() { + for (Map.Entry entry : messages.entrySet()) { + Message oldValue = entry.getValue(); + Message newValue; + if (oldValue.isOneWay()) { + newValue = createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, + context.resolve(oldValue.getRequest())); + } else { + Schema request = context.resolve(oldValue.getRequest()); + Schema response = context.resolve(oldValue.getResponse()); + Schema errors = context.resolve(oldValue.getErrors()); + newValue = createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, request, response, errors); + } + entry.setValue(newValue); + } } private void parseNameAndNamespace(JsonNode json) { @@ -544,11 +574,7 @@ private void parseTypes(JsonNode json) { for (JsonNode type : defs) { if (!type.isObject()) throw new SchemaParseException("Type not an object: " + type); - Schema.parseNamesDeclared(type, types, types.space()); - - } - for (JsonNode type : defs) { - Schema.parseCompleteSchema(type, types, types.space()); + Schema.parse(type, context, namespace); } } @@ -596,8 +622,8 @@ private Message parseMessage(String messageName, JsonNode json) { JsonNode fieldDocNode = field.get("doc"); if (fieldDocNode != null) fieldDoc = fieldDocNode.textValue(); - Field newField = new Field(name, Schema.parse(fieldTypeNode, types), fieldDoc, field.get("default"), true, - Order.ASCENDING); + Field newField = new Field(name, Schema.parse(fieldTypeNode, context, namespace), fieldDoc, field.get("default"), + true, Order.ASCENDING); Set aliases = Schema.parseAliases(field); if (aliases != null) { // add aliases for (String alias : aliases) @@ -612,7 +638,7 @@ private Message parseMessage(String messageName, JsonNode json) { } fields.add(newField); } - Schema request = Schema.createRecord(fields); + Schema request = Schema.createRecord(null, null, null, false, fields); boolean oneWay = false; JsonNode oneWayNode = json.get("one-way"); @@ -631,12 +657,12 @@ private Message parseMessage(String messageName, JsonNode json) { if (oneWay) { if (decls != null) throw new SchemaParseException("one-way can't have errors: " + json); - if (responseNode != null && Schema.parse(responseNode, types).getType() != Schema.Type.NULL) + if (responseNode != null && Schema.parse(responseNode, context, namespace).getType() != Schema.Type.NULL) throw new SchemaParseException("One way response must be null: " + json); return new Message(messageName, doc, mProps, request); } - Schema response = Schema.parse(responseNode, types); + Schema response = Schema.parse(responseNode, context, namespace); List errs = new ArrayList<>(); errs.add(SYSTEM_ERROR); // every method can throw @@ -645,7 +671,7 @@ private Message parseMessage(String messageName, JsonNode json) { throw new SchemaParseException("Errors not an array: " + json); for (JsonNode decl : decls) { String name = decl.textValue(); - Schema schema = this.types.get(name); + Schema schema = this.context.find(name, namespace); if (schema == null) throw new SchemaParseException("Undefined error: " + name); if (!schema.isError()) @@ -660,5 +686,4 @@ private Message parseMessage(String messageName, JsonNode json) { public static void main(String[] args) throws Exception { System.out.println(Protocol.parse(new File(args[0]))); } - } diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java b/lang/java/avro/src/main/java/org/apache/avro/Schema.java index 5e62bd1100b..e38e31d4072 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -55,7 +54,6 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.apache.avro.LogicalType.LOGICAL_TYPE_PROP; @@ -80,9 +78,9 @@ *

  • null. * * - * A schema can be constructed using one of its static createXXX - * methods, or more conveniently using {@link SchemaBuilder}. The schema objects - * are logically immutable. There are only two mutating methods - + * Construct a schema using one of its static createXXX methods, or + * more conveniently using {@link SchemaBuilder}. The schema objects are + * logically immutable. There are only two mutating methods - * {@link #setFields(List)} and {@link #addProp(String, String)}. The following * restrictions apply on these two methods. *
      @@ -93,6 +91,7 @@ * property. *
    */ +@SuppressWarnings("unused") public abstract class Schema extends JsonProperties implements Serializable { private static final long serialVersionUID = 1L; @@ -125,20 +124,20 @@ private Object readResolve() { FACTORY.setCodec(MAPPER); } - /** The type of a schema. */ + /** The type of schema. */ public enum Type { RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL; private final String name; - private Type() { + Type() { this.name = this.name().toLowerCase(Locale.ENGLISH); } public String getName() { return name; } - }; + } private final Type type; private LogicalType logicalType = null; @@ -206,9 +205,9 @@ void setLogicalType(LogicalType logicalType) { * Create an anonymous record schema. * * @deprecated This method allows to create Schema objects that cannot be parsed - * by {@link Schema.Parser#parse(String)}. It will be removed in a - * future version of Avro. Better use - * i{@link #createRecord(String, String, String, boolean, List)} to + * by {@link SchemaParser#parse(CharSequence)}. It will be removed + * in a future version of Avro. Better use + * {@link #createRecord(String, String, String, boolean, List)} to * produce a fully qualified Schema. */ @Deprecated @@ -273,7 +272,7 @@ public Type getType() { * fieldName. If there is no field by that name, a null is * returned. */ - public Field getField(String fieldname) { + public Field getField(String fieldName) { throw new AvroRuntimeException("Not a record: " + this); } @@ -406,7 +405,7 @@ public String toString() { * @param pretty if true, pretty-print JSON. */ public String toString(boolean pretty) { - return toString(new Names(), pretty); + return toString(new HashSet(), pretty); } /** @@ -419,22 +418,22 @@ public String toString(boolean pretty) { // Use at your own risk. This method should be removed with AVRO-2832. @Deprecated public String toString(Collection referencedSchemas, boolean pretty) { - Schema.Names names = new Schema.Names(); + Set knownNames = new HashSet<>(); if (referencedSchemas != null) { for (Schema s : referencedSchemas) { - names.add(s); + knownNames.add(s.getFullName()); } } - return toString(names, pretty); + return toString(knownNames, pretty); } - String toString(Names names, boolean pretty) { + String toString(Set knownNames, boolean pretty) { try { StringWriter writer = new StringWriter(); JsonGenerator gen = FACTORY.createGenerator(writer); if (pretty) gen.useDefaultPrettyPrinter(); - toJson(names, gen); + toJson(knownNames, null, gen); gen.flush(); return writer.toString(); } catch (IOException e) { @@ -442,7 +441,7 @@ String toString(Names names, boolean pretty) { } } - void toJson(Names names, JsonGenerator gen) throws IOException { + void toJson(Set knownNames, String namespace, JsonGenerator gen) throws IOException { if (!hasProps()) { // no props defined gen.writeString(getName()); // just write name } else { @@ -453,7 +452,7 @@ void toJson(Names names, JsonGenerator gen) throws IOException { } } - void fieldsToJson(Names names, JsonGenerator gen) throws IOException { + void fieldsToJson(Set knownNames, String namespace, JsonGenerator gen) throws IOException { throw new AvroRuntimeException("Not a record: " + this); } @@ -487,12 +486,12 @@ final boolean equalCachedHash(Schema other) { private static final Set FIELD_RESERVED = Collections .unmodifiableSet(new HashSet<>(Arrays.asList("default", "doc", "name", "order", "type", "aliases"))); - /** Returns true if this record is an union type. */ + /** Returns true if this record is a union type. */ public boolean isUnion() { return this instanceof UnionSchema; } - /** Returns true if this record is an union type containing null. */ + /** Returns true if this record is a union type containing null. */ public boolean isNullable() { if (!isUnion()) { return getType().equals(Schema.Type.NULL); @@ -581,14 +580,14 @@ public Field(Field field, Schema schema) { * */ public Field(String name, Schema schema) { - this(name, schema, (String) null, (JsonNode) null, true, Order.ASCENDING); + this(name, schema, null, null, true, Order.ASCENDING); } /** * */ public Field(String name, Schema schema, String doc) { - this(name, schema, doc, (JsonNode) null, true, Order.ASCENDING); + this(name, schema, doc, null, true, Order.ASCENDING); } /** @@ -613,7 +612,7 @@ public Field(String name, Schema schema, String doc, Object defaultValue, Order public String name() { return name; - }; + } /** The position of this field within the record. */ public int pos() { @@ -698,95 +697,6 @@ private boolean defaultValueEquals(JsonNode thatDefaultValue) { public String toString() { return name + " type:" + schema.type + " pos:" + position; } - - /** - * Parse field. - * - * @param field : json field definition. - * @param names : names map. - * @param namespace : current working namespace. - * @return field. - */ - static Field parse(JsonNode field, Names names, String namespace) { - String fieldName = getRequiredText(field, "name", "No field name"); - String fieldDoc = getOptionalText(field, "doc"); - JsonNode fieldTypeNode = field.get("type"); - if (fieldTypeNode == null) { - throw new SchemaParseException("No field type: " + field); - } - - Schema fieldSchema = null; - if (fieldTypeNode.isTextual()) { - Schema schemaField = names.get(fieldTypeNode.textValue()); - if (schemaField == null) { - schemaField = names.get(namespace + "." + fieldTypeNode.textValue()); - } - if (schemaField == null) { - throw new SchemaParseException(fieldTypeNode + " is not a defined name." + " The type of the \"" + fieldName - + "\" field must be a defined name or a {\"type\": ...} expression."); - } - fieldSchema = schemaField; - } else if (fieldTypeNode.isObject()) { - fieldSchema = resolveSchema(fieldTypeNode, names, namespace); - if (fieldSchema == null) { - fieldSchema = Schema.parseCompleteSchema(fieldTypeNode, names, namespace); - } - } else if (fieldTypeNode.isArray()) { - List unionTypes = new ArrayList<>(); - - fieldTypeNode.forEach((JsonNode node) -> { - Schema subSchema = null; - if (node.isTextual()) { - subSchema = names.get(node.asText()); - if (subSchema == null) { - subSchema = names.get(namespace + "." + node.asText()); - } - } else if (node.isObject()) { - subSchema = Schema.parseCompleteSchema(node, names, namespace); - } else { - throw new SchemaParseException("Illegal type in union : " + node); - } - if (subSchema == null) { - throw new SchemaParseException("Null element in union : " + node); - } - unionTypes.add(subSchema); - }); - - fieldSchema = Schema.createUnion(unionTypes); - } - - if (fieldSchema == null) { - throw new SchemaParseException("Can't find type for field " + fieldName); - } - Field.Order order = Field.Order.ASCENDING; - JsonNode orderNode = field.get("order"); - if (orderNode != null) - order = Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH)); - JsonNode defaultValue = field.get("default"); - - if (defaultValue != null - && (Type.FLOAT.equals(fieldSchema.getType()) || Type.DOUBLE.equals(fieldSchema.getType())) - && defaultValue.isTextual()) { - try { - defaultValue = new DoubleNode(Double.valueOf(defaultValue.textValue())); - } catch (NumberFormatException ex) { - throw new SchemaParseException( - "Can't parse number '" + defaultValue.textValue() + "' for field '" + fieldName); - } - } - - Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, order); - Iterator i = field.fieldNames(); - while (i.hasNext()) { // add field props - String prop = i.next(); - if (!FIELD_RESERVED.contains(prop)) - f.addProp(prop, field.get(prop)); - } - f.aliases = parseAliases(field); - - return f; - } - } static class Name { @@ -832,13 +742,13 @@ public String toString() { return full; } - public void writeName(Names names, JsonGenerator gen) throws IOException { + public void writeName(String currentNamespace, JsonGenerator gen) throws IOException { if (name != null) gen.writeStringField("name", name); if (space != null) { - if (!space.equals(names.space())) + if (!space.equals(currentNamespace)) gen.writeStringField("namespace", space); - } else if (names.space() != null) { // null within non-null + } else if (currentNamespace != null) { // null within non-null gen.writeStringField("namespace", ""); } } @@ -849,8 +759,8 @@ public String getQualified(String defaultSpace) { /** * Determine if full name must be written. There are 2 cases for true : - * defaultSpace != from this.space or name is already a Schema.Type (int, array - * ...) + * {@code defaultSpace} != from {@code this.space} or name is already a + * {@code Schema.Type} (int, array, ...) * * @param defaultSpace : default name space. * @return true if full name must be written. @@ -925,22 +835,25 @@ public Set getAliases() { Set result = new LinkedHashSet<>(); if (aliases != null) for (Name alias : aliases) - result.add(alias.full); + if (alias.space == null && name.space != null) + result.add("." + alias.name); + else + result.add(alias.full); return result; } - public boolean writeNameRef(Names names, JsonGenerator gen) throws IOException { - if (this.equals(names.get(name))) { - gen.writeString(name.getQualified(names.space())); - return true; - } else if (name.name != null) { - names.put(name, this); + public boolean writeNameRef(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { + if (name.name != null) { + if (!knownNames.add(name.full)) { + gen.writeString(name.getQualified(currentNamespace)); + return true; + } } return false; } - public void writeName(Names names, JsonGenerator gen) throws IOException { - name.writeName(names, gen); + public void writeName(String currentNamespace, JsonGenerator gen) throws IOException { + name.writeName(currentNamespace, gen); } public boolean equalNames(NamedSchema that) { @@ -969,8 +882,8 @@ public void aliasesToJson(JsonGenerator gen) throws IOException { * and need to watch for recursion. */ public static class SeenPair { - private Object s1; - private Object s2; + private final Object s1; + private final Object s2; public SeenPair(Object s1, Object s2) { this.s1 = s1; @@ -992,7 +905,6 @@ public int hashCode() { private static final ThreadLocal> SEEN_EQUALS = ThreadLocalWithInitial.of(HashSet::new); private static final ThreadLocal> SEEN_HASHCODE = ThreadLocalWithInitial.of(IdentityHashMap::new); - @SuppressWarnings(value = "unchecked") private static class RecordSchema extends NamedSchema { private List fields; private Map fieldMap; @@ -1015,10 +927,10 @@ public boolean isError() { } @Override - public Field getField(String fieldname) { + public Field getField(String fieldName) { if (fieldMap == null) throw new AvroRuntimeException("Schema fields not set yet"); - return fieldMap.get(fieldname); + return fieldMap.get(fieldName); } @Override @@ -1070,7 +982,7 @@ public boolean equals(Object o) { return false; if (!propsEqual(that)) return false; - Set seen = SEEN_EQUALS.get(); + Set seen = SEEN_EQUALS.get(); SeenPair here = new SeenPair(this, o); if (seen.contains(here)) return true; // prevent stack overflow @@ -1100,36 +1012,34 @@ int computeHash() { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { - if (writeNameRef(names, gen)) + void toJson(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { + if (writeNameRef(knownNames, currentNamespace, gen)) return; - String savedSpace = names.space; // save namespace gen.writeStartObject(); gen.writeStringField("type", isError ? "error" : "record"); - writeName(names, gen); - names.space = name.space; // set default namespace - if (this.getDoc() != null) + writeName(currentNamespace, gen); + if (this.getDoc() != null) { gen.writeStringField("doc", this.getDoc()); + } if (fields != null) { gen.writeFieldName("fields"); - fieldsToJson(names, gen); + fieldsToJson(knownNames, name.space, gen); } writeProps(gen); aliasesToJson(gen); gen.writeEndObject(); - names.space = savedSpace; // restore namespace } @Override - void fieldsToJson(Names names, JsonGenerator gen) throws IOException { + void fieldsToJson(Set knownNames, String namespace, JsonGenerator gen) throws IOException { gen.writeStartArray(); for (Field f : fields) { gen.writeStartObject(); gen.writeStringField("name", f.name()); gen.writeFieldName("type"); - f.schema().toJson(names, gen); + f.schema().toJson(knownNames, namespace, gen); if (f.doc() != null) gen.writeStringField("doc", f.doc()); if (f.hasDefaultValue()) { @@ -1138,7 +1048,7 @@ void fieldsToJson(Names names, JsonGenerator gen) throws IOException { } if (f.order() != Field.Order.ASCENDING) gen.writeStringField("order", f.order().name); - if (f.aliases != null && f.aliases.size() != 0) { + if (f.aliases != null && !f.aliases.isEmpty()) { gen.writeFieldName("aliases"); gen.writeStartArray(); for (String alias : f.aliases) @@ -1210,12 +1120,12 @@ int computeHash() { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { - if (writeNameRef(names, gen)) + void toJson(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { + if (writeNameRef(knownNames, currentNamespace, gen)) return; gen.writeStartObject(); gen.writeStringField("type", "enum"); - writeName(names, gen); + writeName(currentNamespace, gen); if (getDoc() != null) gen.writeStringField("doc", getDoc()); gen.writeArrayFieldStart("symbols"); @@ -1259,11 +1169,11 @@ int computeHash() { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { + void toJson(Set knownNames, String namespace, JsonGenerator gen) throws IOException { gen.writeStartObject(); gen.writeStringField("type", "array"); gen.writeFieldName("items"); - elementType.toJson(names, gen); + elementType.toJson(knownNames, namespace, gen); writeProps(gen); gen.writeEndObject(); } @@ -1298,11 +1208,11 @@ int computeHash() { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { + void toJson(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { gen.writeStartObject(); gen.writeStringField("type", "map"); gen.writeFieldName("values"); - valueType.toJson(names, gen); + valueType.toJson(knownNames, currentNamespace, gen); writeProps(gen); gen.writeEndObject(); } @@ -1375,10 +1285,10 @@ public void addProp(String name, String value) { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { + void toJson(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { gen.writeStartArray(); for (Schema type : types) - type.toJson(names, gen); + type.toJson(knownNames, currentNamespace, gen); gen.writeEndArray(); } @@ -1419,12 +1329,12 @@ int computeHash() { } @Override - void toJson(Names names, JsonGenerator gen) throws IOException { - if (writeNameRef(names, gen)) + void toJson(Set knownNames, String currentNamespace, JsonGenerator gen) throws IOException { + if (writeNameRef(knownNames, currentNamespace, gen)) return; gen.writeStartObject(); gen.writeStringField("type", "fixed"); - writeName(names, gen); + writeName(currentNamespace, gen); if (getDoc() != null) gen.writeStringField("doc", getDoc()); gen.writeNumberField("size", size); @@ -1488,7 +1398,7 @@ public NullSchema() { * may refer to it by name. */ public static class Parser { - private Names names = new Names(); + final ParseContext context; private final NameValidator validate; private boolean validateDefaults = true; @@ -1498,11 +1408,19 @@ public Parser() { public Parser(final NameValidator validate) { this.validate = validate != null ? validate : NameValidator.NO_VALIDATION; + context = new ParseContext(this.validate); + } + + public Parser(final ParseContext context) { + this.validate = context.nameValidator; + this.context = context; } /** * Adds the provided types to the set of defined, named types known to this - * parser. deprecated: use addTypes(Iterable types) + * parser. + * + * @deprecated use addTypes(Iterable types) */ @Deprecated public Parser addTypes(Map types) { @@ -1515,16 +1433,13 @@ public Parser addTypes(Map types) { */ public Parser addTypes(Iterable types) { for (Schema s : types) - names.add(s); + context.put(s); return this; } /** Returns the set of defined, named types known to this parser. */ public Map getTypes() { - Map result = new LinkedHashMap<>(); - for (Schema s : names.values()) - result.put(s.getFullName(), s); - return result; + return context.typesByName(); } /** Enable or disable default value validation. */ @@ -1543,21 +1458,7 @@ public boolean getValidateDefaults() { * names known to this parser. */ public Schema parse(File file) throws IOException { - return parse(FACTORY.createParser(file), false); - } - - public List parse(Iterable sources) throws IOException { - final List schemas = new ArrayList<>(); - for (File source : sources) { - final Schema emptySchema = parseNamesDeclared(FACTORY.createParser(source)); - schemas.add(emptySchema); - } - - for (File source : sources) { - parseFieldsOnly(FACTORY.createParser(source)); - } - - return schemas; + return parse(FACTORY.createParser(file), false, true); } /** @@ -1565,7 +1466,8 @@ public List parse(Iterable sources) throws IOException { * names known to this parser. The input stream stays open after the parsing. */ public Schema parse(InputStream in) throws IOException { - return parse(FACTORY.createParser(in).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE), true); + JsonParser parser = FACTORY.createParser(in).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE); + return parse(parser, true, true); } /** Read a schema from one or more json strings */ @@ -1582,43 +1484,41 @@ public Schema parse(String s, String... more) { */ public Schema parse(String s) { try { - return parse(FACTORY.createParser(s), false); + return parse(FACTORY.createParser(s), false, true); } catch (IOException e) { throw new SchemaParseException(e); } } - private static interface ParseFunction { - Schema parse(JsonNode node) throws IOException; + public Schema parseInternal(String s) { + try { + return parse(FACTORY.createParser(s), false, false); + } catch (IOException e) { + throw new SchemaParseException(e); + } } - private Schema runParser(JsonParser parser, ParseFunction f) throws IOException { - NameValidator saved = validateNames.get(); + private Schema parse(JsonParser parser, boolean allowDanglingContent, boolean resolveSchema) throws IOException { + NameValidator saved = VALIDATE_NAMES.get(); boolean savedValidateDefaults = VALIDATE_DEFAULTS.get(); try { - validateNames.set(validate); + // This ensured we're using the same validation as the ParseContext. + // This is most relevant for field names. + VALIDATE_NAMES.set(validate); VALIDATE_DEFAULTS.set(validateDefaults); JsonNode jsonNode = MAPPER.readTree(parser); - return f.parse(jsonNode); - } catch (JsonParseException e) { - throw new SchemaParseException(e); - } finally { - parser.close(); - validateNames.set(saved); - VALIDATE_DEFAULTS.set(savedValidateDefaults); - } - } - - private Schema parse(JsonParser parser, final boolean allowDanglingContent) throws IOException { - return this.runParser(parser, (JsonNode jsonNode) -> { - Schema schema = Schema.parse(jsonNode, names); + Schema schema = Schema.parse(jsonNode, context, null); + if (resolveSchema) { + context.commit(); + schema = context.resolve(schema); + } if (!allowDanglingContent) { String dangling; StringWriter danglingWriter = new StringWriter(); int numCharsReleased = parser.releaseBuffered(danglingWriter); if (numCharsReleased == -1) { ByteArrayOutputStream danglingOutputStream = new ByteArrayOutputStream(); - parser.releaseBuffered(danglingOutputStream); // if input isnt chars above it must be bytes + parser.releaseBuffered(danglingOutputStream); // if input isn't chars above it must be bytes dangling = new String(danglingOutputStream.toByteArray(), StandardCharsets.UTF_8).trim(); } else { dangling = danglingWriter.toString().trim(); @@ -1628,17 +1528,14 @@ private Schema parse(JsonParser parser, final boolean allowDanglingContent) thro } } return schema; - }); - } - - private Schema parseNamesDeclared(JsonParser parser) throws IOException { - return this.runParser(parser, (JsonNode jsonNode) -> Schema.parseNamesDeclared(jsonNode, names, names.space)); - } - - private Schema parseFieldsOnly(JsonParser parser) throws IOException { - return this.runParser(parser, (JsonNode jsonNode) -> Schema.parseCompleteSchema(jsonNode, names, names.space)); + } catch (JsonParseException e) { + throw new SchemaParseException(e); + } finally { + parser.close(); + VALIDATE_NAMES.set(saved); + VALIDATE_DEFAULTS.set(savedValidateDefaults); + } } - } /** @@ -1647,9 +1544,9 @@ private Schema parseFieldsOnly(JsonParser parser) throws IOException { * * @param file The file to read the schema from. * @return The freshly built Schema. - * @throws IOException if there was trouble reading the contents or they are + * @throws IOException if there was trouble reading the contents, or they are * invalid - * @deprecated use {@link Schema.Parser} instead. + * @deprecated use {@link SchemaParser} instead. */ @Deprecated public static Schema parse(File file) throws IOException { @@ -1662,9 +1559,9 @@ public static Schema parse(File file) throws IOException { * * @param in The input stream to read the schema from. * @return The freshly built Schema. - * @throws IOException if there was trouble reading the contents or they are + * @throws IOException if there was trouble reading the contents, or they are * invalid - * @deprecated use {@link Schema.Parser} instead. + * @deprecated use {@link SchemaParser} instead. */ @Deprecated public static Schema parse(InputStream in) throws IOException { @@ -1674,7 +1571,7 @@ public static Schema parse(InputStream in) throws IOException { /** * Construct a schema from JSON text. * - * @deprecated use {@link Schema.Parser} instead. + * @deprecated use {@link SchemaParser} instead. */ @Deprecated public static Schema parse(String jsonSchema) { @@ -1685,7 +1582,7 @@ public static Schema parse(String jsonSchema) { * Construct a schema from JSON text. * * @param validate true if names should be validated, false if not. - * @deprecated use {@link Schema.Parser} instead. + * @deprecated use {@link SchemaParser} instead. */ @Deprecated public static Schema parse(String jsonSchema, boolean validate) { @@ -1759,19 +1656,31 @@ public Schema put(Name name, Schema schema) { } } - private static ThreadLocal validateNames = ThreadLocalWithInitial + private static final ThreadLocal VALIDATE_NAMES = ThreadLocalWithInitial .of(() -> NameValidator.UTF_VALIDATOR); private static String validateName(String name) { - NameValidator.Result result = validateNames.get().validate(name); + NameValidator.Result result = VALIDATE_NAMES.get().validate(name); if (!result.isOK()) { throw new SchemaParseException(result.getErrors()); } return name; } + /* + * @deprecated Scheduled for removal. Do Not Use! + */ + @Deprecated public static void setNameValidator(final NameValidator validator) { - Schema.validateNames.set(validator); + Schema.VALIDATE_NAMES.set(validator); + } + + /* + * @deprecated Scheduled for removal. Do Not Use! + */ + @Deprecated + public static NameValidator getNameValidator() { + return Schema.VALIDATE_NAMES.get(); } private static final ThreadLocal VALIDATE_DEFAULTS = ThreadLocalWithInitial.of(() -> true); @@ -1784,6 +1693,22 @@ private static JsonNode validateDefault(String fieldName, Schema schema, JsonNod return defaultValue; } + /* + * @deprecated Scheduled for removal. Do Not Use! + */ + @Deprecated + public static void setValidateDefaults(boolean validateDefaults) { + Schema.VALIDATE_DEFAULTS.set(validateDefaults); + } + + /* + * @deprecated Scheduled for removal. Do Not Use! + */ + @Deprecated + public static boolean getValidateDefaults() { + return Schema.VALIDATE_DEFAULTS.get(); + } + /** * Checks if a JSON value matches the schema. * @@ -1867,267 +1792,187 @@ private static boolean isValidValue(Schema schema, JsonNode value) { } } - /** - * Parse named schema in order to fill names map. This method does not parse - * field of record/error schema. - * - * @param schema : json schema representation. - * @param names : map of named schema. - * @param currentNameSpace : current working name space. - * @return schema. - */ - static Schema parseNamesDeclared(JsonNode schema, Names names, String currentNameSpace) { + /** @see #parse(String) */ + static Schema parse(JsonNode schema, ParseContext context, String currentNameSpace) { if (schema == null) { - return null; - } - if (schema.isObject()) { - - String type = Schema.getOptionalText(schema, "type"); - Name name = null; - - String doc = null; - Schema result = null; + throw new SchemaParseException("Cannot parse schema"); + } else if (schema.isTextual()) { // name + return context.find(schema.textValue(), currentNameSpace); + } else if (schema.isObject()) { + String type = getRequiredText(schema, "type", "No type"); final boolean isTypeError = "error".equals(type); - final boolean isTypeRecord = "record".equals(type); - final boolean isTypeEnum = "enum".equals(type); - final boolean isTypeFixed = "fixed".equals(type); - - if (isTypeRecord || isTypeError || isTypeEnum || isTypeFixed) { - String space = getOptionalText(schema, "namespace"); - doc = getOptionalText(schema, "doc"); - if (space == null) - space = currentNameSpace; - name = new Name(getRequiredText(schema, "name", "No name in schema"), space); - } - if (isTypeRecord || isTypeError) { // record - result = new RecordSchema(name, doc, isTypeError); - names.add(result); - JsonNode fieldsNode = schema.get("fields"); - - if (fieldsNode == null || !fieldsNode.isArray()) - throw new SchemaParseException("Record has no fields: " + schema); - exploreFields(fieldsNode, names, name != null ? name.space : null); - - } else if (isTypeEnum) { // enum - JsonNode symbolsNode = schema.get("symbols"); - if (symbolsNode == null || !symbolsNode.isArray()) - throw new SchemaParseException("Enum has no symbols: " + schema); - LockableArrayList symbols = new LockableArrayList<>(symbolsNode.size()); - for (JsonNode n : symbolsNode) - symbols.add(n.textValue()); - JsonNode enumDefault = schema.get("default"); - String defaultSymbol = null; - if (enumDefault != null) - defaultSymbol = enumDefault.textValue(); - result = new EnumSchema(name, doc, symbols, defaultSymbol); - names.add(result); + if (PRIMITIVES.containsKey(type)) { // primitive + return parsePrimitive(schema, type); + } else if ("record".equals(type) || isTypeError) { // record + return parseRecord(schema, context, currentNameSpace, isTypeError); + } else if ("enum".equals(type)) { // enum + return parseEnum(schema, context, currentNameSpace); } else if (type.equals("array")) { // array - JsonNode itemsNode = schema.get("items"); - if (itemsNode == null) - throw new SchemaParseException("Array has no items type: " + schema); - final Schema items = Schema.parseNamesDeclared(itemsNode, names, currentNameSpace); - result = Schema.createArray(items); + return parseArray(schema, context, currentNameSpace); } else if (type.equals("map")) { // map - JsonNode valuesNode = schema.get("values"); - if (valuesNode == null) - throw new SchemaParseException("Map has no values type: " + schema); - final Schema values = Schema.parseNamesDeclared(valuesNode, names, currentNameSpace); - result = Schema.createMap(values); - } else if (isTypeFixed) { // fixed - JsonNode sizeNode = schema.get("size"); - if (sizeNode == null || !sizeNode.isInt()) - throw new SchemaParseException("Invalid or no size: " + schema); - result = new FixedSchema(name, doc, sizeNode.intValue()); - if (name != null) - names.add(result); - } else if (PRIMITIVES.containsKey(type)) { - result = Schema.create(PRIMITIVES.get(type)); - } - if (result != null) { - Set reserved = SCHEMA_RESERVED; - if (isTypeEnum) { - reserved = ENUM_RESERVED; - } - Schema.addProperties(schema, reserved, result); + return parseMap(schema, context, currentNameSpace); + } else if ("fixed".equals(type)) { // fixed + return parseFixed(schema, context, currentNameSpace); + } else { // For unions with self reference + return context.find(type, currentNameSpace); } - return result; - } else if (schema.isArray()) { - List subs = new ArrayList<>(schema.size()); - schema.forEach((JsonNode item) -> { - Schema sub = Schema.parseNamesDeclared(item, names, currentNameSpace); - if (sub != null) { - subs.add(sub); - } - }); - return Schema.createUnion(subs); - } else if (schema.isTextual()) { - String value = schema.asText(); - return names.get(value); + } else if (schema.isArray()) { // union + return parseUnion(schema, context, currentNameSpace); + } else { + throw new SchemaParseException("Schema not yet supported: " + schema); } - return null; } - private static void addProperties(JsonNode schema, Set reserved, Schema avroSchema) { - Iterator i = schema.fieldNames(); - while (i.hasNext()) { // add properties - String prop = i.next(); - if (!reserved.contains(prop)) // ignore reserved - avroSchema.addProp(prop, schema.get(prop)); - } - // parse logical type if present - avroSchema.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(avroSchema); - // names.space(savedSpace); // restore space - if (avroSchema instanceof NamedSchema) { - Set aliases = parseAliases(schema); - if (aliases != null) // add aliases - for (String alias : aliases) - avroSchema.addAlias(alias); - } + private static Schema parsePrimitive(JsonNode schema, String type) { + Schema result = create(PRIMITIVES.get(type)); + parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED); + return result; } - /** - * Explore record fields in order to fill names map with inner defined named - * types. - * - * @param fieldsNode : json node for field. - * @param names : names map. - * @param nameSpace : current working namespace. - */ - private static void exploreFields(JsonNode fieldsNode, Names names, String nameSpace) { + private static Schema parseRecord(JsonNode schema, ParseContext context, String currentNameSpace, + boolean isTypeError) { + Name name = parseName(schema, currentNameSpace); + String doc = parseDoc(schema); + Schema result = new RecordSchema(name, doc, isTypeError); + context.put(result); + + JsonNode fieldsNode = schema.get("fields"); + if (fieldsNode == null || !fieldsNode.isArray()) + throw new SchemaParseException("Record has no fields: " + schema); + List fields = new ArrayList<>(); for (JsonNode field : fieldsNode) { - final JsonNode fieldType = field.get("type"); - if (fieldType != null) { - if (fieldType.isObject()) { - parseNamesDeclared(fieldType, names, nameSpace); - } else if (fieldType.isArray()) { - exploreFields(fieldType, names, nameSpace); - } else if (fieldType.isTextual() && field.isObject()) { - parseNamesDeclared(field, names, nameSpace); - } - } + Field f = parseField(field, context, name.space); + fields.add(f); + if (f.schema().getLogicalType() == null && getOptionalText(field, LOGICAL_TYPE_PROP) != null) + LOG.warn( + "Ignored the {}.{}.logicalType property (\"{}\"). It should probably be nested inside the \"type\" for the field.", + name, f.name(), getOptionalText(field, "logicalType")); } + result.setFields(fields); + parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED); + parseAliases(schema, result); + return result; } - /** - * in complement of parseNamesDeclared, this method parse schema in details. - * - * @param schema : json schema. - * @param names : names map. - * @param currentSpace : current working name space. - * @return complete schema. - */ - static Schema parseCompleteSchema(JsonNode schema, Names names, String currentSpace) { - if (schema == null) { - throw new SchemaParseException("Cannot parse schema"); - } - if (schema.isTextual()) { - String type = schema.asText(); - Schema avroSchema = names.get(type); - if (avroSchema == null) { - avroSchema = names.get(currentSpace + "." + type); - } - return avroSchema; - } - if (schema.isArray()) { - List schemas = StreamSupport.stream(schema.spliterator(), false) - .map((JsonNode sub) -> parseCompleteSchema(sub, names, currentSpace)).collect(Collectors.toList()); - return Schema.createUnion(schemas); - } - if (schema.isObject()) { - Schema result = null; - String type = getRequiredText(schema, "type", "No type"); - Name name = null; + private static Field parseField(JsonNode field, ParseContext context, String namespace) { + String fieldName = getRequiredText(field, "name", "No field name"); + String fieldDoc = parseDoc(field); + JsonNode fieldTypeNode = field.get("type"); + if (fieldTypeNode == null) + throw new SchemaParseException("No field type: " + field); + Schema fieldSchema = parse(fieldTypeNode, context, namespace); - final boolean isTypeError = "error".equals(type); - final boolean isTypeRecord = "record".equals(type); - final boolean isTypeArray = "array".equals(type); + Field.Order order = Field.Order.ASCENDING; + JsonNode orderNode = field.get("order"); + if (orderNode != null) + order = Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH)); - if (isTypeRecord || isTypeError || "enum".equals(type) || "fixed".equals(type)) { - // named schema - String space = getOptionalText(schema, "namespace"); + JsonNode defaultValue = field.get("default"); + if (defaultValue != null && (Type.FLOAT.equals(fieldSchema.getType()) || Type.DOUBLE.equals(fieldSchema.getType())) + && defaultValue.isTextual()) + defaultValue = new DoubleNode(Double.parseDouble(defaultValue.textValue())); - if (space == null) - space = currentSpace; - name = new Name(getRequiredText(schema, "name", "No name in schema"), space); + Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, order); + parseProperties(field, f, FIELD_RESERVED); + f.aliases = parseAliases(field); + return f; + } - result = names.get(name); - if (result == null) { - throw new SchemaParseException("Unparsed field type " + name); - } - } - if (isTypeRecord || isTypeError) { - if (result != null && !result.hasFields()) { - final List fields = new ArrayList<>(); - JsonNode fieldsNode = schema.get("fields"); - if (fieldsNode == null || !fieldsNode.isArray()) - throw new SchemaParseException("Record has no fields: " + schema); - - for (JsonNode field : fieldsNode) { - Field f = Field.parse(field, names, name.space); - - fields.add(f); - if (f.schema.getLogicalType() == null && getOptionalText(field, LOGICAL_TYPE_PROP) != null) - LOG.warn( - "Ignored the {}.{}.logicalType property (\"{}\"). It should probably be nested inside the \"type\" for the field.", - name, f.name, getOptionalText(field, "logicalType")); - } - result.setFields(fields); - } - } else if (isTypeArray) { - JsonNode items = schema.get("items"); - Schema schemaItems = parseCompleteSchema(items, names, currentSpace); - result = Schema.createArray(schemaItems); - } else if ("map".equals(type)) { - JsonNode values = schema.get("values"); - Schema mapItems = parseCompleteSchema(values, names, currentSpace); - result = Schema.createMap(mapItems); - } else if (result == null) { - result = names.get(currentSpace + "." + type); - if (result == null) { - result = names.get(type); - } - } + private static Schema parseEnum(JsonNode schema, ParseContext context, String currentNameSpace) { + Name name = parseName(schema, currentNameSpace); + String doc = parseDoc(schema); - Set reserved = SCHEMA_RESERVED; - if ("enum".equals(type)) { - reserved = ENUM_RESERVED; - } - Schema.addProperties(schema, reserved, result); - return result; + JsonNode symbolsNode = schema.get("symbols"); + if (symbolsNode == null || !symbolsNode.isArray()) { + throw new SchemaParseException("Enum has no symbols: " + schema); } - return null; + LockableArrayList symbols = new LockableArrayList<>(symbolsNode.size()); + for (JsonNode n : symbolsNode) + symbols.add(n.textValue()); + JsonNode enumDefault = schema.get("default"); + String defaultSymbol = null; + if (enumDefault != null) { + defaultSymbol = enumDefault.textValue(); + } + + Schema result = new EnumSchema(name, doc, symbols, defaultSymbol); + context.put(result); + parsePropertiesAndLogicalType(schema, result, ENUM_RESERVED); + parseAliases(schema, result); + return result; } - static Schema parse(JsonNode schema, Names names) { - if (schema == null) { - throw new SchemaParseException("Cannot parse schema"); - } + private static Schema parseArray(JsonNode schema, ParseContext context, String currentNameSpace) { + Schema result; + JsonNode itemsNode = schema.get("items"); + if (itemsNode == null) + throw new SchemaParseException("Array has no items type: " + schema); + result = new ArraySchema(parse(itemsNode, context, currentNameSpace)); + parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED); + return result; + } + + private static Schema parseMap(JsonNode schema, ParseContext context, String currentNameSpace) { + Schema result; + JsonNode valuesNode = schema.get("values"); + if (valuesNode == null) + throw new SchemaParseException("Map has no values type: " + schema); + result = new MapSchema(parse(valuesNode, context, currentNameSpace)); + parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED); + return result; + } + + private static Schema parseFixed(JsonNode schema, ParseContext context, String currentNameSpace) { + Name name = parseName(schema, currentNameSpace); + String doc = parseDoc(schema); - Schema result = Schema.parseNamesDeclared(schema, names, names.space); - Schema.parseCompleteSchema(schema, names, names.space); + JsonNode sizeNode = schema.get("size"); + if (sizeNode == null || !sizeNode.isInt()) + throw new SchemaParseException("Invalid or no size: " + schema); + Schema result = new FixedSchema(name, doc, sizeNode.intValue()); + context.put(result); + parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED); + parseAliases(schema, result); return result; } - static Schema resolveSchema(JsonNode schema, Names names, String currentNameSpace) { - String np = currentNameSpace; - String nodeName = getOptionalText(schema, "name"); - if (nodeName != null) { - final JsonNode nameSpace = schema.get("namespace"); - StringBuilder fullName = new StringBuilder(); - if (nameSpace != null && nameSpace.isTextual()) { - fullName.append(nameSpace.asText()).append("."); - np = nameSpace.asText(); - } - fullName.append(nodeName); - Schema schema1 = names.get(fullName.toString()); + private static UnionSchema parseUnion(JsonNode schema, ParseContext context, String currentNameSpace) { + LockableArrayList types = new LockableArrayList<>(schema.size()); + for (JsonNode typeNode : schema) + types.add(parse(typeNode, context, currentNameSpace)); + return new UnionSchema(types); + } - if (schema1 != null && schema1.getType() == Type.RECORD && !schema1.hasFields()) { - Schema.parseCompleteSchema(schema, names, np); - } - return schema1; - } - return null; + private static void parsePropertiesAndLogicalType(JsonNode jsonNode, Schema result, Set propertiesToSkip) { + parseProperties(jsonNode, result, propertiesToSkip); + // parse logical type if present + result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result); + } + + private static void parseProperties(JsonNode schema, JsonProperties result, Set propertiesToSkip) { + schema.fieldNames().forEachRemaining(prop -> { // add properties + if (!propertiesToSkip.contains(prop)) // ignore reserved + result.addProp(prop, schema.get(prop)); + }); + } + + private static Name parseName(JsonNode schema, String currentNameSpace) { + String space = getOptionalText(schema, "namespace"); + if (space == null) + space = currentNameSpace; + return new Name(getRequiredText(schema, "name", "No name in schema"), space); + } + + private static String parseDoc(JsonNode schema) { + return getOptionalText(schema, "doc"); + } + + private static void parseAliases(JsonNode schema, Schema result) { + Set aliases = parseAliases(schema); + if (aliases != null) // add aliases + for (String alias : aliases) + result.addAlias(alias); } static Set parseAliases(JsonNode node) { @@ -2199,13 +2044,14 @@ public static Schema applyAliases(Schema writer, Schema reader) { Map> fieldAliases = new HashMap<>(1); getAliases(reader, seen, aliases, fieldAliases); - if (aliases.size() == 0 && fieldAliases.size() == 0) + if (aliases.isEmpty() && fieldAliases.isEmpty()) return writer; // no aliases seen.clear(); return applyAliases(writer, seen, aliases, fieldAliases); } + @SuppressWarnings("DataFlowIssue") private static Schema applyAliases(Schema s, Map seen, Map aliases, Map> fieldAliases) { @@ -2261,6 +2107,7 @@ private static Schema applyAliases(Schema s, Map seen, Map seen, Map aliases, Map> fieldAliases) { if (schema instanceof NamedSchema) { @@ -2322,10 +2169,11 @@ private static String getFieldAlias(Name record, String field, Maptrue in the lock() method. It's legal to call lock() any number of * times. Any lock() other than the first one is a no-op. * - * This class throws IllegalStateException if a mutating operation is - * performed after being locked. Since modifications through iterator also use + * If a mutating operation is performed after being locked, it throws an + * IllegalStateException. Since modifications through iterator also use * the list's mutating operations, this effectively blocks all modifications. */ + @SuppressWarnings("unused") static class LockableArrayList extends ArrayList { private static final long serialVersionUID = 1L; private boolean locked = false; @@ -2341,6 +2189,7 @@ public LockableArrayList(List types) { super(types); } + @SafeVarargs public LockableArrayList(E... types) { super(types.length); Collections.addAll(this, types); diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java index dfb3c01f353..e3eb2d9ab69 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java +++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java @@ -84,7 +84,7 @@ public SchemaParser() { * available * @see UtfTextUtils */ - public Schema parse(File file) throws IOException, SchemaParseException { + public ParseResult parse(File file) throws IOException, SchemaParseException { return parse(file, null); } @@ -99,7 +99,7 @@ public Schema parse(File file) throws IOException, SchemaParseException { * suppressed underlying parse exceptions if * available */ - public Schema parse(File file, Charset charset) throws IOException, SchemaParseException { + public ParseResult parse(File file, Charset charset) throws IOException, SchemaParseException { return parse(file.toPath(), charset); } @@ -115,7 +115,7 @@ public Schema parse(File file, Charset charset) throws IOException, SchemaParseE * available * @see UtfTextUtils */ - public Schema parse(Path file) throws IOException, SchemaParseException { + public ParseResult parse(Path file) throws IOException, SchemaParseException { return parse(file, null); } @@ -130,7 +130,7 @@ public Schema parse(Path file) throws IOException, SchemaParseException { * suppressed underlying parse exceptions if * available */ - public Schema parse(Path file, Charset charset) throws IOException, SchemaParseException { + public ParseResult parse(Path file, Charset charset) throws IOException, SchemaParseException { URI inputDir = file.getParent().toUri(); try (InputStream stream = Files.newInputStream(file)) { String formattedSchema = UtfTextUtils.readAllBytes(stream, charset); @@ -138,6 +138,24 @@ public Schema parse(Path file, Charset charset) throws IOException, SchemaParseE } } + /** + * Parse an Avro schema from a file written with a specific character set. + * + * @param location the location of the schema resource + * @param charset the character set of the schema resource + * @return the schema + * @throws IOException when the schema cannot be read + * @throws SchemaParseException if parsing the schema failed; contains + * suppressed underlying parse exceptions if + * available + */ + public ParseResult parse(URI location, Charset charset) throws IOException, SchemaParseException { + try (InputStream stream = location.toURL().openStream()) { + String formattedSchema = UtfTextUtils.readAllBytes(stream, charset); + return parse(location, formattedSchema); + } + } + /** * Parse an Avro schema from an input stream. The stream content is assumed to * be UTF-8 text. Note that the stream stays open after reading. @@ -150,7 +168,7 @@ public Schema parse(Path file, Charset charset) throws IOException, SchemaParseE * available * @see UtfTextUtils */ - public Schema parse(InputStream in) throws IOException, SchemaParseException { + public ParseResult parse(InputStream in) throws IOException, SchemaParseException { return parse(in, null); } @@ -166,7 +184,7 @@ public Schema parse(InputStream in) throws IOException, SchemaParseException { * suppressed underlying parse exceptions if * available */ - public Schema parse(InputStream in, Charset charset) throws IOException, SchemaParseException { + public ParseResult parse(InputStream in, Charset charset) throws IOException, SchemaParseException { return parse(UtfTextUtils.readAllBytes(in, charset)); } @@ -180,7 +198,7 @@ public Schema parse(InputStream in, Charset charset) throws IOException, SchemaP * suppressed underlying parse exceptions if * available */ - public Schema parse(Reader in) throws IOException, SchemaParseException { + public ParseResult parse(Reader in) throws IOException, SchemaParseException { return parse(UtfTextUtils.readAllChars(in)); } @@ -193,7 +211,7 @@ public Schema parse(Reader in) throws IOException, SchemaParseException { * suppressed underlying parse exceptions if * available */ - public Schema parse(CharSequence text) throws SchemaParseException { + public ParseResult parse(CharSequence text) throws SchemaParseException { try { return parse(null, text); } catch (IOException e) { @@ -220,15 +238,14 @@ public Schema parse(CharSequence text) throws SchemaParseException { * @throws RuntimeException if thrown by one of the parsers * @throws SchemaParseException when all parsers fail */ - private Schema parse(URI baseUri, CharSequence formattedSchema) throws IOException, SchemaParseException { + private ParseResult parse(URI baseUri, CharSequence formattedSchema) throws IOException, SchemaParseException { List parseExceptions = new ArrayList<>(); for (FormattedSchemaParser formattedSchemaParser : formattedSchemaParsers) { try { Schema schema = formattedSchemaParser.parse(parseContext, baseUri, formattedSchema); - if (parseContext.hasNewSchemas()) { + if (parseContext.hasNewSchemas() || schema != null) { // Parsing succeeded: return the result. - parseContext.commit(); - return schema; + return parseContext.commit(schema); } } catch (SchemaParseException e) { parseContext.rollback(); @@ -246,4 +263,32 @@ private Schema parse(URI baseUri, CharSequence formattedSchema) throws IOExcepti parseExceptions.forEach(parseException::addSuppressed); throw parseException; } + + /** + * Get all parsed schemata. + * + * @return all parsed schemas, in the order they were parsed + */ + public List getParsedNamedSchemas() { + return parseContext.resolveAllSchemas(); + } + + // Temporary method to reduce PR size + @Deprecated + public Schema resolve(ParseResult result) { + return result.mainSchema(); + } + + public interface ParseResult { + /** + * The main schema parsed from a file. Can be any schema, or {@code null} if the + * parsed file has no "main" schema. + */ + Schema mainSchema(); + + /** + * The list of named schemata that were parsed. + */ + List parsedNamedSchemas(); + } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java index c3a25a5e577..83285d371ae 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java +++ b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java @@ -18,26 +18,19 @@ package org.apache.avro.util; import org.apache.avro.AvroTypeException; -import org.apache.avro.JsonProperties; -import org.apache.avro.ParseContext; -import org.apache.avro.Protocol; import org.apache.avro.Schema; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.stream.Collectors; +import static java.util.Objects.requireNonNull; import static org.apache.avro.Schema.Type.ARRAY; import static org.apache.avro.Schema.Type.ENUM; import static org.apache.avro.Schema.Type.FIXED; @@ -47,7 +40,8 @@ /** * Utility class to resolve schemas that are unavailable at the point they are - * referenced in the IDL. + * referenced in a schema file. This class is meant for internal use: use at + * your own risk! */ public final class SchemaResolver { @@ -111,67 +105,6 @@ public static boolean isFullyResolvedSchema(final Schema schema) { } } - /** - * Clone the provided schema while resolving all unreferenced schemas. - * - * @param parseContext the parse context with known names - * @param schema the schema to resolve - * @return a copy of the schema with all schemas resolved - */ - public static Schema resolve(final ParseContext parseContext, Schema schema) { - if (schema == null) { - return null; - } - ResolvingVisitor visitor = new ResolvingVisitor(schema, parseContext::resolve); - return Schemas.visit(schema, visitor); - } - - /** - * Clone all provided schemas while resolving all unreferenced schemas. - * - * @param parseContext the parse context with known names - * @param schemas the schemas to resolve - * @return a copy of all schemas with all schemas resolved - */ - public static Collection resolve(final ParseContext parseContext, Collection schemas) { - ResolvingVisitor visitor = new ResolvingVisitor(null, parseContext::resolve); - return schemas.stream().map(schema -> Schemas.visit(schema, visitor.withRoot(schema))).collect(Collectors.toList()); - } - - /** - * Will clone the provided protocol while resolving all unreferenced schemas - * - * @param parseContext the parse context with known names - * @param protocol the protocol to resolve - * @return a copy of the protocol with all schemas resolved - */ - public static Protocol resolve(ParseContext parseContext, final Protocol protocol) { - // Create an empty copy of the protocol - Protocol result = new Protocol(protocol.getName(), protocol.getDoc(), protocol.getNamespace()); - protocol.getObjectProps().forEach(((JsonProperties) result)::addProp); - - ResolvingVisitor visitor = new ResolvingVisitor(null, parseContext::resolve); - Function resolver = schema -> Schemas.visit(schema, visitor.withRoot(schema)); - - // Resolve all schemata in the protocol. - result.setTypes(protocol.getTypes().stream().map(resolver).collect(Collectors.toList())); - Map resultMessages = result.getMessages(); - protocol.getMessages().forEach((name, oldValue) -> { - Protocol.Message newValue; - if (oldValue.isOneWay()) { - newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, - resolver.apply(oldValue.getRequest())); - } else { - Schema request = resolver.apply(oldValue.getRequest()); - Schema response = resolver.apply(oldValue.getResponse()); - Schema errors = resolver.apply(oldValue.getErrors()); - newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, request, response, errors); - } - resultMessages.put(name, newValue); - }); - return result; - } - /** * This visitor checks if the current schema is fully resolved. */ @@ -216,78 +149,35 @@ public Boolean get() { * This visitor creates clone of the visited Schemata, minus the specified * schema properties, and resolves all unresolved schemas. */ - public static final class ResolvingVisitor implements SchemaVisitor { + public static final class ResolvingVisitor implements SchemaVisitor { private static final Set CONTAINER_SCHEMA_TYPES = EnumSet.of(RECORD, ARRAY, MAP, UNION); private static final Set NAMED_SCHEMA_TYPES = EnumSet.of(RECORD, ENUM, FIXED); private final Function symbolTable; - private final Set schemaPropertiesToRemove; private final IdentityHashMap replace; - private final Schema root; - - public ResolvingVisitor(final Schema root, final Function symbolTable, - String... schemaPropertiesToRemove) { - this(root, symbolTable, new HashSet<>(Arrays.asList(schemaPropertiesToRemove))); - } - - public ResolvingVisitor(final Schema root, final Function symbolTable, - Set schemaPropertiesToRemove) { + public ResolvingVisitor(final Function symbolTable) { this.replace = new IdentityHashMap<>(); this.symbolTable = symbolTable; - this.schemaPropertiesToRemove = schemaPropertiesToRemove; - - this.root = root; - } - - public ResolvingVisitor withRoot(Schema root) { - return new ResolvingVisitor(root, symbolTable, schemaPropertiesToRemove); } @Override public SchemaVisitorAction visitTerminal(final Schema terminal) { Schema.Type type = terminal.getType(); - Schema newSchema; if (CONTAINER_SCHEMA_TYPES.contains(type)) { if (!replace.containsKey(terminal)) { throw new IllegalStateException("Schema " + terminal + " must be already processed"); } - return SchemaVisitorAction.CONTINUE; - } else if (type == ENUM) { - newSchema = Schema.createEnum(terminal.getName(), terminal.getDoc(), terminal.getNamespace(), - terminal.getEnumSymbols(), terminal.getEnumDefault()); - } else if (type == FIXED) { - newSchema = Schema.createFixed(terminal.getName(), terminal.getDoc(), terminal.getNamespace(), - terminal.getFixedSize()); } else { - newSchema = Schema.create(type); + replace.put(terminal, terminal); } - copyProperties(terminal, newSchema); - replace.put(terminal, newSchema); return SchemaVisitorAction.CONTINUE; } - public void copyProperties(final Schema first, final Schema second) { - // Logical type - Optional.ofNullable(first.getLogicalType()).ifPresent(logicalType -> logicalType.addToSchema(second)); - - // Aliases (if applicable) - if (NAMED_SCHEMA_TYPES.contains(first.getType())) { - first.getAliases().forEach(second::addAlias); - } - - // Other properties - first.getObjectProps().forEach((name, value) -> { - if (!schemaPropertiesToRemove.contains(name)) { - second.addProp(name, value); - } - }); - } - @Override public SchemaVisitorAction visitNonTerminal(final Schema nt) { Schema.Type type = nt.getType(); - if (type == RECORD) { + if (type == RECORD && !replace.containsKey(nt)) { if (isUnresolvedSchema(nt)) { // unresolved schema will get a replacement that we already encountered, // or we will attempt to resolve. @@ -298,19 +188,32 @@ public SchemaVisitorAction visitNonTerminal(final Schema nt) { } Schema replacement = replace.computeIfAbsent(resSchema, schema -> { Schemas.visit(schema, this); - return replace.get(schema); + return replace.get(schema); // This is not what the visitor returns! }); replace.put(nt, replacement); } else { - // Create a clone without fields. Fields will be added in afterVisitNonTerminal. - Schema newSchema = Schema.createRecord(nt.getName(), nt.getDoc(), nt.getNamespace(), nt.isError()); - copyProperties(nt, newSchema); - replace.put(nt, newSchema); + // Create a clone without fields or properties. They will be added in + // afterVisitNonTerminal, as they can both create circular references. + // (see org.apache.avro.TestCircularReferences as an example) + replace.put(nt, Schema.createRecord(nt.getName(), nt.getDoc(), nt.getNamespace(), nt.isError())); } } return SchemaVisitorAction.CONTINUE; } + public void copyProperties(final Schema first, final Schema second) { + // Logical type + Optional.ofNullable(first.getLogicalType()).ifPresent(logicalType -> logicalType.addToSchema(second)); + + // Aliases (if applicable) + if (NAMED_SCHEMA_TYPES.contains(first.getType())) { + first.getAliases().forEach(second::addAlias); + } + + // Other properties + first.getObjectProps().forEach(second::addProp); + } + @Override public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) { Schema.Type type = nt.getType(); @@ -328,6 +231,7 @@ public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) { newFields.add(new Schema.Field(field, replace.get(field.schema()))); } newSchema.setFields(newFields); + copyProperties(nt, newSchema); } } return SchemaVisitorAction.CONTINUE; @@ -335,15 +239,15 @@ public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) { List types = nt.getTypes(); List newTypes = new ArrayList<>(types.size()); for (Schema sch : types) { - newTypes.add(replace.get(sch)); + newTypes.add(requireNonNull(replace.get(sch))); } newSchema = Schema.createUnion(newTypes); break; case ARRAY: - newSchema = Schema.createArray(replace.get(nt.getElementType())); + newSchema = Schema.createArray(requireNonNull(replace.get(nt.getElementType()))); break; case MAP: - newSchema = Schema.createMap(replace.get(nt.getValueType())); + newSchema = Schema.createMap(requireNonNull(replace.get(nt.getValueType()))); break; default: throw new IllegalStateException("Illegal type " + type + ", schema " + nt); @@ -354,14 +258,18 @@ public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) { } @Override - public Schema get() { - return replace.get(root); + public Void get() { + return null; + } + + public Schema getResolved(Schema schema) { + return requireNonNull(replace.get(schema), + () -> "Unknown schema: " + schema.getFullName() + ". Was it resolved before?"); } @Override public String toString() { - return "ResolvingVisitor{symbolTable=" + symbolTable + ", schemaPropertiesToRemove=" + schemaPropertiesToRemove - + ", replace=" + replace + '}'; + return "ResolvingVisitor{symbolTable=" + symbolTable + ", replace=" + replace + '}'; } } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java b/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java index 4802aea0747..db7dc640521 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java +++ b/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java @@ -38,7 +38,7 @@ public class DummySchemaParser implements FormattedSchemaParser { @Override public Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema) throws IOException, SchemaParseException { - LOGGER.info("Using DummySchemaParser for {}", formattedSchema); + LOGGER.debug("Using DummySchemaParser for {}", formattedSchema); if (SCHEMA_TEXT_ONE.contentEquals(formattedSchema)) { parseContext.put(FIXED_SCHEMA); return FIXED_SCHEMA; diff --git a/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java b/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java index afd3a643570..d40a6cc9d83 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java +++ b/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java @@ -71,8 +71,8 @@ public void primitivesAreNotCached() { ParseContext context = new ParseContext(); for (Schema.Type type : primitives) { - Schema first = context.resolve(type.getName()); - Schema second = context.resolve(type.getName()); + Schema first = context.find(type.getName(), null); + Schema second = context.find(type.getName(), null); assertEquals(first, second); assertNotSame(first, second); @@ -85,26 +85,31 @@ public void primitivesAreNotCached() { public void validateSchemaRetrievalFailure() { Schema unknown = Schema.createFixed("unknown", null, null, 0); - Schema unresolved = fooBarBaz.resolve("unknown"); + Schema unresolved = fooBarBaz.find("unknown", null); assertTrue(SchemaResolver.isUnresolvedSchema(unresolved)); assertEquals(unknown.getFullName(), SchemaResolver.getUnresolvedSchemaName(unresolved)); } @Test public void validateSchemaRetrievalByFullName() { - assertSame(fooRecord, fooBarBaz.resolve(fooRecord.getFullName())); + assertSame(fooRecord, fooBarBaz.find(fooRecord.getFullName(), null)); + } + + @Test + public void validateSchemaRetrievalBySimpleName() { + assertSame(fooRecord, fooBarBaz.find(fooRecord.getName(), fooRecord.getNamespace())); } @Test public void verifyPutIsIdempotent() { ParseContext context = new ParseContext(); - assertNotEquals(fooRecord, context.resolve(fooRecord.getFullName())); + assertNotEquals(fooRecord, context.find(fooRecord.getFullName(), null)); context.put(fooRecord); - assertEquals(fooRecord, context.resolve(fooRecord.getFullName())); + assertEquals(fooRecord, context.find(fooRecord.getFullName(), null)); context.put(fooRecord); - assertEquals(fooRecord, context.resolve(fooRecord.getFullName())); + assertEquals(fooRecord, context.find(fooRecord.getFullName(), null)); } @Test diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java index 41f0474c9da..6c4e35df97e 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java @@ -17,8 +17,6 @@ */ package org.apache.avro; -import static org.junit.jupiter.api.Assertions.*; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -44,14 +42,21 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.TextNode; - import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static java.util.Objects.requireNonNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class TestSchema { @Test void splitSchemaBuild() { @@ -583,26 +588,27 @@ void testContentAfterAvscInFile() throws Exception { @Test void testParseMultipleFile() throws IOException { - URL directory = Thread.currentThread().getContextClassLoader().getResource("multipleFile"); + URL directory = requireNonNull(Thread.currentThread().getContextClassLoader().getResource("multipleFile")); File f1 = new File(directory.getPath(), "ApplicationEvent.avsc"); File f2 = new File(directory.getPath(), "DocumentInfo.avsc"); File f3 = new File(directory.getPath(), "MyResponse.avsc"); Assertions.assertTrue(f1.exists(), "File not exist for test " + f1.getPath()); Assertions.assertTrue(f2.exists(), "File not exist for test " + f2.getPath()); Assertions.assertTrue(f3.exists(), "File not exist for test " + f3.getPath()); - - final List schemas = new Schema.Parser().parse(Arrays.asList(f1, f2, f3)); + SchemaParser parser = new SchemaParser(); + parser.parse(f1); + parser.parse(f2); + parser.parse(f3); + final List schemas = parser.getParsedNamedSchemas(); Assertions.assertEquals(3, schemas.size()); Schema schemaAppEvent = schemas.get(0); Schema schemaDocInfo = schemas.get(1); Schema schemaResponse = schemas.get(2); - Assertions.assertNotNull(schemaAppEvent); Assertions.assertEquals(3, schemaAppEvent.getFields().size()); Field documents = schemaAppEvent.getField("documents"); Schema docSchema = documents.schema().getTypes().get(1).getElementType(); Assertions.assertEquals(docSchema, schemaDocInfo); - Assertions.assertNotNull(schemaDocInfo); Assertions.assertNotNull(schemaResponse); } @@ -610,9 +616,9 @@ void testParseMultipleFile() throws IOException { @Test void add_types() { String schemaRecord2 = "{\"type\":\"record\", \"name\":\"record2\", \"fields\": [" - + " {\"name\":\"f1\", \"type\":\"record1\" }" + "]}"; - // register schema1 in schema. + + " {\"name\":\"f1\", \"type\":\"record1\" }" + "]}"; // register schema1 in schema. Schema schemaRecord1 = Schema.createRecord("record1", "doc", "", false); + schemaRecord1.setFields(Collections.singletonList(new Field("name", Schema.create(Type.STRING)))); Schema.Parser parser = new Schema.Parser().addTypes(Collections.singleton(schemaRecord1)); // parse schema for record2 that contains field for schema1. @@ -628,6 +634,6 @@ void add_types() { */ @Test void testParserNullValidate() { - new Schema.Parser(null).parse("{\"type\":\"record\",\"name\":\"\",\"fields\":[]}"); // Empty name + new Schema.Parser((NameValidator) null).parse("{\"type\":\"record\",\"name\":\"\",\"fields\":[]}"); // Empty name } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java index dc0c77431fe..29c8f65be66 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java @@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.StringReader; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -35,13 +36,15 @@ class TestSchemaParser { private static final Schema SCHEMA_REAL = Schema.createFixed("Real", null, "tests", 42); private static final String SCHEMA_JSON = SCHEMA_REAL.toString(false); + private static final Charset[] UTF_CHARSETS = { StandardCharsets.UTF_8, StandardCharsets.UTF_16LE, + StandardCharsets.UTF_16BE }; @Test void testParseFile() throws IOException { Path tempFile = Files.createTempFile("TestSchemaParser", null); Files.write(tempFile, singletonList(SCHEMA_JSON)); - Schema schema = new SchemaParser().parse(tempFile.toFile()); + Schema schema = new SchemaParser().parse(tempFile.toFile()).mainSchema(); assertEquals(SCHEMA_REAL, schema); } @@ -50,38 +53,49 @@ void testParsePath() throws IOException { Path tempFile = Files.createTempFile("TestSchemaParser", null); Files.write(tempFile, singletonList(SCHEMA_JSON)); - Schema schema = new SchemaParser().parse(tempFile); + Schema schema = new SchemaParser().parse(tempFile).mainSchema(); + assertEquals(SCHEMA_REAL, schema); + } + + @Test + void testParseURI() throws IOException { + Path tempFile = Files.createTempFile("TestSchemaParser", null); + Charset charset = UTF_CHARSETS[(int) Math.floor(UTF_CHARSETS.length * Math.random())]; + Files.write(tempFile, singletonList(SCHEMA_JSON), charset); + + Schema schema = new SchemaParser().parse(tempFile.toUri(), null).mainSchema(); assertEquals(SCHEMA_REAL, schema); } @Test void testParseReader() throws IOException { - Schema schema = new SchemaParser().parse(new StringReader(SCHEMA_JSON)); + Schema schema = new SchemaParser().parse(new StringReader(SCHEMA_JSON)).mainSchema(); assertEquals(SCHEMA_REAL, schema); } @Test void testParseStream() throws IOException { - Schema schema = new SchemaParser().parse(new ByteArrayInputStream(SCHEMA_JSON.getBytes(StandardCharsets.UTF_16))); + Schema schema = new SchemaParser().parse(new ByteArrayInputStream(SCHEMA_JSON.getBytes(StandardCharsets.UTF_16))) + .mainSchema(); assertEquals(SCHEMA_REAL, schema); } @Test void testParseTextWithFallbackJsonParser() { - Schema schema = new SchemaParser().parse(SCHEMA_JSON); + Schema schema = new SchemaParser().parse(SCHEMA_JSON).mainSchema(); assertEquals(SCHEMA_REAL, schema); } @Test void testParseByCustomParser() { - Schema schema = new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ONE); + Schema schema = new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ONE).mainSchema(); assertEquals(DummySchemaParser.FIXED_SCHEMA, schema); } @Test void testSingleParseError() { SchemaParseException parseException = assertThrows(SchemaParseException.class, - () -> new SchemaParser().parse("foo")); + () -> new SchemaParser().parse("foo").mainSchema()); assertEquals(JsonParseException.class, parseException.getCause().getClass()); assertEquals(0, parseException.getSuppressed().length); } @@ -89,7 +103,7 @@ void testSingleParseError() { @Test void testMultipleParseErrors() { SchemaParseException parseException = assertThrows(SchemaParseException.class, - () -> new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ERROR)); + () -> new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ERROR).mainSchema()); assertTrue(parseException.getMessage().startsWith("Could not parse the schema")); Throwable[] suppressed = parseException.getSuppressed(); assertEquals(2, suppressed.length); @@ -100,7 +114,7 @@ void testMultipleParseErrors() { @Test void testIOFailureWhileParsingText() { AvroRuntimeException exception = assertThrows(AvroRuntimeException.class, - () -> new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_IO_ERROR)); + () -> new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_IO_ERROR).mainSchema()); assertEquals(IOException.class, exception.getCause().getClass()); assertEquals(DummySchemaParser.IO_ERROR_MESSAGE, exception.getCause().getMessage()); } diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java index 6a1a137898d..e3e1a2ddb76 100644 --- a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java @@ -17,6 +17,10 @@ */ package org.apache.avro.compiler.idl; +import org.apache.avro.Protocol; +import org.apache.avro.Schema; +import org.apache.avro.compiler.schema.Schemas; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,10 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import org.apache.avro.Protocol; -import org.apache.avro.Schema; -import org.apache.avro.compiler.schema.Schemas; - /** * Utility class to resolve schemas that are unavailable at the time they are * referenced in the IDL. @@ -103,12 +103,12 @@ static boolean isFullyResolvedSchema(final Schema schema) { /** * Will clone the provided protocol while resolving all unreferenced schemas * - * @param protocol - * @return + * @param protocol a protocol with possibly unresolved schema references + * @return a protocol without unresolved schema references */ static Protocol resolve(final Protocol protocol) { Protocol result = new Protocol(protocol.getName(), protocol.getDoc(), protocol.getNamespace()); - final Collection types = protocol.getTypes(); + final Collection types = protocol.getUnresolvedTypes(); // replace unresolved schemas. List newSchemas = new ArrayList<>(types.size()); IdentityHashMap replacements = new IdentityHashMap<>(); diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java index e7c854c57f7..53675f4a01b 100644 --- a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -36,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.Conversion; import org.apache.avro.Conversions; @@ -47,6 +49,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaNormalization; +import org.apache.avro.SchemaParser; import org.apache.avro.data.TimeConversions; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.StringType; @@ -468,12 +471,16 @@ public static void compileSchema(File src, File dest) throws IOException { * Generates Java classes for a number of schema files. */ public static void compileSchema(File[] srcFiles, File dest) throws IOException { - Schema.Parser parser = new Schema.Parser(); + SchemaParser parser = new SchemaParser(); for (File src : srcFiles) { - Schema schema = parser.parse(src); + parser.parse(src); + } + // FIXME: use lastModified() without causing a NoSuchMethodError in the build + File lastModifiedSourceFile = Stream.of(srcFiles).max(Comparator.comparing(File::lastModified)).orElse(null); + for (Schema schema : parser.getParsedNamedSchemas()) { SpecificCompiler compiler = new SpecificCompiler(schema); - compiler.compileToDestination(src, dest); + compiler.compileToDestination(lastModifiedSourceFile, dest); } } diff --git a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj index af2480ce992..2d312794c3e 100644 --- a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj +++ b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj @@ -1287,6 +1287,8 @@ Schema ImportSchema() : { importFile = JsonString() ";" { try (InputStream stream=findFile(importFile).openStream()){ + // This usage of Schema.Parser should not be changed. + // Remove this whole (old) IDL parser instead. Parser parser = new Schema.Parser(); parser.addTypes(names.values()); // inherit names Schema value = parser.parse(stream); diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java index 75d0f73faaf..3b3f6b97c56 100644 --- a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java +++ b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java @@ -17,6 +17,7 @@ */ package org.apache.avro.idl; +import org.apache.avro.ParseContext; import org.apache.avro.Protocol; import org.apache.avro.Schema; @@ -32,24 +33,24 @@ * the protocol containing the schemas. */ public class IdlFile { - private final Schema mainSchema; - private final Protocol protocol; - private final Map namedSchemas; + private final Object resolveLock = new Object(); + private volatile ParseContext parseContext; + private Schema mainSchema; + private Protocol protocol; + private Map namedSchemas; private final List warnings; - IdlFile(Protocol protocol, List warnings) { - this(protocol.getTypes(), null, protocol, warnings); + IdlFile(Protocol protocol, ParseContext context, List warnings) { + this(context, null, protocol, warnings); } - IdlFile(Schema mainSchema, Iterable schemas, List warnings) { - this(schemas, mainSchema, null, warnings); + IdlFile(Schema mainSchema, ParseContext context, List warnings) { + this(context, mainSchema, null, warnings); } - private IdlFile(Iterable schemas, Schema mainSchema, Protocol protocol, List warnings) { + private IdlFile(ParseContext context, Schema mainSchema, Protocol protocol, List warnings) { + this.parseContext = context; this.namedSchemas = new LinkedHashMap<>(); - for (Schema namedSchema : schemas) { - this.namedSchemas.put(namedSchema.getFullName(), namedSchema); - } this.mainSchema = mainSchema; this.protocol = protocol; this.warnings = Collections.unmodifiableList(new ArrayList<>(warnings)); @@ -59,13 +60,55 @@ private IdlFile(Iterable schemas, Schema mainSchema, Protocol protocol, * The (main) schema defined by the IDL file. */ public Schema getMainSchema() { + if (mainSchema == null) { + return null; + } + ensureSchemasAreResolved(); return mainSchema; } + private void ensureSchemasAreResolved() { + if (parseContext != null) { + synchronized (resolveLock) { + if (parseContext != null) { + parseContext.commit(); + List schemas = parseContext.resolveAllSchemas(); + schemas.forEach(schema -> namedSchemas.put(schema.getFullName(), schema)); + if (mainSchema != null) { + mainSchema = parseContext.resolve(mainSchema); + } + if (protocol != null) { + protocol.setTypes(schemas); + Map messages = protocol.getMessages(); + for (Map.Entry entry : messages.entrySet()) { + Protocol.Message oldValue = entry.getValue(); + Protocol.Message newValue; + if (oldValue.isOneWay()) { + newValue = protocol.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, + parseContext.resolve(oldValue.getRequest())); + } else { + Schema request = parseContext.resolve(oldValue.getRequest()); + Schema response = parseContext.resolve(oldValue.getResponse()); + Schema errors = parseContext.resolve(oldValue.getErrors()); + newValue = protocol.createMessage(oldValue.getName(), oldValue.getDoc(), oldValue, request, response, + errors); + } + entry.setValue(newValue); + } + } + } + } + } + } + /** * The protocol defined by the IDL file. */ public Protocol getProtocol() { + if (protocol == null) { + return null; + } + ensureSchemasAreResolved(); return protocol; } @@ -83,6 +126,7 @@ public List getWarnings(String importFile) { * The named schemas defined by the IDL file, mapped by their full name. */ public Map getNamedSchemas() { + ensureSchemasAreResolved(); return Collections.unmodifiableMap(namedSchemas); } @@ -95,11 +139,13 @@ public Map getNamedSchemas() { * @return the schema, or {@code null} if it does not exist */ public Schema getNamedSchema(String name) { + ensureSchemasAreResolved(); return namedSchemas.get(name); } // Visible for testing String outputString() { + ensureSchemasAreResolved(); if (protocol != null) { return protocol.toString(); } diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java index 047d162879b..96d45ab6dc2 100644 --- a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java +++ b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java @@ -38,7 +38,6 @@ import org.apache.avro.JsonSchemaParser; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; -import org.apache.avro.NameValidator; import org.apache.avro.ParseContext; import org.apache.avro.Protocol; import org.apache.avro.Schema; @@ -90,6 +89,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; +import java.util.EnumSet; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -145,6 +145,8 @@ public void syntaxError(Recognizer recognizer, Object offendingSymbol, int private static final Set INVALID_TYPE_NAMES = new HashSet<>(Arrays.asList("boolean", "int", "long", "float", "double", "bytes", "string", "null", "date", "time_ms", "timestamp_ms", "localtimestamp_ms", "uuid")); private static final String CLASSPATH_SCHEME = "classpath"; + private static final Set NAMED_SCHEMA_TYPES = EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM, + Schema.Type.FIXED); private final Set readLocations; private final ParseContext parseContext; @@ -153,34 +155,19 @@ public IdlReader() { this(new ParseContext()); } - public IdlReader(NameValidator nameValidator) { - this(new ParseContext(nameValidator)); - } - public IdlReader(ParseContext parseContext) { readLocations = new HashSet<>(); this.parseContext = parseContext; } private Schema namedSchemaOrUnresolved(String fullName) { - return parseContext.resolve(fullName); + return parseContext.find(fullName, null); } private void addSchema(Schema schema) { parseContext.put(schema); } - public IdlFile resolve(IdlFile unresolved) { - Protocol protocol = unresolved.getProtocol(); - if (protocol == null) { - Schema mainSchema = SchemaResolver.resolve(parseContext, unresolved.getMainSchema()); - Iterable namedSchemas = SchemaResolver.resolve(parseContext, unresolved.getNamedSchemas().values()); - return new IdlFile(mainSchema, namedSchemas, unresolved.getWarnings()); - } else { - return new IdlFile(SchemaResolver.resolve(parseContext, protocol), unresolved.getWarnings()); - } - } - public IdlFile parse(Path location) throws IOException { return parse(location.toUri()); } @@ -364,9 +351,9 @@ private void popNamespace() { @Override public void exitIdlFile(IdlFileContext ctx) { if (protocol == null) { - result = new IdlFile(mainSchema, parseContext.typesByName().values(), warnings); + result = new IdlFile(mainSchema, parseContext, warnings); } else { - result = new IdlFile(protocol, warnings); + result = new IdlFile(protocol, parseContext, warnings); } } @@ -390,8 +377,10 @@ public void enterProtocolDeclarationBody(ProtocolDeclarationBodyContext ctx) { @Override public void exitProtocolDeclaration(ProtocolDeclarationContext ctx) { - if (protocol != null) - protocol.setTypes(parseContext.typesByName().values()); + if (protocol != null) { + parseContext.commit(); + protocol.setTypes(parseContext.resolveAllSchemas()); + } if (!namespaces.isEmpty()) popNamespace(); } @@ -404,6 +393,10 @@ public void exitNamespaceDeclaration(NamespaceDeclarationContext ctx) { @Override public void exitMainSchemaDeclaration(IdlParser.MainSchemaDeclarationContext ctx) { mainSchema = typeStack.pop(); + + if (NAMED_SCHEMA_TYPES.contains(mainSchema.getType()) && mainSchema.getFullName() != null) { + parseContext.put(mainSchema); + } assert typeStack.isEmpty(); } diff --git a/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl b/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl index 1df43f7a656..6a2d19a0e19 100644 --- a/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl +++ b/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl @@ -36,7 +36,7 @@ record StatusUpdate { /** * The new status of the process. */ - Status newStatus; + system.Status newStatus; /** * A description why this status change occurred (optional). */ diff --git a/lang/java/idl/src/test/idl/input/status_schema.avdl b/lang/java/idl/src/test/idl/input/status_schema.avdl index 504218a4fcb..e4fda1d331a 100644 --- a/lang/java/idl/src/test/idl/input/status_schema.avdl +++ b/lang/java/idl/src/test/idl/input/status_schema.avdl @@ -1,3 +1,5 @@ +namespace system; + enum Status { UNKNOWN, NEW, STARTUP, RUNNING, TERMINATING, SHUTDOWN, CRASHED } = UNKNOWN; diff --git a/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java b/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java index a3c94ff025d..f9cb54f34ba 100644 --- a/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java +++ b/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java @@ -45,7 +45,7 @@ public class TestCycle { public void testCycleGeneration() throws IOException, URISyntaxException { final ClassLoader cl = Thread.currentThread().getContextClassLoader(); IdlReader parser = new IdlReader(); - IdlFile idlFile = parser.resolve(parser.parse(requireNonNull(cl.getResource("input/cycle.avdl")).toURI())); + IdlFile idlFile = parser.parse(requireNonNull(cl.getResource("input/cycle.avdl")).toURI()); String json = idlFile.outputString(); LOG.info(json); diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java index 54f49b04fb5..632ce7f955c 100644 --- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java +++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java @@ -90,7 +90,6 @@ protected void doCompile(String filename, File sourceDirectory, File outputDirec for (String warning : idlFile.getWarnings()) { getLog().warn(warning); } - idlFile = parser.resolve(idlFile); final SpecificCompiler compiler; final Protocol protocol = idlFile.getProtocol(); if (protocol != null) { diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java index 36a4fc4a53c..9d1a91426ff 100644 --- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java +++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; +import org.apache.avro.SchemaParser; import org.apache.maven.plugin.MojoExecutionException; import java.io.File; @@ -42,7 +43,7 @@ public class SchemaMojo extends AbstractAvroMojo { * A parser used to parse all schema files. Using a common parser will * facilitate the import of external schemas. */ - private Schema.Parser schemaParser = new Schema.Parser(); + private SchemaParser schemaParser = new SchemaParser(); /** * A set of Ant-like inclusion patterns used to select files from the source @@ -76,11 +77,11 @@ protected void doCompile(String[] fileNames, File sourceDirectory, File outputDi // no imported files then isolate the schemas from each other, otherwise // allow them to share a single schema so reuse and sharing of schema // is possible. - if (imports == null) { - schemas = new Schema.Parser().parse(sourceFiles); - } else { - schemas = schemaParser.parse(sourceFiles); + SchemaParser parser = imports == null ? new SchemaParser() : schemaParser; + for (File sourceFile : sourceFiles) { + parser.parse(sourceFile); } + schemas = parser.getParsedNamedSchemas(); doCompile(sourceFileForModificationDetection, schemas, outputDirectory); } catch (IOException | SchemaParseException ex) { diff --git a/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java b/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java index 6ef82714678..7f7179dad3a 100644 --- a/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java +++ b/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java @@ -71,7 +71,6 @@ public int run(InputStream in, PrintStream out, PrintStream err, List ar for (String warning : idlFile.getWarnings()) { err.println("Warning: " + warning); } - idlFile = parser.resolve(idlFile); p = idlFile.getProtocol(); m = idlFile.getMainSchema(); }