Skip to content

custom cofig rules DYNAMODB_TABLE_ENCRYPTED_KMS and SQS_ENCRYPTED_KMS #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions python/DYNAMODB_TABLE_ENCRYPTED_KMS/DYNAMODB_TABLE_ENCRYPTED_KMS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
'''
#####################################
## Gherkin ##
#####################################
Rule Name:
DYNAMODB_TABLE_ENCRYPTED_KMS

Description:
Check whether Amazon DynamoDB Table is encrypted with KMS.

Rationale:
Encrypting on Amazon DynamoDB tables ensure that no data is written on disk in clear text.

Indicative Severity:
Medium

Trigger:
Configuration Change on AWS::DynamoDB::Table

Reports on:
AWS::DynamoDB::Table

Rule Parameters:(optional)
Provide comma seperated KMS Key ARN list.

Scenarios:

Scenario: 1
Given: Rules parameter is provided
And: Any key in "KmsKeyArns" is invalid
Then: Return ERROR
Scenario: 2
Given: Rules parameter is provided
And: All keys in "KmsKeyArns" is valid
Then: Return Success
Scenario: 3
Given: Amazon DynamoDB table is not active state
Then: Return NOT_APPLICABLE
Scenario: 4
Given: Amazon DynamoDB table is active
And: Amazon DynamoDB table is encrypted with KMS
Then: Return COMPLIANT
Scenario: 5
Given: Amazon DynamoDB table is active
And: Amazon DynamoDB table is not encrypted with KMS
Then: Return NON_COMPLIANT
Scenario: 6
Given: Amazon DynamoDB table is active
And: KmsKeyArns Rule Parameter provided

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make "Rule Parameter" to lower case

And: Amazon DynamoDB table is encrypted with provided KmsKeyArns Rule Parameter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Then: Return COMPLIANT
Scenario: 7
Given: Amazon DynamoDB table is active
And: KmsKeyArns Rule Parameter provided

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

And: Amazon DynamoDB table is not encrypted with with provided KmsKeyArns Rule Parameter

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove one "with"

Then: Return NON_COMPLIANT
'''
import json
from rdklib import Evaluator, Evaluation, ConfigRule, ComplianceType, InvalidParametersError
PAGE_SIZE = 100
DEFAULT_RESOURCE_TYPE = 'AWS::DynamoDB::Table'

class DYNAMODB_TABLE_ENCRYPTED_KMS(ConfigRule):
def evaluate_periodic(self, event, client_factory, valid_rule_parameters):
evaluations = []
config_client = client_factory.build_client(service='config')
dynamodb_client = client_factory.build_client(service='dynamodb')
for table in describe_tables(config_client):
is_valid_enctyption = False
annotation = ''
table_data = dynamodb_client.describe_table(TableName=table)
if 'SSEDescription' in table_data['Table']:
ssetype = table_data['Table']['SSEDescription']['SSEType']
kmskey = table_data['Table']['SSEDescription']['KMSMasterKeyArn']
kms_arn_list = valid_rule_parameters.get("KmsKeyArns")
if ssetype == 'KMS':
if not kms_arn_list or kmskey in kms_arn_list:
is_valid_enctyption = True
else:
is_valid_enctyption = False
annotation = "AWS KMS key '{}' used to encrypt the Amazon DynamoDB Table is not in rule_paramter 'KmsKeyArns'".format(kmskey)
else:
annotation = "Table is not encrypted with KMS"
is_valid_enctyption = False
else:
annotation = "Table is not encrypted with KMS"
is_valid_enctyption = False
if is_valid_enctyption:
evaluations.append(Evaluation(ComplianceType.COMPLIANT, table, DEFAULT_RESOURCE_TYPE))
else:
evaluations.append(Evaluation(ComplianceType.NON_COMPLIANT,
table, DEFAULT_RESOURCE_TYPE, annotation=annotation))
return evaluations

def evaluate_change(self, event, client_factory, configuration_item, valid_rule_parameters):
if configuration_item['configuration']['tableStatus'] != 'ACTIVE':
return [Evaluation(ComplianceType.NOT_APPLICABLE)]

if 'ssedescription' in configuration_item['configuration']:
ssetype = configuration_item['configuration']['ssedescription']['ssetype']
kmskey = configuration_item['configuration']['ssedescription']['kmsmasterKeyArn']
kms_arn_list = valid_rule_parameters.get("KmsKeyArns")

