Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
f"{AJV_VALIDATOR_SCHEME}://{AJV_VALIDATOR_HOST}:{AJV_VALIDATOR_PORT}/validate",
)

DEFAULT_BODY = Body(None)

app = FastAPI()

logger = structlog.get_logger()
Expand Down Expand Up @@ -72,7 +74,7 @@ async def status():


@app.post("/validate")
async def validate_schema_request_body(payload=Body(None)):
async def validate_schema_request_body(payload=DEFAULT_BODY):
logger.info("Schema validation request received")
return await validate_schema(payload)

Expand All @@ -97,7 +99,7 @@ async def validate_schema_from_url(url=None):
try:
# Opens the URL and validates the schema
# Mitigation for opening ftp:// and file:// URLs with urllib.request is implemented in lines 91-95
with request.urlopen(parsed_url.geturl()) as opened_url: # nosec B310
with request.urlopen(parsed_url.geturl()) as opened_url: # nosec B310 # noqa: S310
return await validate_schema(data=opened_url.read().decode())
except error.URLError:
logger.warning(
Expand All @@ -108,20 +110,22 @@ async def validate_schema_from_url(url=None):
status_code=404,
content=f"Could not load schema from allowed domain - URL not found [{url}]",
)
return None


async def validate_schema(data):
async def validate_schema(data): # pylint: disable=R0911
logger.debug("Attempting to validate schema from JSON data...")
if data:
if isinstance(data, dict):
logger.info("JSON data received as dictionary - parsing not required")
json_to_validate = data
# Sets `json_to_validate` to the parsed data if it is a string
elif isinstance(data, str):
logger.info("JSON data received as string - parsing required")
logger.debug("Attempting to parse JSON data...")
json_to_validate = parse_json(data)
# Returns an error response if the data received is not a string or dictionary
# If parse_json returns a Response (error), return it immediately
if isinstance(json_to_validate, Response):
return json_to_validate
else:
logger.error(
"Invalid data type received for validation (expected string or dictionary)",
Expand Down Expand Up @@ -185,15 +189,12 @@ async def validate_schema(data):
status=400,
errors=response["errors"],
)
response = Response(content=json.dumps(response), status_code=400)

return response
return Response(content=json.dumps(response), status_code=400)

logger.info("Schema validation successfully completed with no errors", status=200)

response = Response(content=json.dumps(response), status_code=200)

return response
return Response(content=json.dumps(response), status_code=200)


def is_url_allowed(parsed_url, domain):
Expand Down Expand Up @@ -241,10 +242,10 @@ def parse_json(data):
try:
processed_data = json.loads(data)
logger.info("JSON data parsed successfully")
return processed_data
except JSONDecodeError:
logger.exception("Failed to parse JSON data", status=400)
return Response(status_code=400, content="Failed to parse JSON")
return processed_data


if __name__ == "__main__":
Expand Down
35 changes: 21 additions & 14 deletions app/validators/answers/date_answer_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from datetime import datetime
from datetime import datetime, timezone

from dateutil.relativedelta import relativedelta

Expand All @@ -17,21 +17,28 @@ def validate(self):
return self.errors

def is_offset_date_valid(self):
if "minimum" in self.answer and "maximum" in self.answer:
if (
"value" in self.answer["minimum"]
and "value" in self.answer["maximum"]
and not isinstance(self.answer["minimum"]["value"], dict)
and not isinstance(self.answer["maximum"]["value"], dict)
):
minimum_date = self._get_offset_date(self.answer["minimum"])
maximum_date = self._get_offset_date(self.answer["maximum"])
return minimum_date < maximum_date if minimum_date and maximum_date else False
return True
minimum = self.answer.get("minimum")
maximum = self.answer.get("maximum")
if not (minimum and maximum):
return True

if (
"value" not in minimum
or "value" not in maximum
or isinstance(minimum["value"], dict)
or isinstance(maximum["value"], dict)
):
return True

minimum_date = self._get_offset_date(minimum)
maximum_date = self._get_offset_date(maximum)
if minimum_date and maximum_date:
return minimum_date < maximum_date
return False

def _get_offset_date(self, answer_min_or_max):
if answer_min_or_max["value"] == "now":
value = datetime.utcnow().strftime("%Y-%m-%d")
value = datetime.now(timezone.utc).strftime("%Y-%m-%d")
else:
value = answer_min_or_max["value"]

Expand All @@ -56,4 +63,4 @@ def _convert_to_datetime(value):
if value and re.match(r"\d{4}-\d{2}-\d{2}", value):
date_format = "%Y-%m-%d"

return datetime.strptime(value, date_format) if value else None
return datetime.strptime(value, date_format).replace(tzinfo=timezone.utc) if value else None
2 changes: 1 addition & 1 deletion app/validators/blocks/block_validator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Mapping
from collections.abc import Mapping

from app.validators.questionnaire_schema import (
QuestionnaireSchema,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Mapping
from collections.abc import Mapping

from app.validators.blocks.calculation_block_validator import CalculationBlockValidator
from app.validators.questionnaire_schema import (
Expand Down Expand Up @@ -77,6 +77,7 @@ def validate_calculated_summary_ids_to_calculate(self):
)
self.answers_to_calculate.extend(answers)
self.calculated_summary_answers[calculated_summary_id] = tuple(answers)
return None

def validate_calculated_summary_is_before_grand_calculated_summary_block(
self,
Expand Down
Loading
Loading