Skip to content

Commit dc8d820

Browse files
committed
updaste
1 parent ac1ac51 commit dc8d820

File tree

5 files changed

+143
-22
lines changed

5 files changed

+143
-22
lines changed

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,8 +1270,8 @@ async def get_compatibility(self, subject_name: Optional[str] = None) -> str:
12701270
return result['compatibilityLevel']
12711271

12721272
async def test_compatibility(
1273-
self, subject_name: str, schema: 'Schema',
1274-
version: Union[int, str] = "latest"
1273+
self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest",
1274+
normalize: bool = False, verbose: bool = False
12751275
) -> bool:
12761276
"""
12771277
Test the compatibility of a candidate schema for a given subject and version
@@ -1280,6 +1280,8 @@ async def test_compatibility(
12801280
subject_name (str): Subject name the schema is registered under
12811281
schema (Schema): Schema instance.
12821282
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
1283+
normalize (bool): Whether to normalize the input schema.
1284+
verbose (bool): Wehther to return detailed error messages.
12831285
12841286
Returns:
12851287
bool: True if the schema is compatible with the specified version
@@ -1293,31 +1295,33 @@ async def test_compatibility(
12931295

12941296
request = schema.to_dict()
12951297
response = await self._rest_client.post(
1296-
'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request
1298+
'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format(_urlencode(subject_name), version, normalize, verbose),
1299+
body=request
12971300
)
12981301
return response['is_compatible'] # TODO: should it return entire response (including error messages)?
12991302

13001303
async def test_compatibility_all_versions(
1301-
self, subject_name: str, schema: 'Schema', normalize: bool = False, verbose: bool = False
1304+
self, subject_name: str, schema: 'Schema',
1305+
normalize: bool = False, verbose: bool = False
13021306
) -> bool:
13031307
"""
1304-
Test the input schema against one of more versions in the subject (depending on the compatibility level set).
1308+
Test the input schema against all schema versions under the subject (depending on the compatibility level set).
13051309
13061310
Args:
13071311
subject_name (str): Subject of the schema versions against which compatibility is to be tested.
13081312
schema (Schema): Schema instance.
1309-
normalize (bool): Whether to normalize the input schema. # TODO: missing in cp + cc docs
1313+
normalize (bool): Whether to normalize the input schema.
13101314
verbose (bool): Wehther to return detailed error messages.
13111315
13121316
Returns:
13131317
bool: True if the schema is compatible with all of the subject's schemas versions.
13141318
See Also:
13151319
`POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_
13161320
"""
1321+
13171322
request = schema.to_dict()
13181323
response = await self._rest_client.post(
1319-
'compatibility/subjects/{}/versions'.format(_urlencode(subject_name)),
1320-
query={'normalize': normalize, 'verbose': verbose},
1324+
'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format(_urlencode(subject_name), normalize, verbose),
13211325
body=request,
13221326
)
13231327
return response['is_compatible'] # TODO: should it return entire response (including error messages)?

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,8 +1270,8 @@ def get_compatibility(self, subject_name: Optional[str] = None) -> str:
12701270
return result['compatibilityLevel']
12711271

12721272
def test_compatibility(
1273-
self, subject_name: str, schema: 'Schema',
1274-
version: Union[int, str] = "latest"
1273+
self, subject_name: str, schema: 'Schema', version: Union[int, str] = "latest",
1274+
normalize: bool = False, verbose: bool = False
12751275
) -> bool:
12761276
"""
12771277
Test the compatibility of a candidate schema for a given subject and version
@@ -1280,6 +1280,8 @@ def test_compatibility(
12801280
subject_name (str): Subject name the schema is registered under
12811281
schema (Schema): Schema instance.
12821282
version (int or str, optional): Version number, or the string "latest". Defaults to "latest".
1283+
normalize (bool): Whether to normalize the input schema.
1284+
verbose (bool): Wehther to return detailed error messages.
12831285
12841286
Returns:
12851287
bool: True if the schema is compatible with the specified version
@@ -1293,31 +1295,33 @@ def test_compatibility(
12931295

12941296
request = schema.to_dict()
12951297
response = self._rest_client.post(
1296-
'compatibility/subjects/{}/versions/{}'.format(_urlencode(subject_name), version), body=request
1298+
'compatibility/subjects/{}/versions/{}?normalize={}&verbose={}'.format(_urlencode(subject_name), version, normalize, verbose),
1299+
body=request
12971300
)
12981301
return response['is_compatible'] # TODO: should it return entire response (including error messages)?
12991302

13001303
def test_compatibility_all_versions(
1301-
self, subject_name: str, schema: 'Schema', normalize: bool = False, verbose: bool = False
1304+
self, subject_name: str, schema: 'Schema',
1305+
normalize: bool = False, verbose: bool = False
13021306
) -> bool:
13031307
"""
1304-
Test the input schema against one of more versions in the subject (depending on the compatibility level set).
1308+
Test the input schema against all schema versions under the subject (depending on the compatibility level set).
13051309
13061310
Args:
13071311
subject_name (str): Subject of the schema versions against which compatibility is to be tested.
13081312
schema (Schema): Schema instance.
1309-
normalize (bool): Whether to normalize the input schema. # TODO: missing in cp + cc docs
1313+
normalize (bool): Whether to normalize the input schema.
13101314
verbose (bool): Wehther to return detailed error messages.
13111315
13121316
Returns:
13131317
bool: True if the schema is compatible with all of the subject's schemas versions.
13141318
See Also:
13151319
`POST Test Compatibility Against All API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions>`_
13161320
"""
1321+
13171322
request = schema.to_dict()
13181323
response = self._rest_client.post(
1319-
'compatibility/subjects/{}/versions'.format(_urlencode(subject_name)),
1320-
query={'normalize': normalize, 'verbose': verbose},
1324+
'compatibility/subjects/{}/versions?normalize={}&verbose={}'.format(_urlencode(subject_name), normalize, verbose),
13211325
body=request,
13221326
)
13231327
return response['is_compatible'] # TODO: should it return entire response (including error messages)?

tests/schema_registry/_async/test_api_client.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ async def test_test_compatibility_with_error(
482482
assert e.value.error_code == error_code
483483

484484

485-
async def test_test_compatibility_all_versions(mock_schema_registry, load_avsc):
485+
async def test_test_compatibility_all_versions_no_error(mock_schema_registry, load_avsc):
486486
conf = {'url': TEST_URL}
487487
sr = AsyncSchemaRegistryClient(conf)
488488
schema = Schema(load_avsc('basic_schema.avsc'), schema_type='AVRO')
@@ -505,3 +505,44 @@ async def test_set_global_mode(mock_schema_registry):
505505

506506
result = await sr.update_global_mode('READONLY')
507507
assert result == 'READONLY'
508+
509+
510+
async def test_get_mode(mock_schema_registry):
511+
conf = {'url': TEST_URL}
512+
sr = AsyncSchemaRegistryClient(conf)
513+
514+
result = await sr.get_mode('test-key')
515+
assert result == 'READWRITE'
516+
517+
518+
@pytest.mark.parametrize(
519+
'subject_name,match_str,status_code,error_code',
520+
[
521+
('invalid_mode', 'Invalid mode', 422, 42204),
522+
('operation_not_permitted', 'Operation not permitted', 422, 42205),
523+
]
524+
)
525+
async def test_update_mode_with_error(mock_schema_registry, subject_name, match_str, status_code, error_code):
526+
conf = {'url': TEST_URL}
527+
sr = AsyncSchemaRegistryClient(conf)
528+
529+
with pytest.raises(SchemaRegistryError, match=match_str) as e:
530+
await sr.update_mode(subject_name, 'READONLY')
531+
assert e.value.http_status_code == status_code
532+
assert e.value.error_code == error_code
533+
534+
535+
async def test_update_mode(mock_schema_registry):
536+
conf = {'url': TEST_URL}
537+
sr = AsyncSchemaRegistryClient(conf)
538+
539+
result = await sr.update_mode('test-key', 'READONLY')
540+
assert result == 'READONLY'
541+
542+
543+
async def test_delete_mode(mock_schema_registry):
544+
conf = {'url': TEST_URL}
545+
sr = AsyncSchemaRegistryClient(conf)
546+
547+
result = await sr.delete_mode('test-key')
548+
assert result == 'READWRITE'

tests/schema_registry/_sync/test_api_client.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def test_test_compatibility_with_error(
482482
assert e.value.error_code == error_code
483483

484484

485-
def test_test_compatibility_all_versions(mock_schema_registry, load_avsc):
485+
def test_test_compatibility_all_versions_no_error(mock_schema_registry, load_avsc):
486486
conf = {'url': TEST_URL}
487487
sr = SchemaRegistryClient(conf)
488488
schema = Schema(load_avsc('basic_schema.avsc'), schema_type='AVRO')
@@ -505,3 +505,44 @@ def test_set_global_mode(mock_schema_registry):
505505

506506
result = sr.update_global_mode('READONLY')
507507
assert result == 'READONLY'
508+
509+
510+
def test_get_mode(mock_schema_registry):
511+
conf = {'url': TEST_URL}
512+
sr = SchemaRegistryClient(conf)
513+
514+
result = sr.get_mode('test-key')
515+
assert result == 'READWRITE'
516+
517+
518+
@pytest.mark.parametrize(
519+
'subject_name,match_str,status_code,error_code',
520+
[
521+
('invalid_mode', 'Invalid mode', 422, 42204),
522+
('operation_not_permitted', 'Operation not permitted', 422, 42205),
523+
]
524+
)
525+
def test_update_mode_with_error(mock_schema_registry, subject_name, match_str, status_code, error_code):
526+
conf = {'url': TEST_URL}
527+
sr = SchemaRegistryClient(conf)
528+
529+
with pytest.raises(SchemaRegistryError, match=match_str) as e:
530+
sr.update_mode(subject_name, 'READONLY')
531+
assert e.value.http_status_code == status_code
532+
assert e.value.error_code == error_code
533+
534+
535+
def test_update_mode(mock_schema_registry):
536+
conf = {'url': TEST_URL}
537+
sr = SchemaRegistryClient(conf)
538+
539+
result = sr.update_mode('test-key', 'READONLY')
540+
assert result == 'READONLY'
541+
542+
543+
def test_delete_mode(mock_schema_registry):
544+
conf = {'url': TEST_URL}
545+
sr = SchemaRegistryClient(conf)
546+
547+
result = sr.delete_mode('test-key')
548+
assert result == 'READWRITE'

tests/schema_registry/conftest.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@
109109
+--------+-------------------------------------------------+-------+------------------------------+
110110
| POST | /compatibility/subjects/invalid/versions/bad | 42202 | Invalid version |
111111
+--------+-------------------------------------------------+-------+------------------------------+
112+
| PUT | /mode/invalid_mode | 42204 | Invalid mode |
113+
+--------+-------------------------------------------------+-------+------------------------------+
114+
| PUT | /mode/operation_not_permitted | 42205 | Operation not permitted |
115+
+--------+-------------------------------------------------+-------+------------------------------+
112116
* POST /subjects/{}/versions does not follow the documented API error.
113117
** PUT /config reacts to a trigger in the body: - {"compatibility": "FULL"}
114118
@@ -139,9 +143,9 @@ def mock_schema_registry():
139143

140144
respx_mock.get(MODE_GLOBAL_RE).mock(side_effect=get_global_mode_callback)
141145
respx_mock.put(MODE_GLOBAL_RE).mock(side_effect=put_global_mode_callback)
142-
# respx_mock.get(MODE_RE).mock(side_effect=get_mode_callback)
143-
# respx_mock.put(MODE_RE).mock(side_effect=put_mode_callback)
144-
# respx_mock.delete(MODE_RE).mock(side_effect=delete_mode_callback)
146+
respx_mock.get(MODE_RE).mock(side_effect=get_mode_callback)
147+
respx_mock.put(MODE_RE).mock(side_effect=put_mode_callback)
148+
respx_mock.delete(MODE_RE).mock(side_effect=delete_mode_callback)
145149

146150
respx_mock.get(SCHEMAS_RE).mock(side_effect=get_schemas_callback)
147151
respx_mock.get(SCHEMAS_STRING_RE).mock(side_effect=get_schema_string_callback)
@@ -178,7 +182,7 @@ def mock_schema_registry():
178182
COMPATIBILITY_SUBJECTS_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions/?(.*)$")
179183
COMPATIBILITY_SUBJECTS_ALL_VERSIONS_RE = re.compile("/compatibility/subjects/(.*)/versions")
180184

181-
MODE_GLOBAL_RE = re.compile("/mode(\?.*)?$")
185+
MODE_GLOBAL_RE = re.compile(r"/mode(\?.*)?$")
182186
MODE_RE = re.compile("/mode/(.*)$")
183187

184188
# constants
@@ -446,6 +450,33 @@ def put_global_mode_callback(request, route):
446450
return Response(200, json=body)
447451

448452

453+
def get_mode_callback(request, route):
454+
COUNTER['GET'][request.url.path] += 1
455+
return Response(200, json={'mode': 'READWRITE'})
456+
457+
458+
def put_mode_callback(request, route):
459+
COUNTER['PUT'][request.url.path] += 1
460+
461+
path_match = re.match(MODE_RE, request.url.path)
462+
subject = path_match.group(1)
463+
body = json.loads(request.content.decode('utf-8'))
464+
mode = body.get('mode')
465+
466+
if subject == 'invalid_mode':
467+
return Response(422, json={'error_code': 42204,
468+
'message': "Invalid mode"})
469+
if subject == 'operation_not_permitted':
470+
return Response(422, json={'error_code': 42205,
471+
'message': "Operation not permitted"})
472+
return Response(200, json={'mode': mode})
473+
474+
475+
def delete_mode_callback(request, route):
476+
COUNTER['DELETE'][request.url.path] += 1
477+
return Response(200, json={'mode': 'READWRITE'})
478+
479+
449480
@pytest.fixture(scope="package")
450481
def load_avsc():
451482
def get_handle(name):

0 commit comments

Comments
 (0)