diff --git a/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK.py b/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK.py new file mode 100644 index 00000000..e05ee7f2 --- /dev/null +++ b/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK.py @@ -0,0 +1,464 @@ +# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for +# the specific language governing permissions and limitations under the License. + +''' +##################################### +## Gherkin ## +##################################### +Rule Name: LAMBDA_LAYER_CHECK +Description: Checks whether the AWS Lambda function is configured to use a specified layer. The rule is NON_COMPLIANT if the Lambda function is not using the layer. +Trigger: + Configuration Change +Reports on: + AWS::Lambda::Function +Rule Parameters: + LayerArn + Layer ARN (without the version) + MinLayerVersion + Minimum layer version required +Scenarios: + Scenario: 1 + Given: The LayerArn rule parameter is not configured + Then: Return Error + Scenario: 2 + Given: The MinLayerVersion rule parameter is not configured + Then: Return Error + Scenario: 3 + Given: The LayerArn rule parameter is configured but invalid + Then: Return Error + Scenario: 4 + Given: The MinLayerVersion rule parameter is configured but invalid + Then: Return Error + Scenario: 5 + Given: Lambda function has no layer configured + Then: Return NON_COMPLIANT with Annotation "No layer is configured for this Lambda function" + Scenario: 6 + Given: Lambda function has some layers configured + And: No layer matches the LayerArn rule parameter + Then: Return NON_COMPLIANT with Annotation "Layer [layer_arn] not used for this Lambda function" + Scenario: 7 + Given: Lambda function has some layers configured + And: One layer matches the LayerArn rule parameter + And: Layer version is less than the MinLayerVersion rule parameter + Then: Return NON_COMPLIANT with Annotation "Wrong layer version (was [layer_version], expected [MinLayerVersion]+)" + Scenario: 8 + Given: Lambda function has some layers configured + And: One layer matches the LayerArn rule parameter + And: Layer version is equal to the MinLayerVersion rule parameter + Then: Return COMPLIANT + Scenario: 9 + Given: Lambda function packageType is not Zip (ie. container image) + Then: Return NOT_APPLICABLE +''' + +import json +import boto3 +import botocore +import re + +############## +# Parameters # +############## + +AWS_CONFIG_CLIENT = boto3.client('config') + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = "AWS::Lambda::Function" + +# Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). +ASSUME_ROLE_MODE = False + +CONFIG_ROLE_TIMEOUT_SECONDS = 900 + +LAYER_REGEXP = 'arn:aws:lambda:[a-z]{2}((-gov)|(-iso(b?)))?-[a-z]+-\d{1}:\d{12}:layer:[a-zA-Z0-9-_]+' + + +############# +# Main Code # +############# + +def evaluate_compliance(event, configuration_item, valid_rule_parameters): + pkg = configuration_item['configuration']['packageType'] + if not pkg or pkg != "Zip": + return build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE', + annotation='Layers can only be used with functions using Zip package type') + + layers = configuration_item['configuration']['layers'] + if not layers: + return build_evaluation_from_config_item(configuration_item, 'NON_COMPLIANT', + annotation='No layer is configured for this Lambda function') + + regex = re.compile(LAYER_REGEXP + ':(.*)') + annotation = 'Layer ' + valid_rule_parameters['LayerArn'] + ' not used for this Lambda function' + for layer in layers: + arn = layer['arn'] + version = regex.search(arn).group(5) + arn = re.sub('\:' + version + '$', '', arn) + if arn == valid_rule_parameters['LayerArn']: + if version >= valid_rule_parameters['MinLayerVersion']: + return build_evaluation_from_config_item(configuration_item, 'COMPLIANT') + else: + annotation = 'Wrong layer version (was ' + version + ', expected ' + valid_rule_parameters[ + 'MinLayerVersion'] + '+)' + + return build_evaluation_from_config_item(configuration_item, 'NON_COMPLIANT', + annotation=annotation) + + +# Check if parameters are defined and have correct values +def evaluate_parameters(rule_parameters): + if not rule_parameters: + raise ValueError('LayerArn and MinLayerVersion parameters must be set') + + if 'MinLayerVersion' in rule_parameters: + if int(rule_parameters['MinLayerVersion']) <= 0: + raise ValueError('MinLayerVersion must be a positive integer') + else: + raise ValueError('MinLayerVersion must be provided') + + if 'LayerArn' in rule_parameters: + regex = re.compile(LAYER_REGEXP) + if not bool(regex.match(str(rule_parameters['LayerArn']))): + raise ValueError( + 'LayerArn must be valid: arn:aws:lambda:{region}:{accountid}:layer:{layername}, without version number') + else: + raise ValueError('LayerArn must be provided') + + return rule_parameters + + +#################### +# Helper Functions # +#################### + +# Build an error to be displayed in the logs when the parameter is invalid. +def build_parameters_value_error_response(ex): + """Return an error dictionary when the evaluate_parameters() raises a ValueError. + Keyword arguments: + ex -- Exception text + """ + return build_error_response(internal_error_message="Parameter value is invalid", + internal_error_details="An ValueError was raised during the validation of the Parameter value", + customer_error_code="InvalidParameterValueException", + customer_error_message=str(ex)) + + +# This generate an evaluation for config +def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. + Keyword arguments: + resource_id -- the unique id of the resource to report + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + event -- the event variable given in the lambda handler + resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) + annotation -- an annotation to be added to the evaluation (default None). It will be truncated to 255 if longer. + """ + eval_cc = {} + if annotation: + eval_cc['Annotation'] = build_annotation(annotation) + eval_cc['ComplianceResourceType'] = resource_type + eval_cc['ComplianceResourceId'] = resource_id + eval_cc['ComplianceType'] = compliance_type + eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) + return eval_cc + + +def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on configuration change rules. + Keyword arguments: + configuration_item -- the configurationItem dictionary in the invokingEvent + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + annotation -- an annotation to be added to the evaluation (default None). It will be truncated to 255 if longer. + """ + # print(configuration_item['resourceId'] + ' = ' + compliance_type) + eval_ci = {} + if annotation: + eval_ci['Annotation'] = build_annotation(annotation) + eval_ci['ComplianceResourceType'] = configuration_item['resourceType'] + eval_ci['ComplianceResourceId'] = configuration_item['resourceId'] + eval_ci['ComplianceType'] = compliance_type + eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime'] + return eval_ci + + +# This gets the client after assuming the Config service role +# either in the same AWS account or cross-account. +def get_client(service, event, region=None): + """Return the service boto client. It should be used instead of directly calling the client. + Keyword arguments: + service -- the service name used for calling the boto.client() + event -- the event variable given in the lambda handler + region -- the region where the client is called (default: None) + """ + if not ASSUME_ROLE_MODE: + return boto3.client(service, region) + credentials = get_assume_role_credentials(event["executionRoleArn"], region) + return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + region_name=region + ) + + +#################### +# Boilerplate Code # +#################### + +# Helper function used to validate input +def check_defined(reference, reference_name): + if not reference: + raise Exception('Error: ', reference_name, 'is not defined') + return reference + + +# Build annotation within Service constraints +def build_annotation(annotation_string): + if len(annotation_string) > 256: + return annotation_string[:244] + " [truncated]" + return annotation_string + + +def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, + customer_error_message=None): + error_response = { + 'internalErrorMessage': internal_error_message, + 'internalErrorDetails': internal_error_details, + 'customerErrorMessage': customer_error_message, + 'customerErrorCode': customer_error_code + } + print(error_response) + return error_response + + +def build_internal_error_response(internal_error_message, internal_error_details=None): + return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError') + + +def is_internal_error(exception): + return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error'][ + 'Code'].startswith('5') + or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error'][ + 'Code']) + + +# Check whether the message is a ScheduledNotification or not. +def is_scheduled_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'ScheduledNotification' + + +# Check whether the message is OversizedConfigurationItemChangeNotification or not +def is_oversized_changed_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'OversizedConfigurationItemChangeNotification' + + +def get_assume_role_credentials(role_arn, region=None): + sts_client = boto3.client('sts', region) + try: + assume_role_response = sts_client.assume_role(RoleArn=role_arn, + RoleSessionName="configLambdaExecution", + DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS) + return assume_role_response['Credentials'] + except botocore.exceptions.ClientError as ex: + # Scrub error message for any internal account info leaks + if 'AccessDenied' in ex.response['Error']['Code']: + ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." + else: + ex.response['Error']['Message'] = "InternalError" + ex.response['Error']['Code'] = "InternalError" + raise ex + + +# Get configurationItem using getResourceConfigHistory API +# in case of OversizedConfigurationItemChangeNotification +def get_configuration(resource_type, resource_id, configuration_capture_time): + result = AWS_CONFIG_CLIENT.get_resource_config_history( + resourceType=resource_type, + resourceId=resource_id, + laterTime=configuration_capture_time, + limit=1) + configuration_item = result['configurationItems'][0] + return convert_api_configuration(configuration_item) + + +# Based on the type of message get the configuration item +# either from configurationItem in the invoking event +# or using the getResourceConfigHistiry API in getConfiguration function. +def get_configuration_item(invoking_event): + check_defined(invoking_event, 'invokingEvent') + if is_oversized_changed_notification(invoking_event['messageType']): + configuration_item_summary = check_defined(invoking_event['configuration_item_summary'], + 'configurationItemSummary') + return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], + configuration_item_summary['configurationItemCaptureTime']) + if is_scheduled_notification(invoking_event['messageType']): + return None + return check_defined(invoking_event['configurationItem'], 'configurationItem') + + +# Convert from the API model to the original invocation model +def convert_api_configuration(configuration_item): + for k, v in configuration_item.items(): + if isinstance(v, datetime.datetime): + configuration_item[k] = str(v) + configuration_item['awsAccountId'] = configuration_item['accountId'] + configuration_item['ARN'] = configuration_item['arn'] + configuration_item['configurationStateMd5Hash'] = configuration_item['configurationItemMD5Hash'] + configuration_item['configurationItemVersion'] = configuration_item['version'] + configuration_item['configuration'] = json.loads(configuration_item['configuration']) + if 'relationships' in configuration_item: + for i in range(len(configuration_item['relationships'])): + configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName'] + return configuration_item + + +# Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. +def is_applicable(configuration_item, event): + try: + check_defined(configuration_item, 'configurationItem') + check_defined(event, 'event') + except: + return True + status = configuration_item['configurationItemStatus'] + event_left_scope = event['eventLeftScope'] + if status == 'ResourceDeleted': + print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") + return status in ('OK', 'ResourceDiscovered') and not event_left_scope + + +# This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account). +def clean_up_old_evaluations(latest_evaluations, event): + cleaned_evaluations = [] + + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100) + + old_eval_list = [] + + while True: + for old_result in old_eval['EvaluationResults']: + old_eval_list.append(old_result) + if 'NextToken' in old_eval: + next_token = old_eval['NextToken'] + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100, + NextToken=next_token) + else: + break + + for old_eval in old_eval_list: + old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'] + newer_founded = False + for latest_eval in latest_evaluations: + if old_resource_id == latest_eval['ComplianceResourceId']: + newer_founded = True + if not newer_founded: + cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event)) + + return cleaned_evaluations + latest_evaluations + + +#################### +# Handler # +#################### + +def lambda_handler(event, context): + global AWS_CONFIG_CLIENT + + # print(event) + + check_defined(event, 'event') + invoking_event = json.loads(event['invokingEvent']) + rule_parameters = {} + if 'ruleParameters' in event: + rule_parameters = json.loads(event['ruleParameters']) + + try: + valid_rule_parameters = evaluate_parameters(rule_parameters) + except ValueError as ex: + return build_parameters_value_error_response(ex) + + try: + AWS_CONFIG_CLIENT = get_client('config', event) + if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', + 'OversizedConfigurationItemChangeNotification']: + configuration_item = get_configuration_item(invoking_event) + if is_applicable(configuration_item, event): + compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters) + else: + compliance_result = "NOT_APPLICABLE" + else: + return build_internal_error_response('Unexpected message type', str(invoking_event)) + except botocore.exceptions.ClientError as ex: + if is_internal_error(ex): + return build_internal_error_response("Unexpected error while completing API request", str(ex)) + return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], + ex.response['Error']['Message']) + except ValueError as ex: + return build_internal_error_response(str(ex), str(ex)) + + evaluations = [] + latest_evaluations = [] + + if not compliance_result: + latest_evaluations.append( + build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, str): + if configuration_item: + evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result)) + else: + evaluations.append( + build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) + elif isinstance(compliance_result, list): + for evaluation in compliance_result: + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in evaluation: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + + if not missing_fields: + latest_evaluations.append(evaluation) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, dict): + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in compliance_result: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + if not missing_fields: + evaluations.append(compliance_result) + else: + evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE')) + + # Put together the request that reports the evaluation status + result_token = event['resultToken'] + test_mode = False + if result_token == 'TESTMODE': + # Used solely for RDK test to skip actual put_evaluation API call + test_mode = True + + # Invoke the Config API to report the result of the evaluation + evaluation_copy = [] + evaluation_copy = evaluations[:] + while evaluation_copy: + AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, + TestMode=test_mode) + del evaluation_copy[:100] + + # Used solely for RDK test to be able to test Lambda function + return evaluations diff --git a/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK_test.py b/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK_test.py new file mode 100644 index 00000000..02eb6610 --- /dev/null +++ b/python/LAMBDA_LAYER_CHECK/LAMBDA_LAYER_CHECK_test.py @@ -0,0 +1,220 @@ +import sys +import unittest + +try: + from unittest.mock import MagicMock +except ImportError: + from mock import MagicMock +import json +import botocore + +############## +# Parameters # +############## + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = 'AWS::Lambda::Function' + +############# +# Main Code # +############# + +CONFIG_CLIENT_MOCK = MagicMock() +STS_CLIENT_MOCK = MagicMock() + +class Boto3Mock(): + @staticmethod + def client(client_name, *args, **kwargs): + if client_name == 'config': + return CONFIG_CLIENT_MOCK + if client_name == 'sts': + return STS_CLIENT_MOCK + raise Exception("Attempting to create an unknown client") + +sys.modules['boto3'] = Boto3Mock() + +RULE = __import__('LAMBDA_LAYER_CHECK') + +class ParametersTest(unittest.TestCase): + rule_parameter_layer_not_provided = '{"MinLayerVersion" : "1"}' + rule_parameter_version_not_provided = '{"LayerArn" : "arn:aws:lambda:eu-west-1:123456789012:layer:layername"}' + rule_parameter_layer_invalid = '{"LayerArn" : "arn:aws:lambda:eu-west-1:42:layer:layername", "MinLayerVersion" : "2"}' + rule_parameter_version_invalid = '{"LayerArn" : "arn:aws:lambda:eu-west-1:123456789012:layer:layername", "MinLayerVersion" : "0"}' + + simple_function = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function" + } + + def test_scenario_1_layer_param_not_provided(self): + invoking_event = generate_invoking_event(self.simple_function) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameter_layer_not_provided), {}) + assert_customer_error_response(self, response, 'InvalidParameterValueException', 'LayerArn must be provided') + + def test_scenario_2_version_param_not_provided(self): + invoking_event = generate_invoking_event(self.simple_function) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameter_version_not_provided), {}) + assert_customer_error_response(self, response, 'InvalidParameterValueException', 'MinLayerVersion must be provided') + + def test_scenario_3_layer_invalid(self): + invoking_event = generate_invoking_event(self.simple_function) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameter_layer_invalid), {}) + assert_customer_error_response(self, response, 'InvalidParameterValueException', 'LayerArn must be valid: arn:aws:lambda:{region}:{accountid}:layer:{layername}, without version number') + + def test_scenario_4_version_invalid(self): + invoking_event = generate_invoking_event(self.simple_function) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameter_version_invalid), {}) + assert_customer_error_response(self, response, 'InvalidParameterValueException', 'MinLayerVersion must be a positive integer') + +class RuleTest(unittest.TestCase): + rule_parameters = '{"LayerArn" : "arn:aws:lambda:eu-west-1:123456789012:layer:layername", "MinLayerVersion" : "2"}' + + def test_scenario_5_no_layer_configured(self): + function_without_layer = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function", + "packageType": "Zip", + "layers": [] + } + invoking_event = generate_invoking_event(function_without_layer) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameters), {}) + + assert_successful_evaluation(self, response, [build_expected_response('NON_COMPLIANT', '123456789012', annotation='No layer is configured for this Lambda function')]) + + + def test_scenario_6_layer_not_matching(self): + function_another_layer = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function", + "packageType": "Zip", + "layers": [ + {"arn": "arn:aws:lambda:eu-west-1:123456789012:layer:superlayer:5"} + ] + } + invoking_event = generate_invoking_event(function_another_layer) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameters), {}) + + assert_successful_evaluation(self, response, [build_expected_response('NON_COMPLIANT', '123456789012', annotation='Layer arn:aws:lambda:eu-west-1:123456789012:layer:layername not used for this Lambda function')]) + + def test_scenario_7_wrong_version(self): + function_old_version = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function", + "packageType": "Zip", + "layers": [ + {"arn": "arn:aws:lambda:eu-west-1:123456789012:layer:layername:1"} + ] + } + invoking_event = generate_invoking_event(function_old_version) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameters), {}) + + assert_successful_evaluation(self, response, [build_expected_response('NON_COMPLIANT', '123456789012', annotation='Wrong layer version (was 1, expected 2+)')]) + + def test_scenario_8_everything_ok(self): + function_ok = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function", + "packageType": "Zip", + "layers": [ + {"arn": "arn:aws:lambda:eu-west-1:123456789012:layer:layername:3"} + ] + } + invoking_event = generate_invoking_event(function_ok) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameters), {}) + + assert_successful_evaluation(self, response, [build_expected_response('COMPLIANT', '123456789012')]) + + def test_scenario_9_not_zip_pkg(self): + function_ok = { + "functionName": "test_function", + "functionArn": "arn:aws:lambda:us-west-2:123456789012:function:test_function", + "packageType": "Image" + } + invoking_event = generate_invoking_event(function_ok) + response = RULE.lambda_handler( + build_lambda_configurationchange_event(invoking_event, rule_parameters=self.rule_parameters), {}) + + assert_successful_evaluation(self, response, [build_expected_response('NOT_APPLICABLE', '123456789012', annotation='Layers can only be used with functions using Zip package type')]) + +#################### +# Helper Functions # +#################### + +def generate_invoking_event(test_configuration): + invoking_event = '{"configurationItem":{"configuration":' \ + + json.dumps(test_configuration) \ + + ',"configurationItemCaptureTime":"2019-04-18T08:17:52.315Z","configurationItemStatus":"ResourceDiscovered","resourceType":"AWS::Lambda::Function","resourceId":"123456789012"},"messageType":"ConfigurationItemChangeNotification"}' + return invoking_event + + +def build_lambda_configurationchange_event(invoking_event, rule_parameters=None): + event_to_return = { + 'configRuleName': 'myrule', + 'executionRoleArn': 'roleArn', + 'eventLeftScope': False, + 'invokingEvent': invoking_event, + 'accountId': '123456789012', + 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', + 'resultToken': 'token' + } + if rule_parameters: + event_to_return['ruleParameters'] = rule_parameters + return event_to_return + + +def build_expected_response(compliance_type, compliance_resource_id, compliance_resource_type=DEFAULT_RESOURCE_TYPE, + annotation=None): + if not annotation: + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type + } + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type, + 'Annotation': annotation + } + + +def assert_successful_evaluation(test_class, response, resp_expected, evaluations_count=1): + if isinstance(response, dict): + test_class.assertEquals(resp_expected['ComplianceResourceType'], response['ComplianceResourceType']) + test_class.assertEquals(resp_expected['ComplianceResourceId'], response['ComplianceResourceId']) + test_class.assertEquals(resp_expected['ComplianceType'], response['ComplianceType']) + test_class.assertTrue(response['OrderingTimestamp']) + if 'Annotation' in resp_expected or 'Annotation' in response: + test_class.assertEquals(resp_expected['Annotation'], response['Annotation']) + elif isinstance(response, list): + test_class.assertEquals(evaluations_count, len(response)) + for i, response_expected in enumerate(resp_expected): + test_class.assertEquals(response_expected['ComplianceResourceType'], response[i]['ComplianceResourceType']) + test_class.assertEquals(response_expected['ComplianceResourceId'], response[i]['ComplianceResourceId']) + test_class.assertEquals(response_expected['ComplianceType'], response[i]['ComplianceType']) + test_class.assertEquals(response_expected['ComplianceType'], response[i]['ComplianceType']) + test_class.assertTrue(response[i]['OrderingTimestamp']) + if 'Annotation' in response_expected or 'Annotation' in response[i]: + test_class.assertEquals(response_expected['Annotation'], response[i]['Annotation']) + + +def assert_customer_error_response(test_class, response, customer_error_code=None, customer_error_message=None): + if customer_error_code: + test_class.assertEqual(customer_error_code, response['customerErrorCode']) + if customer_error_message: + test_class.assertEqual(customer_error_message, response['customerErrorMessage']) + test_class.assertTrue(response['customerErrorCode']) + test_class.assertTrue(response['customerErrorMessage']) + if "internalErrorMessage" in response: + test_class.assertTrue(response['internalErrorMessage']) + if "internalErrorDetails" in response: + test_class.assertTrue(response['internalErrorDetails']) + diff --git a/python/LAMBDA_LAYER_CHECK/parameters.json b/python/LAMBDA_LAYER_CHECK/parameters.json new file mode 100644 index 00000000..02b02349 --- /dev/null +++ b/python/LAMBDA_LAYER_CHECK/parameters.json @@ -0,0 +1,12 @@ +{ + "Version": "1.0", + "Parameters": { + "RuleName": "LAMBDA_LAYER_CHECK", + "SourceRuntime": "python3.8", + "CodeKey": "LAMBDA_LAYER_CHECK.zip", + "InputParameters": "{\"LayerArn\":\"arn:aws:lambda:region:account:layer:layer_name\", \"MinLayerVersion\":\"1\"}", + "OptionalParameters": "{}", + "SourceEvents": "AWS::Lambda::Function" + }, + "Tags": "[]" + } \ No newline at end of file