diff --git a/.ducktape/metadata/session_id b/.ducktape/metadata/session_id new file mode 100644 index 000000000..fb689f060 --- /dev/null +++ b/.ducktape/metadata/session_id @@ -0,0 +1 @@ +2025-08-21--020 \ No newline at end of file diff --git a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py index b1e701254..b25922312 100644 --- a/src/confluent_kafka/schema_registry/_async/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_async/schema_registry_client.py @@ -699,7 +699,6 @@ async def get_schema( query['format'] = fmt if reference_format is not None: query['reference_format'] = reference_format - response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -1060,7 +1059,8 @@ async def get_version( return registered_schema async def get_referenced_by( - self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + self, subject_name: str, version: Union[int, str] = "latest", + offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. @@ -1086,7 +1086,8 @@ async def get_referenced_by( _urlencode(subject_name), version), query) async def get_versions( - self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of all versions registered with this subject. @@ -1204,15 +1205,18 @@ async def get_compatibility(self, subject_name: Optional[str] = None) -> str: return result['compatibilityLevel'] async def test_compatibility( - self, subject_name: str, schema: 'Schema', - version: Union[int, str] = "latest" + self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest", + normalize: bool = False, verbose: bool = False ) -> bool: - """Test the compatibility of a candidate schema for a given subject and version + """ + Test the compatibility of a candidate schema for a given subject and version Args: subject_name (str): Subject name the schema is registered under schema (Schema): Schema instance. version (int or str, optional): Version number, or the string "latest". Defaults to "latest". + normalize (bool): Whether to normalize the input schema. + verbose (bool): Whether to return detailed error messages. Returns: bool: True if the schema is compatible with the specified version @@ -1225,11 +1229,39 @@ async def test_compatibility( """ # noqa: E501 request = schema.to_dict() - response = await self._rest_client.post( - 'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request + 'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format( + _urlencode(subject_name), version, normalize, verbose), + body=request ) + return response['is_compatible'] + async def test_compatibility_all_versions( + self, subject_name: str, schema: 'Schema', + normalize: bool = False, verbose: bool = False + ) -> bool: + """ + Test the input schema against all schema versions under the subject (depending on the compatibility level set). + + Args: + subject_name (str): Subject of the schema versions against which compatibility is to be tested. + schema (Schema): Schema instance. + normalize (bool): Whether to normalize the input schema. + verbose (bool): Whether to return detailed error messages. + + Returns: + bool: True if the schema is compatible with all of the subject's schemas versions. + See Also: + `POST Test Compatibility Against All API Reference `_ + """ # noqa: E501 + + request = schema.to_dict() + response = await self._rest_client.post( + 'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format( + _urlencode(subject_name), normalize, verbose + ), + body=request, + ) return response['is_compatible'] async def set_config( @@ -1268,6 +1300,30 @@ async def set_config( 'config/{}'.format(_urlencode(subject_name)), body=config.to_dict() ) + async def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': + """ + Delete the specified subject-level compatibility level config and revert to the global default. + + Args: + subject_name (str, optional): Subject name. Deletes global config + if left unset. + + Returns: + ServerConfig: The old deleted config + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `DELETE Subject Config API Reference `_ + """ # noqa: E501 + if subject_name is not None: + url = 'config/{}'.format(_urlencode(subject_name)) + else: + url = 'config' + result = await self._rest_client.delete(url) + return ServerConfig.from_dict(result) + async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': """ Get the current config. @@ -1294,6 +1350,122 @@ async def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig' result = await self._rest_client.get(url) return ServerConfig.from_dict(result) + async def get_mode(self, subject_name: str) -> str: + """ + Get the mode for a subject. + + Args: + subject_name (str): Subject name. + + Returns: + str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `GET Subject Mode API Reference `_ + """ # noqa: E501 + result = await self._rest_client.get('mode/{}'.format(_urlencode(subject_name))) + return result['mode'] + + async def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str: + """ + Update the mode for a subject. + + Args: + subject_name (str): Subject name. + mode (str): Mode to update. + force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. + + Returns: + str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `PUT Subject Mode API Reference `_ + """ # noqa: E501 + result = await self._rest_client.put( + 'mode/{}?force={}'.format(_urlencode(subject_name), force), + body={'mode': mode}, + ) + return result['mode'] + + async def delete_mode(self, subject_name: str) -> str: + """ + Delete the mode for a subject and revert to the global default + + Args: + subject_name (str): Subject name. + + Returns: + str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `DELETE Subject Mode API Reference `_ + """ # noqa: E501 + result = await self._rest_client.delete('mode/{}'.format(_urlencode(subject_name))) + return result['mode'] + + async def get_global_mode(self) -> str: + """ + Get the current mode for Schema Registry at a global level. + + Returns: + str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `GET Global Mode API Reference `_ + """ # noqa: E501 + result = await self._rest_client.get('mode') + return result['mode'] + + async def update_global_mode(self, mode: str, force: bool = False) -> str: + """ + Update the mode for the Schema Registry at a global level. + + Args: + mode (str): Mode to update. + force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. + + Returns: + str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `PUT Global Mode API Reference `_ + """ # noqa: E501 + result = await self._rest_client.put('mode?force={}'.format(force), body={'mode': mode}) + return result['mode'] + + async def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]: + """ + Retrieves a list of contexts. + + Args: + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + List[str]: List of contexts. + + Raises: + SchemaRegistryError: if the request was unsuccessful. + """ # noqa: E501 + + result = await self._rest_client.get('contexts', query={'offset': offset, 'limit': limit}) + return result + def clear_latest_caches(self): self._latest_version_cache.clear() self._latest_with_metadata_cache.clear() diff --git a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py index 9b084c6e5..3847866f5 100644 --- a/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/_sync/schema_registry_client.py @@ -699,7 +699,6 @@ def get_schema( query['format'] = fmt if reference_format is not None: query['reference_format'] = reference_format - response = self._rest_client.get('schemas/ids/{}'.format(schema_id), query) registered_schema = RegisteredSchema.from_dict(response) @@ -1060,7 +1059,8 @@ def get_version( return registered_schema def get_referenced_by( - self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1 + self, subject_name: str, version: Union[int, str] = "latest", + offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`. @@ -1086,7 +1086,8 @@ def get_referenced_by( _urlencode(subject_name), version), query) def get_versions( - self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1 + self, subject_name: str, deleted: bool = False, deleted_only: bool = False, + offset: int = 0, limit: int = -1 ) -> List[int]: """ Get a list of all versions registered with this subject. @@ -1204,15 +1205,18 @@ def get_compatibility(self, subject_name: Optional[str] = None) -> str: return result['compatibilityLevel'] def test_compatibility( - self, subject_name: str, schema: 'Schema', - version: Union[int, str] = "latest" + self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest", + normalize: bool = False, verbose: bool = False ) -> bool: - """Test the compatibility of a candidate schema for a given subject and version + """ + Test the compatibility of a candidate schema for a given subject and version Args: subject_name (str): Subject name the schema is registered under schema (Schema): Schema instance. version (int or str, optional): Version number, or the string "latest". Defaults to "latest". + normalize (bool): Whether to normalize the input schema. + verbose (bool): Whether to return detailed error messages. Returns: bool: True if the schema is compatible with the specified version @@ -1225,11 +1229,39 @@ def test_compatibility( """ # noqa: E501 request = schema.to_dict() - response = self._rest_client.post( - 'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request + 'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format( + _urlencode(subject_name), version, normalize, verbose), + body=request ) + return response['is_compatible'] + def test_compatibility_all_versions( + self, subject_name: str, schema: 'Schema', + normalize: bool = False, verbose: bool = False + ) -> bool: + """ + Test the input schema against all schema versions under the subject (depending on the compatibility level set). + + Args: + subject_name (str): Subject of the schema versions against which compatibility is to be tested. + schema (Schema): Schema instance. + normalize (bool): Whether to normalize the input schema. + verbose (bool): Whether to return detailed error messages. + + Returns: + bool: True if the schema is compatible with all of the subject's schemas versions. + See Also: + `POST Test Compatibility Against All API Reference `_ + """ # noqa: E501 + + request = schema.to_dict() + response = self._rest_client.post( + 'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format( + _urlencode(subject_name), normalize, verbose + ), + body=request, + ) return response['is_compatible'] def set_config( @@ -1268,6 +1300,30 @@ def set_config( 'config/{}'.format(_urlencode(subject_name)), body=config.to_dict() ) + def delete_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': + """ + Delete the specified subject-level compatibility level config and revert to the global default. + + Args: + subject_name (str, optional): Subject name. Deletes global config + if left unset. + + Returns: + ServerConfig: The old deleted config + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `DELETE Subject Config API Reference `_ + """ # noqa: E501 + if subject_name is not None: + url = 'config/{}'.format(_urlencode(subject_name)) + else: + url = 'config' + result = self._rest_client.delete(url) + return ServerConfig.from_dict(result) + def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': """ Get the current config. @@ -1294,6 +1350,122 @@ def get_config(self, subject_name: Optional[str] = None) -> 'ServerConfig': result = self._rest_client.get(url) return ServerConfig.from_dict(result) + def get_mode(self, subject_name: str) -> str: + """ + Get the mode for a subject. + + Args: + subject_name (str): Subject name. + + Returns: + str: Mode for the subject. Returns one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `GET Subject Mode API Reference `_ + """ # noqa: E501 + result = self._rest_client.get('mode/{}'.format(_urlencode(subject_name))) + return result['mode'] + + def update_mode(self, subject_name: str, mode: str, force: bool = False) -> str: + """ + Update the mode for a subject. + + Args: + subject_name (str): Subject name. + mode (str): Mode to update. + force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. + + Returns: + str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `PUT Subject Mode API Reference `_ + """ # noqa: E501 + result = self._rest_client.put( + 'mode/{}?force={}'.format(_urlencode(subject_name), force), + body={'mode': mode}, + ) + return result['mode'] + + def delete_mode(self, subject_name: str) -> str: + """ + Delete the mode for a subject and revert to the global default + + Args: + subject_name (str): Subject name. + + Returns: + str: New mode for the subject. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `DELETE Subject Mode API Reference `_ + """ # noqa: E501 + result = self._rest_client.delete('mode/{}'.format(_urlencode(subject_name))) + return result['mode'] + + def get_global_mode(self) -> str: + """ + Get the current mode for Schema Registry at a global level. + + Returns: + str: Schema Registry mode. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `GET Global Mode API Reference `_ + """ # noqa: E501 + result = self._rest_client.get('mode') + return result['mode'] + + def update_global_mode(self, mode: str, force: bool = False) -> str: + """ + Update the mode for the Schema Registry at a global level. + + Args: + mode (str): Mode to update. + force (bool): Whether to force a mode change even if the Schema Registry has existing schemas. + + Returns: + str: New mode for the Schema Registry. Must be one of IMPORT, READONLY, READWRITE (default). + + Raises: + SchemaRegistryError: if the request was unsuccessful. + + See Also: + `PUT Global Mode API Reference `_ + """ # noqa: E501 + result = self._rest_client.put('mode?force={}'.format(force), body={'mode': mode}) + return result['mode'] + + def get_contexts(self, offset: int = 0, limit: int = -1) -> List[str]: + """ + Retrieves a list of contexts. + + Args: + offset (int): Pagination offset for results. + limit (int): Pagination size for results. Ignored if negative. + + Returns: + List[str]: List of contexts. + + Raises: + SchemaRegistryError: if the request was unsuccessful. + """ # noqa: E501 + + result = self._rest_client.get('contexts', query={'offset': offset, 'limit': limit}) + return result + def clear_latest_caches(self): self._latest_version_cache.clear() self._latest_with_metadata_cache.clear() diff --git a/tests/schema_registry/_async/test_api_client.py b/tests/schema_registry/_async/test_api_client.py index fb930cddf..f3e584da4 100644 --- a/tests/schema_registry/_async/test_api_client.py +++ b/tests/schema_registry/_async/test_api_client.py @@ -398,6 +398,24 @@ async def test_set_compatibility_invalid(mock_schema_registry): e.value.error_code = 42203 +async def test_delete_config(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.delete_config() + assert result.compatibility == 'FULL' + + +async def test_delete_config_subject_not_found(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + with pytest.raises(SchemaRegistryError, match="Subject not found") as e: + await sr.delete_config("notfound") + assert e.value.http_status_code == 404 + assert e.value.error_code == 40401 + + async def test_get_compatibility_subject_not_found(mock_schema_registry): conf = {'url': TEST_URL} sr = AsyncSchemaRegistryClient(conf) @@ -408,6 +426,14 @@ async def test_get_compatibility_subject_not_found(mock_schema_registry): assert e.value.error_code == 40401 +async def test_get_contexts(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.get_contexts() + assert result == ['context1', 'context2'] + + async def test_schema_equivilence(load_avsc): schema_str1 = load_avsc('basic_schema.avsc') schema_str2 = load_avsc('basic_schema.avsc') @@ -461,3 +487,69 @@ async def test_test_compatibility_with_error( await sr.test_compatibility(subject_name, schema, version) assert e.value.http_status_code == status_code assert e.value.error_code == error_code + + +async def test_test_compatibility_all_versions_no_error(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + schema = Schema(load_avsc('basic_schema.avsc'), schema_type='AVRO') + + is_compatible = await sr.test_compatibility_all_versions('subject-all-versions', schema) + assert is_compatible is True + + +async def test_get_global_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + mode = await sr.get_global_mode() + assert mode == 'READWRITE' + + +async def test_set_global_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.update_global_mode('READONLY') + assert result == 'READONLY' + + +async def test_get_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.get_mode('test-key') + assert result == 'READWRITE' + + +@pytest.mark.parametrize( + 'subject_name,match_str,status_code,error_code', + [ + ('invalid_mode', 'Invalid mode', 422, 42204), + ('operation_not_permitted', 'Operation not permitted', 422, 42205), + ] +) +async def test_update_mode_with_error(mock_schema_registry, subject_name, match_str, status_code, error_code): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + with pytest.raises(SchemaRegistryError, match=match_str) as e: + await sr.update_mode(subject_name, 'READONLY') + assert e.value.http_status_code == status_code + assert e.value.error_code == error_code + + +async def test_update_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.update_mode('test-key', 'READONLY') + assert result == 'READONLY' + + +async def test_delete_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = AsyncSchemaRegistryClient(conf) + + result = await sr.delete_mode('test-key') + assert result == 'READWRITE' diff --git a/tests/schema_registry/_sync/test_api_client.py b/tests/schema_registry/_sync/test_api_client.py index 2ee0b7802..9c7c3daf2 100644 --- a/tests/schema_registry/_sync/test_api_client.py +++ b/tests/schema_registry/_sync/test_api_client.py @@ -398,6 +398,24 @@ def test_set_compatibility_invalid(mock_schema_registry): e.value.error_code = 42203 +def test_delete_config(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.delete_config() + assert result.compatibility == 'FULL' + + +def test_delete_config_subject_not_found(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + with pytest.raises(SchemaRegistryError, match="Subject not found") as e: + sr.delete_config("notfound") + assert e.value.http_status_code == 404 + assert e.value.error_code == 40401 + + def test_get_compatibility_subject_not_found(mock_schema_registry): conf = {'url': TEST_URL} sr = SchemaRegistryClient(conf) @@ -408,6 +426,14 @@ def test_get_compatibility_subject_not_found(mock_schema_registry): assert e.value.error_code == 40401 +def test_get_contexts(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.get_contexts() + assert result == ['context1', 'context2'] + + def test_schema_equivilence(load_avsc): schema_str1 = load_avsc('basic_schema.avsc') schema_str2 = load_avsc('basic_schema.avsc') @@ -461,3 +487,69 @@ def test_test_compatibility_with_error( sr.test_compatibility(subject_name, schema, version) assert e.value.http_status_code == status_code assert e.value.error_code == error_code + + +def test_test_compatibility_all_versions_no_error(mock_schema_registry, load_avsc): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + schema = Schema(load_avsc('basic_schema.avsc'), schema_type='AVRO') + + is_compatible = sr.test_compatibility_all_versions('subject-all-versions', schema) + assert is_compatible is True + + +def test_get_global_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + mode = sr.get_global_mode() + assert mode == 'READWRITE' + + +def test_set_global_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.update_global_mode('READONLY') + assert result == 'READONLY' + + +def test_get_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.get_mode('test-key') + assert result == 'READWRITE' + + +@pytest.mark.parametrize( + 'subject_name,match_str,status_code,error_code', + [ + ('invalid_mode', 'Invalid mode', 422, 42204), + ('operation_not_permitted', 'Operation not permitted', 422, 42205), + ] +) +def test_update_mode_with_error(mock_schema_registry, subject_name, match_str, status_code, error_code): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + with pytest.raises(SchemaRegistryError, match=match_str) as e: + sr.update_mode(subject_name, 'READONLY') + assert e.value.http_status_code == status_code + assert e.value.error_code == error_code + + +def test_update_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.update_mode('test-key', 'READONLY') + assert result == 'READONLY' + + +def test_delete_mode(mock_schema_registry): + conf = {'url': TEST_URL} + sr = SchemaRegistryClient(conf) + + result = sr.delete_mode('test-key') + assert result == 'READWRITE' diff --git a/tests/schema_registry/conftest.py b/tests/schema_registry/conftest.py index 3b3d9032e..653548af6 100644 --- a/tests/schema_registry/conftest.py +++ b/tests/schema_registry/conftest.py @@ -97,10 +97,12 @@ +--------+-------------------------------------------------+-------+------------------------------+ | DELETE | /subjects/notfound/versions/422 | 42202 | Invalid version | +--------+-------------------------------------------------+-------+------------------------------+ -| GET | /config/notconfig | 40401 | Subject not found | +| GET | /config/notfound | 40401 | Subject not found | +--------+-------------------------------------------------+-------+------------------------------+ | PUT | /config** | 42203 | Invalid compatibility level | +--------+-------------------------------------------------+-------+------------------------------+ +| DELETE | /config/notfound | 40401 | Subject not found | ++--------+-------------------------------------------------+-------+------------------------------+ | POST | /compatibility/subjects/notfound/versions/[0-9] | 40401 | Subject not found | +--------+-------------------------------------------------+-------+------------------------------+ | POST | /compatibility/subjects/invalid/versions/[0-9] | 42201 | Invalid Schema | @@ -109,6 +111,10 @@ +--------+-------------------------------------------------+-------+------------------------------+ | POST | /compatibility/subjects/invalid/versions/bad | 42202 | Invalid version | +--------+-------------------------------------------------+-------+------------------------------+ +| PUT | /mode/invalid_mode | 42204 | Invalid mode | ++--------+-------------------------------------------------+-------+------------------------------+ +| PUT | /mode/operation_not_permitted | 42205 | Operation not permitted | ++--------+-------------------------------------------------+-------+------------------------------+ * POST /subjects/{}/versions does not follow the documented API error. ** PUT /config reacts to a trigger in the body: - {"compatibility": "FULL"} @@ -131,9 +137,20 @@ def mock_schema_registry(): respx_mock.post(COMPATIBILITY_SUBJECTS_VERSIONS_RE).mock( side_effect=post_compatibility_subjects_versions_callback) + respx_mock.post(COMPATIBILITY_SUBJECTS_ALL_VERSIONS_RE).mock( + side_effect=post_compatibility_subjects_all_versions_callback) + + respx_mock.get(CONFIG_RE).mock(side_effect=get_config_callback) + respx_mock.put(CONFIG_RE).mock(side_effect=put_config_callback) + respx_mock.delete(CONFIG_RE).mock(side_effect=delete_config_callback) - respx_mock.get(COMPATIBILITY_RE).mock(side_effect=get_compatibility_callback) - respx_mock.put(COMPATIBILITY_RE).mock(side_effect=put_compatibility_callback) + respx_mock.get(CONTEXTS_RE).mock(side_effect=get_contexts_callback) + + respx_mock.get(MODE_GLOBAL_RE).mock(side_effect=get_global_mode_callback) + respx_mock.put(MODE_GLOBAL_RE).mock(side_effect=put_global_mode_callback) + respx_mock.get(MODE_RE).mock(side_effect=get_mode_callback) + respx_mock.put(MODE_RE).mock(side_effect=put_mode_callback) + respx_mock.delete(MODE_RE).mock(side_effect=delete_mode_callback) respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback) respx_mock.get(SCHEMAS_VERSIONS_RE).mock(side_effect=get_schema_versions_callback) @@ -163,8 +180,16 @@ def mock_schema_registry(): SUBJECTS_VERSIONS_SCHEMA_RE = re.compile(r"/subjects/(.*)/versions/(.*)/schema(\?.*)?$") SUBJECTS_VERSIONS_REFERENCED_BY_RE = re.compile(r"/subjects/(.*)/versions/(.*)/referencedby(\?.*)?$") -COMPATIBILITY_RE = re.compile("/config/?(.*)$") +CONFIG_RE = re.compile("/config/?(.*)$") + COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$") +COMPATIBILITY_SUBJECTS_ALL_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions") + +MODE_GLOBAL_RE = re.compile(r"/mode(\?.*)?$") +MODE_RE = re.compile("/mode/(.*)$") + +CONTEXTS_RE = re.compile(r"/contexts(\?.*)?$") + # constants SCHEMA_ID = 47 VERSION = 3 @@ -205,10 +230,10 @@ def _load_avsc(name) -> str: return fd.read() -def get_compatibility_callback(request, route): +def get_config_callback(request, route): COUNTER['GET'][request.url.path] += 1 - path_match = re.match(COMPATIBILITY_RE, request.url.path) + path_match = re.match(CONFIG_RE, request.url.path) subject = path_match.group(1) if subject == "notfound": @@ -218,7 +243,7 @@ def get_compatibility_callback(request, route): return Response(200, json={'compatibility': 'FULL'}) -def put_compatibility_callback(request, route): +def put_config_callback(request, route): COUNTER['PUT'][request.url.path] += 1 body = json.loads(request.content.decode('utf-8')) @@ -231,6 +256,24 @@ def put_compatibility_callback(request, route): return Response(200, json=body) +def delete_config_callback(request, route): + COUNTER['DELETE'][request.url.path] += 1 + + path_match = re.match(CONFIG_RE, request.url.path) + subject = path_match.group(1) + + if subject == "notfound": + return Response(404, json={'error_code': 40401, + 'message': "Subject not found"}) + + return Response(200, json={'compatibility': 'FULL'}) + + +def get_contexts_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json=['context1', 'context2']) + + def delete_subject_callback(request, route): COUNTER['DELETE'][request.url.path] += 1 @@ -364,11 +407,6 @@ def post_subject_version_callback(request, route): return Response(200, json={'id': SCHEMA_ID}) -def get_subject_version_schema_callback(request, route): - COUNTER['GET'][request.url.path] += 1 - return Response(200, json=json.loads(_load_avsc(SCHEMA))) - - def get_subject_version_referenced_by_callback(request, route): COUNTER['GET'][request.url.path] += 1 return Response(200, json=[1, 2]) @@ -403,6 +441,49 @@ def post_compatibility_subjects_versions_callback(request, route): return Response(200, json={'is_compatible': True}) +def post_compatibility_subjects_all_versions_callback(request, route): + COUNTER['POST'][request.url.path] += 1 + return Response(200, json={'is_compatible': True}) + + +def get_global_mode_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json={'mode': 'READWRITE'}) + + +def put_global_mode_callback(request, route): + COUNTER['PUT'][request.url.path] += 1 + body = json.loads(request.content.decode('utf-8')) + return Response(200, json=body) + + +def get_mode_callback(request, route): + COUNTER['GET'][request.url.path] += 1 + return Response(200, json={'mode': 'READWRITE'}) + + +def put_mode_callback(request, route): + COUNTER['PUT'][request.url.path] += 1 + + path_match = re.match(MODE_RE, request.url.path) + subject = path_match.group(1) + body = json.loads(request.content.decode('utf-8')) + mode = body.get('mode') + + if subject == 'invalid_mode': + return Response(422, json={'error_code': 42204, + 'message': "Invalid mode"}) + if subject == 'operation_not_permitted': + return Response(422, json={'error_code': 42205, + 'message': "Operation not permitted"}) + return Response(200, json={'mode': mode}) + + +def delete_mode_callback(request, route): + COUNTER['DELETE'][request.url.path] += 1 + return Response(200, json={'mode': 'READWRITE'}) + + @pytest.fixture(scope="package") def load_avsc(): def get_handle(name):