Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strange schema issue with Kafka Connect #125

Closed
averemee-si opened this issue Jan 7, 2022 · 3 comments
Closed

Strange schema issue with Kafka Connect #125

averemee-si opened this issue Jan 7, 2022 · 3 comments
Labels
enhancement New feature or request PR welcome Users are welcome to submit PR

Comments

@averemee-si
Copy link

In my Kafka Connect connector I have 2 different schemas for key and value data with different names:

			// Schema init - keySchema is immutable and always 1
			final SchemaBuilder keySchemaBuilder = SchemaBuilder
						.struct()
						.required()
						.name(tableFqn + ".Key")
						.version(1);
			final SchemaBuilder valueSchemaBuilder = SchemaBuilder
						.struct()
						.optional()
						.name(tableFqn + ".Value")
						.version(version);

SourceRecord is created using both schemas:

				keySchema = keySchemaBuilder.build();
				valueSchema = valueSchemaBuilder.build();
				//
				// Populations of Struct for key and value...
				//
				sourceRecord = new SourceRecord(
					sourcePartition,
					offset,
					kafkaTopic,
					keySchema,
					keyStruct,
					valueSchema,
					valueStruct);

My connect standalone.properties contains:

bootstrap.servers=localhost:9092

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
poll.interval.ms=5000

# Glue Schema Registry Specific Converters
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.schemas.enable=true

# Converter-specific settings can be passed in by prefixing the Converter's setting
# with the converter we want to apply it to
key.converter.region=eu-north-1
key.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
# Default value: default-registry
key.converter.registry.name=TestSchemaRegistry001
value.converter.region=eu-north-1
value.converter.schemaAutoRegistrationEnabled=true
value.converter.avroRecordType=GENERIC_RECORD
# Default value: default-registry
value.converter.registry.name=TestSchemaRegistry001

This produces very strange schemas, two versions with same name in Glue Schema Registry (Kafka Connect schema names are different). Under version 1 - key schema:

{
  "type": "record",
  "name": "Key",
  "namespace": "SCOTT.SOURCE_CLONE",
  "fields": [
    {
      "name": "OWNER",
      "type": "string"
    },
    {
      "name": "NAME",
      "type": "string"
    },
    {
      "name": "TYPE",
      "type": "string"
    },
    {
      "name": "LINE",
      "type": "double"
    },
    {
      "name": "ORIGIN_CON_ID",
      "type": "double"
    }
  ],
  "connect.version": 1,
  "connect.name": "SCOTT.SOURCE_CLONE.Key"
}

version 2 is ... Value schema:

{
  "type": "record",
  "name": "Value",
  "namespace": "SCOTT.SOURCE_CLONE",
  "fields": [
    {
      "name": "TEXT",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.version": 1,
  "connect.name": "SCOTT.SOURCE_CLONE.Value"
}

Schemas have different connect.name and different name. Only namespace is same.

Why did schemas with different connect.name and name become one schema with different versions?

Could you please help me with resolving.

@averemee-si
Copy link
Author

According to README.md

Providing Schema Name
Schema Name can be provided by setting this property -

properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema"); // If not passed, uses transport name (topic name in case of Kafka)

Alternatively, a schema registry naming strategy implementation can be provided.

properties.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
                "com.amazonaws.services.schemaregistry.serializers.avro.CustomerProvidedSchemaNamingStrategy");

An example test implementation class is here.

And link to CustomerProvidedSchemaNamingStrategy.java is broken

@averemee-si
Copy link
Author

Fixed with creation of naming strategy which uses connect.name as Glue Schema name:

package com.amazonaws.services.schemaregistry.common;

import lombok.extern.slf4j.Slf4j;

import org.apache.avro.generic.GenericData;
import org.apache.avro.Schema;

@Slf4j
public class AWSSchemaNamingStrategyConnectName implements AWSSchemaNamingStrategy {

    @Override
    public String getSchemaName(String transportName, Object data) {
	if (data instanceof GenericData.Record) {
	    return getConnectName((GenericData.Record) data, transportName);
        } else {
            return getSchemaName(transportName);
        }
    }

    @Override
    public String getSchemaName(String transportName, Object data, boolean isKey) {
        if (data instanceof GenericData.Record) {
            return getConnectName((GenericData.Record) data, transportName);
	} else {
            return isKey ? getSchemaName(transportName) : getSchemaName(transportName, data);
        }
    }

    private String getConnectName(final GenericData.Record record, final String transportName) {
        final Schema schema = record.getSchema();
        log.debug("connect.name='{}' will be used as SchemaName for data in topic {}",
                schema.getFullName(), transportName);
        return schema.getFullName();
    }

    /**
     * Returns the schemaName.
     *
     * @param transportName topic Name or stream name etc.
     * @return schema name.
     */
    @Override
    public String getSchemaName(String transportName) {
        return transportName;
    }

}

and setting

key.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategyConnectName
value.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategyConnectName

Is it possible to ad this naming strategy implementation to code baseline?
Thanks!

@blacktooth blacktooth added enhancement New feature or request PR welcome Users are welcome to submit PR labels Jan 11, 2022
@mohitpali
Copy link
Contributor

Closing since duplicate of #126

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request PR welcome Users are welcome to submit PR
Projects
None yet
Development

No branches or pull requests

3 participants