Skip to content

Add missing endpoints (/compatibility, /mode) to SchemaRegistryClient #2024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
197 changes: 188 additions & 9 deletions src/confluent_kafka/schema_registry/_async/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@
query['format'] = fmt
if reference_format is not None:
query['reference_format'] = reference_format

response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query)

registered_schema = RegisteredSchema.from_dict(response)
Expand Down Expand Up @@ -889,6 +888,13 @@
"""
Lists all subjects registered with the Schema Registry.
Args:
subject_prefix (str): Subject name prefix that results can be filtered by.
deleted (bool): Whether to include deleted subjects.
deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence.
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Args:
subject_prefix (str): Subject name prefix that results can be filtered by.
deleted (bool): Whether to include deleted subjects.
Expand Down Expand Up @@ -1060,7 +1066,8 @@
return registered_schema

async def get_referenced_by(
self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1
self, subject_name: str, version: Union[int, str] = "latest",
offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`.
Expand All @@ -1086,7 +1093,8 @@
_urlencode(subject_name), version), query)

async def get_versions(
self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1
self, subject_name: str, deleted: bool = False, deleted_only: bool = False,
offset: int = 0, limit: int = -1
) -> List[int]:
"""
Get a list of all versions registered with this subject.
Expand Down Expand Up @@ -1174,7 +1182,7 @@
)

return await self._rest_client.put(
'config/{}'.format(_urlencode(subject_name)), body={'compatibility': level.upper()}

Check failure on line 1185 in src/confluent_kafka/schema_registry/_async/schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/schema_registry_client.py#L1185

Define a constant instead of duplicating this literal 'config/{}' 5 times.
)

async def get_compatibility(self, subject_name: Optional[str] = None) -> str:
Expand Down Expand Up @@ -1204,15 +1212,18 @@
return result['compatibilityLevel']

async def test_compatibility(
self, subject_name: str, schema: 'Schema',
version: Union[int, str] = "latest"
self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest",
normalize: bool = False, verbose: bool = False
) -> bool:
"""Test the compatibility of a candidate schema for a given subject and version
"""
Test the compatibility of a candidate schema for a given subject and version
Args:
subject_name (str): Subject name the schema is registered under
schema (Schema): Schema instance.
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
normalize (bool): Whether to normalize the input schema.
verbose (bool): Whether to return detailed error messages.
Returns:
bool: True if the schema is compatible with the specified version
Expand All @@ -1225,12 +1236,40 @@
""" # noqa: E501

request = schema.to_dict()

response = await self._rest_client.post(
'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request
'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format(
_urlencode(subject_name), version, normalize, verbose),
body=request
)
return response['is_compatible'] # TODO: should it return entire response (including error messages)?

Check notice on line 1244 in src/confluent_kafka/schema_registry/_async/schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/schema_registry_client.py#L1244

Complete the task associated to this "TODO" comment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe return a tuple [bool, Response] so it's easy to have a simple response vs a parsing the complex object? But I think this should probably return the full response by itself with a reference in the docstring to the is_compatible as a likely attribute for consumption.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is this would be a breaking change, although in this case I think it might be necessary to just make the change (or add a separate method and mark this one as @deprecated): not sure how customers usually interact with test_compatibility, but intuitively I think the "messages" field for test_compatibility is important to include
cc @rayokota if you have any thoughts on this :)


return response['is_compatible']
async def test_compatibility_all_versions(
self, subject_name: str, schema: 'Schema',
normalize: bool = False, verbose: bool = False
) -> bool:
"""
Test the input schema against all schema versions under the subject (depending on the compatibility level set).
Args:
subject_name (str): Subject of the schema versions against which compatibility is to be tested.
schema (Schema): Schema instance.
normalize (bool): Whether to normalize the input schema.
verbose (bool): Wehther to return detailed error messages.
Copy link
Preview

Copilot AI Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error: 'Wehther' should be 'Whether'.

Suggested change
verbose (bool): Wehther to return detailed error messages.
verbose (bool): Whether to return detailed error messages.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to have copilot find typos to save reading for them :D

Returns:
bool: True if the schema is compatible with all of the subject's schemas versions.
See Also:
`POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_
""" # noqa: E501

request = schema.to_dict()
response = await self._rest_client.post(
'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format(
_urlencode(subject_name), normalize, verbose
),
body=request,
)
return response['is_compatible'] # TODO: should it return entire response (including error messages)?

Check notice on line 1272 in src/confluent_kafka/schema_registry/_async/schema_registry_client.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

src/confluent_kafka/schema_registry/_async/schema_registry_client.py#L1272

Complete the task associated to this "TODO" comment.

async def set_config(
self, subject_name: Optional[str] = None,
Expand Down Expand Up @@ -1268,6 +1307,30 @@
'config/{}'.format(_urlencode(subject_name)), body=config.to_dict()
)

async def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
"""
Delete the specified subject-level compatibility level config and revert to the global default.
Args:
subject_name (str, optional): Subject name. Deletes global config
if left unset.
Returns:
ServerConfig: The old deleted config
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject Config API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--config-(string- subject)>`_
""" # noqa: E501
if subject_name is not None:
url = 'config/{}'.format(_urlencode(subject_name))
else:
url = 'config'
result = await self._rest_client.delete(url)
return ServerConfig.from_dict(result)

async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig':
"""
Get the current config.
Expand All @@ -1294,6 +1357,122 @@
result = await self._rest_client.get(url)
return ServerConfig.from_dict(result)

async def get_mode(self, subject_name: str) -> str:
"""
Get the mode for a subject.
Args:
subject_name (str): Subject name.
Returns:
str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default).
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`GET Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.get('mode/{}'.format(_urlencode(subject_name)))
return result['mode']

async def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str:
"""
Update the mode for a subject.
Args:
subject_name (str): Subject name.
mode (str): Mode to update.
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.
Returns:
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`PUT Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.put(
'mode/{}?force={}'.format(_urlencode(subject_name), force),
body={'mode': mode},
)
return result['mode']

async def delete_mode(self, subject_name: str) -> str:
"""
Delete the mode for a subject and revert to the global default
Args:
subject_name (str): Subject name.
Returns:
str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default).
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--mode-(string-%20subject)>`_
""" # noqa: E501
result = await self._rest_client.delete('mode/{}'.format(_urlencode(subject_name)))
return result['mode']

async def get_global_mode(self) -> str:
"""
Get the current mode for Schema Reigstry at a global level.
Returns:
str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default).
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`GET Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--mode>`_
""" # noqa: E501
result = await self._rest_client.get('mode')
return result['mode']

async def update_global_mode(self, mode: str, force: bool = False) -> str:
"""
Update the mode for the Schema Registry at a global level.
Args:
mode (str): Mode to update.
force (bool): Whether to force a mode change even if the Schema Registry has existing schemas.
Returns:
str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default).
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`PUT Global Mode API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#put--mode>`_
""" # noqa: E501
result = await self._rest_client.put('mode?force={}'.format(force), body={'mode': mode})
return result['mode']

async def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]:
"""
Retrieves a list of contexts.
Args:
offset (int): Pagination offset for results.
limit (int): Pagination size for results. Ignored if negative.
Returns:
List[str]: List of contexts.
Raises:
SchemaRegistryError: if the request was unsuccessful.
""" # noqa: E501

result = await self._rest_client.get('contexts', query={'offset': offset, 'limit': limit})
return result

def clear_latest_caches(self):
self._latest_version_cache.clear()
self._latest_with_metadata_cache.clear()
Expand Down
Loading