|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import yaml |
| 4 | +from marshmallow import fields |
| 5 | +from marshmallow import pre_load |
| 6 | +from marshmallow import Schema |
| 7 | +from marshmallow import validate |
| 8 | +from marshmallow import ValidationError |
| 9 | + |
| 10 | +OPENAPI_VERSION = '3.1.0' |
| 11 | +TYPES = ['array', 'boolean', 'integer', 'number', 'object', 'string'] |
| 12 | +RE_VERSION = r'^[0-9]+.[0-9]+.[0-9]+$' |
| 13 | + |
| 14 | +ENDPOINT_FIELD = fields.String(required=True, validate=validate.Regexp(r'^[/][0-9a-z-{}/]*[^/]$')) |
| 15 | +HTTP_CODE_FIELD = fields.String(required=True, validate=validate.Regexp(r'^[1-5]{1}\d{2}|default$')) |
| 16 | +DESCRIPTION_FIELD = fields.String(required=True) |
| 17 | +MEDIA_TYPE_FIELD = fields.String(required=True) |
| 18 | + |
| 19 | + |
| 20 | +class OpenAPI310ValidationError(Exception): |
| 21 | + """OpenAPI specification validation error.""" |
| 22 | + |
| 23 | + |
| 24 | +class InfoSchema(Schema): |
| 25 | + title = fields.String(required=True) |
| 26 | + version = fields.String(required=True, validate=validate.Regexp(RE_VERSION)) |
| 27 | + |
| 28 | + |
| 29 | +class SchemaObjectSchema(Schema): |
| 30 | + type = fields.String(required=True, validate=validate.OneOf(TYPES)) |
| 31 | + properties = fields.Dict( |
| 32 | + keys=fields.String(required=True), values=fields.Nested('SchemaObjectSchema') |
| 33 | + ) |
| 34 | + |
| 35 | + |
| 36 | +class MediaTypeObjectSchema(Schema): |
| 37 | + schema = fields.Nested(SchemaObjectSchema) |
| 38 | + |
| 39 | + |
| 40 | +class ResponsesObjectSchema(Schema): |
| 41 | + description = DESCRIPTION_FIELD |
| 42 | + content = fields.Dict(keys=MEDIA_TYPE_FIELD, values=fields.Nested(MediaTypeObjectSchema)) |
| 43 | + |
| 44 | + |
| 45 | +class OperationObjectSchema(Schema): |
| 46 | + operation_id = fields.String(data_key='operationId') |
| 47 | + responses = fields.Dict(keys=HTTP_CODE_FIELD, values=fields.Nested(ResponsesObjectSchema)) |
| 48 | + |
| 49 | + @pre_load |
| 50 | + def normalize_nested_keys(self, data, **kwargs): |
| 51 | + responses_with_key_as_str = {str(k): v for k, v in data['responses'].items()} |
| 52 | + data['responses'] = responses_with_key_as_str |
| 53 | + return data |
| 54 | + |
| 55 | + |
| 56 | +class PathItemObjectSchema(Schema): |
| 57 | + get = fields.Nested(OperationObjectSchema) |
| 58 | + |
| 59 | + |
| 60 | +class RootSchema(Schema): |
| 61 | + openapi = fields.String( |
| 62 | + required=True, |
| 63 | + validate=validate.And( |
| 64 | + validate.Equal(OPENAPI_VERSION), validate.Regexp(r'^[/][0-9a-z-{}/]*[^/]$') |
| 65 | + ), |
| 66 | + ) |
| 67 | + info = fields.Nested(InfoSchema, required=True) |
| 68 | + paths = fields.Dict( |
| 69 | + required=True, |
| 70 | + keys=ENDPOINT_FIELD, |
| 71 | + values=fields.Nested(PathItemObjectSchema, required=True), |
| 72 | + ) |
| 73 | + |
| 74 | + |
| 75 | +class Validator: |
| 76 | + def __init__(self, path: Path | str): |
| 77 | + self.path = path |
| 78 | + self.raw_spec = self._yaml_to_dict(self.path) |
| 79 | + |
| 80 | + @staticmethod |
| 81 | + def _yaml_to_dict(path: Path) -> dict: |
| 82 | + with open(path) as f: |
| 83 | + s = yaml.safe_load(f) |
| 84 | + return s |
| 85 | + |
| 86 | + def validate(self) -> None or OpenAPI310ValidationError: |
| 87 | + try: |
| 88 | + RootSchema().validate(self.raw_spec) |
| 89 | + except ValidationError as e: |
| 90 | + raise OpenAPI310ValidationError(e) |
0 commit comments