From 1d14492d6d5e350c738b145ca926c5aec3aecadc Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Tue, 17 May 2022 15:12:39 +0300 Subject: [PATCH] Remove ujson dependency --- .pylintrc | 1 - karapace/config.py | 8 +- karapace/kafka_rest_apis/__init__.py | 4 +- karapace/kafka_rest_apis/consumer_manager.py | 4 +- karapace/master_coordinator.py | 12 +- karapace/protobuf/exception.py | 4 +- karapace/rapu.py | 4 +- karapace/schema_backup.py | 18 +- karapace/schema_models.py | 9 +- karapace/schema_reader.py | 14 +- karapace/schema_registry_apis.py | 3 +- karapace/serialization.py | 4 +- karapace/utils.py | 4 +- mypy.ini | 4 - requirements.txt | 1 - setup.py | 1 - tests/conftest.py | 4 +- tests/integration/conftest.py | 12 +- .../schema_registry/test_jsonschema.py | 18 +- tests/integration/test_karapace.py | 4 +- tests/integration/test_master_coordinator.py | 4 +- tests/integration/test_rest_consumer.py | 6 +- tests/integration/test_schema.py | 287 +++++++++--------- tests/integration/test_schema_backup.py | 4 +- .../test_schema_backup_avro_export.py | 6 +- tests/unit/test_avro_compatibility.py | 138 +++++---- tests/unit/test_serialization.py | 14 +- tests/utils.py | 8 +- 28 files changed, 293 insertions(+), 307 deletions(-) diff --git a/.pylintrc b/.pylintrc index 871994295..767ffede6 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,6 +1,5 @@ [MASTER] jobs=4 -extension-pkg-allow-list=ujson [MESSAGES CONTROL] enable= diff --git a/karapace/config.py b/karapace/config.py index 2ddcbe604..5713e656b 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -10,11 +10,11 @@ from pathlib import Path from typing import Dict, IO, List, Optional, Union +import json import logging import os import socket import ssl -import ujson Config = Dict[str, Union[None, str, int, bool, List[str], AccessLogger]] LOG = logging.getLogger(__name__) @@ -154,13 +154,13 @@ def validate_config(config: Config) -> None: def write_config(config_path: Path, custom_values: Config) -> None: - config_path.write_text(ujson.dumps(custom_values)) + config_path.write_text(json.dumps(custom_values)) def read_config(config_handler: IO) -> Config: try: - config = ujson.load(config_handler) - except ValueError as ex: + config = json.load(config_handler) + except json.JSONDecodeError as ex: raise InvalidConfiguration("Configuration is not a valid JSON") from ex return set_config_defaults(config) diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index 37ccf5f84..ef4715e8e 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -18,8 +18,8 @@ import asyncio import base64 +import json import time -import ujson RECORD_KEYS = ["key", "value", "partition"] PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"} @@ -544,7 +544,7 @@ async def serialize( # not pretty if ser_format == "json": # TODO -> get encoding from headers - return ujson.dumps(obj).encode("utf8") + return json.dumps(obj).encode("utf8") if ser_format == "binary": return base64.b64decode(obj) if ser_format in {"avro", "jsonschema", "protobuf"}: diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index ac707cb88..8b62a0ce9 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -15,9 +15,9 @@ import asyncio import base64 +import json import logging import time -import ujson import uuid KNOWN_FORMATS = {"json", "avro", "binary", "jsonschema", "protobuf"} @@ -506,7 +506,7 @@ async def deserialize(self, bytes_: bytes, fmt: str): if fmt in {"avro", "jsonschema", "protobuf"}: return await self.deserializer.deserialize(bytes_) if fmt == "json": - return ujson.loads(bytes_.decode("utf-8")) + return json.loads(bytes_.decode("utf-8")) return base64.b64encode(bytes_).decode("utf-8") def close(self): diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 77984201a..e79390c3b 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -13,9 +13,9 @@ from threading import Event, Thread from typing import Optional, Tuple +import json import logging import time -import ujson # SR group errors NO_ERROR = 0 @@ -42,7 +42,7 @@ def protocol_type(self): def get_identity(self, *, host, port, scheme, json_encode=True): res = {"version": 1, "host": host, "port": port, "scheme": scheme, "master_eligibility": self.master_eligibility} if json_encode: - return ujson.dumps(res) + return json.dumps(res) return res def group_protocols(self): @@ -55,7 +55,7 @@ def _perform_assignment(self, leader_id, protocol, members): urls = {} fallback_urls = {} for member_id, member_data in members: - member_identity = ujson.loads(member_data.decode("utf8")) + member_identity = json.loads(member_data.decode("utf8")) if member_identity["master_eligibility"] is True: urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = ( member_id, @@ -72,7 +72,7 @@ def _perform_assignment(self, leader_id, protocol, members): # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] schema_master_id, member_data = fallback_urls[chosen_url] - member_identity = ujson.loads(member_data.decode("utf8")) + member_identity = json.loads(member_data.decode("utf8")) identity = self.get_identity( host=member_identity["host"], port=member_identity["port"], @@ -83,7 +83,7 @@ def _perform_assignment(self, leader_id, protocol, members): assignments = {} for member_id, member_data in members: - assignments[member_id] = ujson.dumps({"master": schema_master_id, "master_identity": identity, "error": error}) + assignments[member_id] = json.dumps({"master": schema_master_id, "master_identity": identity, "error": error}) return assignments def _on_join_prepare(self, generation, member_id): @@ -98,7 +98,7 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b protocol, member_assignment_bytes, ) - member_assignment = ujson.loads(member_assignment_bytes.decode("utf8")) + member_assignment = json.loads(member_assignment_bytes.decode("utf8")) member_identity = member_assignment["master_identity"] master_url = get_identity_url( diff --git a/karapace/protobuf/exception.py b/karapace/protobuf/exception.py index 402cd9fdd..b42d6e141 100644 --- a/karapace/protobuf/exception.py +++ b/karapace/protobuf/exception.py @@ -1,4 +1,4 @@ -import ujson +import json class ProtobufParserRuntimeException(Exception): @@ -30,7 +30,7 @@ class SchemaParseException(ProtobufException): def pretty_print_json(obj: str) -> str: - return ujson.dumps(ujson.loads(obj), indent=2) + return json.dumps(json.loads(obj), indent=2) class ProtobufSchemaResolutionException(ProtobufException): diff --git a/karapace/rapu.py b/karapace/rapu.py index 000e9e465..0b9238883 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -20,10 +20,10 @@ import asyncio import cgi import hashlib +import json import logging import re import time -import ujson SERVER_NAME = "Karapace/{}".format(__version__) JSON_CONTENT_TYPE = "application/json" @@ -281,7 +281,7 @@ async def _handle_request( _, options = cgi.parse_header(rapu_request.get_header("Content-Type")) charset = options.get("charset", "utf-8") body_string = body.decode(charset) - rapu_request.json = ujson.loads(body_string) + rapu_request.json = json.loads(body_string) except UnicodeDecodeError: raise HTTPResponse( # pylint: disable=raise-missing-from body=f"Request body is not valid {charset}", status=HTTPStatus.BAD_REQUEST diff --git a/karapace/schema_backup.py b/karapace/schema_backup.py index d35995376..4e212016c 100644 --- a/karapace/schema_backup.py +++ b/karapace/schema_backup.py @@ -15,11 +15,11 @@ from typing import Dict, List, Optional, Tuple import argparse +import json import logging import os import sys import time -import ujson LOG = logging.getLogger(__name__) @@ -138,7 +138,7 @@ def close(self): def request_backup(self): values = self._export() - ser = ujson.dumps(values) + ser = json.dumps(values) if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) @@ -161,7 +161,7 @@ def restore_backup(self): values = None with open(self.backup_location, mode="r", encoding="utf8") as fp: raw_msg = fp.read() - values = ujson.loads(raw_msg) + values = json.loads(raw_msg) if not values: return @@ -184,7 +184,7 @@ def export_anonymized_avro_schemas(self): # Check that the message has key `schema` and type is Avro schema. # The Avro schemas may have `schemaType` key, if not present the schema is Avro. if value[1] and "schema" in value[1] and value[1].get("schemaType", "AVRO") == "AVRO": - original_schema = ujson.loads(value[1].get("schema")) + original_schema = json.loads(value[1].get("schema")) anonymized_schema = anonymize_avro.anonymize(original_schema) if anonymized_schema: if "subject" in value[0]: @@ -193,7 +193,7 @@ def export_anonymized_avro_schemas(self): value[1]["subject"] = anonymize_avro.anonymize_name(value[1]["subject"]) value[1]["schema"] = anonymized_schema anonymized_schemas.append((value[0], value[1])) - ser = ujson.dumps(anonymized_schemas) + ser = json.dumps(anonymized_schemas) if self.backup_location: with open(self.backup_location, mode="w", encoding="utf8") as fp: fp.write(ser) @@ -220,15 +220,15 @@ def _export(self) -> List[Tuple[str, Dict[str, str]]]: for message in messages: key = message.key.decode("utf8") try: - key = ujson.loads(key) - except ValueError: + key = json.loads(key) + except json.JSONDecodeError: LOG.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value) value = None if message.value: value = message.value.decode("utf8") try: - value = ujson.loads(value) - except ValueError: + value = json.loads(value) + except json.JSONDecodeError: LOG.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key) values.append((key, value)) diff --git a/karapace/schema_models.py b/karapace/schema_models.py index d36274015..b31f08e68 100644 --- a/karapace/schema_models.py +++ b/karapace/schema_models.py @@ -16,7 +16,6 @@ from typing import Any, Dict, Union import json -import ujson def parse_avro_schema_definition(s: str) -> AvroSchema: @@ -44,7 +43,7 @@ def parse_jsonschema_definition(schema_definition: str) -> Draft7Validator: Raises: SchemaError: If `schema_definition` is not a valid Draft7 schema. """ - schema = ujson.loads(schema_definition) + schema = json.loads(schema_definition) Draft7Validator.check_schema(schema) return Draft7Validator(schema) @@ -89,7 +88,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str): def to_dict(self) -> Dict[str, Any]: if self.schema_type is SchemaType.PROTOBUF: raise InvalidSchema("Protobuf do not support to_dict serialization") - return ujson.loads(self.schema_str) + return json.loads(self.schema_str) def __str__(self) -> str: if self.schema_type == SchemaType.PROTOBUF: @@ -119,14 +118,14 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema": if schema_type is SchemaType.AVRO: try: parsed_schema = parse_avro_schema_definition(schema_str) - except (SchemaParseException, ValueError, TypeError) as e: + except (SchemaParseException, json.JSONDecodeError, TypeError) as e: raise InvalidSchema from e elif schema_type is SchemaType.JSONSCHEMA: try: parsed_schema = parse_jsonschema_definition(schema_str) # TypeError - Raised when the user forgets to encode the schema as a string. - except (TypeError, ValueError, SchemaError, AssertionError) as e: + except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e: raise InvalidSchema from e elif schema_type is SchemaType.PROTOBUF: diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 62ee97809..ab894ace3 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -17,8 +17,8 @@ from threading import Event, Lock, Thread from typing import Any, Dict, Optional +import json import logging -import ujson Offset = int Subject = str @@ -260,16 +260,16 @@ def handle_messages(self) -> None: for _, msgs in raw_msgs.items(): for msg in msgs: try: - key = ujson.loads(msg.key.decode("utf8")) - except ValueError: + key = json.loads(msg.key.decode("utf8")) + except json.JSONDecodeError: LOG.exception("Invalid JSON in msg.key") continue value = None if msg.value: try: - value = ujson.loads(msg.value.decode("utf8")) - except ValueError: + value = json.loads(msg.value.decode("utf8")) + except json.JSONDecodeError: LOG.exception("Invalid JSON in msg.value") continue @@ -348,8 +348,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None: # what is available in the topic. if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]: try: - schema_str = ujson.dumps(ujson.loads(schema_str), sort_keys=True) - except ValueError: + schema_str = json.dumps(json.loads(schema_str), sort_keys=True) + except json.JSONDecodeError: LOG.error("Schema is not invalid JSON") return diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index d5232ad77..59b9f3219 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -18,6 +18,7 @@ import aiohttp import async_timeout import asyncio +import json import time @@ -784,7 +785,7 @@ def write_new_schema_local( new_schema = ValidatedTypedSchema.parse(schema_type=schema_type, schema_str=body["schema"]) except (InvalidSchema, InvalidSchemaType) as e: self.log.warning("Invalid schema: %r", body["schema"], exc_info=True) - if isinstance(e.__cause__, (SchemaParseException, ValueError)): + if isinstance(e.__cause__, (SchemaParseException, json.JSONDecodeError)): human_error = f"{e.__cause__.args[0]}" # pylint: disable=no-member else: human_error = "Provided schema is not valid" diff --git a/karapace/serialization.py b/karapace/serialization.py index e983d35c1..306c3114f 100644 --- a/karapace/serialization.py +++ b/karapace/serialization.py @@ -13,8 +13,8 @@ import avro import avro.schema import io +import json import struct -import ujson START_BYTE = 0x0 HEADER_FORMAT = ">bI" @@ -247,7 +247,7 @@ def read_value(config: dict, schema: TypedSchema, bio: io.BytesIO): reader = DatumReader(writers_schema=schema.schema) return reader.read(BinaryDecoder(bio)) if schema.schema_type is SchemaType.JSONSCHEMA: - value = ujson.load(bio) + value = json.load(bio) try: schema.schema.validate(value) except ValidationError as e: diff --git a/karapace/utils.py b/karapace/utils.py index c0cd17449..95302da04 100644 --- a/karapace/utils.py +++ b/karapace/utils.py @@ -15,10 +15,10 @@ from types import MappingProxyType from typing import NoReturn, overload, Union +import json import kafka.client_async import logging import time -import ujson NS_BLACKOUT_DURATION_SECONDS = 120 LOG = logging.getLogger(__name__) @@ -71,7 +71,7 @@ def default_json_serialization( # pylint: disable=inconsistent-return-statement def json_encode(obj, *, sort_keys: bool = True, binary=False): - res = ujson.dumps( + res = json.dumps( obj, sort_keys=sort_keys, default=default_json_serialization, diff --git a/mypy.ini b/mypy.ini index 543db6e82..a0952b212 100644 --- a/mypy.ini +++ b/mypy.ini @@ -11,10 +11,6 @@ warn_no_return = True warn_unreachable = True strict_equality = True -[mypy-ujson] -ignore_errors = True -ignore_missing_imports = True - [mypy-karapace.schema_registry_apis] ignore_errors = True diff --git a/requirements.txt b/requirements.txt index 8ec6bef5c..7e1aa0f05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,6 @@ requests==2.27.1 networkx==2.5 python-dateutil==2.8.2 protobuf==3.19.4 -ujson==5.1.0 avro==1.11.0 # Patched dependencies # diff --git a/setup.py b/setup.py index 2bf481661..5be67c065 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,6 @@ "networkx", "protobuf", "requests", - "ujson", ], extras_require={ # compression algorithms supported by AioKafka and KafkaConsumer diff --git a/tests/conftest.py b/tests/conftest.py index 1a91dd645..46b7d5906 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,9 +2,9 @@ from pathlib import Path from typing import List, Optional +import json import pytest import re -import ujson pytest_plugins = "aiohttp.pytest_plugin" KAFKA_BOOTSTRAP_SERVERS_OPT = "--kafka-bootstrap-servers" @@ -157,5 +157,5 @@ def fixture_session_logdir(request, tmp_path_factory, worker_id) -> Path: @pytest.fixture(scope="session", name="default_config_path") def fixture_default_config(session_logdir: Path) -> str: path = session_logdir / "karapace_config.json" - path.write_text(ujson.dumps({"registry_host": "localhost", "registry_port": 8081})) + path.write_text(json.dumps({"registry_host": "localhost", "registry_port": 8081})) return str(path) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b3b310ffc..885c1c4bf 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -31,13 +31,13 @@ from typing import AsyncIterator, Iterator, List, Optional import asyncio +import json import os import pathlib import pytest import re import string import time -import ujson REPOSITORY_DIR = pathlib.Path(__file__).parent.parent.parent.absolute() RUNTIME_DIR = REPOSITORY_DIR / "runtime" @@ -126,7 +126,7 @@ def fixture_kafka_server( with FileLock(str(lock_file)): if transfer_file.exists(): - config_data = ujson.loads(transfer_file.read_text()) + config_data = json.loads(transfer_file.read_text()) zk_config = ZKConfig.from_dict(config_data["zookeeper"]) kafka_config = KafkaConfig.from_dict(config_data["kafka"]) config_data[WORKER_COUNTER_KEY] += 1 # Count the new worker @@ -168,7 +168,7 @@ def fixture_kafka_server( WORKER_COUNTER_KEY: 1, } - transfer_file.write_text(ujson.dumps(config_data)) + transfer_file.write_text(json.dumps(config_data)) try: # Make sure every test worker can communicate with kafka @@ -180,16 +180,16 @@ def fixture_kafka_server( # This must be called on errors, otherwise the master node will wait forever with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" - config_data = ujson.loads(transfer_file.read_text()) + config_data = json.loads(transfer_file.read_text()) config_data[WORKER_COUNTER_KEY] -= 1 - transfer_file.write_text(ujson.dumps(config_data)) + transfer_file.write_text(json.dumps(config_data)) # Wait until every worker finished before stopping the servers worker_counter = float("inf") while worker_counter > 0: with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" - config_data = ujson.loads(transfer_file.read_text()) + config_data = json.loads(transfer_file.read_text()) worker_counter = config_data[WORKER_COUNTER_KEY] time.sleep(2) diff --git a/tests/integration/schema_registry/test_jsonschema.py b/tests/integration/schema_registry/test_jsonschema.py index 67804af35..599965b19 100644 --- a/tests/integration/schema_registry/test_jsonschema.py +++ b/tests/integration/schema_registry/test_jsonschema.py @@ -94,8 +94,8 @@ ) from tests.utils import new_random_name +import json import pytest -import ujson async def debugging_details( @@ -104,8 +104,8 @@ async def debugging_details( client: Client, subject: str, ) -> str: - newer_schema = ujson.dumps(newer.schema) - older_schema = ujson.dumps(older.schema) + newer_schema = json.dumps(newer.schema) + older_schema = json.dumps(older.schema) config_res = await client.get(f"config/{subject}?defaultToGlobal=true") config = config_res.json() return f"subject={subject} newer={newer_schema} older={older_schema} compatibility={config}" @@ -126,7 +126,7 @@ async def not_schemas_are_compatible( older_res = await client.post( f"subjects/{subject}/versions", json={ - "schema": ujson.dumps(older.schema), + "schema": json.dumps(older.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) @@ -141,7 +141,7 @@ async def not_schemas_are_compatible( newer_res = await client.post( f"subjects/{subject}/versions", json={ - "schema": ujson.dumps(newer.schema), + "schema": json.dumps(newer.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) @@ -169,7 +169,7 @@ async def schemas_are_compatible( older_res = await client.post( f"subjects/{subject}/versions", json={ - "schema": ujson.dumps(older.schema), + "schema": json.dumps(older.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) @@ -184,7 +184,7 @@ async def schemas_are_compatible( newer_res = await client.post( f"subjects/{subject}/versions", json={ - "schema": ujson.dumps(newer.schema), + "schema": json.dumps(newer.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) @@ -242,7 +242,7 @@ async def test_same_jsonschema_must_have_same_id( first_res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", json={ - "schema": ujson.dumps(schema.schema), + "schema": json.dumps(schema.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) @@ -253,7 +253,7 @@ async def test_same_jsonschema_must_have_same_id( second_res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", json={ - "schema": ujson.dumps(schema.schema), + "schema": json.dumps(schema.schema), "schemaType": SchemaType.JSONSCHEMA.value, }, ) diff --git a/tests/integration/test_karapace.py b/tests/integration/test_karapace.py index cbe17d44f..fd6ef56eb 100644 --- a/tests/integration/test_karapace.py +++ b/tests/integration/test_karapace.py @@ -5,8 +5,8 @@ from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process +import json import socket -import ujson def test_regression_server_must_exit_on_exception( @@ -33,7 +33,7 @@ def test_regression_server_must_exit_on_exception( logfile = stack.enter_context((tmp_path / "karapace.log").open("w")) errfile = stack.enter_context((tmp_path / "karapace.err").open("w")) - config_path.write_text(ujson.dumps(config)) + config_path.write_text(json.dumps(config)) sock.bind(("127.0.0.1", port)) process = Popen( args=["python", "-m", "karapace.karapace_all", str(config_path)], diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index eb8741cc2..8f110a3ad 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -12,10 +12,10 @@ from tests.utils import new_random_name import asyncio +import json import pytest import requests import time -import ujson def init_admin(config): @@ -156,7 +156,7 @@ async def test_schema_request_forwarding(registry_async_pair): # New schema updates, last compatibility is None for s in [schema, other_schema]: - resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": ujson.dumps(s)}) + resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}) assert resp.ok data = resp.json() assert "id" in data, data diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index 5178d3106..5992fbd35 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -10,9 +10,9 @@ import base64 import copy +import json import pytest import random -import ujson @pytest.mark.parametrize("trail", ["", "/"]) @@ -238,10 +238,10 @@ async def test_offsets(rest_async_client, admin_client, trail): async def test_consume(rest_async_client, admin_client, producer, trail): # avro to be handled in a separate testcase ?? values = { - "json": [ujson.dumps({"foo": f"bar{i}"}).encode("utf-8") for i in range(3)], + "json": [json.dumps({"foo": f"bar{i}"}).encode("utf-8") for i in range(3)], "binary": [f"val{i}".encode("utf-8") for i in range(3)], } - deserializers = {"binary": base64.b64decode, "json": lambda x: ujson.dumps(x).encode("utf-8")} + deserializers = {"binary": base64.b64decode, "json": lambda x: json.dumps(x).encode("utf-8")} group_name = "consume_group" for fmt in ["binary", "json"]: header = copy.deepcopy(REST_HEADERS[fmt]) diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 9f816500f..283825859 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -19,11 +19,10 @@ ) from typing import List, Tuple -import json as jsonlib +import json import os import pytest import requests -import ujson baseurl = "http://localhost:8081" @@ -51,15 +50,13 @@ async def test_union_to_union(registry_async_client: Client, trail: str) -> None } ], } - res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", json={"schema": ujson.dumps(init_schema)} - ) + res = await registry_async_client.post(f"subjects/{subject_1}/versions{trail}", json={"schema": json.dumps(init_schema)}) assert res.status_code == 200 assert "id" in res.json() - res = await registry_async_client.post(f"subjects/{subject_1}/versions{trail}", json={"schema": ujson.dumps(evolved)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions{trail}", json={"schema": json.dumps(evolved)}) assert res.status_code == 409 res = await registry_async_client.post( - f"subjects/{subject_1}/versions{trail}", json={"schema": ujson.dumps(evolved_compatible)} + f"subjects/{subject_1}/versions{trail}", json={"schema": json.dumps(evolved_compatible)} ) assert res.status_code == 200 # fw compat check @@ -67,15 +64,13 @@ async def test_union_to_union(registry_async_client: Client, trail: str) -> None res = await registry_async_client.put(f"config/{subject_2}{trail}", json={"compatibility": "FORWARD"}) assert res.status_code == 200 res = await registry_async_client.post( - f"subjects/{subject_2}/versions{trail}", json={"schema": ujson.dumps(evolved_compatible)} + f"subjects/{subject_2}/versions{trail}", json={"schema": json.dumps(evolved_compatible)} ) assert res.status_code == 200 assert "id" in res.json() - res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": ujson.dumps(evolved)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": json.dumps(evolved)}) assert res.status_code == 409 - res = await registry_async_client.post( - f"subjects/{subject_2}/versions{trail}", json={"schema": ujson.dumps(init_schema)} - ) + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": json.dumps(init_schema)}) assert res.status_code == 200 @@ -84,7 +79,7 @@ async def test_missing_subject_compatibility(registry_async_client: Client, trai subject = create_subject_name_factory(f"test_missing_subject_compatibility-{trail}")() res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps({"type": "string"})} + f"subjects/{subject}/versions{trail}", json={"schema": json.dumps({"type": "string"})} ) assert res.status_code == 200, f"{res} {subject}" res = await registry_async_client.get(f"config/{subject}{trail}") @@ -121,7 +116,7 @@ async def test_record_union_schema_compatibility(registry_async_client: Client, ], } res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(original_schema)} + f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(original_schema)} ) assert res.status_code == 200 assert "id" in res.json() @@ -152,11 +147,11 @@ async def test_record_union_schema_compatibility(registry_async_client: Client, } res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(evolved_schema)}, + json={"schema": json.dumps(evolved_schema)}, ) assert res.status_code == 200 res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(evolved_schema)} + f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(evolved_schema)} ) assert res.status_code == 200 assert "id" in res.json() @@ -164,11 +159,11 @@ async def test_record_union_schema_compatibility(registry_async_client: Client, # Check that we can delete the field as well res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(original_schema)}, + json={"schema": json.dumps(original_schema)}, ) assert res.status_code == 200 res = await registry_async_client.post( - f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(original_schema)} + f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(original_schema)} ) assert res.status_code == 200 assert "id" in res.json() @@ -205,7 +200,7 @@ async def test_record_nested_schema_compatibility(registry_async_client: Client, } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -214,7 +209,7 @@ async def test_record_nested_schema_compatibility(registry_async_client: Client, schema["fields"][1]["type"]["fields"][0]["type"] = "int" res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 409 @@ -243,7 +238,7 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 @@ -254,7 +249,7 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) schema["fields"] = [{"type": "long", "name": "age"}] res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert res.json() == {"is_compatible": True} @@ -263,7 +258,7 @@ async def test_compatibility_endpoint(registry_async_client: Client, trail: str) schema["fields"] = [{"type": "string", "name": "age"}] res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert res.json() == {"is_compatible": False} @@ -291,14 +286,14 @@ async def test_regression_compatibility_should_not_give_internal_server_error_on res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 # replace int with long res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(schema), "schemaType": "AVROO"}, + json={"schema": json.dumps(schema), "schemaType": "AVROO"}, ) assert res.status_code == HTTPStatus.UNPROCESSABLE_ENTITY assert res.json()["error_code"] == HTTPStatus.UNPROCESSABLE_ENTITY @@ -326,7 +321,7 @@ async def test_regression_invalid_schema_type_should_not_give_internal_server_er res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema), "schemaType": "AVROO"}, + json={"schema": json.dumps(schema), "schemaType": "AVROO"}, ) assert res.status_code == HTTPStatus.UNPROCESSABLE_ENTITY assert res.json()["error_code"] == HTTPStatus.UNPROCESSABLE_ENTITY @@ -400,14 +395,14 @@ def _test_cases(): } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 schema["fields"][0]["type"] = target_type res = await registry_async_client.post( f"compatibility/subjects/{subject}/versions/latest{trail}", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert res.json() == {"is_compatible": expected} @@ -431,7 +426,7 @@ async def test_record_schema_compatibility_forward(registry_async_client: Client } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema_1)}, + json={"schema": json.dumps(schema_1)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -451,7 +446,7 @@ async def test_record_schema_compatibility_forward(registry_async_client: Client } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema_2)}, + json={"schema": json.dumps(schema_2)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -469,7 +464,7 @@ async def test_record_schema_compatibility_forward(registry_async_client: Client } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema_3a)}, + json={"schema": json.dumps(schema_3a)}, ) # Fails because field removed assert res.status_code == 409 @@ -487,7 +482,7 @@ async def test_record_schema_compatibility_forward(registry_async_client: Client } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema_3b)}, + json={"schema": json.dumps(schema_3b)}, ) # Fails because incompatible type change assert res.status_code == 409 @@ -506,7 +501,7 @@ async def test_record_schema_compatibility_forward(registry_async_client: Client } res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", - json={"schema": ujson.dumps(schema_4)}, + json={"schema": json.dumps(schema_4)}, ) assert res.status_code == 200 @@ -529,7 +524,7 @@ async def test_record_schema_compatibility_backward(registry_async_client: Clien } res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": ujson.dumps(schema_1)}, + json={"schema": json.dumps(schema_1)}, ) assert res.status_code == 200 @@ -550,7 +545,7 @@ async def test_record_schema_compatibility_backward(registry_async_client: Clien } res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": ujson.dumps(schema_2)}, + json={"schema": json.dumps(schema_2)}, ) assert res.status_code == 409 @@ -558,7 +553,7 @@ async def test_record_schema_compatibility_backward(registry_async_client: Clien schema_2["fields"][3] = {"name": "fourth_name", "type": "string", "default": "foof"} res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": ujson.dumps(schema_2)}, + json={"schema": json.dumps(schema_2)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -567,7 +562,7 @@ async def test_record_schema_compatibility_backward(registry_async_client: Clien schema_2["fields"][3] = {"name": "fourth_name", "type": "int", "default": 2} res = await registry_async_client.post( f"subjects/{subject_1}/versions{trail}", - json={"schema": ujson.dumps(schema_2)}, + json={"schema": json.dumps(schema_2)}, ) assert res.status_code == 409 @@ -575,10 +570,10 @@ async def test_record_schema_compatibility_backward(registry_async_client: Clien res = await registry_async_client.put(f"config/{subject_2}{trail}", json={"compatibility": "BACKWARD"}) assert res.status_code == 200 schema_1 = {"type": "record", "name": schema_name, "fields": [{"name": "first_name", "type": "string"}]} - res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": ujson.dumps(schema_1)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": json.dumps(schema_1)}) assert res.status_code == 200 schema_1["fields"].append({"name": "last_name", "type": "string"}) - res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": ujson.dumps(schema_1)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions{trail}", json={"schema": json.dumps(schema_1)}) assert res.status_code == 409 @@ -591,12 +586,12 @@ async def test_enum_schema_field_add_compatibility(registry_async_client: Client res = await registry_async_client.put(f"config/{subject}{trail}", json={"compatibility": compatibility}) assert res.status_code == 200 schema = {"type": "enum", "name": "Suit", "symbols": ["SPADES", "HEARTS", "DIAMONDS"]} - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Add a field schema["symbols"].append("CLUBS") - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == status_code @@ -609,12 +604,12 @@ async def test_array_schema_field_add_compatibility(registry_async_client: Clien res = await registry_async_client.put(f"config/{subject}{trail}", json={"compatibility": compatibility}) assert res.status_code == 200 schema = {"type": "array", "items": "int"} - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Modify the items type schema["items"] = "long" - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == status_code @@ -630,12 +625,12 @@ async def test_array_nested_record_compatibility(registry_async_client: Client, "type": "array", "items": {"type": "record", "name": "object", "fields": [{"name": "first_name", "type": "string"}]}, } - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Add a second field to the record schema["items"]["fields"].append({"name": "last_name", "type": "string"}) - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == status_code @@ -652,12 +647,12 @@ async def test_record_nested_array_compatibility(registry_async_client: Client, "name": "object", "fields": [{"name": "simplearray", "type": {"type": "array", "items": "int"}}], } - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Modify the array items type schema["fields"][0]["type"]["items"] = "long" - res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions{trail}", json={"schema": json.dumps(schema)}) assert res.status_code == status_code @@ -671,12 +666,12 @@ async def test_map_schema_field_add_compatibility( res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility}) assert res.status_code == 200 schema = {"type": "map", "values": "int"} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Modify the items type schema["values"] = "long" - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code @@ -687,21 +682,21 @@ async def test_enum_schema(registry_async_client: Client) -> None: res = await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility}) assert res.status_code == 200 schema = {"type": "enum", "name": "testenum", "symbols": ["first"]} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) # Add a symbol. schema["symbols"].append("second") - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Remove a symbol schema["symbols"].pop(1) - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Change the name schema["name"] = "another" - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 409 # Inside record @@ -711,21 +706,21 @@ async def test_enum_schema(registry_async_client: Client) -> None: "name": "object", "fields": [{"name": "enumkey", "type": {"type": "enum", "name": "testenum", "symbols": ["first"]}}], } - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) # Add a symbol. schema["fields"][0]["type"]["symbols"].append("second") - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Remove a symbol schema["fields"][0]["type"]["symbols"].pop(1) - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 # Change the name schema["fields"][0]["type"]["name"] = "another" - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 409 @@ -738,22 +733,22 @@ async def test_fixed_schema(registry_async_client: Client, compatibility: str) - res = await registry_async_client.put(f"config/{subject_1}", json={"compatibility": compatibility}) assert res.status_code == 200 schema = {"type": "fixed", "size": 16, "name": "md5", "aliases": ["testalias"]} - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema)}) # Add new alias schema["aliases"].append("anotheralias") - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_allowed # Try to change size schema["size"] = 32 - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_denied # Try to change name schema["size"] = 16 schema["name"] = "denied" - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_denied # In a record @@ -763,22 +758,22 @@ async def test_fixed_schema(registry_async_client: Client, compatibility: str) - "name": "object", "fields": [{"name": "fixedkey", "type": {"type": "fixed", "size": 16, "name": "md5", "aliases": ["testalias"]}}], } - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema)}) # Add new alias schema["fields"][0]["type"]["aliases"].append("anotheralias") - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_allowed # Try to change size schema["fields"][0]["type"]["size"] = 32 - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_denied # Try to change name schema["fields"][0]["type"]["size"] = 16 schema["fields"][0]["type"]["name"] = "denied" - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code_denied @@ -792,10 +787,10 @@ async def test_primitive_schema(registry_async_client: Client) -> None: # Transition from string to bytes schema = {"type": "string"} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 schema["type"] = "bytes" - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == status_code expected_results = [("BACKWARD", 409), ("FORWARD", 409), ("FULL", 409)] @@ -806,10 +801,10 @@ async def test_primitive_schema(registry_async_client: Client) -> None: # Transition from string to int schema = {"type": "string"} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 schema["type"] = "int" - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) async def test_union_comparing_to_other_types(registry_async_client: Client) -> None: @@ -822,10 +817,10 @@ async def test_union_comparing_to_other_types(registry_async_client: Client) -> # Union vs non-union with the same schema schema = [{"type": "array", "name": "listofstrings", "items": "string"}, "string"] - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 plain_schema = {"type": "string"} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(plain_schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(plain_schema)}) assert res.status_code == status_code expected_results = [("BACKWARD", 200), ("FORWARD", 409), ("FULL", 409)] @@ -836,10 +831,10 @@ async def test_union_comparing_to_other_types(registry_async_client: Client) -> # Non-union first schema = {"type": "array", "name": "listofstrings", "items": "string"} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 union_schema = [{"type": "array", "name": "listofstrings", "items": "string"}, "string"] - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(union_schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(union_schema)}) assert res.status_code == status_code expected_results = [("BACKWARD", 409), ("FORWARD", 409), ("FULL", 409)] @@ -850,11 +845,11 @@ async def test_union_comparing_to_other_types(registry_async_client: Client) -> # Union to a completely different schema schema = [{"type": "array", "name": "listofstrings", "items": "string"}, "string"] - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 plain_wrong_schema = {"type": "int"} res = await registry_async_client.post( - f"subjects/{subject}/versions", json={"schema": ujson.dumps(plain_wrong_schema)} + f"subjects/{subject}/versions", json={"schema": json.dumps(plain_wrong_schema)} ) assert res.status_code == status_code @@ -873,7 +868,7 @@ async def test_transitive_compatibility(registry_async_client: Client) -> None: } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema0)}, + json={"schema": json.dumps(schema0)}, ) assert res.status_code == 200 @@ -891,7 +886,7 @@ async def test_transitive_compatibility(registry_async_client: Client) -> None: } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema1)}, + json={"schema": json.dumps(schema1)}, ) assert res.status_code == 200 @@ -913,7 +908,7 @@ async def test_transitive_compatibility(registry_async_client: Client) -> None: } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema2)}, + json={"schema": json.dumps(schema2)}, ) assert res.status_code == 409 res_json = res.json() @@ -984,7 +979,7 @@ async def test_schema_versions_multiple_subjects_same_schema(registry_async_clie }, ], } - schema_str_1 = ujson.dumps(schema_1) + schema_str_1 = json.dumps(schema_1) schema_2 = { "type": "record", "name": schema_name_factory(), @@ -995,7 +990,7 @@ async def test_schema_versions_multiple_subjects_same_schema(registry_async_clie } ], } - schema_str_2 = ujson.dumps(schema_2) + schema_str_2 = json.dumps(schema_2) subject_1 = subject_name_factory() schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject_1, schema_str_1) @@ -1036,7 +1031,7 @@ async def test_schema_versions_deleting(registry_async_client: Client, trail: st "name": schema_name, "fields": [{"name": "field_1", "type": "string"}, {"name": "field_2", "type": "string"}], } - schema_str_1 = ujson.dumps(schema_1) + schema_str_1 = json.dumps(schema_1) schema_2 = { "type": "record", "name": schema_name, @@ -1044,7 +1039,7 @@ async def test_schema_versions_deleting(registry_async_client: Client, trail: st {"name": "field_1", "type": "string"}, ], } - schema_str_2 = ujson.dumps(schema_2) + schema_str_2 = json.dumps(schema_2) schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject, schema_str_1) schema_1_versions = [(subject, version_1)] @@ -1081,11 +1076,11 @@ async def test_schema_types(registry_async_client: Client, trail: str) -> None: """ res = await registry_async_client.get(f"/schemas/types{trail}") assert res.status_code == 200 - json = res.json() - assert len(json) == 3 - assert "AVRO" in json - assert "JSON" in json - assert "PROTOBUF" in json + json_res = res.json() + assert len(json_res) == 3 + assert "AVRO" in json_res + assert "JSON" in json_res + assert "PROTOBUF" in json_res @pytest.mark.parametrize("trail", ["", "/"]) @@ -1097,7 +1092,7 @@ async def test_schema_repost(registry_async_client: Client, trail: str) -> None: unique_field_factory = create_field_name_factory(trail) unique = unique_field_factory() - schema_str = ujson.dumps({"type": "string", "unique": unique}) + schema_str = json.dumps({"type": "string", "unique": unique}) res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", json={"schema": schema_str}, @@ -1108,7 +1103,7 @@ async def test_schema_repost(registry_async_client: Client, trail: str) -> None: res = await registry_async_client.get(f"schemas/ids/{schema_id}{trail}") assert res.status_code == 200 - assert ujson.loads(res.json()["schema"]) == ujson.loads(schema_str) + assert json.loads(res.json()["schema"]) == json.loads(schema_str) res = await registry_async_client.post( f"subjects/{subject}/versions{trail}", @@ -1188,7 +1183,7 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non """ subject_name_factory = create_subject_name_factory("test_schema_subject_post_invalid") - schema_str = ujson.dumps({"type": "string"}) + schema_str = json.dumps({"type": "string"}) # Create the subject subject_1 = subject_name_factory() @@ -1200,7 +1195,7 @@ async def test_schema_subject_post_invalid(registry_async_client: Client) -> Non res = await registry_async_client.post( f"subjects/{subject_1}", - json={"schema": ujson.dumps({"type": "invalid_type"})}, + json={"schema": json.dumps({"type": "invalid_type"})}, ) assert res.status_code == 500, "Invalid schema for existing subject should return 500" assert res.json()["message"] == f"Error while looking up schema under subject {subject_1}" @@ -1249,7 +1244,7 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No unique_1 = unique_field_factory() res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps({"type": "string", "foo": "string", unique_1: "string"})}, + json={"schema": json.dumps({"type": "string", "foo": "string", unique_1: "string"})}, ) assert res.status_code == 200 schema_id_1 = res.json()["id"] @@ -1257,7 +1252,7 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No unique_2 = unique_field_factory() res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps({"type": "string", "foo": "string", unique_2: "string"})}, + json={"schema": json.dumps({"type": "string", "foo": "string", unique_2: "string"})}, ) schema_id_2 = res.json()["id"] assert res.status_code == 200 @@ -1267,13 +1262,13 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No await assert_schema_versions(registry_async_client, trail, schema_id_2, [(subject, 2)]) result = await registry_async_client.get(os.path.join(f"schemas/ids/{schema_id_1}")) - schema_json_1 = ujson.loads(result.json()["schema"]) + schema_json_1 = json.loads(result.json()["schema"]) assert schema_json_1["type"] == "string" assert schema_json_1["foo"] == "string" assert schema_json_1[unique_1] == "string" result = await registry_async_client.get(os.path.join(f"schemas/ids/{schema_id_2}")) - schema_json_2 = ujson.loads(result.json()["schema"]) + schema_json_2 = json.loads(result.json()["schema"]) assert schema_json_2["type"] == "string" assert schema_json_2["foo"] == "string" assert schema_json_2[unique_2] == "string" @@ -1289,7 +1284,7 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No res = await registry_async_client.get(f"subjects/{subject}/versions/1") assert res.status_code == 200 assert res.json()["subject"] == subject - assert ujson.loads(res.json()["schema"]) == schema_json_1 + assert json.loads(res.json()["schema"]) == schema_json_1 # Delete an actual version res = await registry_async_client.delete(f"subjects/{subject}/versions/1") @@ -1299,7 +1294,7 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No # Get the schema by id, still there, wasn't hard-deleted res = await registry_async_client.get(f"schemas/ids/{schema_id_1}{trail}") assert res.status_code == 200 - assert ujson.loads(res.json()["schema"]) == schema_json_1 + assert json.loads(res.json()["schema"]) == schema_json_1 # Get the schema by id res = await registry_async_client.get(f"schemas/ids/{schema_id_2}{trail}") @@ -1347,7 +1342,7 @@ async def test_schema_lifecycle(registry_async_client: Client, trail: str) -> No unique_3 = unique_field_factory() res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps({"type": "string", "foo": "string", unique_3: "string"})}, + json={"schema": json.dumps({"type": "string", "foo": "string", unique_3: "string"})}, ) assert res.status_code == 200 res = await registry_async_client.get(f"subjects/{subject}/versions") @@ -1374,7 +1369,7 @@ async def test_schema_version_numbering(registry_async_client: Client, trail: st } ], } - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 assert "id" in res.json() @@ -1395,7 +1390,7 @@ async def test_schema_version_numbering(registry_async_client: Client, trail: st }, ], } - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema2)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema2)}) assert res.status_code == 200 assert "id" in res.json() res = await registry_async_client.get(f"subjects/{subject}/versions") @@ -1405,7 +1400,7 @@ async def test_schema_version_numbering(registry_async_client: Client, trail: st # Recreate subject res = await registry_async_client.delete(f"subjects/{subject}") assert res.status_code == 200 - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 res = await registry_async_client.get(f"subjects/{subject}/versions") assert res.status_code == 200 @@ -1433,14 +1428,14 @@ async def test_schema_version_numbering_complex(registry_async_client: Client, t } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) schema_id = res.json()["id"] res = await registry_async_client.get(f"subjects/{subject}/versions/1") assert res.status_code == 200 assert res.json()["subject"] == subject - assert sorted(ujson.loads(res.json()["schema"])) == sorted(schema) + assert sorted(json.loads(res.json()["schema"])) == sorted(schema) await assert_schema_versions(registry_async_client, trail, schema_id, [(subject, 1)]) @@ -1471,14 +1466,14 @@ async def test_schema_three_subjects_sharing_schema(registry_async_client: Clien }, ], } - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 assert "id" in res.json() schema_id_1 = res.json()["id"] # New subject with the same schema subject_2 = subject_name_factory() - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 200 assert "id" in res.json() schema_id_2 = res.json()["id"] @@ -1496,7 +1491,7 @@ async def test_schema_three_subjects_sharing_schema(registry_async_client: Clien assert res.status_code == 200 res = await registry_async_client.post( f"subjects/{subject_3}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert res.json()["id"] == schema_id_1 # Same ID as in the previous test step @@ -1523,7 +1518,7 @@ async def test_schema_subject_version_schema(registry_async_client: Client, trai } ], } - schema_str = ujson.dumps(schema) + schema_str = json.dumps(schema) res = await registry_async_client.post( f"subjects/{subject_1}/versions", @@ -1532,7 +1527,7 @@ async def test_schema_subject_version_schema(registry_async_client: Client, trai assert res.status_code == 200 res = await registry_async_client.get(f"subjects/{subject_1}/versions/1/schema") assert res.status_code == 200 - assert res.json() == ujson.loads(schema_str) + assert res.json() == json.loads(schema_str) subject_2 = subject_name_factory() res = await registry_async_client.get(f"subjects/{subject_2}/versions/1/schema") # Invalid subject @@ -1547,7 +1542,7 @@ async def test_schema_subject_version_schema(registry_async_client: Client, trai res = await registry_async_client.get(f"subjects/{subject_1}/versions/latest/schema") assert res.status_code == 200 - assert res.json() == ujson.loads(schema_str) + assert res.json() == json.loads(schema_str) @pytest.mark.parametrize("trail", ["", "/"]) @@ -1558,7 +1553,7 @@ async def test_schema_same_subject(registry_async_client: Client, trail: str) -> subject_name_factory = create_subject_name_factory(f"test_schema_same_subject_{trail}") schema_name = create_schema_name_factory(f"test_schema_same_subject_{trail}")() - schema_str = ujson.dumps( + schema_str = json.dumps( { "type": "record", "name": schema_name, @@ -1584,9 +1579,9 @@ async def test_schema_same_subject(registry_async_client: Client, trail: str) -> assert res.status_code == 200 # Switch the str schema to a dict for comparison - json = res.json() - json["schema"] = ujson.loads(json["schema"]) - assert json == {"id": schema_id, "subject": subject, "schema": ujson.loads(schema_str), "version": 1} + json_res = res.json() + json_res["schema"] = json.loads(json_res["schema"]) + assert json_res == {"id": schema_id, "subject": subject, "schema": json.loads(schema_str), "version": 1} async def test_schema_same_subject_unnamed(registry_async_client: Client) -> None: @@ -1596,7 +1591,7 @@ async def test_schema_same_subject_unnamed(registry_async_client: Client) -> Non subject_name_factory = create_subject_name_factory("test_schema_same_subject_unnamed") schema_name = create_schema_name_factory("test_schema_same_subject_unnamed")() - schema_str = ujson.dumps( + schema_str = json.dumps( { "type": "int", "name": schema_name, @@ -1610,7 +1605,7 @@ async def test_schema_same_subject_unnamed(registry_async_client: Client) -> Non assert res.status_code == 200 schema_id = res.json()["id"] - unnamed_schema_str = ujson.dumps({"type": "int"}) + unnamed_schema_str = json.dumps({"type": "int"}) res = await registry_async_client.post( f"subjects/{subject}", @@ -1619,9 +1614,9 @@ async def test_schema_same_subject_unnamed(registry_async_client: Client) -> Non assert res.status_code == 200 # Switch the str schema to a dict for comparison - json = res.json() - json["schema"] = ujson.loads(json["schema"]) - assert json == {"id": schema_id, "subject": subject, "schema": ujson.loads(schema_str), "version": 1} + json_res = res.json() + json_res["schema"] = json.loads(json_res["schema"]) + assert json_res == {"id": schema_id, "subject": subject, "schema": json.loads(schema_str), "version": 1} @pytest.mark.parametrize("trail", ["", "/"]) @@ -1679,11 +1674,11 @@ async def test_schema_version_number_existing_schema(registry_async_client: Clie }, ], } - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema_1)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema_1)}) assert res.status_code == 200 schema_id_1 = res.json()["id"] - res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": ujson.dumps(schema_2)}) + res = await registry_async_client.post(f"subjects/{subject_1}/versions", json={"schema": json.dumps(schema_2)}) assert res.status_code == 200 schema_id_2 = res.json()["id"] assert schema_id_2 > schema_id_1 @@ -1693,12 +1688,12 @@ async def test_schema_version_number_existing_schema(registry_async_client: Clie res = await registry_async_client.put( f"config/{subject_2}", json={"compatibility": "NONE"} ) # We don't care about compatibility - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema_1)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema_1)}) assert res.status_code == 200 assert res.json()["id"] == schema_id_1 # Create a new schema - res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": ujson.dumps(schema_3)}) + res = await registry_async_client.post(f"subjects/{subject_2}/versions", json={"schema": json.dumps(schema_3)}) assert res.status_code == 200 schema_id_3 = res.json()["id"] assert res.json()["id"] == schema_id_3 @@ -1961,7 +1956,7 @@ async def test_common_endpoints(registry_async_client: Client) -> None: async def test_invalid_namespace(registry_async_client: Client) -> None: subject = create_subject_name_factory("test_invalid_namespace")() schema = {"type": "record", "name": "foo", "namespace": "foo-bar-baz", "fields": []} - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.status_code == 422, res.json() json_res = res.json() assert json_res["error_code"] == 44201, json_res @@ -1984,12 +1979,12 @@ async def test_schema_remains_constant(registry_async_client: Client) -> None: "namespace": "foo_bar_baz", "fields": [{"type": "string", "name": "bla"}], } - schema_str = ujson.dumps(schema) + schema_str = json.dumps(schema) res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": schema_str}) assert res.ok, res.json() schema_id = res.json()["id"] res = await registry_async_client.get(f"schemas/ids/{schema_id}") - assert ujson.loads(res.json()["schema"]) == ujson.loads(schema_str) + assert json.loads(res.json()["schema"]) == json.loads(schema_str) async def test_malformed_kafka_message( @@ -2002,11 +1997,11 @@ async def test_malformed_kafka_message( import random schema_id = random.randint(20000, 30000) - payload = {"schema": jsonlib.dumps({"foo": "bar"}, indent=None, separators=(",", ":"))} + payload = {"schema": json.dumps({"foo": "bar"})} message_value = {"deleted": False, "id": schema_id, "subject": "foo", "version": 1} message_value.update(payload) producer.send( - registry_cluster.schemas_topic, key=ujson.dumps(message_key).encode(), value=ujson.dumps(message_value).encode() + registry_cluster.schemas_topic, key=json.dumps(message_key).encode(), value=json.dumps(message_value).encode() ).get() path = f"schemas/ids/{schema_id}" @@ -2052,10 +2047,10 @@ async def test_inner_type_compat_failure(registry_async_client: Client) -> None: } ], } - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(sc)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(sc)}) assert res.ok sc_id = res.json()["id"] - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(ev)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(ev)}) assert res.ok assert sc_id != res.json()["id"] @@ -2109,10 +2104,10 @@ async def test_anon_type_union_failure(registry_async_client: Client) -> None: ], } - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(schema)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(schema)}) assert res.ok sc_id = res.json()["id"] - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(evolved)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(evolved)}) assert res.ok assert sc_id != res.json()["id"] @@ -2162,9 +2157,9 @@ async def test_full_transitive_failure(registry_async_client: Client, compatibil ], } await registry_async_client.put(f"config/{subject}", json={"compatibility": compatibility}) - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(init)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(init)}) assert res.ok - res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": ujson.dumps(evolved)}) + res = await registry_async_client.post(f"subjects/{subject}/versions", json={"schema": json.dumps(evolved)}) assert not res.ok assert res.status_code == 409 @@ -2180,7 +2175,7 @@ async def test_invalid_schemas(registry_async_client: Client) -> None: res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(repated_field)}, + json={"schema": json.dumps(repated_field)}, ) assert res.status_code != 500, "an invalid schema should not cause a server crash" assert not is_success(HTTPStatus(res.status_code)), "an invalid schema must not be a success" @@ -2206,7 +2201,7 @@ async def test_schema_hard_delete_version(registry_async_client: Client) -> None } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schemav1)}, + json={"schema": json.dumps(schemav1)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2228,7 +2223,7 @@ async def test_schema_hard_delete_version(registry_async_client: Client) -> None } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schemav2)}, + json={"schema": json.dumps(schemav2)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2290,7 +2285,7 @@ async def test_schema_hard_delete_whole_schema(registry_async_client: Client) -> } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schemav1)}, + json={"schema": json.dumps(schemav1)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2312,7 +2307,7 @@ async def test_schema_hard_delete_whole_schema(registry_async_client: Client) -> } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schemav2)}, + json={"schema": json.dumps(schemav2)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2368,7 +2363,7 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> } res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2381,7 +2376,7 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> # Recreate with same subject after soft delete res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2402,7 +2397,7 @@ async def test_schema_hard_delete_and_recreate(registry_async_client: Client) -> # Recreate with same subject after hard delete res = await registry_async_client.post( f"subjects/{subject}/versions", - json={"schema": ujson.dumps(schema)}, + json={"schema": json.dumps(schema)}, ) assert res.status_code == 200 assert "id" in res.json() @@ -2414,15 +2409,15 @@ async def test_invalid_schema_should_provide_good_error_messages(registry_async_ subject_name_factory = create_subject_name_factory("test_schema_subject_post_invalid_data") test_subject = subject_name_factory() - schema_str = ujson.dumps({"type": "string"}) + schema_str = json.dumps({"type": "string"}) res = await registry_async_client.post( f"subjects/{test_subject}/versions", json={"schema": schema_str[:-1]}, ) - assert res.json()["message"] == "Invalid AVRO schema. Error: Expecting ',' delimiter: line 1 column 17 (char 16)" + assert res.json()["message"] == "Invalid AVRO schema. Error: Expecting ',' delimiter: line 1 column 18 (char 17)" # Unfortunately the AVRO library doesn't provide a good error message, it just raises an TypeError - schema_str = ujson.dumps({"type": "enum", "name": "error"}) + schema_str = json.dumps({"type": "enum", "name": "error"}) res = await registry_async_client.post( f"subjects/{test_subject}/versions", json={"schema": schema_str}, @@ -2433,7 +2428,7 @@ async def test_invalid_schema_should_provide_good_error_messages(registry_async_ ) # This is an upstream bug in the python AVRO library, until the bug is fixed we should at least have a nice error message - schema_str = ujson.dumps({"type": "enum", "name": "error", "symbols": {}}) + schema_str = json.dumps({"type": "enum", "name": "error", "symbols": {}}) res = await registry_async_client.post( f"subjects/{test_subject}/versions", json={"schema": schema_str}, @@ -2444,7 +2439,7 @@ async def test_invalid_schema_should_provide_good_error_messages(registry_async_ ) # This is an upstream bug in the python AVRO library, until the bug is fixed we should at least have a nice error message - schema_str = ujson.dumps({"type": "enum", "name": "error", "symbols": ["A", "B"]}) + schema_str = json.dumps({"type": "enum", "name": "error", "symbols": ["A", "B"]}) res = await registry_async_client.post( f"subjects/{test_subject}/versions", json={"schema": schema_str}, diff --git a/tests/integration/test_schema_backup.py b/tests/integration/test_schema_backup.py index 414b6c5dd..0bcf6f050 100644 --- a/tests/integration/test_schema_backup.py +++ b/tests/integration/test_schema_backup.py @@ -13,9 +13,9 @@ from tests.integration.utils.kafka_server import KafkaServers from tests.utils import new_random_name +import json import os import time -import ujson baseurl = "http://localhost:8081" @@ -54,7 +54,7 @@ async def test_backup_restore( restore_location = tmp_path / "restore.log" with restore_location.open("w") as fp: - ujson.dump( + json.dump( [ [ { diff --git a/tests/integration/test_schema_backup_avro_export.py b/tests/integration/test_schema_backup_avro_export.py index 5eb24e55b..b2ecad462 100644 --- a/tests/integration/test_schema_backup_avro_export.py +++ b/tests/integration/test_schema_backup_avro_export.py @@ -12,8 +12,8 @@ from tests.integration.utils.kafka_server import KafkaServers from typing import Any, Dict +import json import os -import ujson baseurl = "http://localhost:8081" @@ -54,7 +54,7 @@ async def insert_data(c: Client, schemaType: str, subject: str, data: Dict[str, Any]) -> None: - schema_string = ujson.dumps(data) + schema_string = json.dumps(data) res = await c.post( "subjects/{}/versions".format(subject), json={"schema": f"{schema_string}", "schemaType": schemaType}, @@ -89,7 +89,7 @@ async def test_export_anonymized_avro_schemas( expected_subject_hash_found = False json_schema_subject_hash_found = False with export_location.open("r") as fp: - exported_data = ujson.load(fp) + exported_data = json.load(fp) for msg in exported_data: assert len(msg) == 2 key = msg[0] diff --git a/tests/unit/test_avro_compatibility.py b/tests/unit/test_avro_compatibility.py index eda734509..c3bc5b8fd 100644 --- a/tests/unit/test_avro_compatibility.py +++ b/tests/unit/test_avro_compatibility.py @@ -7,8 +7,8 @@ from avro.schema import ArraySchema, Field, MapSchema, Schema, UnionSchema from karapace.schema_models import parse_avro_schema_definition +import json import pytest -import ujson # Schemas defined in AvroCompatibilityTest.java. Used here to ensure compatibility with the schema-registry schema1 = parse_avro_schema_definition('{"type":"record","name":"myrecord","fields":[{"type":"string","name":"f1"}]}') @@ -275,17 +275,17 @@ def test_basic_full_transitive_compatibility(): def test_simple_schema_promotion(): reader = parse_avro_schema_definition( - ujson.dumps({"name": "foo", "type": "record", "fields": [{"type": "int", "name": "f1"}]}) + json.dumps({"name": "foo", "type": "record", "fields": [{"type": "int", "name": "f1"}]}) ) field_alias_reader = parse_avro_schema_definition( - ujson.dumps({"name": "foo", "type": "record", "fields": [{"type": "int", "name": "bar", "aliases": ["f1"]}]}) + json.dumps({"name": "foo", "type": "record", "fields": [{"type": "int", "name": "bar", "aliases": ["f1"]}]}) ) record_alias_reader = parse_avro_schema_definition( - ujson.dumps({"name": "other", "type": "record", "fields": [{"type": "int", "name": "f1"}], "aliases": ["foo"]}) + json.dumps({"name": "other", "type": "record", "fields": [{"type": "int", "name": "f1"}], "aliases": ["foo"]}) ) writer = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "name": "foo", "type": "record", @@ -311,7 +311,7 @@ def test_simple_schema_promotion(): assert not schemas_are_compatible(res, SchemaCompatibilityResult(SchemaCompatibilityType.compatible)), res writer = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "CA", @@ -328,7 +328,7 @@ def test_simple_schema_promotion(): ) ) reader = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "CA", @@ -373,8 +373,8 @@ def test_union_to_simple_comparison(field): } ], } - reader = parse_avro_schema_definition(ujson.dumps(reader)) - writer = parse_avro_schema_definition(ujson.dumps(writer)) + reader = parse_avro_schema_definition(json.dumps(reader)) + writer = parse_avro_schema_definition(json.dumps(writer)) assert are_compatible(reader, writer) @@ -383,32 +383,32 @@ def test_union_to_simple_comparison(field): # There's one test per Java file, so expect the first one to be a mammoth # ================================================================================================ -BOOLEAN_SCHEMA = parse_avro_schema_definition(ujson.dumps("boolean")) -NULL_SCHEMA = parse_avro_schema_definition(ujson.dumps("null")) -INT_SCHEMA = parse_avro_schema_definition(ujson.dumps("int")) -LONG_SCHEMA = parse_avro_schema_definition(ujson.dumps("long")) -STRING_SCHEMA = parse_avro_schema_definition(ujson.dumps("string")) -BYTES_SCHEMA = parse_avro_schema_definition(ujson.dumps("bytes")) -FLOAT_SCHEMA = parse_avro_schema_definition(ujson.dumps("float")) -DOUBLE_SCHEMA = parse_avro_schema_definition(ujson.dumps("double")) +BOOLEAN_SCHEMA = parse_avro_schema_definition(json.dumps("boolean")) +NULL_SCHEMA = parse_avro_schema_definition(json.dumps("null")) +INT_SCHEMA = parse_avro_schema_definition(json.dumps("int")) +LONG_SCHEMA = parse_avro_schema_definition(json.dumps("long")) +STRING_SCHEMA = parse_avro_schema_definition(json.dumps("string")) +BYTES_SCHEMA = parse_avro_schema_definition(json.dumps("bytes")) +FLOAT_SCHEMA = parse_avro_schema_definition(json.dumps("float")) +DOUBLE_SCHEMA = parse_avro_schema_definition(json.dumps("double")) INT_ARRAY_SCHEMA = ArraySchema(INT_SCHEMA.to_json(), Names()) LONG_ARRAY_SCHEMA = ArraySchema(LONG_SCHEMA.to_json(), Names()) STRING_ARRAY_SCHEMA = ArraySchema(STRING_SCHEMA.to_json(), Names()) INT_MAP_SCHEMA = MapSchema(INT_SCHEMA.to_json(), Names()) LONG_MAP_SCHEMA = MapSchema(LONG_SCHEMA.to_json(), Names()) STRING_MAP_SCHEMA = MapSchema(STRING_SCHEMA.to_json(), Names()) -ENUM1_AB_SCHEMA = parse_avro_schema_definition(ujson.dumps({"type": "enum", "name": "Enum1", "symbols": ["A", "B"]})) -ENUM1_ABC_SCHEMA = parse_avro_schema_definition(ujson.dumps({"type": "enum", "name": "Enum1", "symbols": ["A", "B", "C"]})) -ENUM1_BC_SCHEMA = parse_avro_schema_definition(ujson.dumps({"type": "enum", "name": "Enum1", "symbols": ["B", "C"]})) -ENUM2_AB_SCHEMA = parse_avro_schema_definition(ujson.dumps({"type": "enum", "name": "Enum2", "symbols": ["A", "B"]})) +ENUM1_AB_SCHEMA = parse_avro_schema_definition(json.dumps({"type": "enum", "name": "Enum1", "symbols": ["A", "B"]})) +ENUM1_ABC_SCHEMA = parse_avro_schema_definition(json.dumps({"type": "enum", "name": "Enum1", "symbols": ["A", "B", "C"]})) +ENUM1_BC_SCHEMA = parse_avro_schema_definition(json.dumps({"type": "enum", "name": "Enum1", "symbols": ["B", "C"]})) +ENUM2_AB_SCHEMA = parse_avro_schema_definition(json.dumps({"type": "enum", "name": "Enum2", "symbols": ["A", "B"]})) ENUM_ABC_ENUM_DEFAULT_A_SCHEMA = parse_avro_schema_definition( - ujson.dumps({"type": "enum", "name": "Enum", "symbols": ["A", "B", "C"], "default": "A"}) + json.dumps({"type": "enum", "name": "Enum", "symbols": ["A", "B", "C"], "default": "A"}) ) ENUM_AB_ENUM_DEFAULT_A_SCHEMA = parse_avro_schema_definition( - ujson.dumps({"type": "enum", "name": "Enum", "symbols": ["A", "B"], "default": "A"}) + json.dumps({"type": "enum", "name": "Enum", "symbols": ["A", "B"], "default": "A"}) ) ENUM_ABC_ENUM_DEFAULT_A_RECORD = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -419,7 +419,7 @@ def test_union_to_simple_comparison(field): ) ) ENUM_AB_ENUM_DEFAULT_A_RECORD = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -428,7 +428,7 @@ def test_union_to_simple_comparison(field): ) ) ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -443,7 +443,7 @@ def test_union_to_simple_comparison(field): ) ) ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -474,24 +474,22 @@ def test_union_to_simple_comparison(field): ) NULL_INT_ARRAY_UNION_SCHEMA = UnionSchema([NULL_SCHEMA.to_json(), INT_ARRAY_SCHEMA.to_json()], Names()) NULL_INT_MAP_UNION_SCHEMA = UnionSchema([NULL_SCHEMA.to_json(), INT_MAP_SCHEMA.to_json()], Names()) -EMPTY_RECORD1 = parse_avro_schema_definition(ujson.dumps({"type": "record", "name": "Record1", "fields": []})) -EMPTY_RECORD2 = parse_avro_schema_definition(ujson.dumps({"type": "record", "name": "Record2", "fields": []})) +EMPTY_RECORD1 = parse_avro_schema_definition(json.dumps({"type": "record", "name": "Record1", "fields": []})) +EMPTY_RECORD2 = parse_avro_schema_definition(json.dumps({"type": "record", "name": "Record2", "fields": []})) A_INT_RECORD1 = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int"}]}) + json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int"}]}) ) A_LONG_RECORD1 = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "long"}]}) + json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "long"}]}) ) A_INT_B_INT_RECORD1 = parse_avro_schema_definition( - ujson.dumps( - {"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "int"}]} - ) + json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "int"}]}) ) A_DINT_RECORD1 = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int", "default": 0}]}) + json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "a", "type": "int", "default": 0}]}) ) A_INT_B_DINT_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -500,7 +498,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DINT_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -509,7 +507,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DFIXED_4_BYTES_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -521,7 +519,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DFIXED_8_BYTES_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -533,7 +531,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DINT_STRING_UNION_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -542,7 +540,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DINT_UNION_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -551,7 +549,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DENUM_1_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -563,7 +561,7 @@ def test_union_to_simple_comparison(field): ) ) A_DINT_B_DENUM_2_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -574,10 +572,10 @@ def test_union_to_simple_comparison(field): } ) ) -FIXED_4_BYTES = parse_avro_schema_definition(ujson.dumps({"type": "fixed", "name": "Fixed", "size": 4})) -FIXED_8_BYTES = parse_avro_schema_definition(ujson.dumps({"type": "fixed", "name": "Fixed", "size": 8})) +FIXED_4_BYTES = parse_avro_schema_definition(json.dumps({"type": "fixed", "name": "Fixed", "size": 4})) +FIXED_8_BYTES = parse_avro_schema_definition(json.dumps({"type": "fixed", "name": "Fixed", "size": 8})) NS_RECORD1 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -602,7 +600,7 @@ def test_union_to_simple_comparison(field): ) ) NS_RECORD2 = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record1", @@ -627,10 +625,10 @@ def test_union_to_simple_comparison(field): ) ) INT_LIST_RECORD = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "List", "fields": [{"name": "head", "type": "int"}]}) + json.dumps({"type": "record", "name": "List", "fields": [{"name": "head", "type": "int"}]}) ) LONG_LIST_RECORD = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "List", "fields": [{"name": "head", "type": "long"}]}) + json.dumps({"type": "record", "name": "List", "fields": [{"name": "head", "type": "long"}]}) ) int_reader_field = Field(name="tail", type_=INT_LIST_RECORD.to_json(), has_default=False) long_reader_field = Field(name="tail", type_=LONG_LIST_RECORD.to_json(), has_default=False) @@ -645,10 +643,10 @@ def test_union_to_simple_comparison(field): LONG_LIST_RECORD._props["fields"] = LONG_LIST_RECORD._fields # pylint: enable=protected-access RECORD1_WITH_INT = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": "int"}]}) + json.dumps({"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": "int"}]}) ) RECORD2_WITH_INT = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record2", "fields": [{"name": "field1", "type": "int"}]}) + json.dumps({"type": "record", "name": "Record2", "fields": [{"name": "field1", "type": "int"}]}) ) UNION_INT_RECORD1 = UnionSchema([INT_SCHEMA.to_json(), RECORD1_WITH_INT.to_json()], Names()) UNION_INT_RECORD2 = UnionSchema([INT_SCHEMA.to_json(), RECORD2_WITH_INT.to_json()], Names()) @@ -658,14 +656,14 @@ def test_union_to_simple_comparison(field): UNION_INT_ARRAY_INT = UnionSchema([INT_SCHEMA.to_json(), INT_ARRAY_SCHEMA.to_json()], Names()) UNION_INT_MAP_INT = UnionSchema([INT_SCHEMA.to_json(), INT_MAP_SCHEMA.to_json()], Names()) UNION_INT_NULL = UnionSchema([INT_SCHEMA.to_json(), NULL_SCHEMA.to_json()], Names()) -FIXED_4_ANOTHER_NAME = parse_avro_schema_definition(ujson.dumps({"type": "fixed", "name": "AnotherName", "size": 4})) +FIXED_4_ANOTHER_NAME = parse_avro_schema_definition(json.dumps({"type": "fixed", "name": "AnotherName", "size": 4})) RECORD1_WITH_ENUM_AB = parse_avro_schema_definition( - ujson.dumps( + json.dumps( {"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": dict(ENUM1_AB_SCHEMA.to_json())}]} ) ) RECORD1_WITH_ENUM_ABC = parse_avro_schema_definition( - ujson.dumps( + json.dumps( {"type": "record", "name": "Record1", "fields": [{"name": "field1", "type": dict(ENUM1_ABC_SCHEMA.to_json())}]} ) ) @@ -674,7 +672,7 @@ def test_union_to_simple_comparison(field): def test_schema_compatibility(): # testValidateSchemaPairMissingField writer = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -683,17 +681,17 @@ def test_schema_compatibility(): ) ) reader = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record", "fields": [{"name": "oldField1", "type": "int"}]}) + json.dumps({"type": "record", "name": "Record", "fields": [{"name": "oldField1", "type": "int"}]}) ) assert are_compatible(reader, writer) # testValidateSchemaPairMissingSecondField reader = parse_avro_schema_definition( - ujson.dumps({"type": "record", "name": "Record", "fields": [{"name": "oldField2", "type": "string"}]}) + json.dumps({"type": "record", "name": "Record", "fields": [{"name": "oldField2", "type": "string"}]}) ) assert are_compatible(reader, writer) # testValidateSchemaPairAllFields reader = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -704,7 +702,7 @@ def test_schema_compatibility(): assert are_compatible(reader, writer) # testValidateSchemaNewFieldWithDefault reader = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -715,7 +713,7 @@ def test_schema_compatibility(): assert are_compatible(reader, writer) # testValidateSchemaNewField reader = parse_avro_schema_definition( - ujson.dumps( + json.dumps( { "type": "record", "name": "Record", @@ -725,24 +723,24 @@ def test_schema_compatibility(): ) assert not are_compatible(reader, writer) # testValidateArrayWriterSchema - writer = parse_avro_schema_definition(ujson.dumps({"type": "array", "items": {"type": "string"}})) - reader = parse_avro_schema_definition(ujson.dumps({"type": "array", "items": {"type": "string"}})) + writer = parse_avro_schema_definition(json.dumps({"type": "array", "items": {"type": "string"}})) + reader = parse_avro_schema_definition(json.dumps({"type": "array", "items": {"type": "string"}})) assert are_compatible(reader, writer) - reader = parse_avro_schema_definition(ujson.dumps({"type": "map", "values": {"type": "string"}})) + reader = parse_avro_schema_definition(json.dumps({"type": "map", "values": {"type": "string"}})) assert not are_compatible(reader, writer) # testValidatePrimitiveWriterSchema - writer = parse_avro_schema_definition(ujson.dumps({"type": "string"})) - reader = parse_avro_schema_definition(ujson.dumps({"type": "string"})) + writer = parse_avro_schema_definition(json.dumps({"type": "string"})) + reader = parse_avro_schema_definition(json.dumps({"type": "string"})) assert are_compatible(reader, writer) - reader = parse_avro_schema_definition(ujson.dumps({"type": "int"})) + reader = parse_avro_schema_definition(json.dumps({"type": "int"})) assert not are_compatible(reader, writer) # testUnionReaderWriterSubsetIncompatibility # cannot have a union as a top level data type, so im cheating a bit here writer = parse_avro_schema_definition( - ujson.dumps({"name": "Record", "type": "record", "fields": [{"name": "f1", "type": ["int", "string", "long"]}]}) + json.dumps({"name": "Record", "type": "record", "fields": [{"name": "f1", "type": ["int", "string", "long"]}]}) ) reader = parse_avro_schema_definition( - ujson.dumps({"name": "Record", "type": "record", "fields": [{"name": "f1", "type": ["int", "string"]}]}) + json.dumps({"name": "Record", "type": "record", "fields": [{"name": "f1", "type": ["int", "string"]}]}) ) reader = reader.fields[0].type writer = writer.fields[0].type @@ -818,8 +816,8 @@ def test_schema_compatibility(): (A_DINT_B_DINT_RECORD1, A_INT_RECORD1), (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1), ( - parse_avro_schema_definition(ujson.dumps({"type": "null"})), - parse_avro_schema_definition(ujson.dumps({"type": "null"})), + parse_avro_schema_definition(json.dumps({"type": "null"})), + parse_avro_schema_definition(json.dumps({"type": "null"})), ), (INT_LIST_RECORD, INT_LIST_RECORD), (LONG_LIST_RECORD, LONG_LIST_RECORD), diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index e3da46374..f13947468 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -16,10 +16,10 @@ import avro import copy import io +import json import logging import pytest import struct -import ujson log = logging.getLogger(__name__) @@ -51,7 +51,7 @@ async def test_happy_flow(default_config_path, mock_registry_client): def test_flatten_unions_record() -> None: typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, - ujson.dumps( + json.dumps( { "namespace": "io.aiven.data", "name": "Test", @@ -80,7 +80,7 @@ def test_flatten_unions_record() -> None: def test_flatten_unions_array() -> None: typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, - ujson.dumps( + json.dumps( { "type": "array", "items": { @@ -108,7 +108,7 @@ def test_flatten_unions_array() -> None: def test_flatten_unions_map() -> None: typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, - ujson.dumps( + json.dumps( { "type": "map", "values": { @@ -131,7 +131,7 @@ def test_flatten_unions_map() -> None: typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, - ujson.dumps({"type": "array", "items": ["null", "string", "int"]}), + json.dumps({"type": "array", "items": ["null", "string", "int"]}), ) record = [{"string": "foo"}, None, {"int": 1}] flatten_record = ["foo", None, 1] @@ -156,7 +156,7 @@ def test_avro_json_write_invalid() -> None: {"foo": "bar"}, ] - typed_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, ujson.dumps(schema)) + typed_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, json.dumps(schema)) bio = io.BytesIO() for record in records: @@ -215,7 +215,7 @@ def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> No } ], } - typed_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, ujson.dumps(schema)) + typed_schema = ValidatedTypedSchema.parse(SchemaType.AVRO, json.dumps(schema)) properly_tagged_encoding_a = {"outter": {duplicated_name: {duplicated_name: "data"}}} properly_tagged_encoding_b = {"outter": {"int": 1}} diff --git a/tests/utils.py b/tests/utils.py index 1d887e07d..3df1652f0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -9,8 +9,8 @@ import asyncio import copy +import json import ssl -import ujson import uuid consumer_valid_payload = { @@ -20,7 +20,7 @@ "fetch.min.bytes": 100000, "auto.commit.enable": "true", } -schema_jsonschema_json = ujson.dumps( +schema_jsonschema_json = json.dumps( { "type": "object", "properties": { @@ -29,7 +29,7 @@ } ) -schema_avro_json = ujson.dumps( +schema_avro_json = json.dumps( { "namespace": "example.avro", "type": "record", @@ -137,7 +137,7 @@ schema_data_second = {"protobuf": (schema_protobuf_second, test_objects_protobuf_second)} -second_schema_json = ujson.dumps( +second_schema_json = json.dumps( {"namespace": "example.avro.other", "type": "record", "name": "Dude", "fields": [{"name": "name", "type": "string"}]} )