From b0410dd1509ff87c8f9dd49fa94faeb7331b686e Mon Sep 17 00:00:00 2001 From: Naxin Date: Fri, 8 Aug 2025 16:48:09 -0400 Subject: [PATCH 01/13] updaet --- .../_async/schema_registry_client.py | 96 +++++++++++++++++- .../_sync/schema_registry_client.py | 98 ++++++++++++++++++- .../common/schema_registry_client.py | 11 +++ 3 files changed, 198 insertions(+), 7 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 72f26a106..8da4936f2 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -33,6 +33,7 @@ from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, + SchemaVersion, ServerConfig, is_success, is_retriable, @@ -733,6 +734,42 @@ async def get_schema_by_guid( return registered_schema.schema + async def get_schema_types(self) -> List[str]: + """ + Lists all supported schema types in the Schema Registry. + + Returns: + list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF']) + + Raises: + SchemaRegistryError: if schema types can't be retrieved + + See Also: + `GET Schema Types API Reference `_ + """ # noqa: E501 + return await self._rest_client.get('schemas/types') + + async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: + """ + Gets all subject-version pairs of a schema by its ID. + + Args: + schema_id (int): Schema ID + + Returns: + list(dict): List of schema versions with their metadata. Each dict contains: + - subject (str): Subject name + - version (int): Version number + + Raises: + SchemaRegistryError: if schema versions can't be found + + See Also: + `GET Schema Versions API Reference `_ + """ # noqa: E501 + response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) + return [SchemaVersion.from_dict(item) for item in response] + async def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, deleted: bool = False @@ -912,13 +949,13 @@ async def get_version( deleted: bool = False, fmt: Optional[str] = None ) -> 'RegisteredSchema': """ - Retrieves a specific schema registered under ``subject_name``. + Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. version (int): version number. Defaults to latest version. deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema + fmt (str): Format of the schema. Returns: RegisteredSchema: Registration information for this version. @@ -927,7 +964,7 @@ async def get_version( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_version(subject_name, version) @@ -945,6 +982,59 @@ async def get_version( return registered_schema + async def get_version_schema( + self, subject_name: str, version: Union[int, str] = "latest", + deleted: bool = False, fmt: Optional[str] = None + ) -> str: # TODO: revisit the function naming + """ + Retrieves a specific schema registered under `subject_name` and `version`. + Only the unescaped schema string is returned. + + Args: + subject_name (str): Subject name. + version (int): version number. Defaults to latest version. + deleted (bool): Whether to include deleted schemas. + fmt (str): Format of the schema. + + Returns: + str: Schema string for this version. + + Raises: + SchemaRegistryError: if the version can't be found or is invalid. + + See Also: + `GET Subject Versions API Reference `_ + """ # noqa: E501 + + query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + return self._rest_client.get( + 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query + ) + + async def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[Dict[str, Any]]: + """ + Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. + + Args: + subject_name (str): Subject name + version (int or str): Version number or "latest" + + Returns: + list(dict): List of IDs of schemas that reference the specified schema. Each dict contains: + - subject (str): Subject name of the referencing schema + - version (int): Version number of the referencing schema + - id (int): Schema ID of the referencing schema + - schema (str): Schema string of the referencing schema + + Raises: + SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved + + See Also: + `GET Referenced By API Reference `_ + """ # noqa: E501 + return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( + _urlencode(subject_name), version)) + async def get_versions(self, subject_name: str) -> List[int]: """ Get a list of all versions registered with this subject. diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 3be266538..f388fd7b8 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -33,6 +33,7 @@ from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError from confluent_kafka.schema_registry.common.schema_registry_client import ( RegisteredSchema, + SchemaVersion, ServerConfig, is_success, is_retriable, @@ -733,6 +734,42 @@ def get_schema_by_guid( return registered_schema.schema + def get_schema_types(self) -> List[str]: + """ + Lists all supported schema types in the Schema Registry. + + Returns: + list(str): Schema types currently available on Schema Registry. + + Raises: + SchemaRegistryError: if schema types can't be retrieved. + + See Also: + `GET Schema Types API Reference `_ + """ # noqa: E501 + return self._rest_client.get('schemas/types') + + def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: + """ + Gets all subject-version pairs of a schema by its ID. + + Args: + schema_id (int): Schema ID. + + Returns: + list(dict): List of schema versions with their metadata. Each dict contains: + - subject (str): Subject name. + - version (int): Version number. + + Raises: + SchemaRegistryError: if schema versions can't be found. + + See Also: + `GET Schema Versions API Reference `_ + """ # noqa: E501 + response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) + return [SchemaVersion.from_dict(item) for item in response] + def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, deleted: bool = False @@ -910,15 +947,15 @@ def get_latest_with_metadata( def get_version( self, subject_name: str, version: Union[int, str] = "latest", deleted: bool = False, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + ) -> 'RegisteredSchema': # TODO: revisit the function naming """ - Retrieves a specific schema registered under ``subject_name``. + Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. version (int): version number. Defaults to latest version. deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema + fmt (str): Format of the schema. Returns: RegisteredSchema: Registration information for this version. @@ -927,7 +964,7 @@ def get_version( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_version(subject_name, version) @@ -945,6 +982,59 @@ def get_version( return registered_schema + def get_version_schema( + self, subject_name: str, version: Union[int, str] = "latest", + deleted: bool = False, fmt: Optional[str] = None + ) -> str: # TODO: revisit the function naming + """ + Retrieves a specific schema registered under `subject_name` and `version`. + Only the unescaped schema string is returned. + + Args: + subject_name (str): Subject name. + version (int): version number. Defaults to latest version. + deleted (bool): Whether to include deleted schemas. + fmt (str): Format of the schema. + + Returns: + str: Schema string for this version. + + Raises: + SchemaRegistryError: if the version can't be found or is invalid. + + See Also: + `GET Subject Versions API Reference `_ + """ # noqa: E501 + + query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + return self._rest_client.get( + 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query + ) + + def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[Dict[str, Any]]: + """ + Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. + + Args: + subject_name (str): Subject name + version (int or str): Version number or "latest" + + Returns: + list(dict): List of IDs of schemas that reference the specified schema. Each dict contains: + - subject (str): Subject name of the referencing schema + - version (int): Version number of the referencing schema + - id (int): Schema ID of the referencing schema + - schema (str): Schema string of the referencing schema + + Raises: + SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved + + See Also: + `GET Referenced By API Reference `_ + """ # noqa: E501 + return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( + _urlencode(subject_name), version)) + def get_versions(self, subject_name: str) -> List[int]: """ Get a list of all versions registered with this subject. diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 27f9d946a..8ec7b4f5e 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -658,6 +658,17 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: return metadata +@_attrs_define(frozen=True) +class SchemaVersion: + subject: Optional[str] + version: Optional[int] + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + d = src_dict.copy() + subject = d.pop("subject", None) + version = d.pop("version", None) + return cls(subject=subject, version=version) @_attrs_define(frozen=True) class SchemaReference: From 3212e558ab292bb694cf4d8f44c0af6eb221781f Mon Sep 17 00:00:00 2001 From: Naxin Date: Mon, 11 Aug 2025 17:39:01 -0400 Subject: [PATCH 02/13] update --- .../_async/schema_registry_client.py | 70 +++++++++++------ .../_sync/mock_schema_registry_client.py | 32 +++++++- .../_sync/schema_registry_client.py | 75 +++++++++++++------ .../common/schema_registry_client.py | 5 +- 4 files changed, 129 insertions(+), 53 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 8da4936f2..76b94305d 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -657,19 +657,19 @@ async def register_schema_full_response( async def get_schema( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> 'Schema': + ) -> 'RegisteredSchema': """ Fetches the schema associated with ``schema_id`` from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry. Args: - schema_id (int): Schema id - subject_name (str): Subject name the schema is registered under - fmt (str): Format of the schema + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Format of the schema. Returns: - Schema: Schema instance identified by the ``schema_id`` + RegisteredSchema: Registration information for this schema. Raises: SchemaRegistryError: If schema can't be found. @@ -697,9 +697,39 @@ async def get_schema( return registered_schema.schema + async def get_schema_string( + self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None + ) -> str: + """ + Fetches the schema associated with ``schema_id`` from the + Schema Registry. Only the unescaped schema string is returned. + + Args: + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Format of the schema. + + Returns: + str: Schema string for this version. + + Raises: + SchemaRegistryError: if the version can't be found or is invalid. + + See Also: + `GET Schema API Reference `_ + """ # noqa: E501 + + query = {'subject': subject_name} if subject_name is not None else None + if fmt is not None: + if query is not None: + query['format'] = fmt + else: + query = {'format': fmt} + return await self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) + async def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'Schema': + ) -> 'RegisteredSchema': """ Fetches the schema associated with ``guid`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -710,7 +740,7 @@ async def get_schema_by_guid( fmt (str): Format of the schema Returns: - Schema: Schema instance identified by the ``guid`` + RegisteredSchema: Registration information for this schema. Raises: SchemaRegistryError: If schema can't be found. @@ -732,7 +762,7 @@ async def get_schema_by_guid( self._cache.set_schema(None, registered_schema.schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema.schema + return registered_schema async def get_schema_types(self) -> List[str]: """ @@ -757,7 +787,7 @@ async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: schema_id (int): Schema ID Returns: - list(dict): List of schema versions with their metadata. Each dict contains: + list(SchemaVersion): List of subject-version pairs. Each pair contains: - subject (str): Subject name - version (int): Version number @@ -982,12 +1012,12 @@ async def get_version( return registered_schema - async def get_version_schema( + async def get_version_schema_string( self, subject_name: str, version: Union[int, str] = "latest", deleted: bool = False, fmt: Optional[str] = None - ) -> str: # TODO: revisit the function naming + ) -> str: """ - Retrieves a specific schema registered under `subject_name` and `version`. + Retrieves a specific schema registered under ``subject_name`` and ``version``. Only the unescaped schema string is returned. Args: @@ -1003,15 +1033,15 @@ async def get_version_schema( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} - return self._rest_client.get( + return await self._rest_client.get( 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query ) - async def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[Dict[str, Any]]: + async def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. @@ -1020,19 +1050,15 @@ async def get_referenced_by(self, subject_name: str, version: Union[int, str] = version (int or str): Version number or "latest" Returns: - list(dict): List of IDs of schemas that reference the specified schema. Each dict contains: - - subject (str): Subject name of the referencing schema - - version (int): Version number of the referencing schema - - id (int): Schema ID of the referencing schema - - schema (str): Schema string of the referencing schema + list(int): List of schema IDs that reference the specified schema. Raises: SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved See Also: - `GET Referenced By API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 - return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( + return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( _urlencode(subject_name), version)) async def get_versions(self, subject_name: str) -> List[int]: diff --git a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py index 0ffcea34d..548103e26 100644 --- a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py @@ -21,7 +21,7 @@ from typing import List, Dict, Optional from .schema_registry_client import SchemaRegistryClient -from ..common.schema_registry_client import RegisteredSchema, Schema, ServerConfig +from ..common.schema_registry_client import RegisteredSchema, Schema, SchemaVersion, ServerConfig from ..error import SchemaRegistryError @@ -51,12 +51,15 @@ def set(self, registered_schema: RegisteredSchema) -> RegisteredSchema: self.subject_schemas[rs.subject].add(rs) return rs - def get_schema(self, schema_id: int) -> Optional[Schema]: + def get_schema(self, schema_id: int) -> Optional[RegisteredSchema]: with self.lock: rs = self.schema_id_index.get(schema_id, None) return rs.schema if rs else None - def get_schema_by_guid(self, guid: str) -> Optional[Schema]: + def get_schema_string(self, schema_id: int) -> Optional[str]: + return None + + def get_schema_by_guid(self, guid: str) -> Optional[RegisteredSchema]: with self.lock: rs = self.schema_guid_index.get(guid, None) return rs.schema if rs else None @@ -191,15 +194,27 @@ def get_schema( raise SchemaRegistryError(404, 40400, "Schema Not Found") + def get_schema_string( + self, schema_id: int, subject_name: Optional[str] = None, + fmt: Optional[str] = None + ) -> str: + raise SchemaRegistryError(404, 40400, "Schema Not Found") + def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'Schema': + ) -> 'RegisteredSchema': schema = self._store.get_schema_by_guid(guid) if schema is not None: return schema raise SchemaRegistryError(404, 40400, "Schema Not Found") + def get_schema_types(self) -> List[str]: + return [] + + def get_schema_versions(self, subject_id: int) -> List[SchemaVersion]: + return [] + def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, deleted: bool = False @@ -244,6 +259,15 @@ def get_version( raise SchemaRegistryError(404, 40400, "Schema Not Found") + def get_version_schema_string( + self, subject_name: str, version: int, + deleted: bool = False, fmt: Optional[str] = None + ) -> str: + return "" + + def get_referenced_by(self, subject_name: str, version: int) -> List[int]: + return [] + def get_versions(self, subject_name: str) -> List[int]: return self._store.get_versions(subject_name) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index f388fd7b8..db226e0e7 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -657,25 +657,25 @@ def register_schema_full_response( def get_schema( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> 'Schema': + ) -> 'RegisteredSchema': """ Fetches the schema associated with ``schema_id`` from the Schema Registry. The result is cached so subsequent attempts will not require an additional round-trip to the Schema Registry. Args: - schema_id (int): Schema id - subject_name (str): Subject name the schema is registered under - fmt (str): Format of the schema + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Format of the schema. Returns: - Schema: Schema instance identified by the ``schema_id`` + RegisteredSchema: Registration information for this schema. Raises: SchemaRegistryError: If schema can't be found. See Also: - `GET Schema API Reference `_ + `GET Schema API Reference `_ """ # noqa: E501 result = self._cache.get_schema_by_id(subject_name, schema_id) @@ -695,11 +695,42 @@ def get_schema( self._cache.set_schema(subject_name, schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema.schema + return registered_schema + + def get_schema_string( + self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None + ) -> str: + """ + Fetches the schema associated with ``schema_id`` from the + Schema Registry. Only the unescaped schema string is returned. + + Args: + schema_id (int): Schema id. + subject_name (str): Subject name the schema is registered under. + fmt (str): Format of the schema. + + Returns: + str: Schema string for this version. + + Raises: + SchemaRegistryError: if the version can't be found or is invalid. + + See Also: + `GET Schema API Reference `_ + """ # noqa: E501 + + query = {'subject': subject_name} if subject_name is not None else None + if fmt is not None: + if query is not None: + query['format'] = fmt + else: + query = {'format': fmt} + return self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) + def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'Schema': + ) -> 'RegisteredSchema': """ Fetches the schema associated with ``guid`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -710,7 +741,7 @@ def get_schema_by_guid( fmt (str): Format of the schema Returns: - Schema: Schema instance identified by the ``guid`` + RegisteredSchema: Registration information for this schema. Raises: SchemaRegistryError: If schema can't be found. @@ -732,7 +763,7 @@ def get_schema_by_guid( self._cache.set_schema(None, registered_schema.schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema.schema + return registered_schema def get_schema_types(self) -> List[str]: """ @@ -757,7 +788,7 @@ def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: schema_id (int): Schema ID. Returns: - list(dict): List of schema versions with their metadata. Each dict contains: + list(SchemaVersion): List of subject-version pairs. Each pair contains: - subject (str): Subject name. - version (int): Version number. @@ -947,9 +978,9 @@ def get_latest_with_metadata( def get_version( self, subject_name: str, version: Union[int, str] = "latest", deleted: bool = False, fmt: Optional[str] = None - ) -> 'RegisteredSchema': # TODO: revisit the function naming + ) -> 'RegisteredSchema': """ - Retrieves a specific schema registered under `subject_name` and `version`. + Retrieves a specific schema registered under ``subject_name`` and ``version``. Args: subject_name (str): Subject name. @@ -982,12 +1013,12 @@ def get_version( return registered_schema - def get_version_schema( + def get_version_schema_string( self, subject_name: str, version: Union[int, str] = "latest", deleted: bool = False, fmt: Optional[str] = None - ) -> str: # TODO: revisit the function naming + ) -> str: """ - Retrieves a specific schema registered under `subject_name` and `version`. + Retrieves a specific schema registered under ``subject_name`` and ``version``. Only the unescaped schema string is returned. Args: @@ -1003,7 +1034,7 @@ def get_version_schema( SchemaRegistryError: if the version can't be found or is invalid. See Also: - `GET Subject Versions API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} @@ -1011,7 +1042,7 @@ def get_version_schema( 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query ) - def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[Dict[str, Any]]: + def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. @@ -1020,17 +1051,13 @@ def get_referenced_by(self, subject_name: str, version: Union[int, str] = "lates version (int or str): Version number or "latest" Returns: - list(dict): List of IDs of schemas that reference the specified schema. Each dict contains: - - subject (str): Subject name of the referencing schema - - version (int): Version number of the referencing schema - - id (int): Schema ID of the referencing schema - - schema (str): Schema string of the referencing schema + list(int): List of schema IDs that reference the specified schema. Raises: SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved See Also: - `GET Referenced By API Reference `_ + `GET Subject Versions API Reference `_ """ # noqa: E501 return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( _urlencode(subject_name), version)) diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 8ec7b4f5e..77e433b92 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -869,7 +869,6 @@ class Schema: def to_dict(self) -> Dict[str, Any]: schema = self.schema_str - schema_type = self.schema_type _references: Optional[List[Dict[str, Any]]] = [] @@ -950,11 +949,11 @@ class RegisteredSchema: An registered schema. """ + subject: Optional[str] + version: Optional[int] schema_id: Optional[int] guid: Optional[str] schema: Optional[Schema] - subject: Optional[str] - version: Optional[int] def to_dict(self) -> Dict[str, Any]: schema = self.schema From 356f3ca7f691dff18b251ddf6c63aebe7d3e3c34 Mon Sep 17 00:00:00 2001 From: Naxin Date: Tue, 12 Aug 2025 11:54:48 -0400 Subject: [PATCH 03/13] tests --- .../_async/mock_schema_registry_client.py | 2 +- .../_sync/mock_schema_registry_client.py | 21 -------- .../_sync/schema_registry_client.py | 1 - .../schema_registry/_async/test_api_client.py | 44 +++++++++++++++++ .../schema_registry/_sync/test_api_client.py | 49 +++++++++++++++++-- tests/schema_registry/conftest.py | 45 +++++++++++++++-- 6 files changed, 132 insertions(+), 30 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py index 13ec8db87..c4e52e6c1 100644 --- a/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py @@ -21,7 +21,7 @@ from typing import List, Dict, Optional from .schema_registry_client import AsyncSchemaRegistryClient -from ..common.schema_registry_client import RegisteredSchema, Schema, ServerConfig +from ..common.schema_registry_client import RegisteredSchema, Schema, SchemaVersion, ServerConfig from ..error import SchemaRegistryError diff --git a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py index 548103e26..8de463761 100644 --- a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py @@ -194,12 +194,6 @@ def get_schema( raise SchemaRegistryError(404, 40400, "Schema Not Found") - def get_schema_string( - self, schema_id: int, subject_name: Optional[str] = None, - fmt: Optional[str] = None - ) -> str: - raise SchemaRegistryError(404, 40400, "Schema Not Found") - def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None ) -> 'RegisteredSchema': @@ -209,12 +203,6 @@ def get_schema_by_guid( raise SchemaRegistryError(404, 40400, "Schema Not Found") - def get_schema_types(self) -> List[str]: - return [] - - def get_schema_versions(self, subject_id: int) -> List[SchemaVersion]: - return [] - def lookup_schema( self, subject_name: str, schema: 'Schema', normalize_schemas: bool = False, deleted: bool = False @@ -259,15 +247,6 @@ def get_version( raise SchemaRegistryError(404, 40400, "Schema Not Found") - def get_version_schema_string( - self, subject_name: str, version: int, - deleted: bool = False, fmt: Optional[str] = None - ) -> str: - return "" - - def get_referenced_by(self, subject_name: str, version: int) -> List[int]: - return [] - def get_versions(self, subject_name: str) -> List[int]: return self._store.get_versions(subject_name) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index db226e0e7..1bc2ae0e7 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -727,7 +727,6 @@ def get_schema_string( query = {'format': fmt} return self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) - def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None ) -> 'RegisteredSchema': diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 05f55a3da..1fe619728 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -15,10 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import pytest import asyncio from concurrent.futures import ThreadPoolExecutor, wait +from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion from confluent_kafka.schema_registry.error import SchemaRegistryError from confluent_kafka.schema_registry.schema_registry_client import Schema, \ AsyncSchemaRegistryClient @@ -184,6 +186,32 @@ def get(): assert count_after - count_before == 1 +async def test_get_schema_string_success(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = json.loads(load_avsc(SCHEMA)) + actual = await sr.get_schema_string(47) + assert expected == actual + + +async def test_get_schema_types(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = ['AVRO', 'JSON', 'PROTOBUF'] + actual = await sr.get_schema_types() + assert expected == actual + + +async def test_get_schema_versions(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = [SchemaVersion(subject='subject1', version=1), SchemaVersion(subject='subject2', version=2)] + actual = await sr.get_schema_versions(47) + assert expected == actual + async def test_get_registration(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} @@ -346,6 +374,22 @@ async def test_delete_version_invalid(mock_schema_registry): assert e.value.error_code == 42202 +async def test_get_version_schema_string(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = json.loads(load_avsc(SCHEMA)) + actual = await sr.get_version_schema_string("get_version", 3) + assert expected == actual + + +async def test_get_referenced_by(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + assert await sr.get_referenced_by("get_version", 3) == [1, 2] + + async def test_set_compatibility(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index 17e5ce249..76ea6d4a6 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -15,10 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import pytest from concurrent.futures import ThreadPoolExecutor, wait +from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion from confluent_kafka.schema_registry.error import SchemaRegistryError from confluent_kafka.schema_registry.schema_registry_client import Schema, \ SchemaRegistryClient @@ -42,7 +44,7 @@ TEST_USER_PASSWORD = 'sr_user_secret' -def cmp_schema(schema1, schema2): +def cmp_schema(schema1: Schema, schema2: Schema): """ Compare to Schemas for equivalence @@ -142,10 +144,10 @@ def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) - schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') - schema2 = sr.get_schema(47) + expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') + actual = sr.get_schema(47) - assert cmp_schema(schema, schema2) + assert cmp_schema(expected, actual.schema) def test_get_schema_not_found(mock_schema_registry): @@ -184,6 +186,32 @@ def get(): assert count_after - count_before == 1 +def test_get_schema_string_success(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = json.loads(load_avsc(SCHEMA)) + actual = sr.get_schema_string(47) + assert expected == actual + + +def test_get_schema_types(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = ['AVRO', 'JSON', 'PROTOBUF'] + actual = sr.get_schema_types() + assert expected == actual + + +def test_get_schema_versions(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = [SchemaVersion(subject='subject1', version=1), SchemaVersion(subject='subject2', version=2)] + actual = sr.get_schema_versions(47) + assert expected == actual + def test_get_registration(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} @@ -345,6 +373,19 @@ def test_delete_version_invalid(mock_schema_registry): assert e.value.http_status_code == 422 assert e.value.error_code == 42202 +def test_get_version_schema_string(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = json.loads(load_avsc(SCHEMA)) + actual = sr.get_version_schema_string("get_version", 3) + assert expected == actual + +def test_get_referenced_by(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + assert sr.get_referenced_by("get_version", 3) == [1, 2] def test_set_compatibility(mock_schema_registry): conf = {'url': TEST_URL} diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index a30a28932..4f2ce8715 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -25,6 +25,8 @@ import respx from httpx import Response +from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion + work_dir = os.path.dirname(os.path.realpath(__file__)) @@ -136,7 +138,12 @@ def mock_schema_registry(): respx_mock.put(COMPATIBILITY_RE).mock(side_effect=put_compatibility_callback) respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback) + respx_mock.get(SCHEMAS_STRING_RE).mock(side_effect=get_schema_string_callback) + respx_mock.get(SCHEMAS_TYPES_RE).mock(side_effect=get_schema_types_callback) + respx_mock.get(SCHEMAS_VERSIONS_RE).mock(side_effect=get_schema_versions_callback) + respx_mock.get(SUBJECTS_VERSIONS_SCHEMA_RE).mock(side_effect=get_subject_version_schema_callback) + respx_mock.get(SUBJECTS_VERSIONS_REFERENCED_BY_RE).mock(side_effect=get_subject_version_referenced_by_callback) respx_mock.get(SUBJECTS_VERSIONS_RE).mock(side_effect=get_subject_version_callback) respx_mock.delete(SUBJECTS_VERSIONS_RE).mock(side_effect=delete_subject_version_callback) respx_mock.post(SUBJECTS_VERSIONS_RE).mock(side_effect=post_subject_version_callback) @@ -149,9 +156,16 @@ def mock_schema_registry(): # request paths -SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)?(.*)$") +SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)$") +SCHEMAS_STRING_RE = re.compile("/schemas/ids/([0-9]*)/schema$") +SCHEMAS_TYPES_RE = re.compile("/schemas/types$") +SCHEMAS_VERSIONS_RE = re.compile("/schemas/ids/([0-9]*)/versions$") + SUBJECTS_RE = re.compile("/subjects/?(.*)$") SUBJECTS_VERSIONS_RE = re.compile("/subjects/(.*)/versions/?(.*)$") +SUBJECTS_VERSIONS_SCHEMA_RE = re.compile("/subjects/(.*)/versions/(.*)/schema/?(.*)$") +SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile("/subjects/(.*)/versions/(.*)/referencedby$") + COMPATIBILITY_RE = re.compile("/config/?(.*)$") COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$") @@ -189,12 +203,11 @@ def _auth_matcher(request): return Response(401, json=unauthorized) -def _load_avsc(name): +def _load_avsc(name) -> str: with open(os.path.join(work_dir, '..', 'integration', 'schema_registry', 'data', name)) as fd: return fd.read() - def get_compatibility_callback(request, route): COUNTER['GET'][request.url.path] += 1 @@ -272,6 +285,25 @@ def get_schemas_callback(request, route): return Response(200, json={'schema': _load_avsc(SCHEMA)}) +def get_schema_string_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + path_match = re.match(SCHEMAS_STRING_RE, request.url.path) + schema_id = path_match.group(1) + if int(schema_id) == 404: + return Response(404, json={'error_code': 40403, + 'message': "Schema not found"}) + return Response(200, json=json.loads(_load_avsc(SCHEMA))) + +def get_schema_types_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=['AVRO', 'JSON', 'PROTOBUF']) + +def get_schema_versions_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=[ + {'subject': 'subject1', 'version': 1}, + {'subject': 'subject2', 'version': 2} + ]) def get_subject_version_callback(request, route): COUNTER['GET'][request.url.path] += 1 @@ -335,6 +367,13 @@ def post_subject_version_callback(request, route): else: return Response(200, json={'id': SCHEMA_ID}) +def get_subject_version_schema_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=json.loads(_load_avsc(SCHEMA))) + +def get_subject_version_referenced_by_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=[1, 2]) def post_compatibility_subjects_versions_callback(request, route): COUNTER['POST'][request.url.path] += 1 From 884c35e4c7108f0eed4db5f174b88b1add44e052 Mon Sep 17 00:00:00 2001 From: Naxin Date: Tue, 12 Aug 2025 16:26:33 -0400 Subject: [PATCH 04/13] fix integration tests --- .../schema_registry/_sync/mock_schema_registry_client.py | 6 +++--- .../schema_registry/_async/test_proto_serializers.py | 2 +- .../schema_registry/_sync/test_proto_serializers.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py index 8de463761..36b78207e 100644 --- a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py @@ -51,7 +51,7 @@ def set(self, registered_schema: RegisteredSchema) -> RegisteredSchema: self.subject_schemas[rs.subject].add(rs) return rs - def get_schema(self, schema_id: int) -> Optional[RegisteredSchema]: + def get_schema(self, schema_id: int) -> Optional[Schema]: with self.lock: rs = self.schema_id_index.get(schema_id, None) return rs.schema if rs else None @@ -59,7 +59,7 @@ def get_schema(self, schema_id: int) -> Optional[RegisteredSchema]: def get_schema_string(self, schema_id: int) -> Optional[str]: return None - def get_schema_by_guid(self, guid: str) -> Optional[RegisteredSchema]: + def get_schema_by_guid(self, guid: str) -> Optional[Schema]: with self.lock: rs = self.schema_guid_index.get(guid, None) return rs.schema if rs else None @@ -196,7 +196,7 @@ def get_schema( def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + ) -> 'Schema': schema = self._store.get_schema_by_guid(guid) if schema is not None: return schema diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index 0e65686e2..c04eb480b 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -92,7 +92,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs await producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (await sr.get_schema(serializer._schema_id.id)).references + registered_refs = (await sr.get_schema(serializer._schema_id.id)).schema.references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index 9b3ca3197..bbd6bb40c 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -92,7 +92,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (sr.get_schema(serializer._schema_id.id)).references + registered_refs = sr.get_schema(serializer._schema_id.id).schema.references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() From a30561b5f40cdbf12a0a46a0a560dd569520e392 Mon Sep 17 00:00:00 2001 From: Naxin Date: Tue, 12 Aug 2025 21:37:37 -0400 Subject: [PATCH 05/13] regenerate sync code + fix it --- DEVELOPER.md | 3 +-- .../_async/schema_registry_client.py | 20 +++++++++---------- .../_sync/mock_schema_registry_client.py | 3 --- .../_sync/schema_registry_client.py | 16 +++++++-------- .../_async/test_proto_serializers.py | 2 +- .../_sync/test_proto_serializers.py | 2 +- .../schema_registry/_async/test_api_client.py | 2 +- .../schema_registry/_sync/test_api_client.py | 11 ++++++---- 8 files changed, 29 insertions(+), 30 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index e886fa63a..48f7e6b53 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -11,7 +11,7 @@ If librdkafka is installed in a non-standard location provide the include and li $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build -**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB +**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB ## Generate Documentation @@ -45,4 +45,3 @@ If you make any changes to the async code (in `src/confluent_kafka/schema_regist See [tests/README.md](tests/README.md) for instructions on how to run tests. - diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 76b94305d..9f2518ee9 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -675,7 +675,7 @@ async def get_schema( SchemaRegistryError: If schema can't be found. See Also: - `GET Schema API Reference `_ + `GET Schema API Reference `_ """ # noqa: E501 result = self._cache.get_schema_by_id(subject_name, schema_id) @@ -695,7 +695,7 @@ async def get_schema( self._cache.set_schema(subject_name, schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema.schema + return registered_schema async def get_schema_string( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None @@ -784,15 +784,15 @@ async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: Gets all subject-version pairs of a schema by its ID. Args: - schema_id (int): Schema ID + schema_id (int): Schema ID. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: - - subject (str): Subject name - - version (int): Version number + - subject (str): Subject name. + - version (int): Version number. Raises: - SchemaRegistryError: if schema versions can't be found + SchemaRegistryError: if schema versions can't be found. See Also: `GET Schema Versions API Reference `_ @@ -808,20 +808,20 @@ async def lookup_schema( Returns ``schema`` registration information for ``subject``. Args: - subject_name (str): Subject name the schema is registered under + subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. - normalize_schemas (bool): Normalize schema before registering + normalize_schemas (bool): Normalize schema before registering. deleted (bool): Whether to include deleted schemas. Returns: RegisteredSchema: Subject registration information for this schema. Raises: - SchemaRegistryError: If schema or subject can't be found + SchemaRegistryError: If schema or subject can't be found. See Also: `POST Subject API Reference `_ - """ # noqa: E501 + """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_schema(subject_name, schema) if registered_schema is not None: diff --git a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py index 36b78207e..2b9037bb5 100644 --- a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py @@ -56,9 +56,6 @@ def get_schema(self, schema_id: int) -> Optional[Schema]: rs = self.schema_id_index.get(schema_id, None) return rs.schema if rs else None - def get_schema_string(self, schema_id: int) -> Optional[str]: - return None - def get_schema_by_guid(self, guid: str) -> Optional[Schema]: with self.lock: rs = self.schema_guid_index.get(guid, None) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 1bc2ae0e7..85e726bb4 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -769,10 +769,10 @@ def get_schema_types(self) -> List[str]: Lists all supported schema types in the Schema Registry. Returns: - list(str): Schema types currently available on Schema Registry. + list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF']) Raises: - SchemaRegistryError: if schema types can't be retrieved. + SchemaRegistryError: if schema types can't be retrieved See Also: `GET Schema Types API Reference `_ @@ -796,7 +796,7 @@ def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: See Also: `GET Schema Versions API Reference `_ - """ # noqa: E501 + """ # noqa: E501 response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) return [SchemaVersion.from_dict(item) for item in response] @@ -808,20 +808,20 @@ def lookup_schema( Returns ``schema`` registration information for ``subject``. Args: - subject_name (str): Subject name the schema is registered under + subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. - normalize_schemas (bool): Normalize schema before registering + normalize_schemas (bool): Normalize schema before registering. deleted (bool): Whether to include deleted schemas. Returns: RegisteredSchema: Subject registration information for this schema. Raises: - SchemaRegistryError: If schema or subject can't be found + SchemaRegistryError: If schema or subject can't be found. See Also: `POST Subject API Reference `_ - """ # noqa: E501 + """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_schema(subject_name, schema) if registered_schema is not None: @@ -979,7 +979,7 @@ def get_version( deleted: bool = False, fmt: Optional[str] = None ) -> 'RegisteredSchema': """ - Retrieves a specific schema registered under ``subject_name`` and ``version``. + Retrieves a specific schema registered under `subject_name` and `version`. Args: subject_name (str): Subject name. diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index c04eb480b..0e65686e2 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -92,7 +92,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs await producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (await sr.get_schema(serializer._schema_id.id)).schema.references + registered_refs = (await sr.get_schema(serializer._schema_id.id)).references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index bbd6bb40c..9b3ca3197 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -92,7 +92,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = sr.get_schema(serializer._schema_id.id).schema.references + registered_refs = (sr.get_schema(serializer._schema_id.id)).references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 1fe619728..147d055cf 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -44,7 +44,7 @@ TEST_USER_PASSWORD = 'sr_user_secret' -def cmp_schema(schema1, schema2): +def cmp_schema(schema1: Schema, schema2: Schema) -> bool: """ Compare to Schemas for equivalence diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index 76ea6d4a6..c3bd320dc 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -44,7 +44,7 @@ TEST_USER_PASSWORD = 'sr_user_secret' -def cmp_schema(schema1: Schema, schema2: Schema): +def cmp_schema(schema1: Schema, schema2: Schema) -> bool: """ Compare to Schemas for equivalence @@ -144,10 +144,10 @@ def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) - expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') - actual = sr.get_schema(47) + schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') + schema2 = sr.get_schema(47) - assert cmp_schema(expected, actual.schema) + assert cmp_schema(schema, schema2) def test_get_schema_not_found(mock_schema_registry): @@ -373,6 +373,7 @@ def test_delete_version_invalid(mock_schema_registry): assert e.value.http_status_code == 422 assert e.value.error_code == 42202 + def test_get_version_schema_string(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) @@ -381,12 +382,14 @@ def test_get_version_schema_string(mock_schema_registry, load_avsc): actual = sr.get_version_schema_string("get_version", 3) assert expected == actual + def test_get_referenced_by(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) assert sr.get_referenced_by("get_version", 3) == [1, 2] + def test_set_compatibility(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) From 30a97e5fcbe504fe60f1a66b7beeca84c637acf3 Mon Sep 17 00:00:00 2001 From: Naxin Date: Tue, 12 Aug 2025 23:59:38 -0400 Subject: [PATCH 06/13] fix some unit tests broken from unasync --- .../_async/mock_schema_registry_client.py | 2 +- .../schema_registry/_async/schema_registry_client.py | 10 ++++++---- .../_sync/mock_schema_registry_client.py | 2 +- .../schema_registry/_sync/schema_registry_client.py | 8 +++++--- .../schema_registry/common/schema_registry_client.py | 2 ++ .../schema_registry/_async/test_proto_serializers.py | 2 +- .../schema_registry/_sync/test_proto_serializers.py | 2 +- tests/schema_registry/_async/test_api_client.py | 7 ++++--- tests/schema_registry/_sync/test_api_client.py | 7 ++++--- tests/schema_registry/conftest.py | 10 ++++++++-- 10 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py index c4e52e6c1..13ec8db87 100644 --- a/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/mock_schema_registry_client.py @@ -21,7 +21,7 @@ from typing import List, Dict, Optional from .schema_registry_client import AsyncSchemaRegistryClient -from ..common.schema_registry_client import RegisteredSchema, Schema, SchemaVersion, ServerConfig +from ..common.schema_registry_client import RegisteredSchema, Schema, ServerConfig from ..error import SchemaRegistryError diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 9f2518ee9..6d5926a2e 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -776,7 +776,8 @@ async def get_schema_types(self) -> List[str]: See Also: `GET Schema Types API Reference `_ - """ # noqa: E501 + """ # noqa: E501 + return await self._rest_client.get('schemas/types') async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: @@ -796,8 +797,9 @@ async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: See Also: `GET Schema Versions API Reference `_ - """ # noqa: E501 - response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) + """ # noqa: E501 + + response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) return [SchemaVersion.from_dict(item) for item in response] async def lookup_schema( @@ -821,7 +823,7 @@ async def lookup_schema( See Also: `POST Subject API Reference `_ - """ # noqa: E501 + """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_schema(subject_name, schema) if registered_schema is not None: diff --git a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py index 2b9037bb5..0ffcea34d 100644 --- a/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/mock_schema_registry_client.py @@ -21,7 +21,7 @@ from typing import List, Dict, Optional from .schema_registry_client import SchemaRegistryClient -from ..common.schema_registry_client import RegisteredSchema, Schema, SchemaVersion, ServerConfig +from ..common.schema_registry_client import RegisteredSchema, Schema, ServerConfig from ..error import SchemaRegistryError diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 85e726bb4..87978d165 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -776,7 +776,8 @@ def get_schema_types(self) -> List[str]: See Also: `GET Schema Types API Reference `_ - """ # noqa: E501 + """ # noqa: E501 + return self._rest_client.get('schemas/types') def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: @@ -796,7 +797,8 @@ def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: See Also: `GET Schema Versions API Reference `_ - """ # noqa: E501 + """ # noqa: E501 + response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) return [SchemaVersion.from_dict(item) for item in response] @@ -821,7 +823,7 @@ def lookup_schema( See Also: `POST Subject API Reference `_ - """ # noqa: E501 + """ # noqa: E501 registered_schema = self._cache.get_registered_by_subject_schema(subject_name, schema) if registered_schema is not None: diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 77e433b92..34cfa9129 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -658,6 +658,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: return metadata + @_attrs_define(frozen=True) class SchemaVersion: subject: Optional[str] @@ -670,6 +671,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: version = d.pop("version", None) return cls(subject=subject, version=version) + @_attrs_define(frozen=True) class SchemaReference: name: Optional[str] diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index 0e65686e2..c04eb480b 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -92,7 +92,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs await producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (await sr.get_schema(serializer._schema_id.id)).references + registered_refs = (await sr.get_schema(serializer._schema_id.id)).schema.references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index 9b3ca3197..dd70ec4b7 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -92,7 +92,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (sr.get_schema(serializer._schema_id.id)).references + registered_refs = (sr.get_schema(serializer._schema_id.id)).schema.references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 147d055cf..63405f5fd 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -144,10 +144,10 @@ async def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) - schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') - schema2 = await sr.get_schema(47) + expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') + actual = await sr.get_schema(47) - assert cmp_schema(schema, schema2) + assert cmp_schema(expected, actual.schema) async def test_get_schema_not_found(mock_schema_registry): @@ -186,6 +186,7 @@ def get(): assert count_after - count_before == 1 + async def test_get_schema_string_success(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index c3bd320dc..e494b8ba7 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -144,10 +144,10 @@ def test_get_schema(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) - schema = Schema(load_avsc(SCHEMA), schema_type='AVRO') - schema2 = sr.get_schema(47) + expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') + actual = sr.get_schema(47) - assert cmp_schema(schema, schema2) + assert cmp_schema(expected, actual.schema) def test_get_schema_not_found(mock_schema_registry): @@ -186,6 +186,7 @@ def get(): assert count_after - count_before == 1 + def test_get_schema_string_success(mock_schema_registry, load_avsc): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index 4f2ce8715..5040a52b1 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -25,8 +25,6 @@ import respx from httpx import Response -from confluent_kafka.schema_registry.common.schema_registry_client import SchemaVersion - work_dir = os.path.dirname(os.path.realpath(__file__)) @@ -208,6 +206,7 @@ def _load_avsc(name) -> str: 'data', name)) as fd: return fd.read() + def get_compatibility_callback(request, route): COUNTER['GET'][request.url.path] += 1 @@ -285,6 +284,7 @@ def get_schemas_callback(request, route): return Response(200, json={'schema': _load_avsc(SCHEMA)}) + def get_schema_string_callback(request, route): COUNTER['GET'][request.url.path] += 1 path_match = re.match(SCHEMAS_STRING_RE, request.url.path) @@ -294,10 +294,12 @@ def get_schema_string_callback(request, route): 'message': "Schema not found"}) return Response(200, json=json.loads(_load_avsc(SCHEMA))) + def get_schema_types_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=['AVRO', 'JSON', 'PROTOBUF']) + def get_schema_versions_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=[ @@ -305,6 +307,7 @@ def get_schema_versions_callback(request, route): {'subject': 'subject2', 'version': 2} ]) + def get_subject_version_callback(request, route): COUNTER['GET'][request.url.path] += 1 @@ -367,14 +370,17 @@ def post_subject_version_callback(request, route): else: return Response(200, json={'id': SCHEMA_ID}) + def get_subject_version_schema_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=json.loads(_load_avsc(SCHEMA))) + def get_subject_version_referenced_by_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=[1, 2]) + def post_compatibility_subjects_versions_callback(request, route): COUNTER['POST'][request.url.path] += 1 From f1029a9845fa886c4728516bdcd46cc9daa5b51c Mon Sep 17 00:00:00 2001 From: Naxin Date: Wed, 13 Aug 2025 15:45:25 -0400 Subject: [PATCH 07/13] missing params and address feedback --- .../_async/schema_registry_client.py | 54 ++++++++++++------- .../_sync/schema_registry_client.py | 54 ++++++++++++------- .../common/schema_registry_client.py | 22 ++------ 3 files changed, 75 insertions(+), 55 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 6d5926a2e..2a7c61bc6 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -682,12 +682,11 @@ async def get_schema( if result is not None: return result[1] - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -719,12 +718,11 @@ async def get_schema_string( `GET Schema API Reference `_ """ # noqa: E501 - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt return await self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) async def get_schema_by_guid( @@ -780,12 +778,14 @@ async def get_schema_types(self) -> List[str]: return await self._rest_client.get('schemas/types') - async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: + async def get_schema_versions(self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. Args: schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subject versions where the schema was deleted. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: @@ -799,7 +799,12 @@ async def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: `GET Schema Versions API Reference `_ """ # noqa: E501 - response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) + query = {} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query) return [SchemaVersion.from_dict(item) for item in response] async def lookup_schema( @@ -852,10 +857,14 @@ async def lookup_schema( return registered_schema - async def get_subjects(self) -> List[str]: + async def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = False) -> List[str]: """ Lists all subjects registered with the Schema Registry + Args: + subject_prefix (str): Subject name prefix that results can be filtered by. + deleted (bool): Whether to include deleted subjects. + Returns: list(str): Registered subject names @@ -866,7 +875,10 @@ async def get_subjects(self) -> List[str]: `GET subjects API Reference `_ """ # noqa: E501 - return await self._rest_client.get('subjects') + query = {'deleted': deleted } + if subject_prefix is not None: + query['subject'] = subject_prefix + return await self._rest_client.get('subjects', query) async def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ @@ -960,7 +972,9 @@ async def get_latest_with_metadata( if registered_schema is not None: return registered_schema - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + query = {'deleted': deleted} + if fmt is not None: + query['format'] = fmt keys = metadata.keys() if keys: query['key'] = [_urlencode(key) for key in keys] @@ -985,7 +999,7 @@ async def get_version( Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema. @@ -1024,7 +1038,7 @@ async def get_version_schema_string( Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema. @@ -1063,12 +1077,13 @@ async def get_referenced_by(self, subject_name: str, version: Union[int, str] = return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( _urlencode(subject_name), version)) - async def get_versions(self, subject_name: str) -> List[int]: + async def get_versions(self, subject_name: str, deleted: bool = False) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. + deleted (bool): Whether to include deleted schemas. Returns: list(int): Registered versions @@ -1080,7 +1095,8 @@ async def get_versions(self, subject_name: str) -> List[int]: `GET Subject Versions API Reference `_ """ # noqa: E501 - return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) + query = {'deleted': deleted} + return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) async def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 87978d165..a29b36fce 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -682,12 +682,11 @@ def get_schema( if result is not None: return result[1] - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -719,12 +718,11 @@ def get_schema_string( `GET Schema API Reference `_ """ # noqa: E501 - query = {'subject': subject_name} if subject_name is not None else None + query = {} + if subject_name is not None: + query['subject'] = subject_name if fmt is not None: - if query is not None: - query['format'] = fmt - else: - query = {'format': fmt} + query['format'] = fmt return self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) def get_schema_by_guid( @@ -780,12 +778,14 @@ def get_schema_types(self) -> List[str]: return self._rest_client.get('schemas/types') - def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: + def get_schema_versions(self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. Args: schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subject versions where the schema was deleted. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: @@ -799,7 +799,12 @@ def get_schema_versions(self, schema_id: int) -> List[SchemaVersion]: `GET Schema Versions API Reference `_ """ # noqa: E501 - response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id)) + query = {} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + response = self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query) return [SchemaVersion.from_dict(item) for item in response] def lookup_schema( @@ -852,10 +857,14 @@ def lookup_schema( return registered_schema - def get_subjects(self) -> List[str]: + def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = False) -> List[str]: """ Lists all subjects registered with the Schema Registry + Args: + subject_prefix (str): Subject name prefix that results can be filtered by. + deleted (bool): Whether to include deleted subjects. + Returns: list(str): Registered subject names @@ -866,7 +875,10 @@ def get_subjects(self) -> List[str]: `GET subjects API Reference `_ """ # noqa: E501 - return self._rest_client.get('subjects') + query = {'deleted': deleted } + if subject_prefix is not None: + query['subject'] = subject_prefix + return self._rest_client.get('subjects', query) def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]: """ @@ -960,7 +972,9 @@ def get_latest_with_metadata( if registered_schema is not None: return registered_schema - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} + query = {'deleted': deleted} + if fmt is not None: + query['format'] = fmt keys = metadata.keys() if keys: query['key'] = [_urlencode(key) for key in keys] @@ -985,7 +999,7 @@ def get_version( Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema. @@ -1024,7 +1038,7 @@ def get_version_schema_string( Args: subject_name (str): Subject name. - version (int): version number. Defaults to latest version. + version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. deleted (bool): Whether to include deleted schemas. fmt (str): Format of the schema. @@ -1063,12 +1077,13 @@ def get_referenced_by(self, subject_name: str, version: Union[int, str] = "lates return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( _urlencode(subject_name), version)) - def get_versions(self, subject_name: str) -> List[int]: + def get_versions(self, subject_name: str, deleted: bool = False) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. + deleted (bool): Whether to include deleted schemas. Returns: list(int): Registered versions @@ -1080,7 +1095,8 @@ def get_versions(self, subject_name: str) -> List[int]: `GET Subject Versions API Reference `_ """ # noqa: E501 - return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) + query = {'deleted': deleted} + return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: """ diff --git a/src/confluent_kafka/schema_registry/common/schema_registry_client.py b/src/confluent_kafka/schema_registry/common/schema_registry_client.py index 34cfa9129..e52c0254b 100644 --- a/src/confluent_kafka/schema_registry/common/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/common/schema_registry_client.py @@ -666,10 +666,7 @@ class SchemaVersion: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: - d = src_dict.copy() - subject = d.pop("subject", None) - version = d.pop("version", None) - return cls(subject=subject, version=version) + return cls(subject=src_dict.get('subject'), version=src_dict.get('version')) @_attrs_define(frozen=True) @@ -697,21 +694,12 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: - d = src_dict.copy() - name = d.pop("name", None) - - subject = d.pop("subject", None) - - version = d.pop("version", None) - - schema_reference = cls( - name=name, - subject=subject, - version=version, + return cls( + name=src_dict.get('name'), + subject=src_dict.get('subject'), + version=src_dict.get('version'), ) - return schema_reference - class ConfigCompatibilityLevel(str, Enum): BACKWARD = "BACKWARD" From c465d04718426ba39375fcb3e94606c372f51473 Mon Sep 17 00:00:00 2001 From: Naxin Date: Wed, 13 Aug 2025 15:57:16 -0400 Subject: [PATCH 08/13] lint --- .../schema_registry/_async/schema_registry_client.py | 6 ++++-- .../schema_registry/_sync/schema_registry_client.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 2a7c61bc6..8b2015f44 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -778,7 +778,9 @@ async def get_schema_types(self) -> List[str]: return await self._rest_client.get('schemas/types') - async def get_schema_versions(self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False) -> List[SchemaVersion]: + async def get_schema_versions( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False + ) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. @@ -875,7 +877,7 @@ async def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool `GET subjects API Reference `_ """ # noqa: E501 - query = {'deleted': deleted } + query = {'deleted': deleted} if subject_prefix is not None: query['subject'] = subject_prefix return await self._rest_client.get('subjects', query) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index a29b36fce..52d3f5304 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -778,7 +778,9 @@ def get_schema_types(self) -> List[str]: return self._rest_client.get('schemas/types') - def get_schema_versions(self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False) -> List[SchemaVersion]: + def get_schema_versions( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False + ) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. @@ -875,7 +877,7 @@ def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = Fal `GET subjects API Reference `_ """ # noqa: E501 - query = {'deleted': deleted } + query = {'deleted': deleted} if subject_prefix is not None: query['subject'] = subject_prefix return self._rest_client.get('subjects', query) From 3115286968ac0bee50044ffada0acbfae2b57c1c Mon Sep 17 00:00:00 2001 From: Naxin Date: Fri, 15 Aug 2025 15:27:47 -0400 Subject: [PATCH 09/13] revert breaking chamges --- .../_async/schema_registry_client.py | 102 ++++++++++++++---- .../_sync/schema_registry_client.py | 102 ++++++++++++++---- .../_async/test_proto_serializers.py | 2 +- .../_sync/test_proto_serializers.py | 2 +- .../schema_registry/_async/test_api_client.py | 11 +- .../schema_registry/_sync/test_api_client.py | 11 +- tests/schema_registry/conftest.py | 18 ++-- 7 files changed, 196 insertions(+), 52 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index b1c296867..2675a35a9 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -664,8 +664,10 @@ async def register_schema_full_response( return registered_schema async def get_schema( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + self, schema_id: int, subject_name: Optional[str] = None, + fmt: Optional[str] = None, reference_format: Optional[str] = None, + find_tags: Optional[List[str]] = None, fetch_max_id: bool = False + ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -674,10 +676,13 @@ async def get_schema( Args: schema_id (int): Schema id. subject_name (str): Subject name the schema is registered under. - fmt (str): Format of the schema. + fmt (str): Desired output format, dependent on schema type. + reference_format (str): Desired output format for references. + find_tags (list[str]): Find tagged entities for the given tags or * for all tags. + fetch_max_id (boolean): Whether to fetch the maximum schema identifier that exists Returns: - RegisteredSchema: Registration information for this schema. + Schema: Schema instance identified by the ``schema_id`` Raises: SchemaRegistryError: If schema can't be found. @@ -695,6 +700,12 @@ async def get_schema( query['subject'] = subject_name if fmt is not None: query['format'] = fmt + if reference_format is not None: + query['reference_format'] = reference_format + if find_tags is not None: + query['find_tags'] = find_tags + if fetch_max_id: + query['fetch_max_id'] = fetch_max_id response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -702,7 +713,7 @@ async def get_schema( self._cache.set_schema(subject_name, schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema + return registered_schema.schema async def get_schema_string( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None @@ -714,7 +725,7 @@ async def get_schema_string( Args: schema_id (int): Schema id. subject_name (str): Subject name the schema is registered under. - fmt (str): Format of the schema. + fmt (str): Desired output format, dependent on schema type. Returns: str: Schema string for this version. @@ -735,7 +746,7 @@ async def get_schema_string( async def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + ) -> 'Schema': """ Fetches the schema associated with ``guid`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -746,7 +757,7 @@ async def get_schema_by_guid( fmt (str): Format of the schema Returns: - RegisteredSchema: Registration information for this schema. + Schema: Schema instance identified by the ``guid`` Raises: SchemaRegistryError: If schema can't be found. @@ -768,7 +779,7 @@ async def get_schema_by_guid( self._cache.set_schema(None, registered_schema.schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema + return registered_schema.schema async def get_schema_types(self) -> List[str]: """ @@ -786,8 +797,38 @@ async def get_schema_types(self) -> List[str]: return await self._rest_client.get('schemas/types') + async def get_subjects_by_schema_id( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: + """ + Retrieves all the subjects associated with ``schema_id``. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subejcts where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(str): List of suubjects matching the specified parameters. + + Raises: + SchemaRegistryError: if subjects can't be found + + TODO: add API reference + """ + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + return await self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query) + async def get_schema_versions( - self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 ) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. @@ -796,6 +837,8 @@ async def get_schema_versions( schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. deleted (bool): Whether to include subject versions where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: @@ -809,7 +852,7 @@ async def get_schema_versions( `GET Schema Versions API Reference `_ """ # noqa: E501 - query = {} + query = {'offset': offset, 'limit': limit} if subject_name is not None: query['subject'] = subject_name if deleted: @@ -819,7 +862,7 @@ async def get_schema_versions( async def lookup_schema( self, subject_name: str, schema: 'Schema', - normalize_schemas: bool = False, deleted: bool = False + normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False ) -> 'RegisteredSchema': """ Returns ``schema`` registration information for ``subject``. @@ -828,6 +871,7 @@ async def lookup_schema( subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. normalize_schemas (bool): Normalize schema before registering. + fmt (str): Desired output format, dependent on schema type. deleted (bool): Whether to include deleted schemas. Returns: @@ -847,8 +891,8 @@ async def lookup_schema( request = schema.to_dict() response = await self._rest_client.post( - 'subjects/{}?normalize={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, deleted), + 'subjects/{}?normalize={}&format={}&deleted={}'.format( + _urlencode(subject_name), normalize_schemas, fmt, deleted), body=request ) @@ -867,13 +911,18 @@ async def lookup_schema( return registered_schema - async def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = False) -> List[str]: + async def get_subjects( + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[str]: """ - Lists all subjects registered with the Schema Registry + Lists all subjects registered with the Schema Registry. Args: subject_prefix (str): Subject name prefix that results can be filtered by. deleted (bool): Whether to include deleted subjects. + deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(str): Registered subject names @@ -885,7 +934,7 @@ async def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool `GET subjects API Reference `_ """ # noqa: E501 - query = {'deleted': deleted} + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} if subject_prefix is not None: query['subject'] = subject_prefix return await self._rest_client.get('subjects', query) @@ -1067,13 +1116,17 @@ async def get_version_schema_string( 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query ) - async def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[int]: + async def get_referenced_by( + self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. Args: subject_name (str): Subject name version (int or str): Version number or "latest" + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): List of schema IDs that reference the specified schema. @@ -1084,16 +1137,23 @@ async def get_referenced_by(self, subject_name: str, version: Union[int, str] = See Also: `GET Subject Versions API Reference `_ """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( - _urlencode(subject_name), version)) + _urlencode(subject_name), version), query) - async def get_versions(self, subject_name: str, deleted: bool = False) -> List[int]: + async def get_versions( + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. deleted (bool): Whether to include deleted schemas. + deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): Registered versions @@ -1105,7 +1165,7 @@ async def get_versions(self, subject_name: str, deleted: bool = False) -> List[i `GET Subject Versions API Reference `_ """ # noqa: E501 - query = {'deleted': deleted} + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) async def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index fff0ddaeb..593a07ef8 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -664,8 +664,10 @@ def register_schema_full_response( return registered_schema def get_schema( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + self, schema_id: int, subject_name: Optional[str] = None, + fmt: Optional[str] = None, reference_format: Optional[str] = None, + find_tags: Optional[List[str]] = None, fetch_max_id: bool = False + ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -674,10 +676,13 @@ def get_schema( Args: schema_id (int): Schema id. subject_name (str): Subject name the schema is registered under. - fmt (str): Format of the schema. + fmt (str): Desired output format, dependent on schema type. + reference_format (str): Desired output format for references. + find_tags (list[str]): Find tagged entities for the given tags or * for all tags. + fetch_max_id (boolean): Whether to fetch the maximum schema identifier that exists Returns: - RegisteredSchema: Registration information for this schema. + Schema: Schema instance identified by the ``schema_id`` Raises: SchemaRegistryError: If schema can't be found. @@ -695,6 +700,12 @@ def get_schema( query['subject'] = subject_name if fmt is not None: query['format'] = fmt + if reference_format is not None: + query['reference_format'] = reference_format + if find_tags is not None: + query['find_tags'] = find_tags + if fetch_max_id: + query['fetch_max_id'] = fetch_max_id response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -702,7 +713,7 @@ def get_schema( self._cache.set_schema(subject_name, schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema + return registered_schema.schema def get_schema_string( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None @@ -714,7 +725,7 @@ def get_schema_string( Args: schema_id (int): Schema id. subject_name (str): Subject name the schema is registered under. - fmt (str): Format of the schema. + fmt (str): Desired output format, dependent on schema type. Returns: str: Schema string for this version. @@ -735,7 +746,7 @@ def get_schema_string( def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None - ) -> 'RegisteredSchema': + ) -> 'Schema': """ Fetches the schema associated with ``guid`` from the Schema Registry. The result is cached so subsequent attempts will not @@ -746,7 +757,7 @@ def get_schema_by_guid( fmt (str): Format of the schema Returns: - RegisteredSchema: Registration information for this schema. + Schema: Schema instance identified by the ``guid`` Raises: SchemaRegistryError: If schema can't be found. @@ -768,7 +779,7 @@ def get_schema_by_guid( self._cache.set_schema(None, registered_schema.schema_id, registered_schema.guid, registered_schema.schema) - return registered_schema + return registered_schema.schema def get_schema_types(self) -> List[str]: """ @@ -786,8 +797,38 @@ def get_schema_types(self) -> List[str]: return self._rest_client.get('schemas/types') + def get_subjects_by_schema_id( + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 + ) -> List[str]: + """ + Retrieves all the subjects associated with ``schema_id``. + + Args: + schema_id (int): Schema ID. + subject_name (str): Subject name that results can be filtered by. + deleted (bool): Whether to include subejcts where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + list(str): List of suubjects matching the specified parameters. + + Raises: + SchemaRegistryError: if subjects can't be found + + TODO: add API reference + """ + query = {'offset': offset, 'limit': limit} + if subject_name is not None: + query['subject'] = subject_name + if deleted: + query['deleted'] = deleted + return self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query) + def get_schema_versions( - self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False + self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False, + offset: int = 0, limit: int = -1 ) -> List[SchemaVersion]: """ Gets all subject-version pairs of a schema by its ID. @@ -796,6 +837,8 @@ def get_schema_versions( schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. deleted (bool): Whether to include subject versions where the schema was deleted. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(SchemaVersion): List of subject-version pairs. Each pair contains: @@ -809,7 +852,7 @@ def get_schema_versions( `GET Schema Versions API Reference `_ """ # noqa: E501 - query = {} + query = {'offset': offset, 'limit': limit} if subject_name is not None: query['subject'] = subject_name if deleted: @@ -819,7 +862,7 @@ def get_schema_versions( def lookup_schema( self, subject_name: str, schema: 'Schema', - normalize_schemas: bool = False, deleted: bool = False + normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False ) -> 'RegisteredSchema': """ Returns ``schema`` registration information for ``subject``. @@ -828,6 +871,7 @@ def lookup_schema( subject_name (str): Subject name the schema is registered under. schema (Schema): Schema instance. normalize_schemas (bool): Normalize schema before registering. + fmt (str): Desired output format, dependent on schema type. deleted (bool): Whether to include deleted schemas. Returns: @@ -847,8 +891,8 @@ def lookup_schema( request = schema.to_dict() response = self._rest_client.post( - 'subjects/{}?normalize={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, deleted), + 'subjects/{}?normalize={}&format={}&deleted={}'.format( + _urlencode(subject_name), normalize_schemas, fmt, deleted), body=request ) @@ -867,13 +911,18 @@ def lookup_schema( return registered_schema - def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = False) -> List[str]: + def get_subjects( + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[str]: """ - Lists all subjects registered with the Schema Registry + Lists all subjects registered with the Schema Registry. Args: subject_prefix (str): Subject name prefix that results can be filtered by. deleted (bool): Whether to include deleted subjects. + deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(str): Registered subject names @@ -885,7 +934,7 @@ def get_subjects(self, subject_prefix: Optional[str] = None, deleted: bool = Fal `GET subjects API Reference `_ """ # noqa: E501 - query = {'deleted': deleted} + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} if subject_prefix is not None: query['subject'] = subject_prefix return self._rest_client.get('subjects', query) @@ -1067,13 +1116,17 @@ def get_version_schema_string( 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query ) - def get_referenced_by(self, subject_name: str, version: Union[int, str] = "latest") -> List[int]: + def get_referenced_by( + self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. Args: subject_name (str): Subject name version (int or str): Version number or "latest" + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): List of schema IDs that reference the specified schema. @@ -1084,16 +1137,23 @@ def get_referenced_by(self, subject_name: str, version: Union[int, str] = "lates See Also: `GET Subject Versions API Reference `_ """ # noqa: E501 + + query = {'offset': offset, 'limit': limit} return self._rest_client.get('subjects/{}/versions/{}/referencedby'.format( - _urlencode(subject_name), version)) + _urlencode(subject_name), version), query) - def get_versions(self, subject_name: str, deleted: bool = False) -> List[int]: + def get_versions( + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + ) -> List[int]: """ Get a list of all versions registered with this subject. Args: subject_name (str): Subject name. deleted (bool): Whether to include deleted schemas. + deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence. + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. Returns: list(int): Registered versions @@ -1105,7 +1165,7 @@ def get_versions(self, subject_name: str, deleted: bool = False) -> List[int]: `GET Subject Versions API Reference `_ """ # noqa: E501 - query = {'deleted': deleted} + query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query) def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int: diff --git a/tests/integration/schema_registry/_async/test_proto_serializers.py b/tests/integration/schema_registry/_async/test_proto_serializers.py index c04eb480b..0e65686e2 100644 --- a/tests/integration/schema_registry/_async/test_proto_serializers.py +++ b/tests/integration/schema_registry/_async/test_proto_serializers.py @@ -92,7 +92,7 @@ async def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs await producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (await sr.get_schema(serializer._schema_id.id)).schema.references + registered_refs = (await sr.get_schema(serializer._schema_id.id)).references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/integration/schema_registry/_sync/test_proto_serializers.py b/tests/integration/schema_registry/_sync/test_proto_serializers.py index dd70ec4b7..9b3ca3197 100644 --- a/tests/integration/schema_registry/_sync/test_proto_serializers.py +++ b/tests/integration/schema_registry/_sync/test_proto_serializers.py @@ -92,7 +92,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): producer.produce(topic, key=pb2(), partition=0) producer.flush() - registered_refs = (sr.get_schema(serializer._schema_id.id)).schema.references + registered_refs = (sr.get_schema(serializer._schema_id.id)).references assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 63405f5fd..695f6e482 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -147,7 +147,7 @@ async def test_get_schema(mock_schema_registry, load_avsc): expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') actual = await sr.get_schema(47) - assert cmp_schema(expected, actual.schema) + assert cmp_schema(expected, actual) async def test_get_schema_not_found(mock_schema_registry): @@ -205,6 +205,15 @@ async def test_get_schema_types(mock_schema_registry): assert expected == actual +async def test_get_subjects_by_schema_id(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + expected = SUBJECTS + actual = await sr.get_subjects_by_schema_id(47) + assert expected == actual + + async def test_get_schema_versions(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index e494b8ba7..c8588d12d 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -147,7 +147,7 @@ def test_get_schema(mock_schema_registry, load_avsc): expected = Schema(load_avsc(SCHEMA), schema_type='AVRO') actual = sr.get_schema(47) - assert cmp_schema(expected, actual.schema) + assert cmp_schema(expected, actual) def test_get_schema_not_found(mock_schema_registry): @@ -205,6 +205,15 @@ def test_get_schema_types(mock_schema_registry): assert expected == actual +def test_get_subjects_by_schema_id(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + expected = SUBJECTS + actual = sr.get_subjects_by_schema_id(47) + assert expected == actual + + def test_get_schema_versions(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index 5040a52b1..3065bd6ae 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -137,8 +137,9 @@ def mock_schema_registry(): respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback) respx_mock.get(SCHEMAS_STRING_RE).mock(side_effect=get_schema_string_callback) - respx_mock.get(SCHEMAS_TYPES_RE).mock(side_effect=get_schema_types_callback) respx_mock.get(SCHEMAS_VERSIONS_RE).mock(side_effect=get_schema_versions_callback) + respx_mock.get(SCHEMAS_SUBJECTS_RE).mock(side_effect=get_schema_subjects_callback) + respx_mock.get(SCHEMAS_TYPES_RE).mock(side_effect=get_schema_types_callback) respx_mock.get(SUBJECTS_VERSIONS_SCHEMA_RE).mock(side_effect=get_subject_version_schema_callback) respx_mock.get(SUBJECTS_VERSIONS_REFERENCED_BY_RE).mock(side_effect=get_subject_version_referenced_by_callback) @@ -155,18 +156,18 @@ def mock_schema_registry(): # request paths SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)$") -SCHEMAS_STRING_RE = re.compile("/schemas/ids/([0-9]*)/schema$") +SCHEMAS_STRING_RE = re.compile("/schemas/ids/([0-9]*)/schema(\?.*)?$") +SCHEMAS_VERSIONS_RE = re.compile("/schemas/ids/([0-9]*)/versions(\?.*)?$") +SCHEMAS_SUBJECTS_RE = re.compile("/schemas/ids/([0-9]*)/subjects(\?.*)?$") SCHEMAS_TYPES_RE = re.compile("/schemas/types$") -SCHEMAS_VERSIONS_RE = re.compile("/schemas/ids/([0-9]*)/versions$") SUBJECTS_RE = re.compile("/subjects/?(.*)$") SUBJECTS_VERSIONS_RE = re.compile("/subjects/(.*)/versions/?(.*)$") -SUBJECTS_VERSIONS_SCHEMA_RE = re.compile("/subjects/(.*)/versions/(.*)/schema/?(.*)$") -SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile("/subjects/(.*)/versions/(.*)/referencedby$") +SUBJECTS_VERSIONS_SCHEMA_RE = re.compile("/subjects/(.*)/versions/(.*)/schema(\?.*)?$") +SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile("/subjects/(.*)/versions/(.*)/referencedby(\?.*)?$") COMPATIBILITY_RE = re.compile("/config/?(.*)$") COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$") - # constants SCHEMA_ID = 47 VERSION = 3 @@ -285,6 +286,11 @@ def get_schemas_callback(request, route): return Response(200, json={'schema': _load_avsc(SCHEMA)}) +def get_schema_subjects_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=SUBJECTS) + + def get_schema_string_callback(request, route): COUNTER['GET'][request.url.path] += 1 path_match = re.match(SCHEMAS_STRING_RE, request.url.path) From e812f1c1e805f4ad63fa992a56cb913d38c7db1e Mon Sep 17 00:00:00 2001 From: Naxin Date: Fri, 15 Aug 2025 15:40:03 -0400 Subject: [PATCH 10/13] update --- .../schema_registry/_async/schema_registry_client.py | 3 ++- .../schema_registry/_sync/schema_registry_client.py | 3 ++- tests/schema_registry/conftest.py | 10 +++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 2675a35a9..1fd3d4905 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -912,7 +912,8 @@ async def lookup_schema( return registered_schema async def get_subjects( - self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 ) -> List[str]: """ Lists all subjects registered with the Schema Registry. diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 593a07ef8..3f2486b79 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -912,7 +912,8 @@ def lookup_schema( return registered_schema def get_subjects( - self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 ) -> List[str]: """ Lists all subjects registered with the Schema Registry. diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index 3065bd6ae..42db8f6af 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -156,15 +156,15 @@ def mock_schema_registry(): # request paths SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)$") -SCHEMAS_STRING_RE = re.compile("/schemas/ids/([0-9]*)/schema(\?.*)?$") -SCHEMAS_VERSIONS_RE = re.compile("/schemas/ids/([0-9]*)/versions(\?.*)?$") -SCHEMAS_SUBJECTS_RE = re.compile("/schemas/ids/([0-9]*)/subjects(\?.*)?$") +SCHEMAS_STRING_RE = re.compile(r"/schemas/ids/([0-9]*)/schema(\?.*)?$") +SCHEMAS_VERSIONS_RE = re.compile(r"/schemas/ids/([0-9]*)/versions(\?.*)?$") +SCHEMAS_SUBJECTS_RE = re.compile(r"/schemas/ids/([0-9]*)/subjects(\?.*)?$") SCHEMAS_TYPES_RE = re.compile("/schemas/types$") SUBJECTS_RE = re.compile("/subjects/?(.*)$") SUBJECTS_VERSIONS_RE = re.compile("/subjects/(.*)/versions/?(.*)$") -SUBJECTS_VERSIONS_SCHEMA_RE = re.compile("/subjects/(.*)/versions/(.*)/schema(\?.*)?$") -SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile("/subjects/(.*)/versions/(.*)/referencedby(\?.*)?$") +SUBJECTS_VERSIONS_SCHEMA_RE = re.compile(r"/subjects/(.*)/versions/(.*)/schema(\?.*)?$") +SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile(r"/subjects/(.*)/versions/(.*)/referencedby(\?.*)?$") COMPATIBILITY_RE = re.compile("/config/?(.*)$") COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$") From 00db3aeb0c7c1324d4aaeeefadf68100d0c622c8 Mon Sep 17 00:00:00 2001 From: Naxin Date: Mon, 18 Aug 2025 14:55:49 -0400 Subject: [PATCH 11/13] update --- .../_async/schema_registry_client.py | 16 ++++++++++++---- .../_sync/schema_registry_client.py | 16 ++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 1fd3d4905..29b26ebf8 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -807,12 +807,12 @@ async def get_subjects_by_schema_id( Args: schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. - deleted (bool): Whether to include subejcts where the schema was deleted. + deleted (bool): Whether to include subjects where the schema was deleted. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: - list(str): List of suubjects matching the specified parameters. + list(str): List of subjects matching the specified parameters. Raises: SchemaRegistryError: if subjects can't be found @@ -890,9 +890,17 @@ async def lookup_schema( request = schema.to_dict() + query_params = { + 'normalize': normalize_schemas, + 'deleted': deleted + } + if fmt is not None: + query_params['format'] = fmt + + query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) + response = await self._rest_client.post( - 'subjects/{}?normalize={}&format={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, fmt, deleted), + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request ) diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 3f2486b79..a2fc35a0e 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -807,12 +807,12 @@ def get_subjects_by_schema_id( Args: schema_id (int): Schema ID. subject_name (str): Subject name that results can be filtered by. - deleted (bool): Whether to include subejcts where the schema was deleted. + deleted (bool): Whether to include subjects where the schema was deleted. offset (int): Pagination offset for results. limit (int): Pagination size for results. Ignored if negative. Returns: - list(str): List of suubjects matching the specified parameters. + list(str): List of subjects matching the specified parameters. Raises: SchemaRegistryError: if subjects can't be found @@ -890,9 +890,17 @@ def lookup_schema( request = schema.to_dict() + query_params = { + 'normalize': normalize_schemas, + 'deleted': deleted + } + if fmt is not None: + query_params['format'] = fmt + + query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) + response = self._rest_client.post( - 'subjects/{}?normalize={}&format={}&deleted={}'.format( - _urlencode(subject_name), normalize_schemas, fmt, deleted), + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request ) From aeb14fe6e4b9665935045ae001371357ca3f5a28 Mon Sep 17 00:00:00 2001 From: Naxin Date: Mon, 18 Aug 2025 18:28:11 -0400 Subject: [PATCH 12/13] address feedback --- .../_async/schema_registry_client.py | 68 +------------------ .../_sync/schema_registry_client.py | 68 +------------------ .../schema_registry/_async/test_api_client.py | 18 ----- .../schema_registry/_sync/test_api_client.py | 18 ----- tests/schema_registry/conftest.py | 13 ---- 5 files changed, 2 insertions(+), 183 deletions(-) diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index 29b26ebf8..10f126162 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -666,7 +666,6 @@ async def register_schema_full_response( async def get_schema( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None, reference_format: Optional[str] = None, - find_tags: Optional[List[str]] = None, fetch_max_id: bool = False ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the @@ -678,8 +677,6 @@ async def get_schema( subject_name (str): Subject name the schema is registered under. fmt (str): Desired output format, dependent on schema type. reference_format (str): Desired output format for references. - find_tags (list[str]): Find tagged entities for the given tags or * for all tags. - fetch_max_id (boolean): Whether to fetch the maximum schema identifier that exists Returns: Schema: Schema instance identified by the ``schema_id`` @@ -702,10 +699,7 @@ async def get_schema( query['format'] = fmt if reference_format is not None: query['reference_format'] = reference_format - if find_tags is not None: - query['find_tags'] = find_tags - if fetch_max_id: - query['fetch_max_id'] = fetch_max_id + response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -715,35 +709,6 @@ async def get_schema( return registered_schema.schema - async def get_schema_string( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> str: - """ - Fetches the schema associated with ``schema_id`` from the - Schema Registry. Only the unescaped schema string is returned. - - Args: - schema_id (int): Schema id. - subject_name (str): Subject name the schema is registered under. - fmt (str): Desired output format, dependent on schema type. - - Returns: - str: Schema string for this version. - - Raises: - SchemaRegistryError: if the version can't be found or is invalid. - - See Also: - `GET Schema API Reference `_ - """ # noqa: E501 - - query = {} - if subject_name is not None: - query['subject'] = subject_name - if fmt is not None: - query['format'] = fmt - return await self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) - async def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None ) -> 'Schema': @@ -816,8 +781,6 @@ async def get_subjects_by_schema_id( Raises: SchemaRegistryError: if subjects can't be found - - TODO: add API reference """ query = {'offset': offset, 'limit': limit} if subject_name is not None: @@ -1096,35 +1059,6 @@ async def get_version( return registered_schema - async def get_version_schema_string( - self, subject_name: str, version: Union[int, str] = "latest", - deleted: bool = False, fmt: Optional[str] = None - ) -> str: - """ - Retrieves a specific schema registered under ``subject_name`` and ``version``. - Only the unescaped schema string is returned. - - Args: - subject_name (str): Subject name. - version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. - deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema. - - Returns: - str: Schema string for this version. - - Raises: - SchemaRegistryError: if the version can't be found or is invalid. - - See Also: - `GET Subject Versions API Reference `_ - """ # noqa: E501 - - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} - return await self._rest_client.get( - 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query - ) - async def get_referenced_by( self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 ) -> List[int]: diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index a2fc35a0e..d86dcdfcf 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -666,7 +666,6 @@ def register_schema_full_response( def get_schema( self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None, reference_format: Optional[str] = None, - find_tags: Optional[List[str]] = None, fetch_max_id: bool = False ) -> 'Schema': """ Fetches the schema associated with ``schema_id`` from the @@ -678,8 +677,6 @@ def get_schema( subject_name (str): Subject name the schema is registered under. fmt (str): Desired output format, dependent on schema type. reference_format (str): Desired output format for references. - find_tags (list[str]): Find tagged entities for the given tags or * for all tags. - fetch_max_id (boolean): Whether to fetch the maximum schema identifier that exists Returns: Schema: Schema instance identified by the ``schema_id`` @@ -702,10 +699,7 @@ def get_schema( query['format'] = fmt if reference_format is not None: query['reference_format'] = reference_format - if find_tags is not None: - query['find_tags'] = find_tags - if fetch_max_id: - query['fetch_max_id'] = fetch_max_id + response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -715,35 +709,6 @@ def get_schema( return registered_schema.schema - def get_schema_string( - self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None - ) -> str: - """ - Fetches the schema associated with ``schema_id`` from the - Schema Registry. Only the unescaped schema string is returned. - - Args: - schema_id (int): Schema id. - subject_name (str): Subject name the schema is registered under. - fmt (str): Desired output format, dependent on schema type. - - Returns: - str: Schema string for this version. - - Raises: - SchemaRegistryError: if the version can't be found or is invalid. - - See Also: - `GET Schema API Reference `_ - """ # noqa: E501 - - query = {} - if subject_name is not None: - query['subject'] = subject_name - if fmt is not None: - query['format'] = fmt - return self._rest_client.get('schemas/ids/{}/schema'.format(schema_id), query) - def get_schema_by_guid( self, guid: str, fmt: Optional[str] = None ) -> 'Schema': @@ -816,8 +781,6 @@ def get_subjects_by_schema_id( Raises: SchemaRegistryError: if subjects can't be found - - TODO: add API reference """ query = {'offset': offset, 'limit': limit} if subject_name is not None: @@ -1096,35 +1059,6 @@ def get_version( return registered_schema - def get_version_schema_string( - self, subject_name: str, version: Union[int, str] = "latest", - deleted: bool = False, fmt: Optional[str] = None - ) -> str: - """ - Retrieves a specific schema registered under ``subject_name`` and ``version``. - Only the unescaped schema string is returned. - - Args: - subject_name (str): Subject name. - version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version. - deleted (bool): Whether to include deleted schemas. - fmt (str): Format of the schema. - - Returns: - str: Schema string for this version. - - Raises: - SchemaRegistryError: if the version can't be found or is invalid. - - See Also: - `GET Subject Versions API Reference `_ - """ # noqa: E501 - - query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} - return self._rest_client.get( - 'subjects/{}/versions/{}/schema'.format(_urlencode(subject_name), version), query - ) - def get_referenced_by( self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 ) -> List[int]: diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 695f6e482..65fbfe345 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -187,15 +187,6 @@ def get(): assert count_after - count_before == 1 -async def test_get_schema_string_success(mock_schema_registry, load_avsc): - conf = {'url': TEST_URL} - sr = AsyncSchemaRegistryClient(conf) - - expected = json.loads(load_avsc(SCHEMA)) - actual = await sr.get_schema_string(47) - assert expected == actual - - async def test_get_schema_types(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) @@ -384,15 +375,6 @@ async def test_delete_version_invalid(mock_schema_registry): assert e.value.error_code == 42202 -async def test_get_version_schema_string(mock_schema_registry, load_avsc): - conf = {'url': TEST_URL} - sr = AsyncSchemaRegistryClient(conf) - - expected = json.loads(load_avsc(SCHEMA)) - actual = await sr.get_version_schema_string("get_version", 3) - assert expected == actual - - async def test_get_referenced_by(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index c8588d12d..774219936 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -187,15 +187,6 @@ def get(): assert count_after - count_before == 1 -def test_get_schema_string_success(mock_schema_registry, load_avsc): - conf = {'url': TEST_URL} - sr = SchemaRegistryClient(conf) - - expected = json.loads(load_avsc(SCHEMA)) - actual = sr.get_schema_string(47) - assert expected == actual - - def test_get_schema_types(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) @@ -384,15 +375,6 @@ def test_delete_version_invalid(mock_schema_registry): assert e.value.error_code == 42202 -def test_get_version_schema_string(mock_schema_registry, load_avsc): - conf = {'url': TEST_URL} - sr = SchemaRegistryClient(conf) - - expected = json.loads(load_avsc(SCHEMA)) - actual = sr.get_version_schema_string("get_version", 3) - assert expected == actual - - def test_get_referenced_by(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index 42db8f6af..3b3d9032e 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -136,12 +136,10 @@ def mock_schema_registry(): respx_mock.put(COMPATIBILITY_RE).mock(side_effect=put_compatibility_callback) respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback) - respx_mock.get(SCHEMAS_STRING_RE).mock(side_effect=get_schema_string_callback) respx_mock.get(SCHEMAS_VERSIONS_RE).mock(side_effect=get_schema_versions_callback) respx_mock.get(SCHEMAS_SUBJECTS_RE).mock(side_effect=get_schema_subjects_callback) respx_mock.get(SCHEMAS_TYPES_RE).mock(side_effect=get_schema_types_callback) - respx_mock.get(SUBJECTS_VERSIONS_SCHEMA_RE).mock(side_effect=get_subject_version_schema_callback) respx_mock.get(SUBJECTS_VERSIONS_REFERENCED_BY_RE).mock(side_effect=get_subject_version_referenced_by_callback) respx_mock.get(SUBJECTS_VERSIONS_RE).mock(side_effect=get_subject_version_callback) respx_mock.delete(SUBJECTS_VERSIONS_RE).mock(side_effect=delete_subject_version_callback) @@ -156,7 +154,6 @@ def mock_schema_registry(): # request paths SCHEMAS_RE = re.compile("/schemas/ids/([0-9]*)$") -SCHEMAS_STRING_RE = re.compile(r"/schemas/ids/([0-9]*)/schema(\?.*)?$") SCHEMAS_VERSIONS_RE = re.compile(r"/schemas/ids/([0-9]*)/versions(\?.*)?$") SCHEMAS_SUBJECTS_RE = re.compile(r"/schemas/ids/([0-9]*)/subjects(\?.*)?$") SCHEMAS_TYPES_RE = re.compile("/schemas/types$") @@ -291,16 +288,6 @@ def get_schema_subjects_callback(request, route): return Response(200, json=SUBJECTS) -def get_schema_string_callback(request, route): - COUNTER['GET'][request.url.path] += 1 - path_match = re.match(SCHEMAS_STRING_RE, request.url.path) - schema_id = path_match.group(1) - if int(schema_id) == 404: - return Response(404, json={'error_code': 40403, - 'message': "Schema not found"}) - return Response(200, json=json.loads(_load_avsc(SCHEMA))) - - def get_schema_types_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=['AVRO', 'JSON', 'PROTOBUF']) From beda1681f1527a8de2e23b89344028e0ab9ad020 Mon Sep 17 00:00:00 2001 From: Naxin Date: Tue, 19 Aug 2025 11:02:19 -0400 Subject: [PATCH 13/13] remove unused import --- tests/schema_registry/_async/test_api_client.py | 1 - tests/schema_registry/_sync/test_api_client.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index 65fbfe345..fb930cddf 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import pytest import asyncio from concurrent.futures import ThreadPoolExecutor, wait diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index 774219936..2ee0b7802 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import pytest from concurrent.futures import ThreadPoolExecutor, wait