diff --git a/DEVELOPER.md b/DEVELOPER.md index e886fa63a..48f7e6b53 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -11,7 +11,7 @@ If librdkafka is installed in a non-standard location provide the include and li $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build -**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB +**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB ## Generate Documentation @@ -45,4 +45,3 @@ If you make any changes to the async code (in `src/confluent_kafka/schema_regist See [tests/README.md](tests/README.md) for instructions on how to run tests. - diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 11dd39a69..10f126162 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -33,6 +33,7 @@ from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, + SchemaVersion, ServerConfig, is_success, is_retriable, @@ -663,7 +664,8 @@ async def register_schema_full_response( return registered_schema async def get_schema( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None + self, schema_id: int, subject_name: Optional[str] = None, + fmt: Optional[str] = None, reference_format: Optional[str] = None, ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the @@ -671,9 +673,10 @@ async def get_schema( require an additional round-trip to the Schema Registry. Args: - schema_id (int): Schema id - subject_name (str): Subject name the schema is registered under - fmt (str): Format of the schema + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Desired output format, dependent on schema type. + reference_format (str): Desired output format for references. Returns: Schema: Schema instance identified by the ``schema_id`` @@ -682,19 +685,21 @@ async def get_schema( SchemaRegistryError: If schema can't be found. See Also: - `GET Schema API Reference `_ + `GET Schema API Reference `_ """ # noqa: E501 result = self._cache.get_schema_by_id(subject_name, schema_id) if result is not None: return result[1] - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt + if reference_format is not None: + query['reference_format'] = reference_format + response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -741,24 +746,102 @@ async def get_schema_by_guid( return registered_schema.schema + async def get_schema_types(self) -> List[str]: + """ + Lists all supported schema types in the Schema Registry. + + Returns: + list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF']) + + Raises: + SchemaRegistryError: if schema types can't be retrieved + + See Also: + `GET Schema Types API Reference `_ + """ # noqa: E501 + + return await self._rest_client.get('schemas/types') + + async def get_subjects_by_schema_id( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: + """ + Retrieves all the subjects associated with ``schema_id``. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subjects where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(str): List of subjects matching the specified parameters. + + Raises: + SchemaRegistryError: if subjects can't be found + """ + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + return await self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query) + + async def get_schema_versions( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[SchemaVersion]: + """ + Gets all subject-version pairs of a schema by its ID. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subject versions where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(SchemaVersion): List of subject-version pairs. Each pair contains: + - subject (str): Subject name. + - version (int): Version number. + + Raises: + SchemaRegistryError: if schema versions can't be found. + + See Also: + `GET Schema Versions API Reference `_ + """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query) + return [SchemaVersion.from_dict(item) for item in response] + async def lookup_schema( self, subject_name: str, schema: 'Schema', - normalize_schemas: bool = False, deleted: bool = False + normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False ) -> 'RegisteredSchema': """ Returns ``schema`` registration information for ``subject``. Args: - subject_name (str): Subject name the schema is registered under + subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. - normalize_schemas (bool): Normalize schema before registering + normalize_schemas (bool): Normalize schema before registering. + fmt (str): Desired output format, dependent on schema type. deleted (bool): Whether to include deleted schemas. Returns: RegisteredSchema: Subject registration information for this schema. Raises: - SchemaRegistryError: If schema or subject can't be found + SchemaRegistryError: If schema or subject can't be found. See Also: `POST Subject API Reference `_ @@ -770,9 +853,17 @@ async def lookup_schema( request = schema.to_dict() + query_params = { + 'normalize': normalize_schemas, + 'deleted': deleted + } + if fmt is not None: + query_params['format'] = fmt + + query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) + response = await self._rest_client.post( - 'subjects/{}?normalize={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, deleted), + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request ) @@ -791,9 +882,19 @@ async def lookup_schema( return registered_schema - async def get_subjects(self) -> List[str]: + async def get_subjects( + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: """ - Lists all subjects registered with the Schema Registry + Lists all subjects registered with the Schema Registry. + + Args: + subject_prefix (str): Subject name prefix that results can be filtered by. + deleted (bool): Whether to include deleted subjects. + deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(str): Registered subject names @@ -805,7 +906,10 @@ async def get_subjects(self) -> List[str]: `GET subjects API Reference `_ """ # noqa: E501 - return await self._rest_client.get('subjects') + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} + if subject_prefix is not None: + query['subject'] = subject_prefix + return await self._rest_client.get('subjects', query) async def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ @@ -899,7 +1003,9 @@ async def get_latest_with_metadata( if registered_schema is not None: return registered_schema - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + query = {'deleted': deleted} + if fmt is not None: + query['format'] = fmt keys = metadata.keys() if keys: query['key'] = [_urlencode(key) for key in keys] @@ -920,13 +1026,13 @@ async def get_version( deleted: bool = False, fmt: Optional[str] = None ) -> 'RegisteredSchema': """ - Retrieves a specific schema registered under ``subject_name``. + Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema + fmt (str): Format of the schema. Returns: RegisteredSchema: Registration information for this version. @@ -935,7 +1041,7 @@ async def get_version( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_version(subject_name, version) @@ -953,12 +1059,44 @@ async def get_version( return registered_schema - async def get_versions(self, subject_name: str) -> List[int]: + async def get_referenced_by( + self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + ) -> List[int]: + """ + Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. + + Args: + subject_name (str): Subject name + version (int or str): Version number or "latest" + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(int): List of schema IDs that reference the specified schema. + + Raises: + SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved + + See Also: + `GET Subject Versions API Reference `_ + """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} + return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( + _urlencode(subject_name), version), query) + + async def get_versions( + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. + deleted (bool): Whether to include deleted schemas. + deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): Registered versions @@ -970,7 +1108,8 @@ async def get_versions(self, subject_name: str) -> List[int]: `GET Subject Versions API Reference `_ """ # noqa: E501 - return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} + return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) async def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 512f44592..d86dcdfcf 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -33,6 +33,7 @@ from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, + SchemaVersion, ServerConfig, is_success, is_retriable, @@ -663,7 +664,8 @@ def register_schema_full_response( return registered_schema def get_schema( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None + self, schema_id: int, subject_name: Optional[str] = None, + fmt: Optional[str] = None, reference_format: Optional[str] = None, ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the @@ -671,9 +673,10 @@ def get_schema( require an additional round-trip to the Schema Registry. Args: - schema_id (int): Schema id - subject_name (str): Subject name the schema is registered under - fmt (str): Format of the schema + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Desired output format, dependent on schema type. + reference_format (str): Desired output format for references. Returns: Schema: Schema instance identified by the ``schema_id`` @@ -682,19 +685,21 @@ def get_schema( SchemaRegistryError: If schema can't be found. See Also: - `GET Schema API Reference `_ + `GET Schema API Reference `_ """ # noqa: E501 result = self._cache.get_schema_by_id(subject_name, schema_id) if result is not None: return result[1] - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt + if reference_format is not None: + query['reference_format'] = reference_format + response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -741,24 +746,102 @@ def get_schema_by_guid( return registered_schema.schema + def get_schema_types(self) -> List[str]: + """ + Lists all supported schema types in the Schema Registry. + + Returns: + list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF']) + + Raises: + SchemaRegistryError: if schema types can't be retrieved + + See Also: + `GET Schema Types API Reference `_ + """ # noqa: E501 + + return self._rest_client.get('schemas/types') + + def get_subjects_by_schema_id( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: + """ + Retrieves all the subjects associated with ``schema_id``. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subjects where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(str): List of subjects matching the specified parameters. + + Raises: + SchemaRegistryError: if subjects can't be found + """ + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + return self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query) + + def get_schema_versions( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[SchemaVersion]: + """ + Gets all subject-version pairs of a schema by its ID. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subject versions where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(SchemaVersion): List of subject-version pairs. Each pair contains: + - subject (str): Subject name. + - version (int): Version number. + + Raises: + SchemaRegistryError: if schema versions can't be found. + + See Also: + `GET Schema Versions API Reference `_ + """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query) + return [SchemaVersion.from_dict(item) for item in response] + def lookup_schema( self, subject_name: str, schema: 'Schema', - normalize_schemas: bool = False, deleted: bool = False + normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False ) -> 'RegisteredSchema': """ Returns ``schema`` registration information for ``subject``. Args: - subject_name (str): Subject name the schema is registered under + subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. - normalize_schemas (bool): Normalize schema before registering + normalize_schemas (bool): Normalize schema before registering. + fmt (str): Desired output format, dependent on schema type. deleted (bool): Whether to include deleted schemas. Returns: RegisteredSchema: Subject registration information for this schema. Raises: - SchemaRegistryError: If schema or subject can't be found + SchemaRegistryError: If schema or subject can't be found. See Also: `POST Subject API Reference `_ @@ -770,9 +853,17 @@ def lookup_schema( request = schema.to_dict() + query_params = { + 'normalize': normalize_schemas, + 'deleted': deleted + } + if fmt is not None: + query_params['format'] = fmt + + query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) + response = self._rest_client.post( - 'subjects/{}?normalize={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, deleted), + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request ) @@ -791,9 +882,19 @@ def lookup_schema( return registered_schema - def get_subjects(self) -> List[str]: + def get_subjects( + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: """ - Lists all subjects registered with the Schema Registry + Lists all subjects registered with the Schema Registry. + + Args: + subject_prefix (str): Subject name prefix that results can be filtered by. + deleted (bool): Whether to include deleted subjects. + deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(str): Registered subject names @@ -805,7 +906,10 @@ def get_subjects(self) -> List[str]: `GET subjects API Reference `_ """ # noqa: E501 - return self._rest_client.get('subjects') + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} + if subject_prefix is not None: + query['subject'] = subject_prefix + return self._rest_client.get('subjects', query) def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ @@ -899,7 +1003,9 @@ def get_latest_with_metadata( if registered_schema is not None: return registered_schema - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + query = {'deleted': deleted} + if fmt is not None: + query['format'] = fmt keys = metadata.keys() if keys: query['key'] = [_urlencode(key) for key in keys] @@ -920,13 +1026,13 @@ def get_version( deleted: bool = False, fmt: Optional[str] = None ) -> 'RegisteredSchema': """ - Retrieves a specific schema registered under ``subject_name``. + Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema + fmt (str): Format of the schema. Returns: RegisteredSchema: Registration information for this version. @@ -935,7 +1041,7 @@ def get_version( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_version(subject_name, version) @@ -953,12 +1059,44 @@ def get_version( return registered_schema - def get_versions(self, subject_name: str) -> List[int]: + def get_referenced_by( + self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + ) -> List[int]: + """ + Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. + + Args: + subject_name (str): Subject name + version (int or str): Version number or "latest" + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(int): List of schema IDs that reference the specified schema. + + Raises: + SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved + + See Also: + `GET Subject Versions API Reference `_ + """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} + return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( + _urlencode(subject_name), version), query) + + def get_versions( + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. + deleted (bool): Whether to include deleted schemas. + deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): Registered versions @@ -970,7 +1108,8 @@ def get_versions(self, subject_name: str) -> List[int]: `GET Subject Versions API Reference `_ """ # noqa: E501 - return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} + return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 27f9d946a..e52c0254b 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -659,6 +659,16 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: return metadata +@_attrs_define(frozen=True) +class SchemaVersion: + subject: Optional[str] + version: Optional[int] + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + return cls(subject=src_dict.get('subject'), version=src_dict.get('version')) + + @_attrs_define(frozen=True) class SchemaReference: name: Optional[str] @@ -684,21 +694,12 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: - d = src_dict.copy() - name = d.pop("name", None) - - subject = d.pop("subject", None) - - version = d.pop("version", None) - - schema_reference = cls( - name=name, - subject=subject, - version=version, + return cls( + name=src_dict.get('name'), + subject=src_dict.get('subject'), + version=src_dict.get('version'), ) - return schema_reference - class ConfigCompatibilityLevel(str, Enum): BACKWARD = "BACKWARD" @@ -858,7 +859,6 @@ class Schema: def to_dict(self) -> Dict[str, Any]: schema = self.schema_str - schema_type = self.schema_type _references: Optional[List[Dict[str, Any]]] = [] @@ -939,11 +939,11 @@ class RegisteredSchema: An registered schema. """ + subject: Optional[str] + version: Optional[int] schema_id: Optional[int] guid: Optional[str] schema: Optional[Schema] - subject: Optional[str] - version: Optional[int] def to_dict(self) -> Dict[str, Any]: schema = self.schema diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 05f55a3da..fb930cddf 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -19,6 +19,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor, wait +from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion from confluent_kafka.schema_registry.error import SchemaRegistryError from confluent_kafka.schema_registry.schema_registry_client import Schema, \ AsyncSchemaRegistryClient @@ -42,7 +43,7 @@ TEST_USER_PASSWORD = 'sr_user_secret' -def cmp_schema(schema1, schema2): +def cmp_schema(schema1: Schema, schema2: Schema) -> bool: """ Compare to Schemas for equivalence @@ -142,10 +143,10 @@ async def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) - schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') - schema2 = await sr.get_schema(47) + expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') + actual = await sr.get_schema(47) - assert cmp_schema(schema, schema2) + assert cmp_schema(expected, actual) async def test_get_schema_not_found(mock_schema_registry): @@ -185,6 +186,33 @@ def get(): assert count_after - count_before == 1 +async def test_get_schema_types(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = ['AVRO', 'JSON', 'PROTOBUF'] + actual = await sr.get_schema_types() + assert expected == actual + + +async def test_get_subjects_by_schema_id(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = SUBJECTS + actual = await sr.get_subjects_by_schema_id(47) + assert expected == actual + + +async def test_get_schema_versions(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = [SchemaVersion(subject='subject1', version=1), SchemaVersion(subject='subject2', version=2)] + actual = await sr.get_schema_versions(47) + assert expected == actual + + async def test_get_registration(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) @@ -346,6 +374,13 @@ async def test_delete_version_invalid(mock_schema_registry): assert e.value.error_code == 42202 +async def test_get_referenced_by(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + assert await sr.get_referenced_by("get_version", 3) == [1, 2] + + async def test_set_compatibility(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index 17e5ce249..2ee0b7802 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -19,6 +19,7 @@ from concurrent.futures import ThreadPoolExecutor, wait +from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion from confluent_kafka.schema_registry.error import SchemaRegistryError from confluent_kafka.schema_registry.schema_registry_client import Schema, \ SchemaRegistryClient @@ -42,7 +43,7 @@ TEST_USER_PASSWORD = 'sr_user_secret' -def cmp_schema(schema1, schema2): +def cmp_schema(schema1: Schema, schema2: Schema) -> bool: """ Compare to Schemas for equivalence @@ -142,10 +143,10 @@ def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) - schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') - schema2 = sr.get_schema(47) + expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') + actual = sr.get_schema(47) - assert cmp_schema(schema, schema2) + assert cmp_schema(expected, actual) def test_get_schema_not_found(mock_schema_registry): @@ -185,6 +186,33 @@ def get(): assert count_after - count_before == 1 +def test_get_schema_types(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = ['AVRO', 'JSON', 'PROTOBUF'] + actual = sr.get_schema_types() + assert expected == actual + + +def test_get_subjects_by_schema_id(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = SUBJECTS + actual = sr.get_subjects_by_schema_id(47) + assert expected == actual + + +def test_get_schema_versions(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = [SchemaVersion(subject='subject1', version=1), SchemaVersion(subject='subject2', version=2)] + actual = sr.get_schema_versions(47) + assert expected == actual + + def test_get_registration(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) @@ -346,6 +374,13 @@ def test_delete_version_invalid(mock_schema_registry): assert e.value.error_code == 42202 +def test_get_referenced_by(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + assert sr.get_referenced_by("get_version", 3) == [1, 2] + + def test_set_compatibility(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index a30a28932..3b3d9032e 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -136,7 +136,11 @@ def mock_schema_registry(): respx_mock.put(COMPATIBILITY_RE).mock(side_effect=put_compatibility_callback) respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback) + respx_mock.get(SCHEMAS_VERSIONS_RE).mock(side_effect=get_schema_versions_callback) + respx_mock.get(SCHEMAS_SUBJECTS_RE).mock(side_effect=get_schema_subjects_callback) + respx_mock.get(SCHEMAS_TYPES_RE).mock(side_effect=get_schema_types_callback) + respx_mock.get(SUBJECTS_VERSIONS_REFERENCED_BY_RE).mock(side_effect=get_subject_version_referenced_by_callback) respx_mock.get(SUBJECTS_VERSIONS_RE).mock(side_effect=get_subject_version_callback) respx_mock.delete(SUBJECTS_VERSIONS_RE).mock(side_effect=delete_subject_version_callback) respx_mock.post(SUBJECTS_VERSIONS_RE).mock(side_effect=post_subject_version_callback) @@ -149,12 +153,18 @@ def mock_schema_registry(): # request paths -SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)?(.*)$") +SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)$") +SCHEMAS_VERSIONS_RE = re.compile(r"/schemas/ids/([0-9]*)/versions(\?.*)?$") +SCHEMAS_SUBJECTS_RE = re.compile(r"/schemas/ids/([0-9]*)/subjects(\?.*)?$") +SCHEMAS_TYPES_RE = re.compile("/schemas/types$") + SUBJECTS_RE = re.compile("/subjects/?(.*)$") SUBJECTS_VERSIONS_RE = re.compile("/subjects/(.*)/versions/?(.*)$") +SUBJECTS_VERSIONS_SCHEMA_RE = re.compile(r"/subjects/(.*)/versions/(.*)/schema(\?.*)?$") +SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile(r"/subjects/(.*)/versions/(.*)/referencedby(\?.*)?$") + COMPATIBILITY_RE = re.compile("/config/?(.*)$") COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$") - # constants SCHEMA_ID = 47 VERSION = 3 @@ -189,7 +199,7 @@ def _auth_matcher(request): return Response(401, json=unauthorized) -def _load_avsc(name): +def _load_avsc(name) -> str: with open(os.path.join(work_dir, '..', 'integration', 'schema_registry', 'data', name)) as fd: return fd.read() @@ -273,6 +283,24 @@ def get_schemas_callback(request, route): return Response(200, json={'schema': _load_avsc(SCHEMA)}) +def get_schema_subjects_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=SUBJECTS) + + +def get_schema_types_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=['AVRO', 'JSON', 'PROTOBUF']) + + +def get_schema_versions_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=[ + {'subject': 'subject1', 'version': 1}, + {'subject': 'subject2', 'version': 2} + ]) + + def get_subject_version_callback(request, route): COUNTER['GET'][request.url.path] += 1 @@ -336,6 +364,16 @@ def post_subject_version_callback(request, route): return Response(200, json={'id': SCHEMA_ID}) +def get_subject_version_schema_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=json.loads(_load_avsc(SCHEMA))) + + +def get_subject_version_referenced_by_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=[1, 2]) + + def post_compatibility_subjects_versions_callback(request, route): COUNTER['POST'][request.url.path] += 1