Skip to content

Commit

Permalink
Updated the dict_merge and required functions that is dependening on it.
Browse files Browse the repository at this point in the history
  • Loading branch information
eerkunt committed Feb 16, 2020
1 parent 3831a53 commit ddcf5a9
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode
.eggs
.tox
*.code-workspace
venv
Expand All @@ -20,4 +21,4 @@ terraform-compliance.iml

**/.terraform
**/plan.out
example
example
42 changes: 22 additions & 20 deletions terraform_compliance/common/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,26 +270,28 @@ def seek_value_in_dict(needle, haystack, address=None):


def dict_merge(source, target):
update = deepcopy(target)
for key, value in source.items():
if key not in update:
update[key] = value
elif isinstance(value, dict) and isinstance(update[key], dict) and remove_empty(update[key]):
update[key] = dict_merge(value, update[key])
elif isinstance(value, list) and isinstance(update[key], list) and remove_empty(update[key]):
update[key].extend(value)
update[key] = update[key]
elif isinstance(value, list) and isinstance(update[key], (bool, str, int)) and remove_empty(update[key]):
value.append(update[key])
update[key] = value
elif isinstance(value, (bool, str, int)) and isinstance(update[key], list) and remove_empty(update[key]):
update[key].append(value)
update[key] = update[key]
else:
update[key] = value
if not isinstance(source, dict) or not isinstance(target, dict):
return source

src = deepcopy(source)
dst = deepcopy(target)
for key, val in src.items():
if key in dst and is_list_of_dict(val) and is_list_of_dict(dst[key]):
for dst_elem in dst[key]:
for each_key in dst_elem.keys():
if not is_key_exist(each_key, val):
src[key].append(dst_elem)

return src


return update
def is_list_of_dict(target_list):
return isinstance(target_list, list) and len(target_list) and isinstance(target_list[0], dict)


def remove_empty(target_list):
return [x for x in target_list if x]
def is_key_exist(key, target_list_of_dict):
for each in target_list_of_dict:
if isinstance(each, dict) and key in each.keys():
return True

return False
1 change: 0 additions & 1 deletion terraform_compliance/extensions/terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ def _distribute_providers(self):
if resource_provider not in self.providers:
self.providers[resource_provider] = {}


self.providers[resource_provider][resource_name] = resource_data

