Skip to content

Commit

Permalink
Revamped whole Security Groups functionality. This also fixes the pro…
Browse files Browse the repository at this point in the history
…blem addressed in #181.
  • Loading branch information
eerkunt committed Dec 28, 2019
1 parent a513c8d commit 0c82dbf
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 241 deletions.
6 changes: 6 additions & 0 deletions .devbots/lock-issue.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
enabled: true
comment: >
This issue's conversation is now
locked. If you want to continue
this discussion please open a
[new issue](https://github.com/eerkunt/terraform-compliance/issues/new/choose).
103 changes: 10 additions & 93 deletions terraform_compliance/common/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ def flatten(items):
yield x


def check_if_cidr( value ):
regex = r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\.(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])\/(3[0-2]|2[0-9]|1[0-9]|[0-9])'
def check_if_cidr(value):
regex = r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \
r'\.' \
r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \
r'\.' \
r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \
r'\.' \
r'(1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[0-9][0-9]|[0-9])' \
r'\/' \
r'(3[0-2]|2[0-9]|1[0-9]|[0-9])'
matches = re.match(regex, value)

if matches is not None:
Expand All @@ -55,97 +63,6 @@ def are_networks_same(first_network, network_list):
return False


# A helper function that compares port related data with given dictionary
def check_sg_rules(plan_data, security_group, condition):
return validate_sg_rule(plan_data=assign_sg_params(plan_data),
params=security_group,
condition=condition)


def assign_sg_params(rule):
from_port = int(rule.get('from_port', 0))
to_port = int(rule.get('to_port', 0))

protocol = [proto for proto in [rule.get('protocol', '-1')]]

if protocol[0] == '-1' or type(protocol[0]) is int:
protocol = ['tcp', 'udp']

protocol[0] = protocol[0].lower()

cidr_blocks = rule.get('cidr_blocks', [])

if type(cidr_blocks) is not list:
cidr_blocks = [cidr_blocks]

if to_port == 0 and from_port == 0:
to_port = 65535

if from_port > to_port:
raise Failure('Invalid configuration from_port can not be bigger than to_port. '
'{} > {} {} in {}'.format(from_port,
to_port,
protocol,
cidr_blocks))

return dict(protocol=protocol, from_port=from_port, to_port=to_port, cidr_blocks=cidr_blocks)


def validate_sg_rule(plan_data, params, condition):
from_port = int(params['from_port'])
to_port = int(params['to_port'])

assert from_port <= to_port, 'Port range is defined incorrectly within the Scenario. ' \
'Define it {}-{} instead of {}-{}.'.format(from_port,
to_port,
to_port,
from_port)

defined_range = set(range(plan_data['from_port'], plan_data['to_port']+1))
defined_network_list = plan_data['cidr_blocks']
given_network = params.get('cidr', None)

# Condition: must only have
# Fail only if ;
# * the ports does not match and defined network is a subset of given network.
if condition:
given_range = set([int(port) for port in params['ports']])
from_to_port = ','.join(params['ports'])

# Set to True if ports are exactly same.
port_intersection = given_range == defined_range

# Set to True if one of the given networks is a subset.
network_check = is_ip_in_cidr(given_network, defined_network_list)

if not port_intersection and network_check:
raise Failure('{}/{} ports are defined for {} network. '
'Must be limited to {}/{} and {}'.format('/'.join(plan_data['protocol']),
'{}-{}'.format(plan_data['from_port'],
plan_data['to_port']),
plan_data['cidr_blocks'],
'/'.join(plan_data['protocol']),
from_to_port,
given_network))

# Condition: must not have
# Fail only if ;
# * the ports match and networks match
else:
given_range = set(range(from_port, to_port+1))
port_intersection = given_range & defined_range
from_to_port = str(from_port) + '-' + str(to_port)
network_intersection = is_ip_in_cidr(given_network, defined_network_list)

if port_intersection and network_intersection:
raise Failure('{}/{} ports are defined for {} network.'.format('/'.join(plan_data['protocol']),
'{}-{}'.format(plan_data['from_port'],
plan_data['to_port']),
plan_data['cidr_blocks']))

return True


def convert_resource_type(resource_type):
'''
Searchs given resource_type within resource_name array and returns the value if it is found
Expand Down
Loading

0 comments on commit 0c82dbf

Please sign in to comment.