Skip to content

Add missing endpoints (/schemas, /subjects) to SchemaRegistryClient #2017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
3 changes: 1 addition & 2 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ If librdkafka is installed in a non-standard location provide the include and li

$ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build

**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB
**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB

## Generate Documentation

Expand Down Expand Up @@ -45,4 +45,3 @@ If you make any changes to the async code (in `src/confluent_kafka/schema_regist


See [tests/README.md](tests/README.md) for instructions on how to run tests.

191 changes: 165 additions & 26 deletions src/confluent_kafka/schema_registry/_async/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError
from confluent_kafka.schema_registry.common.schema_registry_client import (
RegisteredSchema,
SchemaVersion,
ServerConfig,
is_success,
is_retriable,
Expand Down Expand Up @@ -663,17 +664,19 @@ async def register_schema_full_response(
return registered_schema

async def get_schema(
self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None
self, schema_id: int, subject_name: Optional[str] = None,
fmt: Optional[str] = None, reference_format: Optional[str] = None,
) -> 'Schema':
"""
Fetches the schema associated with ``schema_id`` from the
Schema Registry. The result is cached so subsequent attempts will not
require an additional round-trip to the Schema Registry.
Args:
schema_id (int): Schema id
subject_name (str): Subject name the schema is registered under
fmt (str): Format of the schema
schema_id (int): Schema id.
subject_name (str): Subject name the schema is registered under.
fmt (str): Desired output format, dependent on schema type.
reference_format (str): Desired output format for references.
Returns:
Schema: Schema instance identified by the ``schema_id``
Expand All @@ -682,19 +685,21 @@ async def get_schema(
SchemaRegistryError: If schema can't be found.
See Also:
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_
""" # noqa: E501

result = self._cache.get_schema_by_id(subject_name, schema_id)
if result is not None:
return result[1]

query = {'subject': subject_name} if subject_name is not None else None
query = {}
if subject_name is not None:
query['subject'] = subject_name
if fmt is not None:
if query is not None:
query['format'] = fmt
else:
query = {'format': fmt}
query['format'] = fmt
if reference_format is not None:
query['reference_format'] = reference_format

response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query)

registered_schema = RegisteredSchema.from_dict(response)
Expand Down Expand Up @@ -741,24 +746,102 @@ async def get_schema_by_guid(

return registered_schema.schema

async def get_schema_types(self) -> List[str]:
"""
Lists all supported schema types in the Schema Registry.
Returns:
list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF'])
Raises:
SchemaRegistryError: if schema types can't be retrieved
See Also:
`GET Schema Types API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-types>`_
""" # noqa: E501

return await self._rest_client.get('schemas/types')

async def get_subjects_by_schema_id(
self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False,
offset: int = 0, limit: int = -1
) -> List[str]:
"""
Retrieves all the subjects associated with ``schema_id``.
Args:
schema_id (int): Schema ID.
subject_name (str): Subject name that results can be filtered by.
deleted (bool): Whether to include subjects where the schema was deleted.
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
list(str): List of subjects matching the specified parameters.
Raises:
SchemaRegistryError: if subjects can't be found
"""
query = {'offset': offset, 'limit': limit}
Copy link
Preview

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query parameters are being added unconditionally, but the limit parameter should only be added if it's not negative according to the docstring.

Suggested change
query = {'offset': offset, 'limit': limit}
query = {'offset': offset}
if limit >= 0:
query['limit'] = limit

Copilot uses AI. Check for mistakes.

if subject_name is not None:
query['subject'] = subject_name
if deleted:
query['deleted'] = deleted
return await self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query)

async def get_schema_versions(
self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False,
offset: int = 0, limit: int = -1
) -> List[SchemaVersion]:
"""
Gets all subject-version pairs of a schema by its ID.
Args:
schema_id (int): Schema ID.
subject_name (str): Subject name that results can be filtered by.
deleted (bool): Whether to include subject versions where the schema was deleted.
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
list(SchemaVersion): List of subject-version pairs. Each pair contains:
- subject (str): Subject name.
- version (int): Version number.
Raises:
SchemaRegistryError: if schema versions can't be found.
See Also:
`GET Schema Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-versions>`_
""" # noqa: E501

query = {'offset': offset, 'limit': limit}
Copy link
Preview

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above - the limit parameter should only be added to the query if it's not negative, as stated in the docstring.

Suggested change
query = {'offset': offset, 'limit': limit}
query = {'offset': offset}
if limit >= 0:
query['limit'] = limit

Copilot uses AI. Check for mistakes.

if subject_name is not None:
query['subject'] = subject_name
if deleted:
query['deleted'] = deleted
response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query)
return [SchemaVersion.from_dict(item) for item in response]

async def lookup_schema(
self, subject_name: str, schema: 'Schema',
normalize_schemas: bool = False, deleted: bool = False
normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False
) -> 'RegisteredSchema':
"""
Returns ``schema`` registration information for ``subject``.
Args:
subject_name (str): Subject name the schema is registered under
subject_name (str): Subject name the schema is registered under.
schema (Schema): Schema instance.
normalize_schemas (bool): Normalize schema before registering
normalize_schemas (bool): Normalize schema before registering.
fmt (str): Desired output format, dependent on schema type.
deleted (bool): Whether to include deleted schemas.
Returns:
RegisteredSchema: Subject registration information for this schema.
Raises:
SchemaRegistryError: If schema or subject can't be found
SchemaRegistryError: If schema or subject can't be found.
See Also:
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)>`_
Expand All @@ -770,9 +853,17 @@ async def lookup_schema(

request = schema.to_dict()

query_params = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to query to match other methods

'normalize': normalize_schemas,
'deleted': deleted
}
if fmt is not None:
query_params['format'] = fmt

query_string = '&'.join(f"{key}={value}" for key, value in query_params.items())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this passing the query params differently from the other methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to apply "format" query param only when it's not None. To make the method consistent with other methods code-wise, maybe I can set format="" (which will be ignored in SR: https://github.com/confluentinc/schema-registry/blob/a587d2c24bcf108ea8313ce75b7183c333e91587/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectsResource.java#L149) when fmt==None:

response = await self._rest_client.post(
    'subjects/{}?normalize={}&deleted={}&format={}'.format(
        _urlencode(subject_name), version, normalize_schemas, deleted, fmt or ''
    ),
    body=request
)


response = await self._rest_client.post(
'subjects/{}?normalize={}&deleted={}'.format(
_urlencode(subject_name), normalize_schemas, deleted),
'subjects/{}?{}'.format(_urlencode(subject_name), query_string),
body=request
)

Expand All @@ -791,9 +882,19 @@ async def lookup_schema(

return registered_schema

async def get_subjects(self) -> List[str]:
async def get_subjects(
self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False,
offset: int = 0, limit: int = -1
) -> List[str]:
"""
Lists all subjects registered with the Schema Registry
Lists all subjects registered with the Schema Registry.
Args:
subject_prefix (str): Subject name prefix that results can be filtered by.
deleted (bool): Whether to include deleted subjects.
deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence.
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
list(str): Registered subject names
Expand All @@ -805,7 +906,10 @@ async def get_subjects(self) -> List[str]:
`GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects>`_
""" # noqa: E501

return await self._rest_client.get('subjects')
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
Copy link
Preview

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.

Suggested change
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset}
if limit >= 0:
query['limit'] = limit

Copilot uses AI. Check for mistakes.

if subject_prefix is not None:
query['subject'] = subject_prefix
return await self._rest_client.get('subjects', query)

async def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
"""
Expand Down Expand Up @@ -899,7 +1003,9 @@ async def get_latest_with_metadata(
if registered_schema is not None:
return registered_schema

query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted}
query = {'deleted': deleted}
if fmt is not None:
query['format'] = fmt
keys = metadata.keys()
if keys:
query['key'] = [_urlencode(key) for key in keys]
Expand All @@ -920,13 +1026,13 @@ async def get_version(
deleted: bool = False, fmt: Optional[str] = None
) -> 'RegisteredSchema':
"""
Retrieves a specific schema registered under ``subject_name``.
Retrieves a specific schema registered under `subject_name` and `version`.
Args:
subject_name (str): Subject name.
version (int): version number. Defaults to latest version.
version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version.
deleted (bool): Whether to include deleted schemas.
fmt (str): Format of the schema
fmt (str): Format of the schema.
Returns:
RegisteredSchema: Registration information for this version.
Expand All @@ -935,7 +1041,7 @@ async def get_version(
SchemaRegistryError: if the version can't be found or is invalid.
See Also:
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions>`_
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
""" # noqa: E501

registered_schema = self._cache.get_registered_by_subject_version(subject_name, version)
Expand All @@ -953,12 +1059,44 @@ async def get_version(

return registered_schema

async def get_versions(self, subject_name: str) -> List[int]:
async def get_referenced_by(
self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`.
Args:
subject_name (str): Subject name
version (int or str): Version number or "latest"
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
list(int): List of schema IDs that reference the specified schema.
Raises:
SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved
See Also:
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-versionId-%20version-referencedby>`_
""" # noqa: E501

query = {'offset': offset, 'limit': limit}
return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format(
_urlencode(subject_name), version), query)

async def get_versions(
self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of all versions registered with this subject.
Args:
subject_name (str): Subject name.
deleted (bool): Whether to include deleted schemas.
deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence.
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
list(int): Registered versions
Expand All @@ -970,7 +1108,8 @@ async def get_versions(self, subject_name: str) -> List[int]:
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
""" # noqa: E501

return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)))
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
Copy link
Preview

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.

Suggested change
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset}
if limit >= 0:
query['limit'] = limit

Copilot uses AI. Check for mistakes.

return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query)

async def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
"""
Expand Down
Loading