def parse(self):
Expand Down
1 change: 0 additions & 1 deletion terraform_compliance/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def i_have_name_section_configured(_step_obj, name, type_name='resource', _terra
_step_obj.context.property_name = type_name
return True


elif type_name == 'output':
found_output = _terraform_config.config.terraform.outputs.get(name, None)

Expand Down
52 changes: 52 additions & 0 deletions tests/functional/test_dynamodb_kms_key_problem/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
resource "aws_dynamodb_table" "test" {
name = "test-table"
billing_mode = "PROVISIONED"
read_capacity = 20
write_capacity = 20
hash_key = "UserId"
range_key = "GameTitle"

attribute {
name = "UserId"
type = "S"
}

attribute {
name = "GameTitle"
type = "S"
}

attribute {
name = "TopScore"
type = "N"
}

ttl {
attribute_name = "TimeToExist"
enabled = false
}

global_secondary_index {
name = "GameTitleIndex"
hash_key = "GameTitle"
range_key = "TopScore"
write_capacity = 10
read_capacity = 10
projection_type = "INCLUDE"
non_key_attributes = ["UserId"]
}

server_side_encryption {
enabled = true
kms_key_arn = aws_kms_key.test.arn
}

tags = {
Name = "dynamodb-table-1"
Environment = "production"
}
}

resource "aws_kms_key" "test" {
description = "Test"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"format_version":"0.1","terraform_version":"0.12.20","planned_values":{"root_module":{"resources":[{"address":"aws_dynamodb_table.test","mode":"managed","type":"aws_dynamodb_table","name":"test","provider_name":"aws","schema_version":1,"values":{"attribute":[{"name":"GameTitle","type":"S"},{"name":"TopScore","type":"N"},{"name":"UserId","type":"S"}],"billing_mode":"PROVISIONED","global_secondary_index":[{"hash_key":"GameTitle","name":"GameTitleIndex","non_key_attributes":["UserId"],"projection_type":"INCLUDE","range_key":"TopScore","read_capacity":10,"write_capacity":10}],"hash_key":"UserId","local_secondary_index":[],"name":"test-table","range_key":"GameTitle","read_capacity":20,"server_side_encryption":[{"enabled":true}],"stream_enabled":null,"tags":{"Environment":"production","Name":"dynamodb-table-1"},"timeouts":null,"ttl":[{"attribute_name":"TimeToExist","enabled":false}],"write_capacity":20}},{"address":"aws_kms_key.test","mode":"managed","type":"aws_kms_key","name":"test","provider_name":"aws","schema_version":0,"values":{"customer_master_key_spec":"SYMMETRIC_DEFAULT","deletion_window_in_days":null,"description":"Test","enable_key_rotation":false,"is_enabled":true,"key_usage":"ENCRYPT_DECRYPT","tags":null}}]}},"resource_changes":[{"address":"aws_dynamodb_table.test","mode":"managed","type":"aws_dynamodb_table","name":"test","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"attribute":[{"name":"GameTitle","type":"S"},{"name":"TopScore","type":"N"},{"name":"UserId","type":"S"}],"billing_mode":"PROVISIONED","global_secondary_index":[{"hash_key":"GameTitle","name":"GameTitleIndex","non_key_attributes":["UserId"],"projection_type":"INCLUDE","range_key":"TopScore","read_capacity":10,"write_capacity":10}],"hash_key":"UserId","local_secondary_index":[],"name":"test-table","range_key":"GameTitle","read_capacity":20,"server_side_encryption":[{"enabled":true}],"stream_enabled":null,"tags":{"Environment":"production","Name":"dynamodb-table-1"},"timeouts":null,"ttl":[{"attribute_name":"TimeToExist","enabled":false}],"write_capacity":20},"after_unknown":{"arn":true,"attribute":[{},{},{}],"global_secondary_index":[{"non_key_attributes":[false]}],"id":true,"local_secondary_index":[],"point_in_time_recovery":true,"server_side_encryption":[{"kms_key_arn":true}],"stream_arn":true,"stream_label":true,"stream_view_type":true,"tags":{},"ttl":[{}]}}},{"address":"aws_kms_key.test","mode":"managed","type":"aws_kms_key","name":"test","provider_name":"aws","change":{"actions":["create"],"before":null,"after":{"customer_master_key_spec":"SYMMETRIC_DEFAULT","deletion_window_in_days":null,"description":"Test","enable_key_rotation":false,"is_enabled":true,"key_usage":"ENCRYPT_DECRYPT","tags":null},"after_unknown":{"arn":true,"id":true,"key_id":true,"policy":true}}}],"configuration":{"root_module":{"resources":[{"address":"aws_dynamodb_table.test","mode":"managed","type":"aws_dynamodb_table","name":"test","provider_config_key":"aws","expressions":{"attribute":[{"name":{"constant_value":"UserId"},"type":{"constant_value":"S"}},{"name":{"constant_value":"GameTitle"},"type":{"constant_value":"S"}},{"name":{"constant_value":"TopScore"},"type":{"constant_value":"N"}}],"billing_mode":{"constant_value":"PROVISIONED"},"global_secondary_index":[{"hash_key":{"constant_value":"GameTitle"},"name":{"constant_value":"GameTitleIndex"},"non_key_attributes":{"constant_value":["UserId"]},"projection_type":{"constant_value":"INCLUDE"},"range_key":{"constant_value":"TopScore"},"read_capacity":{"constant_value":10},"write_capacity":{"constant_value":10}}],"hash_key":{"constant_value":"UserId"},"name":{"constant_value":"test-table"},"range_key":{"constant_value":"GameTitle"},"read_capacity":{"constant_value":20},"server_side_encryption":[{"enabled":{"constant_value":true},"kms_key_arn":{"references":["aws_kms_key.test"]}}],"tags":{"constant_value":{"Environment":"production","Name":"dynamodb-table-1"}},"ttl":[{"attribute_name":{"constant_value":"TimeToExist"},"enabled":{"constant_value":false}}],"write_capacity":{"constant_value":20}},"schema_version":1},{"address":"aws_kms_key.test","mode":"managed","type":"aws_kms_key","name":"test","provider_config_key":"aws","expressions":{"description":{"constant_value":"Test"}},"schema_version":0}]}}}
7 changes: 7 additions & 0 deletions tests/functional/test_dynamodb_kms_key_problem/test.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Feature: Image scan to be enabled on push to ECR
This feature will enforce vulnerabilty scan on images pushed to ECR
Scenario: Image scan to be enabled on push.
Given I have aws_dynamodb_table defined
Then it must contain server_side_encryption
And it must contain kms_key_arn
20 changes: 20 additions & 0 deletions tests/terraform_compliance/common/test_bdd_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unittest import TestCase
from terraform_compliance.common.bdd_tags import look_for_bdd_tags
from terraform_compliance.common.exceptions import Failure
from tests.mocks import MockedStep, MockedTags


class TestBddTags(TestCase):

def test_unchanged_step_object(self):
step = MockedStep()
look_for_bdd_tags(step)
self.assertFalse(step.context.no_failure)
self.assertIsNone(step.context.failure_class)

def test_warning_case(self):
step = MockedStep()
step.all_tags = [MockedTags(name='warning')]
look_for_bdd_tags(step)
self.assertTrue(step.context.no_failure)
self.assertEqual(step.context.failure_class, 'warning')
108 changes: 48 additions & 60 deletions tests/terraform_compliance/common/test_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
jsonify,
remove_mounted_resources,
get_resource_name_from_stash,
dict_merge
dict_merge,
is_list_of_dict,
is_key_exist
)


Expand Down Expand Up @@ -189,73 +191,59 @@ def test_get_resource_name_from_stash(self):
]
self.assertEqual({'address': 'test'}, get_resource_name_from_stash(stash=stash))

def test_dict_merge_dict_dict(self):
def test_dict_merge_no_change(self):
source = {
'a': 'something',
'b': 'something else'
'a': [{'a': False}],
'b': True
}
target = {
'c': 'completely different something'
target ={
'a': [{'a': True}]
}
self.assertEqual(dict_merge(source, target), source)

self.assertEqual(dict_merge(source, target), {
'a': 'something',
'b': 'something else',
'c': 'completely different something'
})

def test_dict_merge_list_list(self):
def test_dict_merge_no_change_1(self):
source = {
'a': 'something',
'b': ['something else']
}
target = {
'b': ['completely different something']
}

self.assertEqual(dict_merge(source, target), {
'a': 'something',
'b': ['completely different something', 'something else']
})

def test_dict_merge_list_str(self):
source = {
'a': 'something',
'b': ['something else']
}
target = {
'b': 'completely different something'
'a': [{'a': False}],
'b': True
}
target = []
self.assertEqual(dict_merge(source, target), source)

self.assertEqual(dict_merge(source, target), {
'a': 'something',
'b': ['something else', 'completely different something']
})
def test_dict_merge_no_change_2(self):
self.assertEqual(dict_merge([], []), [])

def test_dict_merge_str_list(self):
def test_dict_merge_success(self):
source = {
'a': 'something',
'b': 'something else'
'a': [{'a': False}],
'b': True
}
target = {
'b': ['completely different something']
target ={
'a': [{'b': 0}]
}

self.assertEqual(dict_merge(source, target), {
'a': 'something',
'b': ['completely different something', 'something else']
})

def test_dict_merge_dict_list_failure(self):
source = {
'a': 'something',
'b': {'something': False}
}
target = {
'b': ['completely different something']
}

self.assertEqual(dict_merge(source, target), {
'a': 'something',
'b': {'something': False}
})
self.assertEqual(dict_merge(source, target), {'a': [{'a': False}, {'b': 0}], 'b': True})

def test_is_list_of_dict_failures(self):
self.assertFalse(is_list_of_dict(['a', 'b', 'c']))
self.assertFalse(is_list_of_dict([1, 2, 3]))
self.assertFalse(is_list_of_dict(['a', {'b':'c'}]))
self.assertFalse(is_list_of_dict([False]))
self.assertFalse(is_list_of_dict([]))

def test_is_list_of_dict_successes(self):
self.assertTrue(is_list_of_dict([{}]))
self.assertTrue(is_list_of_dict([{'a:': [1]}]))
self.assertTrue(is_list_of_dict([{'a:': 'b'}]))

def test_is_key_exist_failures(self):
self.assertFalse(is_key_exist('key', []))
self.assertFalse(is_key_exist('key', [{}]))
self.assertFalse(is_key_exist('key', [{'something': True}]))
self.assertFalse(is_key_exist('key', [1, 2]))
self.assertFalse(is_key_exist('key', ['key', 'b']))
self.assertFalse(is_key_exist('key', [False]))
self.assertFalse(is_key_exist('key', {'key': []}))

def test_is_key_exist_successes(self):
self.assertTrue(is_key_exist('key', [{'key': 'something'}]))
self.assertTrue(is_key_exist('key', [{'something': True}, 2, 3, 4, {'key': True}]))
self.assertTrue(is_key_exist('key', [{'something': True}, {'key': True}]))

0 comments on commit ddcf5a9

Please sign in to comment.