Author: Hao Li (@lihaosky) | Release Target: 0.24.0; 7.2.0 | Status: Merged | Discussion: Issue, GitHub PR1
tl;dr: This KLIP introduces key_schema_id
and value_schema_id
in C*AS
statements and details the expected behaviors in various commands.
As discussed in this issue, the main motivation is to enable users to specify key_schema_id
or value_schema_id
in CAS
and CS/CT
statements so that existing schemas in schema registry can be reused for logical schema creation and data serialization for the newly defined data source.
Major benefits are:
- Let users easily reuse existing schemas in schema registry without rewriting the schemas in
CREATE
command again which could be error-prone. - By using the schema IDs, the user can enforce conventions like field name casing, for example to ensure ksqlDB doesn't capitalize the field names which may create incompatibility between schemas later.
- User can use predefined schema in schema registry to serialize data which can be used by downstreams who consume data using the same schema.
- Fix ksqlDB to not always uppercase field names in schema conversions between schema registry schemas and ksqlDB schemas.
- Support
KEY_SCHEMA_FULL_NAME
andVALUE_SCHEMA_FULL_NAME
properties to control full schema name for AVRO and PROTOBUF format. - Support
key_schema_id
andvalue_schema_id
inCAS
andCS/CT
statements with proper validations. - Add support in ksqlDB to serialize data using existing schema defined in schema registry by specifying schema ID.
- ksqlDB support schema inference if no table elements and schema ID are provided in
CS/CT
statement. The behavior of this schema inference will not change. - Alert user of potential logical schema and physical schema incompatibility in stream/table creation time. Because ksqlDB logical schema fields are all nullable whereas the provided schema using schema ID may not have all fields to be nullable. We won't do this compatibility check during creation time. Error will occur if schemas are not compatible during value insertion time.
CREATE STREAM stream_name WITH (key_schema_id=1, value_schema_id=2, key_format='avro', value_format='avro') AS
SELECT key, field1, field_2 FROM source_stream
EMIT CHANGES;
CREATE TABLE table_name WITH (key_schema_id=1, value_schema_id=2, key_format='avro', value_format='avro') AS
SELECT key, COUNT(field1) AS count_field FROM source_stream
GROUP BY key;
CREATE STREAM stream_name WITH (kafka_topic='topic_name', key_schema_id=1, value_schema_id=2,
partitions=1, key_format='avro', value_format='avro');
CREATE TABLE table_name WITH (kafka_topic='table_name', key_schema_id=1, value_schema_id=2,
partitions=1, key_format='avro', value_format='avro');
-
CS/CT
command- Corresponding
key_format
/value_format
orformat
property must exist and the format must supportSCHEMA_INFERENCE
(protobuf, avro, json_sr format currently). - The fetched schema format from schema_registry must match specified format in
WITH
clause. For example, if schema format for schema ID in schema_registry isavro
but specified format inWITH
clause isprotobuf
, an exception will be thrown. - Schema with specified ID MUST exist in schema_registry, otherwise an exception will be thrown.
- Corresponding key/value table elements must be empty if
key_schema_id
orvalue_schema_id
is provided.
- Corresponding
-
CAS
command- For
key_format
andvalue_format
properties, if*_schema_id
is provided:- If a format property is provided, it must match the format fetched using schema ID.
- If a format property is not provided, it will be deduced from query source's format and then must match the format fetched using schema ID.
- Schema with specified ID MUST exist in schema_registry, otherwise an exception will be thrown.
- Compatibility check. Users can use
*_schema_id
with select projection inCAS
command. For example,CREATE STREAM stream_name WITH (kafka_topic='topic_name', key_schema_id=1, value_schema_id=2, partitions=1, key_format='avro', value_format='avro') AS SELECT * FROM source_stream;
. In this situation, schema from schema registry should be a superset of select projection and the fields order must match. Note that when compatibility is checked, whether a field is optional or required in physical schema doesn't matter. This can give user more flexibility to use more schemas. Otherwise, fields schema can only be optional since fields in ksqlDB are all optional. It is also allowed for schema from schema registry to have more fields. We currently don't mandate those fields to be optional. In case extra fields are required, serialization error might occur during value insertion time since value schema will be based query projection schema. We expect users who use schema ID to be advanced users and responsible to make sure the serialization can work.
- For
- Schema specified using schema ID will be looked up and fetched from schema registry.
- It will then be translated to ksqlDB columns:
- If SerdeFeatures
UNWRAP
is specified in the creation statement, single key or value column namedROWKEY
orROWVALUE
will be created and fetched schema will be translated to key or value's type. - Otherwise, fetched schema is expected to be
STRUCT
type and field names will be ksqlDB column names. Schema for each field will be corresponding column's type. - If there are unsupported types or other translation errors, statement will fail.
- If SerdeFeatures
- If columns translation is successful.
- For
CS/CT
statement, the translated columns will be injected to original statement's table elements and registered when DDL command is executed. - For
CAS
statement, compatibility check will be done against the schema of query projection. Logical schema of created stream/table will still be the schema of query projection which is different from the behavior inCS/CT
statement.
- For
- Then the physical schema will be registered in schema registry under correct _ksqlDB_subject name. For example, in
CS/CT
statement, fetched physical schema wil be registered under[topic]-key
or[topic]-value
subject. Fetched physcial schema is registered again because we can do schema compatibility checks if we create other sources using the same topic.
- Schema ID defined in source creation statement will be stored in ksqlDB.
- During data serialization time, data will be created with source logical schema first and the data will be rewritten following the physical schema fetched using schema ID.
- Data rewrite could fail at this time if logical schema and physical schema are not compatible. For example, if a field defined in physical schema is required but can be optional in ksqlDB. If
null
is field value, rewrite using physical schema will fail.
- Data rewrite could fail at this time if logical schema and physical schema are not compatible. For example, if a field defined in physical schema is required but can be optional in ksqlDB. If
- After data rewrite using physical schema, data can be serialized using corresponding format's serializer.
- Note that the underlying serializer will be configured to use schema ID we provided to do schema lookup and use the physical schema to do serialization1.
- Data will be deserialized using physical schema used for serialization.
- When data are converted to ksqlDB data, it will be converted using source's logical schema.
- Since physical schema is a superset of logical schema and logical schema's fields are all optional, we expect data can be successfully converted always.
Tests for the following:
- Schema translator doesn't capitalize field names if schema ID is set in source creation statements.
- Schemas are correctly validated when
*_schema_id
presents. - Different format of schemas can be registered in both
CAS
andCS/CT
statements. - Data can be serialized/deserialized for both
CAS
andCS/CT
statements if physical schema is compatible with logical schema. - Data can not be serialized if physical schema contains required field whose value is provided as null.
Add newly supported properties to:
docs/developer-guide/ksqldb-reference/create-stream.md
docs/developer-guide/ksqldb-reference/create-table.md
docs/developer-guide/ksqldb-reference/create-stream-as-select.md
docs/developer-guide/ksqldb-reference/create-table-as-select.md
key_schema_id
and value_schema_id
properties exists in ksqlDB codebase and it's possible to issue CREATE
statement with them. There could be commands which already used them but those commands should be augmented and written to the command topic without the properties. As a result, adding more validation checks or changing how the properties should be handled have no compatibility issue.
No