-
Notifications
You must be signed in to change notification settings - Fork 925
Add missing endpoints (/schemas, /subjects) to SchemaRegistryClient #2017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor suggestions, looks clean and ready as you can choose which of those comments to apply
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-schema>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'subject': subject_name} if subject_name is not None else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be slightly cleaner to do else {}
or two lines with query = {}
and if subject_name:
so you don't have to do the if query is not None
later on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! Refactored
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)-schema>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that deleted
isn't in the doc parameters. Do we need to update to docs page later on or is this intentionally a hidden field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will make sure the docs are updated
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)-schema>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here on deleted not being in docs reference, are there other routes we need to add deleted to that have it missing?
|
||
@classmethod | ||
def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: | ||
d = src_dict.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kinda unnecessary, this is a shallow copy action that we immediately throw away. Could just make this a one-liner of return cls(subject=src_dict.get('subject'), version=src_dict.get('version'))
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds missing endpoints to the SchemaRegistryClient for /schemas
and /subjects
prefixes, expanding the available functionality for interacting with the Schema Registry. The implementation also corrects the return type of the get_schema
method from Schema
to RegisteredSchema
which is a breaking change that requires updates to existing client code.
- Added new endpoints:
get_schema_string
,get_schema_types
,get_schema_versions
,get_version_schema_string
, andget_referenced_by
- Enhanced existing endpoints with additional optional parameters (
deleted
,subject_prefix
) - Updated return type of
get_schema
fromSchema
toRegisteredSchema
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
tests/schema_registry/conftest.py | Added mock implementations and regex patterns for new endpoints |
tests/schema_registry/_sync/test_api_client.py | Added unit tests for new endpoints and updated existing tests for return type changes |
tests/schema_registry/_async/test_api_client.py | Added async unit tests for new endpoints and updated existing tests for return type changes |
tests/integration/schema_registry/_sync/test_proto_serializers.py | Updated test to access schema through .schema property due to return type change |
tests/integration/schema_registry/_async/test_proto_serializers.py | Updated async test to access schema through .schema property due to return type change |
src/confluent_kafka/schema_registry/common/schema_registry_client.py | Added SchemaVersion class and refactored RegisteredSchema field ordering |
src/confluent_kafka/schema_registry/_sync/schema_registry_client.py | Implemented new endpoints and updated existing methods with additional parameters |
src/confluent_kafka/schema_registry/_async/schema_registry_client.py | Implemented async versions of new endpoints and updated existing methods |
DEVELOPER.md | Removed trailing whitespace |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)-schema>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This conditional query construction pattern is inconsistent with the cleaner approach used in other methods. Consider using the same pattern as in get_schema_string where the query is built incrementally.
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} | |
query = {'deleted': deleted} | |
if fmt is not None: | |
query['format'] = fmt |
Copilot uses AI. Check for mistakes.
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)-schema>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This conditional query construction pattern is inconsistent with the cleaner approach used in other methods. Consider using the same pattern as in get_schema_string where the query is built incrementally.
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} | |
query = {'deleted': deleted} | |
if fmt is not None: | |
query['format'] = fmt |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fangnx . Actually let's revert changing the Schema return type to RegisteredSchema as that's a breaking API change
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds missing endpoints to the SchemaRegistryClient for both schemas and subjects APIs, expanding the functionality to support additional Schema Registry operations. The implementation includes new endpoints for retrieving schema metadata, versions, types, and references.
- Add new
/schemas
endpoints:/schemas/ids/{id}/schema
,/schemas/ids/{id}/versions
,/schemas/ids/{id}/subjects
, and/schemas/types
- Add new
/subjects
endpoints:/subjects/{subject}/versions/{version}/schema
and/subjects/{subject}/versions/{version}/referencedby
- Update existing endpoints with additional parameters and improved query parameter handling
- Add comprehensive test coverage for all new endpoints in both sync and async clients
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
File | Description |
---|---|
tests/schema_registry/conftest.py | Add mock handlers and regex patterns for new schema and subject endpoints |
tests/schema_registry/_sync/test_api_client.py | Add test cases for new sync client endpoints with proper assertions |
tests/schema_registry/_async/test_api_client.py | Add test cases for new async client endpoints mirroring sync tests |
src/confluent_kafka/schema_registry/common/schema_registry_client.py | Add SchemaVersion data class and simplify SchemaReference.from_dict implementation |
src/confluent_kafka/schema_registry/_sync/schema_registry_client.py | Implement new endpoints and update existing ones with enhanced parameter support |
src/confluent_kafka/schema_registry/_async/schema_registry_client.py | Mirror sync implementation for async client with proper await handling |
DEVELOPER.md | Remove trailing whitespace |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
|
||
TODO: add API reference | ||
""" | ||
query = {'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query parameters are being added unconditionally, but the limit parameter should only be added if it's not negative according to the docstring. This could result in unexpected behavior when limit is -1.
query = {'offset': offset, 'limit': limit} | |
query = {'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
`GET Schema Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-versions>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as above - the limit parameter should only be added to the query if it's not negative, as stated in the docstring.
query = {'offset': offset, 'limit': limit} | |
query = {'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
@@ -805,7 +943,10 @@ def get_subjects(self) -> List[str]: | |||
`GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects>`_ | |||
""" # noqa: E501 | |||
|
|||
return self._rest_client.get('subjects') | |||
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} | |
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
@@ -970,7 +1174,8 @@ def get_versions(self, subject_name: str) -> List[int]: | |||
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_ | |||
""" # noqa: E501 | |||
|
|||
return self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) | |||
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} | |
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
|
||
TODO: add API reference | ||
""" | ||
query = {'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query parameters are being added unconditionally, but the limit parameter should only be added if it's not negative according to the docstring.
query = {'offset': offset, 'limit': limit} | |
query = {'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
`GET Schema Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-versions>`_ | ||
""" # noqa: E501 | ||
|
||
query = {'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as above - the limit parameter should only be added to the query if it's not negative, as stated in the docstring.
query = {'offset': offset, 'limit': limit} | |
query = {'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
@@ -805,7 +943,10 @@ async def get_subjects(self) -> List[str]: | |||
`GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects>`_ | |||
""" # noqa: E501 | |||
|
|||
return await self._rest_client.get('subjects') | |||
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} | |
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
@@ -970,7 +1174,8 @@ async def get_versions(self, subject_name: str) -> List[int]: | |||
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_ | |||
""" # noqa: E501 | |||
|
|||
return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name))) | |||
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit parameter should only be added to the query if it's not negative, as documented in the method's docstring.
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit} | |
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset} | |
if limit >= 0: | |
query['limit'] = limit |
Copilot uses AI. Check for mistakes.
This comment has been minimized.
This comment has been minimized.
Raises: | ||
SchemaRegistryError: if subjects can't be found | ||
|
||
TODO: add API reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Will remove it, and add the API reference link once technical writers added it to https://docs.confluent.io/platform/current/schema-registry/develop/api.html#sr-api-compatibility page (separate task for SR team)
Raises: | ||
SchemaRegistryError: if subjects can't be found | ||
|
||
TODO: add API reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on reference TODO
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @fangnx , left some comments
@@ -704,6 +715,35 @@ async def get_schema( | |||
|
|||
return registered_schema.schema | |||
|
|||
async def get_schema_string( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this method. It's a convenience method mainly for use with curl or PostMan and is otherwise redundant
@@ -953,12 +1096,73 @@ async def get_version( | |||
|
|||
return registered_schema | |||
|
|||
async def get_versions(self, subject_name: str) -> List[int]: | |||
async def get_version_schema_string( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this method as well, as it's another convenience method
self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None | ||
self, schema_id: int, subject_name: Optional[str] = None, | ||
fmt: Optional[str] = None, reference_format: Optional[str] = None, | ||
find_tags: Optional[List[str]] = None, fetch_max_id: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove find_tags
and fetch_max_id
for now as those are for internal use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove them. Out of curiosity, looks like those query params (and their usage) are accessible to external customers as well. Will that become an issue at some point?
subject_name (str): Subject name the schema is registered under. | ||
fmt (str): Desired output format, dependent on schema type. | ||
reference_format (str): Desired output format for references. | ||
find_tags (list[str]): Find tagged entities for the given tags or * for all tags. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove find_tags
and fetch_max_id
query['format'] = fmt | ||
if reference_format is not None: | ||
query['reference_format'] = reference_format | ||
if find_tags is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
What
Add the missing endpoints and update the existing ones (some have missing/incorrect fields) to SR client. This PR is only addressing endpoints with
/schemas
and/subjects
prefixes. I will start a follow-up PR to add the rest.Checklist
References
JIRA: https://confluentinc.atlassian.net/browse/DGS-21591
Issues: #1429
Test & Review
Open questions / Follow-ups