diff --git a/api.py b/api.py index 5bee89c1..cc291328 100644 --- a/api.py +++ b/api.py @@ -34,6 +34,8 @@ f"{AJV_VALIDATOR_SCHEME}://{AJV_VALIDATOR_HOST}:{AJV_VALIDATOR_PORT}/validate", ) +DEFAULT_BODY = Body(None) + app = FastAPI() logger = structlog.get_logger() @@ -72,7 +74,7 @@ async def status(): @app.post("/validate") -async def validate_schema_request_body(payload=Body(None)): +async def validate_schema_request_body(payload=DEFAULT_BODY): logger.info("Schema validation request received") return await validate_schema(payload) @@ -97,7 +99,7 @@ async def validate_schema_from_url(url=None): try: # Opens the URL and validates the schema # Mitigation for opening ftp:// and file:// URLs with urllib.request is implemented in lines 91-95 - with request.urlopen(parsed_url.geturl()) as opened_url: # nosec B310 + with request.urlopen(parsed_url.geturl()) as opened_url: # nosec B310 # noqa: S310 return await validate_schema(data=opened_url.read().decode()) except error.URLError: logger.warning( @@ -108,20 +110,22 @@ async def validate_schema_from_url(url=None): status_code=404, content=f"Could not load schema from allowed domain - URL not found [{url}]", ) + return None -async def validate_schema(data): +async def validate_schema(data): # pylint: disable=R0911 logger.debug("Attempting to validate schema from JSON data...") if data: if isinstance(data, dict): logger.info("JSON data received as dictionary - parsing not required") json_to_validate = data - # Sets `json_to_validate` to the parsed data if it is a string elif isinstance(data, str): logger.info("JSON data received as string - parsing required") logger.debug("Attempting to parse JSON data...") json_to_validate = parse_json(data) - # Returns an error response if the data received is not a string or dictionary + # If parse_json returns a Response (error), return it immediately + if isinstance(json_to_validate, Response): + return json_to_validate else: logger.error( "Invalid data type received for validation (expected string or dictionary)", @@ -185,15 +189,12 @@ async def validate_schema(data): status=400, errors=response["errors"], ) - response = Response(content=json.dumps(response), status_code=400) - return response + return Response(content=json.dumps(response), status_code=400) logger.info("Schema validation successfully completed with no errors", status=200) - response = Response(content=json.dumps(response), status_code=200) - - return response + return Response(content=json.dumps(response), status_code=200) def is_url_allowed(parsed_url, domain): @@ -241,10 +242,10 @@ def parse_json(data): try: processed_data = json.loads(data) logger.info("JSON data parsed successfully") - return processed_data except JSONDecodeError: logger.exception("Failed to parse JSON data", status=400) return Response(status_code=400, content="Failed to parse JSON") + return processed_data if __name__ == "__main__": diff --git a/app/validators/answers/date_answer_validator.py b/app/validators/answers/date_answer_validator.py index 622fee99..33b00bea 100644 --- a/app/validators/answers/date_answer_validator.py +++ b/app/validators/answers/date_answer_validator.py @@ -1,5 +1,5 @@ import re -from datetime import datetime +from datetime import datetime, timezone from dateutil.relativedelta import relativedelta @@ -17,21 +17,28 @@ def validate(self): return self.errors def is_offset_date_valid(self): - if "minimum" in self.answer and "maximum" in self.answer: - if ( - "value" in self.answer["minimum"] - and "value" in self.answer["maximum"] - and not isinstance(self.answer["minimum"]["value"], dict) - and not isinstance(self.answer["maximum"]["value"], dict) - ): - minimum_date = self._get_offset_date(self.answer["minimum"]) - maximum_date = self._get_offset_date(self.answer["maximum"]) - return minimum_date < maximum_date if minimum_date and maximum_date else False - return True + minimum = self.answer.get("minimum") + maximum = self.answer.get("maximum") + if not (minimum and maximum): + return True + + if ( + "value" not in minimum + or "value" not in maximum + or isinstance(minimum["value"], dict) + or isinstance(maximum["value"], dict) + ): + return True + + minimum_date = self._get_offset_date(minimum) + maximum_date = self._get_offset_date(maximum) + if minimum_date and maximum_date: + return minimum_date < maximum_date + return False def _get_offset_date(self, answer_min_or_max): if answer_min_or_max["value"] == "now": - value = datetime.utcnow().strftime("%Y-%m-%d") + value = datetime.now(timezone.utc).strftime("%Y-%m-%d") else: value = answer_min_or_max["value"] @@ -56,4 +63,4 @@ def _convert_to_datetime(value): if value and re.match(r"\d{4}-\d{2}-\d{2}", value): date_format = "%Y-%m-%d" - return datetime.strptime(value, date_format) if value else None + return datetime.strptime(value, date_format).replace(tzinfo=timezone.utc) if value else None diff --git a/app/validators/blocks/block_validator.py b/app/validators/blocks/block_validator.py index f3535e70..e89cecaf 100644 --- a/app/validators/blocks/block_validator.py +++ b/app/validators/blocks/block_validator.py @@ -1,4 +1,4 @@ -from typing import Mapping +from collections.abc import Mapping from app.validators.questionnaire_schema import ( QuestionnaireSchema, diff --git a/app/validators/blocks/grand_calculated_summary_block_validator.py b/app/validators/blocks/grand_calculated_summary_block_validator.py index f220d43e..56eeea96 100644 --- a/app/validators/blocks/grand_calculated_summary_block_validator.py +++ b/app/validators/blocks/grand_calculated_summary_block_validator.py @@ -1,4 +1,4 @@ -from typing import Mapping +from collections.abc import Mapping from app.validators.blocks.calculation_block_validator import CalculationBlockValidator from app.validators.questionnaire_schema import ( @@ -77,6 +77,7 @@ def validate_calculated_summary_ids_to_calculate(self): ) self.answers_to_calculate.extend(answers) self.calculated_summary_answers[calculated_summary_id] = tuple(answers) + return None def validate_calculated_summary_is_before_grand_calculated_summary_block( self, diff --git a/app/validators/questionnaire_schema.py b/app/validators/questionnaire_schema.py index 80540f4a..5c2eb3d4 100644 --- a/app/validators/questionnaire_schema.py +++ b/app/validators/questionnaire_schema.py @@ -2,8 +2,9 @@ import collections import re from collections import defaultdict -from functools import cached_property, lru_cache -from typing import Iterable, Mapping, TypeVar +from collections.abc import Iterable, Mapping +from functools import cached_property +from typing import TypeVar from jsonpath_ng import parse from jsonpath_ng.ext import parse as ext_parse @@ -56,9 +57,7 @@ def get_parent_block_from_match(match) -> dict | None: if walked_contexts[-1] is None: return None - block = walked_contexts[-3].value - - return block + return walked_contexts[-3].value def get_element_value(key, match): @@ -98,6 +97,7 @@ def get_context_from_match(match): class QuestionnaireSchema: def __init__(self, schema): self.schema = schema + self._cache = {} self.matches = [ *parse("$..blocks[*]").find(self.schema), *parse("$..[add_block, edit_block, add_or_edit_block, remove_block]").find( @@ -148,9 +148,13 @@ def __init__(self, schema): self._answers_with_context = {} self._lists_with_context = {} - @lru_cache def get_block_ids_for_block_type(self, block_type: str) -> list[str]: - return [block["id"] for block in self.blocks if block["type"] == block_type] + cache_key = ("get_block_ids_for_block_type", block_type) + if cache_key in self._cache: + return self._cache[cache_key] + result = [block["id"] for block in self.blocks if block["type"] == block_type] + self._cache[cache_key] = result + return result @cached_property def list_names_by_dynamic_answer_id(self) -> dict[str, str]: @@ -341,62 +345,94 @@ def answers(self): for question, _ in self.questions_with_context: yield from self.get_answers_from_question(question) - @lru_cache def get_answer(self, answer_id): - return self.answers_with_context[answer_id]["answer"] + cache_key = ("get_answer", answer_id) + if cache_key in self._cache: + return self._cache[cache_key] + result = self.answers_with_context[answer_id]["answer"] + self._cache[cache_key] = result + return result - @lru_cache def get_answer_type(self, answer_id): + cache_key = ("get_answer_type", answer_id) + if cache_key in self._cache: + return self._cache[cache_key] answer = self.get_answer(answer_id) - return AnswerType(answer["type"]) + result = AnswerType(answer["type"]) + self._cache[cache_key] = result + return result - @lru_cache def get_group(self, group_id): - return self.groups_by_id[group_id] + cache_key = ("get_group", group_id) + if cache_key in self._cache: + return self._cache[cache_key] + result = self.groups_by_id[group_id] + self._cache[cache_key] = result + return result - @lru_cache def get_section(self, section_id): - return self.sections_by_id[section_id] + cache_key = ("get_section", section_id) + if cache_key in self._cache: + return self._cache[cache_key] + result = self.sections_by_id[section_id] + self._cache[cache_key] = result + return result - @lru_cache def get_block(self, block_id): - return self.blocks_by_id.get(block_id, None) + cache_key = ("get_block", block_id) + if cache_key in self._cache: + return self._cache[cache_key] + result = self.blocks_by_id.get(block_id, None) + self._cache[cache_key] = result + return result - @lru_cache def get_blocks(self, **filters): + cache_key = ("get_blocks", tuple(sorted(filters.items()))) + if cache_key in self._cache: + return self._cache[cache_key] conditions = [] for key, value in filters.items(): conditions.append(f'@.{key}=="{value}"') if conditions: final_condition = " & ".join(conditions) - return [ + result = [ match.value for match in ext_parse(f"$..blocks[?({final_condition})]").find( self.schema, ) ] + self._cache[cache_key] = result + return result + self._cache[cache_key] = self.blocks return self.blocks - @lru_cache def get_other_blocks(self, block_id_to_filter, **filters): + cache_key = ("get_other_blocks", block_id_to_filter, tuple(sorted(filters.items()))) + if cache_key in self._cache: + return self._cache[cache_key] conditions = [] for key, value in filters.items(): conditions.append(f'@.{key}=="{value}"') if conditions: final_condition = " & ".join(conditions) - return [ + result = [ match.value for match in ext_parse( f'$..blocks[?(@.id != "{block_id_to_filter}" & {final_condition})]', ).find(self.schema) ] + self._cache[cache_key] = result + return result + self._cache[cache_key] = self.blocks return self.blocks - @lru_cache def has_single_driving_question(self, list_name): - return ( + cache_key = ("has_single_driving_question", list_name) + if cache_key in self._cache: + return self._cache[cache_key] + result = ( len( self.get_blocks( type="ListCollectorDrivingQuestion", @@ -405,52 +441,65 @@ def has_single_driving_question(self, list_name): ) == 1 ) + self._cache[cache_key] = result + return result @staticmethod def get_all_questions_for_block(block): """Get all questions on a block including variants.""" - questions = [] - - for variant in block.get("question_variants", []): - questions.append(variant["question"]) - + questions = [variant["question"] for variant in block.get("question_variants", [])] single_question = block.get("question") if single_question: questions.append(single_question) - return questions - @lru_cache def get_list_collector_answer_ids(self, block_id): - """Get all answer IDs for a list collector block.""" + cache_key = ("get_list_collector_answer_ids", block_id) + if cache_key in self._cache: + return self._cache[cache_key] block = self.blocks_by_id[block_id] if "add_or_edit_block" in block: - return self.get_all_answer_ids(block["add_or_edit_block"]["id"]) + result = self.get_all_answer_ids(block["add_or_edit_block"]["id"]) + self._cache[cache_key] = result + return result add_answer_ids = self.get_all_answer_ids(block["add_block"]["id"]) - edit_answer_ids = self.get_all_answer_ids(block["edit_block"]["id"]) - return add_answer_ids | edit_answer_ids + result = add_answer_ids | edit_answer_ids + self._cache[cache_key] = result + return result - @lru_cache def get_list_collector_answer_ids_by_child_block(self, block_id: str): + cache_key = ("get_list_collector_answer_ids_by_child_block", block_id) + if cache_key in self._cache: + return self._cache[cache_key] block = self.blocks_by_id[block_id] - return { + result = { child_block: self.get_all_answer_ids(block[child_block]["id"]) for child_block in ["add_block", "edit_block", "remove_block"] } + self._cache[cache_key] = result + return result - @lru_cache def get_all_answer_ids(self, block_id): + cache_key = ("get_all_answer_ids", block_id) + if cache_key in self._cache: + return self._cache[cache_key] questions = self.get_all_questions_for_block(self.blocks_by_id[block_id]) - return {answer["id"] for question in questions for answer in self.get_answers_from_question(question)} + result = {answer["id"] for question in questions for answer in self.get_answers_from_question(question)} + self._cache[cache_key] = result + return result - @lru_cache def get_all_dynamic_answer_ids(self, block_id): + cache_key = ("get_all_dynamic_answer_ids", block_id) + if cache_key in self._cache: + return self._cache[cache_key] questions = self.get_all_questions_for_block(self.blocks_by_id[block_id]) - return { + result = { answer["id"] for question in questions for answer in question.get("dynamic_answers", {}).get("answers", []) } + self._cache[cache_key] = result + return result def get_list_name_for_answer_id(self, answer_id: str) -> str | None: """Get list name for answer id. @@ -470,20 +519,29 @@ def get_list_name_for_answer_id(self, answer_id: str) -> str | None: return section["repeat"]["for_list"] return None - @lru_cache def get_first_answer_in_block(self, block_id): + cache_key = ("get_first_answer_in_block", block_id) + if cache_key in self._cache: + return self._cache[cache_key] questions = self.get_all_questions_for_block(self.blocks_by_id[block_id]) - return self.get_answers_from_question(questions[0])[0] + result = self.get_answers_from_question(questions[0])[0] + self._cache[cache_key] = result + return result - @lru_cache def get_block_id_by_answer_id(self, answer_id): + cache_key = ("get_block_id_by_answer_id", answer_id) + if cache_key in self._cache: + return self._cache[cache_key] for question, context in self.questions_with_context: if block_id := self.get_block_id_for_answer( answer_id=answer_id, answers=self.get_answers_from_question(question), context=context, ): + self._cache[cache_key] = block_id return block_id + self._cache[cache_key] = None + return None @staticmethod def get_block_id_for_answer(*, answer_id, answers, context): @@ -494,12 +552,16 @@ def get_block_id_for_answer(*, answer_id, answers, context): detail_answer = option.get("detail_answer") if detail_answer and answer_id == detail_answer["id"]: return context["block"] + return None - @lru_cache def get_block_by_answer_id(self, answer_id): + cache_key = ("get_block_by_answer_id", answer_id) + if cache_key in self._cache: + return self._cache[cache_key] block_id = self.get_block_id_by_answer_id(answer_id) - - return self.get_block(block_id) + result = self.get_block(block_id) + self._cache[cache_key] = result + return result def _get_numeric_range_values(self, answer, answer_ranges): min_value = answer.get("minimum", {}).get("value", {}) diff --git a/app/validators/questionnaire_validator.py b/app/validators/questionnaire_validator.py index 8ca77ba3..ff7a2cde 100644 --- a/app/validators/questionnaire_validator.py +++ b/app/validators/questionnaire_validator.py @@ -1,5 +1,5 @@ import re -from typing import Mapping +from collections.abc import Mapping from eq_translations.survey_schema import SurveySchema @@ -110,7 +110,7 @@ def validate_smart_quotes(self): values_to_check = schema_text.values() for schema_text in values_to_check: - if schema_text and quote_regex.search(schema_text): + if isinstance(schema_text, str) and schema_text and quote_regex.search(schema_text): self.add_error( error_messages.DUMB_QUOTES_FOUND, pointer=translatable_item.pointer, @@ -127,7 +127,7 @@ def validate_white_spaces(self): values_to_check = schema_text.values() for text in values_to_check: - if text and (text.startswith(" ") or text.endswith(" ") or " " in text): + if isinstance(text, str) and (text.startswith(" ") or text.endswith(" ") or " " in text): self.add_error( error_messages.INVALID_WHITESPACE_FOUND, pointer=translatable_item.pointer, @@ -198,6 +198,7 @@ def validate_answer_source_group(self, group): ), group_id=group["id"], ) + return None def validate_answer_source_section(self, section, section_index): identifier_references = get_object_containing_key(section, "source") @@ -206,39 +207,52 @@ def validate_answer_source_section(self, section, section_index): source_block = self.questionnaire_schema.get_block_by_answer_id( identifier_reference["identifier"], ) - if isinstance(source_block, dict) and (source_block_id := self.resolve_source_block_id(source_block)): - if source_block_section_id := self.questionnaire_schema.get_section_id_for_block_id( - source_block_id, - ): - source_block_section_index = self.questionnaire_schema.get_section_index_for_section_id( + if ( + isinstance(source_block, dict) + and (source_block_id := self.resolve_source_block_id(source_block)) + and ( + source_block_section_id := self.questionnaire_schema.get_section_id_for_block_id( + source_block_id, + ) + ) + and ( + source_block_section_index := self.questionnaire_schema.get_section_index_for_section_id( source_block_section_id, ) - if source_block_section_index and section_index < source_block_section_index: - self.add_error( - error_messages.ANSWER_REFERENCED_BEFORE_EXISTS.format( - answer_id=identifier_reference["identifier"], - ), - section_id=section["id"], - ) + ) + and section_index < source_block_section_index + ): + self.add_error( + error_messages.ANSWER_REFERENCED_BEFORE_EXISTS.format( + answer_id=identifier_reference["identifier"], + ), + section_id=section["id"], + ) def resolve_source_block_id(self, source_block: Mapping) -> str: # Handling of source block nested (list collector's add-block) - if source_block["type"] == "ListAddQuestion": - if isinstance(source_block, dict) and ( + if ( + source_block["type"] == "ListAddQuestion" + and isinstance(source_block, dict) + and ( block_id := self.questionnaire_schema.get_parent_list_collector_for_add_block( source_block["id"], ) - ): - return block_id + ) + ): + return block_id # Handling of source block nested (list collector's repeating block) - if source_block["type"] == "ListRepeatingQuestion": - if isinstance(source_block, dict) and ( + if ( + source_block["type"] == "ListRepeatingQuestion" + and isinstance(source_block, dict) + and ( block_id := self.questionnaire_schema.get_parent_list_collector_for_repeating_block( source_block["id"], ) - ): - return block_id + ) + ): + return block_id # Handling of standard source block return source_block["id"] diff --git a/app/validators/questions/mutually_exclusive_validator.py b/app/validators/questions/mutually_exclusive_validator.py index 1c6da420..0ca8e8c3 100644 --- a/app/validators/questions/mutually_exclusive_validator.py +++ b/app/validators/questions/mutually_exclusive_validator.py @@ -3,7 +3,6 @@ class MutuallyExclusiveQuestionValidator(QuestionValidator): - question: dict = {} MUTUALLY_EXCLUSIVE_CONTAINS_MANDATORY = "MutuallyExclusive question type cannot contain mandatory answers." INVALID_EXCLUSIVE_ANSWER = "Mutually exclusive answer is not of type Checkbox or Radio." NON_EXCLUSIVE_RADIO_ANSWER = "Mutually exclusive questions cannot contain non exclusive Radio answers." diff --git a/app/validators/questions/question_validator.py b/app/validators/questions/question_validator.py index 1664f9e6..d6091075 100644 --- a/app/validators/questions/question_validator.py +++ b/app/validators/questions/question_validator.py @@ -4,7 +4,6 @@ class QuestionValidator(Validator): ANSWER_LABEL_MISSING_MULTIPLE_ANSWERS = "Answer label must be provided for questions with multiple answers" - question: dict = {} def __init__(self, schema_element, schema=None): super().__init__(schema_element) diff --git a/app/validators/routing/types.py b/app/validators/routing/types.py index 3cef4d4c..41b45049 100644 --- a/app/validators/routing/types.py +++ b/app/validators/routing/types.py @@ -1,4 +1,4 @@ -from typing import Mapping +from collections.abc import Mapping from app.validators.questionnaire_schema import QuestionnaireSchema diff --git a/app/validators/rules/rule_validator.py b/app/validators/rules/rule_validator.py index 836f7c63..28e4f5b9 100644 --- a/app/validators/rules/rule_validator.py +++ b/app/validators/rules/rule_validator.py @@ -139,7 +139,7 @@ def _validate_map_operator(self, operator): The second argument is currently not validated here as it can currently only be `date-range` """ - function_to_map_over_arguments = list(operator["map"][0].values())[0] + function_to_map_over_arguments = next(iter(operator["map"][0].values())) if SELF_REFERENCE_KEY in function_to_map_over_arguments: return @@ -242,7 +242,7 @@ def _validate_operator_arguments(self, rule): operator_name = next(iter(rule)) argument_types = self._get_argument_types_for_operator(rule[operator_name]) - if operator_name in COMPARISON_OPERATORS + ARRAY_OPERATORS + NUMERIC_OPERATORS: + if operator_name in [*COMPARISON_OPERATORS, *ARRAY_OPERATORS, *NUMERIC_OPERATORS]: self._validate_comparison_operator_argument_types( rule, operator_name, @@ -250,7 +250,7 @@ def _validate_operator_arguments(self, rule): ) if ( - operator_name in COMPARISON_OPERATORS + [Operator.ALL_IN, Operator.ANY_IN] + operator_name in [*COMPARISON_OPERATORS, Operator.ALL_IN, Operator.ANY_IN] and TYPE_NULL not in argument_types ): self._validate_argument_types_match(rule, argument_types) @@ -312,8 +312,9 @@ def _validate_comparison_operator_argument_types( @staticmethod def _get_valid_types_for_operator(operator_name, argument_position): + result = None if operator_name in [Operator.EQUAL, Operator.NOT_EQUAL]: - return [ + result = [ TYPE_DATE, TYPE_NUMBER, TYPE_STRING, @@ -321,22 +322,19 @@ def _get_valid_types_for_operator(operator_name, argument_position): TYPE_ARRAY, TYPE_BOOLEAN, ] - - if operator_name in [ + elif operator_name in [ Operator.LESS_THAN, Operator.LESS_THAN_OR_EQUAL, Operator.GREATER_THAN, Operator.GREATER_THAN_OR_EQUAL, ]: - return [TYPE_DATE, TYPE_NUMBER] - - if operator_name in [Operator.ANY_IN, Operator.ALL_IN]: - return [TYPE_ARRAY] - - if operator_name == Operator.IN: - return [TYPE_NUMBER, TYPE_STRING] if argument_position == 0 else [TYPE_ARRAY] - if operator_name == Operator.COUNT: - return [TYPE_ARRAY] - - if operator_name == Operator.SUM: - return [TYPE_NUMBER] + result = [TYPE_DATE, TYPE_NUMBER] + elif operator_name in [Operator.ANY_IN, Operator.ALL_IN]: + result = [TYPE_ARRAY] + elif operator_name == Operator.IN: + result = [TYPE_NUMBER, TYPE_STRING] if argument_position == 0 else [TYPE_ARRAY] + elif operator_name == Operator.COUNT: + result = [TYPE_ARRAY] + elif operator_name == Operator.SUM: + result = [TYPE_NUMBER] + return result diff --git a/app/validators/sections/section_validator.py b/app/validators/sections/section_validator.py index cd934628..d9b6dbd2 100644 --- a/app/validators/sections/section_validator.py +++ b/app/validators/sections/section_validator.py @@ -282,6 +282,7 @@ def has_list_summary_with_non_item_answers(self): if summary := self.schema_element.get("summary"): show_non_item_answers = summary.get("show_non_item_answers") return summary.get("items") and show_non_item_answers + return None def has_multiple_list_collectors(self): list_collectors = [] diff --git a/app/validators/validator.py b/app/validators/validator.py index 892ada13..9db3a899 100644 --- a/app/validators/validator.py +++ b/app/validators/validator.py @@ -1,8 +1,7 @@ -from abc import ABC -from typing import Mapping +from collections.abc import Mapping -class Validator(ABC): +class Validator: def __init__(self, schema_element: Mapping | None = None): self.errors: list[dict] = [] self.context: dict[str, str] = {} diff --git a/app/validators/value_source_validator.py b/app/validators/value_source_validator.py index 706103df..17f7e369 100644 --- a/app/validators/value_source_validator.py +++ b/app/validators/value_source_validator.py @@ -1,4 +1,5 @@ from functools import cached_property +from typing import ClassVar from app.validators.validator import Validator @@ -35,10 +36,10 @@ class ValueSourceValidator(Validator): "in the progress source cannot be a repeating section" ) - COMPOSITE_ANSWERS_TO_SELECTORS_MAP = { + COMPOSITE_ANSWERS_TO_SELECTORS_MAP: ClassVar[dict[str, list[str]]] = { "Address": ["line1", "line2", "town", "postcode"], } - RESPONSE_METADATA_IDENTIFIERS = ["started_at"] + RESPONSE_METADATA_IDENTIFIERS: ClassVar[list[str]] = ["started_at"] def __init__( # pylint: disable=too-many-positional-arguments self, @@ -71,10 +72,10 @@ def __init__( # pylint: disable=too-many-positional-arguments } def _get_valid_progress_value_source_block_identifiers(self): - return self.past_block_ids - set([self.current_block_id]) - self.block_ids_in_past_repeating_sections + return self.past_block_ids - {self.current_block_id} - self.block_ids_in_past_repeating_sections def _get_valid_progress_value_source_section_identifiers(self): - return self.past_section_ids - set([self.current_section_id]) - self.past_repeating_section_ids + return self.past_section_ids - {self.current_section_id} - self.past_repeating_section_ids @cached_property def block_ids_in_past_repeating_sections(self) -> set[str]: @@ -103,9 +104,7 @@ def current_block_id(self) -> str | None: @cached_property def future_block_ids(self) -> set[str]: return ( - set(self.questionnaire_schema.block_ids_without_sub_blocks) - - self.past_block_ids - - set([self.current_block_id]) + set(self.questionnaire_schema.block_ids_without_sub_blocks) - self.past_block_ids - {self.current_block_id} ) @cached_property @@ -131,7 +130,7 @@ def past_block_ids(self) -> set[str]: @cached_property def future_section_ids(self) -> set[str]: - return set(self.questionnaire_schema.section_ids) - self.past_section_ids - set([self.current_section_id]) + return set(self.questionnaire_schema.section_ids) - self.past_section_ids - {self.current_section_id} @cached_property def current_section_id(self) -> str | None: @@ -208,16 +207,14 @@ def _validate_source_identifier_progress_source( if identifier not in valid_identifiers: error_mapping = { "block": { - tuple([self.current_block_id]): self.SOURCE_REFERENCE_CURRENT_BLOCK, + (self.current_block_id,): self.SOURCE_REFERENCE_CURRENT_BLOCK, tuple(self.future_block_ids): self.SOURCE_REFERENCE_FUTURE_BLOCK, tuple( self.block_ids_in_past_repeating_sections, ): self.SOURCE_REFERENCE_BLOCK_IN_REPEATING_SECTION, }, "section": { - tuple( - [self.current_section_id], - ): self.SOURCE_REFERENCE_CURRENT_SECTION, + (self.current_section_id,): self.SOURCE_REFERENCE_CURRENT_SECTION, tuple( self.future_section_ids, ): self.SOURCE_REFERENCE_FUTURE_SECTION, diff --git a/pyproject.toml b/pyproject.toml index f07be499..b06246a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,42 +97,7 @@ ignore = [ "TD002", # Allow TODO comments without a linked issue "TD003", - # Hide 'B019 @lru_cache warning' - to be reviewed - "B019", - # Hide 'FA102 Missing `from __future__ import annotations`, but uses PEP 604 union' - to be reviewed - "FA102", - # Hide 'RET503 Missing explicit `return` at the end of function able to return non-`None` value' - to be reviewed - "RET503", - # Hide 'RET504 Unnecessary assignment to `ids` before `return` statement' - to be reviewed - "RET504", - # Hide 'RUF012 Mutable class attributes should be annotated with `typing.ClassVar`' - to be reviewed - "RUF012", - # Hide 'UP035 Import from `collections.abc` instead: `Mapping`' - to be reviewed - "UP035", - # Hide 'B024 `Validator` is an abstract base class, but it has no abstract methods or properties' - to be reviewed - "B024", - # Hide 'TRY300 Consider moving this statement to an `else` block' - to be reviewed - "TRY300", - # Hide 'RUF005 Consider iterable unpacking instead of concatenation' - to be reviewed - "RUF005", - # Hide 'RUF015 Prefer `next(iter(operator["map"][0].values()))` over single element slice' - to be reviewed - "RUF015", - # Hide 'PERF401 Use a list comprehension to create a transformed list' - to be reviewed - "PERF401", - # Hide 'DTZ007 Naive datetime constructed using `datetime.datetime.strptime()` without %z' - to be reviewed - "DTZ007", - # Hide 'DTZ003 `datetime.datetime.utcnow()` used' - to be reviewed - "DTZ003", - # Hide 'SIM102 Use a single `if` statement instead of nested `if` statements' - to be reviewed - "SIM102", - # Hide 'S310 Audit URL open for permitted schemes. Allowing use of `file:` or custom schemes is often unexpected.' - to be reviewed - "S310", - # Hide 'B008 Do not perform function call `Body` in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable' - to be reviewed - "B008", - # Hide 'C409 Unnecessary list literal passed to `tuple()` (rewrite as a tuple literal)' - to be revisited - "C409", - # Hide 'C405 Unnecessary list literal (rewrite as a set literal)' - to be revisited - "C405", + # Hide 'D100 Missing docstring in public module' - to be revisited "D100", # Hide 'D101 Missing docstring in public class' - to be revisited diff --git a/tests/helpers/schema_validator.py b/tests/helpers/schema_validator.py index 2fc47ea5..4c3e8709 100644 --- a/tests/helpers/schema_validator.py +++ b/tests/helpers/schema_validator.py @@ -1,7 +1,8 @@ import itertools +from collections.abc import Callable, Mapping from json import load from pathlib import Path -from typing import Any, Callable, Mapping +from typing import Any from jsonschema import Draft202012Validator as DraftValidator from jsonschema import ValidationError @@ -85,7 +86,6 @@ def validate(self): """ try: self.schema_validator.validate(self.schema_element) - return [] except ValidationError as e: match = best_match([e]) path = "/".join(str(path_element) for path_element in e.path) @@ -97,6 +97,8 @@ def validate(self): self.add_error(match.message, verbose=error, pointer=f"/{path}") except SchemaError as e: self.add_error(e) + else: + return [] return self.errors