if ssetype == 'KMS':
if not kms_arn_list or kmskey in kms_arn_list:
return [Evaluation(ComplianceType.COMPLIANT)]
return [Evaluation(ComplianceType.NON_COMPLIANT, annotation="AWS KMS key '{}' used to encrypt the Amazon DynamoDB Table is not in rule_paramter 'KmsKeyArns'".format(kmskey))]
return [Evaluation(ComplianceType.NON_COMPLIANT, annotation="Amazon DynamoDB Table is not encrypted with KMS")]

def evaluate_parameters(self, rule_parameters):
valid_rule_parameters = {}
if 'KmsKeyArns' in rule_parameters:
kms_key_arns = "".join(rule_parameters['KmsKeyArns'].split())
if kms_key_arns:
kms_key_arns = kms_key_arns.split(',')
for kms_key_arn in kms_key_arns:
if not kms_key_arn.startswith('arn:aws:kms:'):
raise InvalidParametersError('Invalid AWS KMS Key Arn format for "{}". AWS KMS Key Arn starts with "arn:aws:kms:"'.format(kms_key_arn))
valid_rule_parameters['KmsKeyArns'] = kms_key_arns
return valid_rule_parameters

def describe_tables(config_client):
sql = "select * where resourceType = 'AWS::DynamoDB::Table'"
next_token = True
response = config_client.select_resource_config(Expression=sql, Limit=PAGE_SIZE)
while next_token:
for result in response['Results']:
yield json.loads(result)['resourceName']
if 'NextToken' in response:
next_token = response['NextToken']
response = config_client.select_resource_config(Expression=sql, NextToken=next_token, Limit=PAGE_SIZE)
else:
next_token = False

