From 0c82dbff84c8dba2c438ddf8e2789d83f6df2304 Mon Sep 17 00:00:00 2001 From: Emre Erkunt Date: Sat, 28 Dec 2019 16:52:15 +0000 Subject: [PATCH] Revamped whole Security Groups functionality. This also fixes the problem addressed in #181. --- .devbots/lock-issue.yml | 6 + terraform_compliance/common/helper.py | 103 +------ .../extensions/security_groups.py | 215 ++++++++++++-- terraform_compliance/steps/steps.py | 68 ++--- .../.expected | 0 .../.failure | 1 + .../main.tf | 24 +- .../plan.out.json | 2 +- .../common/test_helper.py | 62 ---- .../extensions/test_security_groups.py | 265 ++++++++++++++++++ .../steps/test_main_steps.py | 12 - 11 files changed, 517 insertions(+), 241 deletions(-) create mode 100644 .devbots/lock-issue.yml create mode 100644 tests/functional/test_issue-181_sg_with_comma_separated/.expected create mode 100644 tests/functional/test_issue-181_sg_with_comma_separated/.failure diff --git a/.devbots/lock-issue.yml b/.devbots/lock-issue.yml new file mode 100644 index 00000000..53c6df03 --- /dev/null +++ b/.devbots/lock-issue.yml @@ -0,0 +1,6 @@ +enabled: true +comment: > + This issue's conversation is now + locked. If you want to continue + this discussion please open a + [new issue](https://github.com/eerkunt/terraform-compliance/issues/new/choose). diff --git a/terraform_compliance/common/helper.py b/terraform_compliance/common/helper.py index 956c7e52..387b6aed 100644 --- a/terraform_compliance/common/helper.py +++ b/terraform_compliance/common/helper.py @@ -28,8 +28,16 @@ def flatten(items): yield x -def check_if_cidr( value ): - regex = r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\/(3[0-2]|2[0-9]|1[0-9]|[0-9])' +def check_if_cidr(value): + regex = r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \ + r'\.' \ + r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \ + r'\.' \ + r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \ + r'\.' \ + r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \ + r'\/' \ + r'(3[0-2]|2[0-9]|1[0-9]|[0-9])' matches = re.match(regex, value) if matches is not None: @@ -55,97 +63,6 @@ def are_networks_same(first_network, network_list): return False -# A helper function that compares port related data with given dictionary -def check_sg_rules(plan_data, security_group, condition): - return validate_sg_rule(plan_data=assign_sg_params(plan_data), - params=security_group, - condition=condition) - - -def assign_sg_params(rule): - from_port = int(rule.get('from_port', 0)) - to_port = int(rule.get('to_port', 0)) - - protocol = [proto for proto in [rule.get('protocol', '-1')]] - - if protocol[0] == '-1' or type(protocol[0]) is int: - protocol = ['tcp', 'udp'] - - protocol[0] = protocol[0].lower() - - cidr_blocks = rule.get('cidr_blocks', []) - - if type(cidr_blocks) is not list: - cidr_blocks = [cidr_blocks] - - if to_port == 0 and from_port == 0: - to_port = 65535 - - if from_port > to_port: - raise Failure('Invalid configuration from_port can not be bigger than to_port. ' - '{} > {} {} in {}'.format(from_port, - to_port, - protocol, - cidr_blocks)) - - return dict(protocol=protocol, from_port=from_port, to_port=to_port, cidr_blocks=cidr_blocks) - - -def validate_sg_rule(plan_data, params, condition): - from_port = int(params['from_port']) - to_port = int(params['to_port']) - - assert from_port <= to_port, 'Port range is defined incorrectly within the Scenario. ' \ - 'Define it {}-{} instead of {}-{}.'.format(from_port, - to_port, - to_port, - from_port) - - defined_range = set(range(plan_data['from_port'], plan_data['to_port']+1)) - defined_network_list = plan_data['cidr_blocks'] - given_network = params.get('cidr', None) - - # Condition: must only have - # Fail only if ; - # * the ports does not match and defined network is a subset of given network. - if condition: - given_range = set([int(port) for port in params['ports']]) - from_to_port = ','.join(params['ports']) - - # Set to True if ports are exactly same. - port_intersection = given_range == defined_range - - # Set to True if one of the given networks is a subset. - network_check = is_ip_in_cidr(given_network, defined_network_list) - - if not port_intersection and network_check: - raise Failure('{}/{} ports are defined for {} network. ' - 'Must be limited to {}/{} and {}'.format('/'.join(plan_data['protocol']), - '{}-{}'.format(plan_data['from_port'], - plan_data['to_port']), - plan_data['cidr_blocks'], - '/'.join(plan_data['protocol']), - from_to_port, - given_network)) - - # Condition: must not have - # Fail only if ; - # * the ports match and networks match - else: - given_range = set(range(from_port, to_port+1)) - port_intersection = given_range & defined_range - from_to_port = str(from_port) + '-' + str(to_port) - network_intersection = is_ip_in_cidr(given_network, defined_network_list) - - if port_intersection and network_intersection: - raise Failure('{}/{} ports are defined for {} network.'.format('/'.join(plan_data['protocol']), - '{}-{}'.format(plan_data['from_port'], - plan_data['to_port']), - plan_data['cidr_blocks'])) - - return True - - def convert_resource_type(resource_type): ''' Searchs given resource_type within resource_name array and returns the value if it is found diff --git a/terraform_compliance/extensions/security_groups.py b/terraform_compliance/extensions/security_groups.py index 018e7cdf..d0ab03d9 100644 --- a/terraform_compliance/extensions/security_groups.py +++ b/terraform_compliance/extensions/security_groups.py @@ -7,7 +7,7 @@ TerraformComplianceInvalidData, TerraformComplianceInternalFailure ) - +from copy import deepcopy class SecurityGroupRule(object): @@ -20,20 +20,16 @@ def __init__(self, **kwargs): self.address = kwargs.get('values', {}).get('address') self.description = kwargs.get('description', kwargs.get('values', {}). get('description')) self.port_is_single = False - self.port_is_list = False + self._normalise_protocols() self._normalise_ports() self._check_for_invalidations() def _normalise_ports(self): - # TODO: Implement other protocols, not just TCP and UDP. - if self.protocol[0] == '-1' or type(self.protocol[0]) is int: - self.protocol = ['tcp', 'udp'] - else: - self.protocol[0] = self.protocol[0].lower() - + # Sometimes terraform reports ports like this if self.from_port == 0 and self.to_port == 0: self.to_port = 65535 + self.port_is_single = False if not self.from_port and not self.to_port: @@ -51,9 +47,20 @@ def _normalise_ports(self): self.from_port = self.from_port if self.from_port > 0 else 1 self.to_port = self.to_port if self.to_port > 0 else 1 + self.ports = self._get_port_range(self.from_port, self.to_port) + + self.port_is_single = True if self.from_port == self.to_port else False + + def _normalise_protocols(self): + # TODO: Implement other protocols, not just TCP and UDP. + # There is always one protocol in the list as an input. + if self.protocol[0] == '-1' or type(self.protocol[0]) is int: + self.protocol = ['tcp', 'udp'] + else: + self.protocol[0] = self.protocol[0].lower() def _check_for_invalidations(self): - if self.from_port > self.to_port: + if (self.from_port and self.to_port) and self.from_port > self.to_port: raise TerraformComplianceInvalidData('Invalid configuration from_port can not be bigger than to_port. ' '{} > {} {} in {}'.format(self.from_port, self.to_port, @@ -63,65 +70,223 @@ def _check_for_invalidations(self): if self.cidr_blocks is None: raise TerraformComplianceInvalidData('A cidr range must be given.') - for cidr in self.cidr_blocks: - if check_if_cidr(cidr) is False: - raise TerraformComplianceInvalidData('{} is not a valid CIDR.'.format(cidr)) + if type(self.cidr_blocks) is list: + for cidr in self.cidr_blocks: + if check_if_cidr(cidr) is False: + raise TerraformComplianceInvalidData('{} is not a valid CIDR.'.format(cidr)) + elif type(self.cidr_blocks) is str: + if check_if_cidr(self.cidr_blocks) is False: + raise TerraformComplianceInvalidData('{} is not a valid CIDR.'.format(self.cidr_blocks)) + else: + raise TerraformComplianceInvalidData('Invalid CIDR Type {} : {}'.format(type(self.cidr_blocks), + self.cidr_blocks)) if not self.protocol: raise TerraformComplianceInvalidData('A protocol must be given.') def _expand_ports(self, port): if '-' in port: - self.from_port, self.to_port = port.split('-') + self.from_port, self.to_port = [int(port_number) for port_number in port.split('-')] self.from_port = self.from_port if self.from_port > 0 else 1 self.to_port = self.to_port if self.to_port > 0 else 1 - self.ports = set(range(self.from_port, self.to_port)) - self.port_is_list = True + self.ports = self._get_port_range(self.from_port, self.to_port) + self.port_is_single = False elif ',' in port: self.from_port = None self.to_port = None - self.port_is_list = True + self.port_is_single = False self.ports = set([int(port_number) for port_number in port.split(',') if int(port_number) > 0]) - elif len(port) == 1: - self.from_port = self.to_port = int(port[0]) - self.ports = set(self.from_port) + elif port.isnumeric(): + self.from_port = self.to_port = int(port) + self.ports = self._get_port_range(self.from_port, self.to_port) self.port_is_single = True else: raise TerraformComplianceInternalFailure('Unexpected situation. ' - 'Please report this port structure: {}'.format(port)) + 'Please report this port structure: {}'.format(vars(self))) + + @staticmethod + def _get_port_range(from_port, to_port): + return set(range(from_port, to_port+1)) class SecurityGroup(object): - def __init__(self, given_reqs, security_groups_in_plan): + def __init__(self, given_reqs, security_groups_in_plan, address='test_sg'): self.given = given_reqs - self.sgs = security_groups_in_plan + self.sgs = self._clean(security_groups_in_plan) + self.address = address + self.exact_match = False self.negative_match = True + self.include_match = False + self.singular_check = True + self.multiple_check = False self.given_rule = SecurityGroupRule(**given_reqs) self.plan_rules = [SecurityGroupRule(**rule_data) for rule_data in security_groups_in_plan] - def enable_exact_match(self): + self.found_ports = set() + + def must_only_have(self): self.exact_match = True self.negative_match = False + self.include_match = False + self.singular_check = False - def enable_negative_match(self): + # This is the default for validate() method + def must_not_have(self): self.exact_match = False self.negative_match = True + self.include_match = False + self.singular_check = True + + def must_have(self): + self.exact_match = False + self.negative_match = False + self.include_match = True + self.singular_check = False def validate(self): + # + # Make it flexible enough to support other scenarios + # Currently we have ; + # + # 1. "must only" ports, which means all ports defined in ALL security group rules must have exactly the same + # ports. + # 2. "must not" ports, which means the ports defined in ANY security group, should not have this port with cidr + # , etc. + # 3. "must have" ports, which means, the ports defined in ALL security groups should include these ports. < This + # doesn't exist yet. + # + + ports_found = set() for rule in self.plan_rules: # General Controls that is applicable for every scenario network_check = self._validate_network(rule.cidr_blocks) ports_check = self._validate_ports(rule) + protocol_check = self._validate_protocol(rule) + + # Apply enforcement on every rule. Only applicable for 'must not have' + if self.singular_check: + self._run_singular_validation(rule, network_check, ports_check, protocol_check) + + # Apply enforcement after the for loop, gather information first. + elif not self.singular_check and network_check and protocol_check: + # Gather all ports defined within the Security Group rule that matches CIDR. + ports_found = ports_found | rule.ports + + # Finalise singular checks as returning True if nothing happens, + # since it must have failed already. + if self.singular_check: + return True + + # Process non singular checks + else: + # So if the scenario is 'must only have' + # Then sets must be exactly equal to each other. + # If not, first get GIVEN->FOUND, FOUND->GIVEN and no disjoint differences + # then return an error output + errors = [] + if self.exact_match: + if ports_found == self.given_rule.ports: + return True + + # Create errors + given_found_diff = self.given_rule.ports - ports_found + found_given_diff = ports_found - self.given_rule.ports + nodisjoint = self.given_rule.ports.isdisjoint(ports_found) + + if given_found_diff: + e = self._prepare_output(given_found_diff) + errors.append('{}/{} port{} not defined within {} network{} in {}.'.format(e['protocol'], + e['ports'], + e['ports_plural'], + e['cidr_blocks'], + e['cidr_blocks_plural'], + e['address'])) + + if found_given_diff: + e = self._prepare_output(found_given_diff) + errors.append('{}/{} port{} defined within {} network{} in {}.'.format(e['protocol'], + e['ports'], + e['ports_plural'], + e['cidr_blocks'], + e['cidr_blocks_plural'], + e['address'])) + + if nodisjoint: + e = self._prepare_output(set()) + errors.append('None of the ports given defined within {} network{} in ' + '{}.'.format(e['cidr_blocks'], + e['cidr_blocks_plural'], + e['address'])) + + # If the scenario is 'must have', then given ports must be a subset (or equal) of ports_found + # If not, found which ports are missing via GIVEN->FOUND diff + else: + if self.given_rule.ports <= ports_found: + return True + + given_found_diff = self.given_rule.ports - ports_found + + if given_found_diff: + e = self._prepare_output(given_found_diff) + errors.append('{}/{} port{} not defined within {} network{} in {}.'.format(e['protocol'], + e['ports'], + e['ports_plural'], + e['cidr_blocks'], + e['cidr_blocks_plural'], + e['address'])) + if errors: + raise Failure('{}'.format("\n".join(errors))) + + def _run_singular_validation(self, rule, network_check, ports_check, protocol_check): + if self.negative_match and network_check and ports_check and protocol_check: + e = self._prepare_output(rule, ports_check, protocol_check) + raise Failure('{}/{} port{} defined within {} network{} in {}.'.format(e['protocol'], + e['ports'], + e['ports_plural'], + e['cidr_blocks'], + e['cidr_blocks_plural'], + e['address'])) def _validate_network(self, cidr_in_plan): return is_ip_in_cidr(self.given_rule.cidr_blocks, cidr_in_plan) def _validate_ports(self, rule_data): - if self.port_is_single: + return self.given_rule.ports & rule_data.ports + + def _validate_protocol(self, rule_data): + return set(self.given_rule.protocol) & set(rule_data.protocol) + + def _prepare_output(self, rule, ports_check=None, protocol_check=None): + output = dict() + + if self.singular_check: + output['protocol'] = str(list(protocol_check)[0]) if len(protocol_check) == 1 else '({})'.format(','.join(str(s) for s in protocol_check)) + output['cidr_blocks'] = rule.cidr_blocks[0] if len(rule.cidr_blocks) == 1 else ', '.join(rule.cidr_blocks) + output['cidr_blocks_plural'] = '' if len(rule.cidr_blocks) == 1 else 's' + output['address'] = rule.address + output['ports'] = str(list(ports_check)[0]) if len(ports_check) == 1 else '({})'.format(','.join(str(s) for s in ports_check)) + # TODO: If the ports output is too large, make it smaller by showing a range only. + output['ports_plural'] = ' is' if len(ports_check) == 1 else 's are' + else: + output['protocol'] = str(list(self.given_rule.protocol)[0]) if len(self.given_rule.protocol) == 1 else '({})'.format(','.join(str(s) for s in self.given_rule.protocol)) + output['cidr_blocks'] = self.given_rule.cidr_blocks + output['cidr_blocks_plural'] = '' + output['address'] = self.address + output['ports'] = str(list(rule)[0]) if len(rule) == 1 else '({})'.format(','.join(str(s) for s in rule)) + # TODO: If the ports output is too large, make it smaller by showing a range only. + output['ports_plural'] = ' is' if len(rule) == 1 else 's are' + + return output + + def _clean(self, sg_array): + for sg in sg_array: + if 'self' in sg: + sg.pop('self') + return sg diff --git a/terraform_compliance/steps/steps.py b/terraform_compliance/steps/steps.py index 91252ae1..e13869d6 100644 --- a/terraform_compliance/steps/steps.py +++ b/terraform_compliance/steps/steps.py @@ -2,10 +2,11 @@ from radish import world, given, when, then, step from terraform_compliance.steps import property_match_list -from terraform_compliance.common.helper import check_sg_rules, convert_resource_type, find_root_by_key, seek_key_in_dict +from terraform_compliance.common.helper import convert_resource_type, find_root_by_key, seek_key_in_dict from terraform_compliance.common.helper import seek_regex_key_in_dict_values, jsonify, Null, EmptyStash from terraform_compliance.common.helper import get_resource_name_from_stash, get_resource_address_list_from_stash from terraform_compliance.common.helper import remove_mounted_resources +from terraform_compliance.extensions.security_groups import SecurityGroup from terraform_compliance.extensions.ext_radish_bdd import skip_step from terraform_compliance.extensions.ext_radish_bdd import custom_type_any from terraform_compliance.extensions.ext_radish_bdd import custom_type_condition @@ -339,54 +340,27 @@ def property_is_enabled(_step_obj, something): property_value)) return True -@then(u'it must {condition:ANY} have {proto:ANY} protocol and port {port} for {cidr:ANY}') +@then(u'it {condition:ANY} have {proto:ANY} protocol and port {port} for {cidr:ANY}') def it_condition_have_proto_protocol_and_port_port_for_cidr(_step_obj, condition, proto, port, cidr): - proto = str(proto) - cidr = str(cidr) - - # Set to True only if the condition is 'only' - condition = condition == 'only' - - # In case we have a range - if '-' in port: - if condition: - raise Failure('"must only" scenario cases must be used either with individual port ' - 'or multiple ports separated with comma.') - - from_port, to_port = port.split('-') - ports = [from_port, to_port] - - # In case we have comma delimited ports - elif ',' in port: - ports = [port for port in port.split(',')] - from_port = min(ports) - to_port = max(ports) - - else: - from_port = to_port = int(port) - ports = list(set([str(from_port), str(to_port)])) - - from_port = int(from_port) if int(from_port) > 0 else 1 - to_port = int(to_port) if int(to_port) > 0 else 1 - ports[0] = ports[0] if int(ports[0]) > 0 else '1' - - looking_for = dict(proto=proto, - from_port=int(from_port), - to_port=int(to_port), - ports=ports, - cidr=cidr) - - for security_group in _step_obj.context.stash: - if type(security_group['values']) is list: - for sg in security_group['values']: - check_sg_rules(plan_data=sg, security_group=looking_for, condition=condition) - - elif type(security_group['values']) is dict: - check_sg_rules(plan_data=security_group['values'], security_group=looking_for, condition=condition) + searching_for=dict(port=port, protocol=proto, cidr_blocks=cidr) + + for sg in _step_obj.context.stash: + if sg['type'] != 'aws_security_group': + raise TerraformComplianceInternalFailure('This method can only be used for aws_security_group resources ' + 'for now. You tried to used it on {}'.format(sg['type'])) + + sg_obj = SecurityGroup(searching_for, sg['values'], address=sg['address']) + if condition == 'must only': + sg_obj.must_only_have() + elif condition == 'must': + sg_obj.must_have() + elif condition == 'must not': + sg_obj.must_not_have() else: - raise TerraformComplianceInternalFailure('Unexpected Security Group, ' - 'must be either list or a dict: ' - '{}'.format(security_group['values'])) + raise TerraformComplianceInternalFailure('You can only use "must have", "must not have" and "must only have"' + 'conditions on this step for now.' + 'You tried to use "{}"'.format(condition)) + sg_obj.validate() return True @when(u'I {action_type:ANY} it') diff --git a/tests/functional/test_issue-181_sg_with_comma_separated/.expected b/tests/functional/test_issue-181_sg_with_comma_separated/.expected new file mode 100644 index 00000000..e69de29b diff --git a/tests/functional/test_issue-181_sg_with_comma_separated/.failure b/tests/functional/test_issue-181_sg_with_comma_separated/.failure new file mode 100644 index 00000000..584bca6c --- /dev/null +++ b/tests/functional/test_issue-181_sg_with_comma_separated/.failure @@ -0,0 +1 @@ +Failure: tcp/\(123,443\) ports are not defined within 0.0.0.0/0 network in aws_security_group.allow_tls_ingress_inline. \ No newline at end of file diff --git a/tests/functional/test_issue-181_sg_with_comma_separated/main.tf b/tests/functional/test_issue-181_sg_with_comma_separated/main.tf index 56bcdc3a..543b1f34 100644 --- a/tests/functional/test_issue-181_sg_with_comma_separated/main.tf +++ b/tests/functional/test_issue-181_sg_with_comma_separated/main.tf @@ -14,4 +14,26 @@ resource "aws_security_group" "allow_tls_ingress_inline" { protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] } -} \ No newline at end of file +} + +resource "aws_security_group" "something_else" { + name = "Some_other_sg" +} + +resource "aws_security_group_rule" "port_80" { + type = "ingress" + protocol = "tcp" + from_port = 80 + to_port = 80 + cidr_blocks = ["0.0.0.0/0"] + security_group_id = aws_security_group.something_else.id +} + +resource "aws_security_group_rule" "port_53" { + type = "ingress" + protocol = "tcp" + from_port = 53 + to_port = 53 + cidr_blocks = ["0.0.0.0/0"] + security_group_id = aws_security_group.something_else.id +} diff --git a/tests/functional/test_issue-181_sg_with_comma_separated/plan.out.json b/tests/functional/test_issue-181_sg_with_comma_separated/plan.out.json index 7a4ad505..ed7025a3 100644 --- a/tests/functional/test_issue-181_sg_with_comma_separated/plan.out.json +++ b/tests/functional/test_issue-181_sg_with_comma_separated/plan.out.json @@ -1 +1 @@ -{"format_version":"0.1","terraform_version":"0.12.12","planned_values":{"root_module":{"resources":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_name":"aws","schema_version":1,"values":{"description":"Managed by Terraform","ingress":[{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":53,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":53},{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":80,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":80}],"name":"allow_tls_ingress_inline","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null}}]}},"resource_changes":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"description":"Managed by Terraform","ingress":[{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":53,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":53},{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":80,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":80}],"name":"allow_tls_ingress_inline","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null},"after_unknown":{"arn":true,"egress":true,"id":true,"ingress":[{"cidr_blocks":[false],"ipv6_cidr_blocks":[],"prefix_list_ids":[],"security_groups":[]},{"cidr_blocks":[false],"ipv6_cidr_blocks":[],"prefix_list_ids":[],"security_groups":[]}],"owner_id":true,"vpc_id":true}}}],"configuration":{"root_module":{"resources":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_config_key":"aws","expressions":{"name":{"constant_value":"allow_tls_ingress_inline"}},"schema_version":1}]}}} +{"format_version":"0.1","terraform_version":"0.12.18","planned_values":{"root_module":{"resources":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_name":"aws","schema_version":1,"values":{"description":"Managed by Terraform","ingress":[{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":53,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":53},{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":80,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":80}],"name":"allow_tls_ingress_inline","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null}},{"address":"aws_security_group.something_else","mode":"managed","type":"aws_security_group","name":"something_else","provider_name":"aws","schema_version":1,"values":{"description":"Managed by Terraform","name":"Some_other_sg","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null}},{"address":"aws_security_group_rule.port_53","mode":"managed","type":"aws_security_group_rule","name":"port_53","provider_name":"aws","schema_version":2,"values":{"cidr_blocks":["0.0.0.0/0"],"description":null,"from_port":53,"ipv6_cidr_blocks":null,"prefix_list_ids":null,"protocol":"tcp","self":false,"to_port":53,"type":"ingress"}},{"address":"aws_security_group_rule.port_80","mode":"managed","type":"aws_security_group_rule","name":"port_80","provider_name":"aws","schema_version":2,"values":{"cidr_blocks":["0.0.0.0/0"],"description":null,"from_port":80,"ipv6_cidr_blocks":null,"prefix_list_ids":null,"protocol":"tcp","self":false,"to_port":80,"type":"ingress"}}]}},"resource_changes":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"description":"Managed by Terraform","ingress":[{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":53,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":53},{"cidr_blocks":["0.0.0.0/0"],"description":"","from_port":80,"ipv6_cidr_blocks":[],"prefix_list_ids":[],"protocol":"tcp","security_groups":[],"self":false,"to_port":80}],"name":"allow_tls_ingress_inline","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null},"after_unknown":{"arn":true,"egress":true,"id":true,"ingress":[{"cidr_blocks":[false],"ipv6_cidr_blocks":[],"prefix_list_ids":[],"security_groups":[]},{"cidr_blocks":[false],"ipv6_cidr_blocks":[],"prefix_list_ids":[],"security_groups":[]}],"owner_id":true,"vpc_id":true}}},{"address":"aws_security_group.something_else","mode":"managed","type":"aws_security_group","name":"something_else","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"description":"Managed by Terraform","name":"Some_other_sg","name_prefix":null,"revoke_rules_on_delete":false,"tags":null,"timeouts":null},"after_unknown":{"arn":true,"egress":true,"id":true,"ingress":true,"owner_id":true,"vpc_id":true}}},{"address":"aws_security_group_rule.port_53","mode":"managed","type":"aws_security_group_rule","name":"port_53","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"cidr_blocks":["0.0.0.0/0"],"description":null,"from_port":53,"ipv6_cidr_blocks":null,"prefix_list_ids":null,"protocol":"tcp","self":false,"to_port":53,"type":"ingress"},"after_unknown":{"cidr_blocks":[false],"id":true,"security_group_id":true,"source_security_group_id":true}}},{"address":"aws_security_group_rule.port_80","mode":"managed","type":"aws_security_group_rule","name":"port_80","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"cidr_blocks":["0.0.0.0/0"],"description":null,"from_port":80,"ipv6_cidr_blocks":null,"prefix_list_ids":null,"protocol":"tcp","self":false,"to_port":80,"type":"ingress"},"after_unknown":{"cidr_blocks":[false],"id":true,"security_group_id":true,"source_security_group_id":true}}}],"configuration":{"root_module":{"resources":[{"address":"aws_security_group.allow_tls_ingress_inline","mode":"managed","type":"aws_security_group","name":"allow_tls_ingress_inline","provider_config_key":"aws","expressions":{"name":{"constant_value":"allow_tls_ingress_inline"}},"schema_version":1},{"address":"aws_security_group.something_else","mode":"managed","type":"aws_security_group","name":"something_else","provider_config_key":"aws","expressions":{"name":{"constant_value":"Some_other_sg"}},"schema_version":1},{"address":"aws_security_group_rule.port_53","mode":"managed","type":"aws_security_group_rule","name":"port_53","provider_config_key":"aws","expressions":{"cidr_blocks":{"constant_value":["0.0.0.0/0"]},"from_port":{"constant_value":53},"protocol":{"constant_value":"tcp"},"security_group_id":{"references":["aws_security_group.something_else"]},"to_port":{"constant_value":53},"type":{"constant_value":"ingress"}},"schema_version":2},{"address":"aws_security_group_rule.port_80","mode":"managed","type":"aws_security_group_rule","name":"port_80","provider_config_key":"aws","expressions":{"cidr_blocks":{"constant_value":["0.0.0.0/0"]},"from_port":{"constant_value":80},"protocol":{"constant_value":"tcp"},"security_group_id":{"references":["aws_security_group.something_else"]},"to_port":{"constant_value":80},"type":{"constant_value":"ingress"}},"schema_version":2}]}}} diff --git a/tests/terraform_compliance/common/test_helper.py b/tests/terraform_compliance/common/test_helper.py index f7fa6b0f..2c0e2c37 100644 --- a/tests/terraform_compliance/common/test_helper.py +++ b/tests/terraform_compliance/common/test_helper.py @@ -3,8 +3,6 @@ flatten_list, check_if_cidr, is_ip_in_cidr, - assign_sg_params, - validate_sg_rule, seek_key_in_dict, find_root_by_key, are_networks_same, @@ -62,66 +60,6 @@ def test_is_ip_in_cidr_failure(self): self.assertFalse(is_ip_in_cidr('10.200.0.0/24', ['10.0.0.0/16'])) self.assertFalse(is_ip_in_cidr('10.0.1.1/32', ['10.0.0.0/24'])) - def test_assign_sg_params_one_port_with_two_cidrs(self): - self.assertEqual(MockedData.sg_params_ssh_with_2_cidrs, assign_sg_params(MockedData.sg_ssh_with_2_cidrs)) - - def test_assign_sg_params_one_port_two_cidrs_any_proto(self): - self.assertEqual(MockedData.sg_params_ssh_with_2_cidrs_any_proto, assign_sg_params(MockedData.sg_ssh_with_2_cidrs_any_proto)) - - def test_assign_sg_params_all_ports_with_all_ips(self): - self.assertEqual(MockedData.sg_params_all_port_all_ip, assign_sg_params(MockedData.sg_all_port_all_ip)) - - def test_assign_sg_params_no_data_given_in_rules(self): - self.assertEqual(MockedData.sg_params_all_port_no_ip, assign_sg_params(MockedData.sg_all_port_no_ip)) - - def test_assign_sg_params_from_port_bigger_than_to_port(self): - with self.assertRaises(Failure) as context: - assign_sg_params(MockedData.sg_invalid) - - self.assertTrue('Invalid configuration from_port can not be bigger than to_port.' in context.exception) - - def test_validate_sg_rule_port_found_in_cidr(self): - with self.assertRaises(Failure) as context: - params = dict(from_port=22, to_port=22, cidr='0.0.0.0/0', ports='', proto='tcp') - validate_sg_rule(MockedData.sg_params_all_port_all_ip, params, False) - self.assertTrue('Found' in context.exception) - - def test_validate_sg_rule_invalid_port_range_within_scenario(self): - with self.assertRaises(AssertionError) as context: - params = dict(from_port=43, to_port=42, cidr=None, ports='', proto='tcp') - validate_sg_rule(None, params, False) - - self.assertTrue('Port range is defined incorrectly within the Scenario.' in context.exception) - - def test_validate_sg_rule_port_range_found_in_cidr_fail(self): - scenario_list = ['22-80', '21-22', '21-23', '70-72', '79-80', '79-81'] - for scenario in scenario_list: - with self.assertRaises(Failure) as context: - from_port, to_port = scenario.split('-') - params = dict(proto='tcp', from_port=from_port, to_port=to_port, cidr='0.0.0.0/0', ports='') - validate_sg_rule(MockedData.sg_params_list_range_public, params, False) - self.assertTrue('Found' in context.exception) - - def test_validate_sg_rule_port_range_found_in_cidr_success_due_to_cidr_mismatch(self): - scenario_list = ['22-80', '21-22', '21-23', '70-72', '79-80', '79-81'] - for scenario in scenario_list: - from_port, to_port = scenario.split('-') - params = dict(proto='tcp', from_port=from_port, to_port=to_port, ports='', cidr='0.0.0.0/0') - self.assertTrue(validate_sg_rule(MockedData.sg_params_list_range_private, params, False)) - - def test_validate_sg_rule_port_not_found_in_comma_delimited_scenario(self): - with self.assertRaises(Failure) as context: - ports = '22,443'.split(',') - params = dict(proto='tcp', from_port=0, to_port=0, ports=ports, cidr='0.0.0.0/0') - self.assertFalse(validate_sg_rule(MockedData.sg_params_list_range_public, params, True)) - - def test_validate_sg_rule_port_found_in_comma_delimited_scenario(self): - with self.assertRaises(Failure) as context: - ports = range(22,80) - ports = [str(i) for i in ports] - params = dict(proto='tcp', from_port=0, to_port=0, ports=ports, cidr='0.0.0.0/0') - self.assertFalse(validate_sg_rule(MockedData.sg_params_list_range_public, params, True)) - def test_seek_in_dict_finding_a_key_in_root(self): dictionary = dict(search_key=dict(something=[])) search_key = 'search_key' diff --git a/tests/terraform_compliance/extensions/test_security_groups.py b/tests/terraform_compliance/extensions/test_security_groups.py index e244cc20..9e1b8f28 100644 --- a/tests/terraform_compliance/extensions/test_security_groups.py +++ b/tests/terraform_compliance/extensions/test_security_groups.py @@ -36,5 +36,270 @@ def test_assign_sg_params_from_port_bigger_than_to_port(self): self.assertTrue('Invalid configuration from_port can not be bigger than to_port.' in context.exception) + # TODO: Implement failure tests here for consistency. + class TestSecurityGroup(TestCase): + + def setUp(self): + self.sg_given = dict( + port=80, + protocol='tcp', + cidr_blocks='0.0.0.0/0', + ) + self.sg_in_conf = [ + dict( + from_port=80, + to_port=80, + protocol='tcp', + cidr_blocks=['0.0.0.0/0'], + description='Test Security Group Description #1', + values=dict(address='test.security_group_rule1') + ), + dict( + from_port=81, + to_port=81, + protocol='tcp', + cidr_blocks=['10.0.0.0/8', '192.168.0.0/24'], + description='Test Security Group Description #2', + values=dict(address='test.security_group_rule2') + + ) + ] + + # Tests about `must not have` scenarios + # Checks are singular per security group rule, failures are imminent. + def test_must_not_have_port_tcp_80_with_ALL_cidr(self): + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/80 port is defined within 0.0.0.0/0 network in test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_80_with_multi_cidr(self): + self.sg_given['cidr_blocks'] = '192.168.1.0/24' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/80 port is defined within 192.168.0.0/16, 0.0.0.0/0 networks in test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_80_with_ALL_cidr_success(self): + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + def test_must_not_have_port_tcp_80_with_ALL_multi_success(self): + self.sg_given['cidr_blocks'] = '192.168.1.0/16' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/24', '10.0.0.0/8'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + def test_must_not_have_port_tcp_22_23_with_ALL_cidr(self): + self.sg_given['port'] = '22-23' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/(22,23) ports are defined within 0.0.0.0/0 network in test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_22_23_with_multi_cidr(self): + self.sg_given['port'] = '22-23' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_given['cidr_blocks'] = '192.168.1.0/24' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/(22,23) ports are defined within 192.168.0.0/16, 0.0.0.0/0 networks in ' + 'test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_22_23_with_ALL_cidr_success(self): + self.sg_given['port'] = '22-23' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + def test_must_not_have_port_tcp_22_23_with_ALL_multi_success(self): + self.sg_given['port'] = '22-23' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_given['cidr_blocks'] = '192.168.1.0/16' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/24', '10.0.0.0/8'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + def test_must_not_have_port_tcp_22_with_range_with_ALL_cidr(self): + self.sg_given['port'] = '21-22' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/22 port is defined within 0.0.0.0/0 network in test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_22_with_range_with_multi_cidr(self): + self.sg_given['port'] = '21-22' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_given['cidr_blocks'] = '192.168.1.0/24' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + with self.assertRaises(Failure) as context: + SecurityGroup(self.sg_given, self.sg_in_conf).validate() + + self.assertEqual('tcp/22 port is defined within 192.168.0.0/16, 0.0.0.0/0 networks in test.security_group_rule1.', + str(context.exception)) + + def test_must_not_have_port_tcp_22_with_range_with_ALL_cidr_success(self): + self.sg_given['port'] = '21-22' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + def test_must_not_have_port_tcp_22_with_range_with_ALL_multi_success(self): + self.sg_given['port'] = '21-22' + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_given['cidr_blocks'] = '192.168.1.0/16' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/24', '10.0.0.0/8'] + self.assertTrue(SecurityGroup(self.sg_given, self.sg_in_conf).validate()) + + # Tests about `must have` scenarios + def test_must_have_port_tcp_443_with_ALL_cidr(self): + self.sg_given['port'] = 443 + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + sg.validate() + + self.assertEqual('tcp/443 port is not defined within 0.0.0.0/0 network in test_sg.', + str(context.exception)) + + def test_must_have_port_tcp_443_with_multi_cidr(self): + self.sg_given['port'] = 443 + self.sg_given['cidr_blocks'] = '192.168.1.0/24' + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + sg.validate() + + self.assertEqual('tcp/443 port is not defined within 192.168.1.0/24 network in ' + 'test_sg.', + str(context.exception)) + + def test_must_have_port_tcp_80_with_multi_cidr_success(self): + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/24'] + self.sg_in_conf[1]['cidr_blocks'] = ['192.168.1.0/24'] + self.sg_in_conf[1]['from_port'] = 79 + self.sg_in_conf[1]['to_port'] = 81 + self.sg_given['cidr_blocks'] = '192.168.0.0/24' + + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + self.assertTrue(sg.validate()) + + def test_must_have_port_tcp_80_with_multi_cidr_success(self): + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/24'] + self.sg_in_conf[1]['cidr_blocks'] = ['192.168.0.0/16'] + self.sg_in_conf[1]['from_port'] = 79 + self.sg_in_conf[1]['to_port'] = 81 + self.sg_given['cidr_blocks'] = '192.168.0.1/32' + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + self.assertTrue(sg.validate()) + + def test_must_have_port_tcp_443_444_with_ALL_cidr(self): + self.sg_given['port'] = '443-444' + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + sg.validate() + + self.assertEqual('tcp/(443,444) ports are not defined within 0.0.0.0/0 network in test_sg.', + str(context.exception)) + + def test_must_have_port_tcp_80_81_with_ALL_cidr(self): + self.sg_given['port'] = '80-82' + self.sg_in_conf[1]['cidr_blocks'] = ['0.0.0.0/0'] + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_have() + sg.validate() + + self.assertEqual('tcp/82 port is not defined within 0.0.0.0/0 network in test_sg.', + str(context.exception)) + + # Tests about `must only have` scenarios + # We are just checking if tcp/80 is defined for 0.0.0.0/0 + def test_must_only_have_port_tcp_80_with_ALL_cidr_success(self): + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_only_have() + self.assertTrue(sg.validate()) + + def test_must_only_have_port_tcp_80_81_with_ALL_cidr_success(self): + self.sg_given['port'] = '80-81' + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_only_have() + sg.validate() + self.assertEqual('tcp/81 port is not defined within 0.0.0.0/0 network in test_sg.', + str(context.exception)) + + def test_must_only_have_port_some_ports_are_over_configured(self): + self.sg_in_conf[0]['from_port'] = 79 + self.sg_in_conf[0]['to_port'] = 81 + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + self.sg_in_conf[1]['from_port'] = 80 + self.sg_in_conf[1]['to_port'] = 80 + self.sg_in_conf[1]['cidr_blocks'] = ['0.0.0.0/0'] + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_only_have() + sg.validate() + self.assertEqual('tcp/(81,79) ports are defined within 0.0.0.0/0 network in test_sg.', + str(context.exception)) + + def test_must_only_have_port_not_match_multiple_errors_given(self): + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + self.sg_in_conf[1]['from_port'] = 443 + self.sg_in_conf[1]['to_port'] = 444 + self.sg_in_conf[1]['cidr_blocks'] = ['0.0.0.0/0'] + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_only_have() + sg.validate() + + self.assertTrue('tcp/80 port is not defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) + self.assertTrue('tcp/(443,444,22,23) ports are defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) + self.assertTrue('None of the ports given defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) + + def test_must_only_have_port_match_multiple_ports_not_ranges(self): + self.sg_in_conf[0]['from_port'] = 22 + self.sg_in_conf[0]['to_port'] = 23 + self.sg_in_conf[0]['cidr_blocks'] = ['192.168.0.0/16', '0.0.0.0/0'] + self.sg_in_conf[1]['from_port'] = 443 + self.sg_in_conf[1]['to_port'] = 444 + self.sg_in_conf[1]['cidr_blocks'] = ['0.0.0.0/0'] + self.sg_given['ports'] = '22,23,443,444' + with self.assertRaises(Failure) as context: + sg = SecurityGroup(self.sg_given, self.sg_in_conf) + sg.must_only_have() + sg.validate() + + self.assertTrue('tcp/80 port is not defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) + self.assertTrue('tcp/(443,444,22,23) ports are defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) + self.assertTrue('None of the ports given defined within 0.0.0.0/0 network in test_sg.' + in str(context.exception)) \ No newline at end of file diff --git a/tests/terraform_compliance/steps/test_main_steps.py b/tests/terraform_compliance/steps/test_main_steps.py index 81c525bb..a042dce8 100644 --- a/tests/terraform_compliance/steps/test_main_steps.py +++ b/tests/terraform_compliance/steps/test_main_steps.py @@ -240,18 +240,6 @@ def test_property_is_enabled_failure(self): with self.assertRaises(Failure): property_is_enabled(step, 'storage_encrypted') - def test_it_condition_have_proto_protocol_and_port_port_for_cidr_ports_must_only_fail(self): - step = MockedStep() - condition = 'only' - proto = 'not important' - port = '1-2' - cidr = 'not important' - with self.assertRaises(Failure) as err: - it_condition_have_proto_protocol_and_port_port_for_cidr(step, condition, proto, port, cidr) - - self.assertEqual(str(err.exception), '"must only" scenario cases must be used either with individual port ' - 'or multiple ports separated with comma.') - def test_it_condition_have_proto_protocol_and_port_port_for_cidr_ports_must_only_success(self): step = MockedStep() step.context.stash = []