def lambda_handler(event, context):
my_rule = DYNAMODB_TABLE_ENCRYPTED_KMS()
evaluator = Evaluator(my_rule)
return evaluator.handle(event, context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import unittest
from mock import patch, MagicMock
from rdklib import Evaluation, ComplianceType, InvalidParametersError
import rdklibtest

# Define the default resource to report to Config Rules
RESOURCE_TYPE = 'AWS::DynamoDB::Table'
MODULE = __import__('DYNAMODB_TABLE_ENCRYPTED_KMS')
RULE = MODULE.DYNAMODB_TABLE_ENCRYPTED_KMS()
CLIENT_FACTORY = MagicMock()
DB_CLIENT_MOCK = MagicMock()
CONFIG_CLIENT = MagicMock()

def mock_get_client(service, *args, **kwargs):
if service == 'dynamodb':
return DB_CLIENT_MOCK
if service == 'config':
return CONFIG_CLIENT
raise Exception("Attempting to create an unknown client")

@patch.object(CLIENT_FACTORY, 'build_client', MagicMock(side_effect=mock_get_client))
class ComplianceTest(unittest.TestCase):
MOCK_CONF_ITEM = {"configuration": {"tableName": "testNonEncrypt", "tableStatus": "ACTIVE", "ssedescription": {"status": "ENABLED", "ssetype": "KMS", "kmsmasterKeyArn": "arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6"}}}
MOCK_CONF_ITEM_NON = {"configuration": {"tableName": "table123", "tableStatus": "ACTIVE"}}
MOCK_CONF_ITEM_NA = {"configuration": {"tableName": "testNA", "tableStatus": "DELETED"}}
MOCK_NON_COMP = {"Table": {"TableArn": "arn:aws:dynamodb:ap-southeast-2:437313072050:table/testNonEncrypt", "ItemCount": 0, "CreationDateTime": 1583296041.861}}
MOCK_COMP = {"Table": {"TableArn": "arn:aws:dynamodb:ap-southeast-2:437313072050:table/test123", "SSEDescription": {"Status": "UPDATING", "KMSMasterKeyArn": "arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6", "SSEType": "KMS"}}}

def test_emptyruleparameter_returnsuccess(self):
rule_invalid_parameter = {
"KmsKeyArns": ""
}
response = RULE.evaluate_parameters(rule_invalid_parameter)
self.assertEqual(response, {})

def test_scenario1_invalidruleparameter_returnserror(self):
rule_invalid_parameter = {
"KmsKeyArns": "invalid-arn,arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6"
}
with self.assertRaises(InvalidParametersError) as context:
RULE.evaluate_parameters(rule_invalid_parameter)
self.assertIn('Invalid AWS KMS Key Arn format for "invalid-arn". AWS KMS Key Arn starts with "arn:aws:kms:"', str(context.exception))

def test_scenario2_evaluateparameters_validruleparameter_returnsuccess(self):
rule_valid_parameter = {
"KmsKeyArns": "arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6,arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
}

resp_expected = {
"KmsKeyArns": [
"arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6",
"arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
]
}
response = RULE.evaluate_parameters(rule_valid_parameter)
self.assertEqual(response, resp_expected)

def test_scenario3_config_not_applicable_table(self):
response = RULE.evaluate_change("", CLIENT_FACTORY, self.MOCK_CONF_ITEM_NA, {})
resp_expected = [
Evaluation(ComplianceType.NOT_APPLICABLE)
]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario4_non_compliant_table(self):
response = RULE.evaluate_change("", CLIENT_FACTORY, self.MOCK_CONF_ITEM_NON, {})
resp_expected = [
Evaluation(ComplianceType.NON_COMPLIANT, annotation="Amazon DynamoDB Table is not encrypted with KMS")
]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario5_compliant_table(self):
response = RULE.evaluate_change("", CLIENT_FACTORY, self.MOCK_CONF_ITEM, {})
resp_expected = [
Evaluation(ComplianceType.COMPLIANT)
]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario6_compliant_table_with_params(self):
valid_rule_parameter = {
"KmsKeyArns": [
"arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6",
"arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
]
}
response = RULE.evaluate_change("", CLIENT_FACTORY, self.MOCK_CONF_ITEM, valid_rule_parameter)
resp_expected = [
Evaluation(ComplianceType.COMPLIANT)
]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario7_non_compliant_table_with_params(self):
valid_rule_parameter = {
"KmsKeyArns": [
"arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6",
"arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
]
}
response = RULE.evaluate_change("", CLIENT_FACTORY, self.MOCK_CONF_ITEM_NON, valid_rule_parameter)
resp_expected = [
Evaluation(ComplianceType.NON_COMPLIANT, annotation="Amazon DynamoDB Table is not encrypted with KMS")
]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario4_periodic_non_compliant_table(self):
CONFIG_CLIENT.select_resource_config.return_value = {"Results":['{"resourceName":"test123"}']}
DB_CLIENT_MOCK.describe_table.return_value = self.MOCK_NON_COMP
response = RULE.evaluate_periodic("", CLIENT_FACTORY, {})
resp_expected = [Evaluation(ComplianceType.NON_COMPLIANT, 'test123', RESOURCE_TYPE, annotation="Table is not encrypted with KMS")]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario5_periodic_compliant_table(self):
CONFIG_CLIENT.select_resource_config.return_value = {"Results":['{"resourceName":"testNonEncrypt"}']}
DB_CLIENT_MOCK.describe_table.return_value = self.MOCK_COMP
response = RULE.evaluate_periodic("", CLIENT_FACTORY, {})
resp_expected = [Evaluation(ComplianceType.COMPLIANT, 'testNonEncrypt', RESOURCE_TYPE)]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario6_periodic_non_compliant_table_with_params(self):
valid_rule_parameter = {
"KmsKeyArns": [
"arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6",
"arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
]
}
CONFIG_CLIENT.select_resource_config.return_value = {"Results":['{"resourceName":"test123"}']}
DB_CLIENT_MOCK.describe_table.return_value = self.MOCK_NON_COMP
response = RULE.evaluate_periodic("", CLIENT_FACTORY, valid_rule_parameter)
resp_expected = [Evaluation(ComplianceType.NON_COMPLIANT, 'test123', RESOURCE_TYPE, annotation="Table is not encrypted with KMS")]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)

def test_scenario7_periodic_compliant_table_with_params(self):
valid_rule_parameter = {
"KmsKeyArns": [
"arn:aws:kms:ap-southeast-2:437313072050:key/f4fb52b5-03a0-4397-a2db-cb5b94abb0a6",
"arn:aws:kms:ap-southeast-2:437313072050:key/ff61230d-5aa3-4ece-9532-a97fecb51f36"
]
}
CONFIG_CLIENT.select_resource_config.return_value = {"Results":['{"resourceName":"testNonEncrypt"}']}
DB_CLIENT_MOCK.describe_table.return_value = self.MOCK_COMP
response = RULE.evaluate_periodic("", CLIENT_FACTORY, valid_rule_parameter)
resp_expected = [Evaluation(ComplianceType.COMPLIANT, 'testNonEncrypt', RESOURCE_TYPE)]
rdklibtest.assert_successful_evaluation(self, response, resp_expected, 1)
12 changes: 12 additions & 0 deletions python/DYNAMODB_TABLE_ENCRYPTED_KMS/parameters.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"Version": "1.0",
"Parameters": {
"CodeKey": "DYNAMODB_TABLE_ENCRYPTED_KMS.zip",
"SourceRuntime": "python3.6-lib",
"RuleName": "DYNAMODB_TABLE_ENCRYPTED_KMS",
"SourceEvents": "AWS::DynamoDB::Table",
"OptionalParameters": "{}",
"InputParameters": "{}"
},
"Tags": "[]"
}
84 changes: 84 additions & 0 deletions python/SQS_ENCRYPTED_KMS/SQS_ENCRYPTED_KMS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
#####################################
## Gherkin ##
#####################################
Rule Name:
SQS_ENCRYPTED_KMS

Description:
Check whether Amazon Simple Queue Service (Amazon SQS) is encrypted with AWS Key Management Service (AWS KMS).

Rationale:
Encryption using AWS KMS provides protection at rest for the data stored in Amazon SQS queue.

Indicative Severity:
Medium

Trigger:
Configuration Change on AWS::SQS::Queue

Reports on:
AWS::SQS::Queue
Rule Parameters:(optional)
Provide comma seperated AWS KMS Key ARN list.

Scenarios:
Scenario: 1
Given: Rules parameter is provided
And: Any key in "KmsKeyArns" is invalid
Then: Return ERROR
Scenario: 2
Given: Rules parameter is provided

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is returning Success valid scenario ?

And: All keys in "KmsKeyArns" is valid
Then: Return Success
Scenario: 3
Given: Amazon SQS Queue is active
And: Amazon SQS Queue is encrypted with KMS key
Then: Return COMPLIANT
Scenario: 4
Given: Amazon SQS Queue is active
And: Amazon SQS Queue is not encrypted with KMS key
Then: Return NON_COMPLIANT
Scenario: 5
Given: Amazon SQS Queue is active
And: Amazon SQS Queue is encrypted with KMS key
And: KmsKeyArns Rule Parameter provided
Then: Return COMPLIANT
Scenario: 6
Given: Amazon SQS Queue is active
And: Amazon SQS Queue is not encrypted with KMS key
And: KmsKeyArns Rule Parameter provided
Then: Return NON_COMPLIANT
"""

from rdklib import Evaluator, Evaluation, ConfigRule, ComplianceType, InvalidParametersError

class SQS_ENCRYPTED_KMS(ConfigRule):
def evaluate_change(self, event, client_factory, configuration_item, valid_rule_parameters):
print(configuration_item)
kms_key = configuration_item.get('configuration').get('KmsMasterKeyId')
kms_arn_list = valid_rule_parameters.get("KmsKeyArns")
if kms_key:
if not kms_arn_list or kms_key in kms_arn_list:
return [Evaluation(ComplianceType.COMPLIANT)]
return [Evaluation(ComplianceType.NON_COMPLIANT, annotation="AWS KMS key '{}' used to encrypt the Amazon SQS Queue is not in rule_paramter 'KmsKeyArns'".format(kms_key))]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotation length has limit make sure it should not exceed 255 characters as kms_key is dynamic value

return [Evaluation(ComplianceType.NON_COMPLIANT,
annotation="Amazon SQS queue is not encrypted with KMS")]

def evaluate_parameters(self, rule_parameters):
valid_rule_parameters = {}
if 'KmsKeyArns' in rule_parameters:
kms_key_arns = "".join(rule_parameters['KmsKeyArns'].split())
if kms_key_arns:
kms_key_arns = kms_key_arns.split(',')
for kms_key_arn in kms_key_arns:
if not kms_key_arn.startswith('arn:aws:kms:'):
raise InvalidParametersError('Invalid AWS KMS Key Arn format for "{}". AWS KMS Key Arn starts with "arn:aws:kms:"'.format(kms_key_arn))
valid_rule_parameters['KmsKeyArns'] = kms_key_arns
return valid_rule_parameters


def lambda_handler(event, context):
my_rule = SQS_ENCRYPTED_KMS()
evaluator = Evaluator(my_rule)
return evaluator.handle(event, context)
Loading