Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 183 additions & 38 deletions aiml-security-assessment/functions/security/agentcore_assessments/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,70 @@ def check_timeout() -> bool:
return elapsed < 540 # 9 minutes hard stop


def _agentcore_list_all(
list_method_name: str, result_keys: List[str]
) -> List[Dict[str, Any]]:
"""Collect all items from an AgentCore list API, following nextToken."""
if agentcore_client is None:
return []

items: List[Dict[str, Any]] = []
next_token = None
list_method = getattr(agentcore_client, list_method_name)

while True:
kwargs = {}
if next_token:
kwargs["nextToken"] = next_token

response = list_method(**kwargs)

for result_key in result_keys:
page_items = response.get(result_key)
if isinstance(page_items, list):
items.extend(page_items)
break

next_token = response.get("nextToken")
if not next_token:
break

return items


def _unwrap_agentcore_detail(
response: Dict[str, Any], wrapper_key: str
) -> Dict[str, Any]:
"""Handle detail APIs that wrap the resource under a top-level key."""
if not isinstance(response, dict):
return {}

wrapped = response.get(wrapper_key)
if isinstance(wrapped, dict):
return wrapped

return response


def _get_agentcore_resource_policy(resource_arn: str) -> str:
"""Retrieve the generic AgentCore resource policy for a resource ARN."""
response = agentcore_client.get_resource_policy(resourceArn=resource_arn)
return response.get("policy") or response.get("resourcePolicy") or ""


def _is_access_denied_client_error(error: ClientError) -> bool:
"""Normalize access-denied checks across AgentCore control plane APIs."""
if not isinstance(error, ClientError):
return False

error_code = error.response.get("Error", {}).get("Code")
return error_code in {
"AccessDenied",
"AccessDeniedException",
"UnauthorizedOperation",
}


def generate_csv_report(findings: List[Dict[str, Any]]) -> str:
"""
Generate CSV report from findings.
Expand Down Expand Up @@ -237,8 +301,7 @@ def check_agentcore_vpc_configuration() -> List[Dict[str, Any]]:

# Check Runtimes
try:
runtimes_response = agentcore_client.list_agent_runtimes()
runtimes = runtimes_response.get("agentRuntimes", [])
runtimes = _agentcore_list_all("list_agent_runtimes", ["agentRuntimes"])

if not runtimes:
logger.info("No AgentCore Runtimes found")
Expand Down Expand Up @@ -955,8 +1018,7 @@ def check_agentcore_observability() -> List[Dict[str, Any]]:

# Check Runtimes for logging and tracing
try:
runtimes_response = agentcore_client.list_agent_runtimes()
runtimes = runtimes_response.get("agentRuntimes", [])
runtimes = _agentcore_list_all("list_agent_runtimes", ["agentRuntimes"])

if not runtimes:
logger.info("No AgentCore Runtimes found")
Expand Down Expand Up @@ -1237,8 +1299,7 @@ def check_browser_tool_recording() -> List[Dict[str, Any]]:

# Browser Tools are part of Runtime configuration
# Check if Runtimes have appropriate storage configured
runtimes_response = agentcore_client.list_agent_runtimes()
runtimes = runtimes_response.get("agentRuntimes", [])
runtimes = _agentcore_list_all("list_agent_runtimes", ["agentRuntimes"])

if not runtimes:
logger.info("No AgentCore Runtimes found")
Expand Down Expand Up @@ -1348,8 +1409,7 @@ def check_agentcore_memory_configuration() -> List[Dict[str, Any]]:
try:
logger.info("Checking AgentCore Memory configuration")

memories_response = agentcore_client.list_memories()
memories = memories_response.get("memories", [])
memories = _agentcore_list_all("list_memories", ["memories"])

if not memories:
logger.info("No Memory resources found")
Expand All @@ -1375,10 +1435,14 @@ def check_agentcore_memory_configuration() -> List[Dict[str, Any]]:
)

try:
memory_details = agentcore_client.get_memory(memoryId=memory_id)
memory_details = _unwrap_agentcore_detail(
agentcore_client.get_memory(memoryId=memory_id), "memory"
)

# Check encryption configuration
encryption_key_arn = memory_details.get("encryptionKeyArn")
encryption_key_arn = memory_details.get(
"encryptionKeyArn"
) or memory_details.get("kmsKeyArn")

if not encryption_key_arn:
findings.append(
Expand Down Expand Up @@ -1652,7 +1716,6 @@ def check_agentcore_resource_based_policies() -> List[Dict[str, Any]]:
Validates:
- Agent Runtime resource policies
- Gateway resource policies
- Memory resource policies

Returns:
List of findings
Expand All @@ -1678,24 +1741,26 @@ def check_agentcore_resource_based_policies() -> List[Dict[str, Any]]:

resources_without_rbp = []
resources_with_rbp = []
policy_access_denied = []
policy_check_errors = []

# Check Agent Runtimes
try:
runtimes_response = agentcore_client.list_agent_runtimes()
runtimes = runtimes_response.get("agentRuntimes", [])
runtimes = _agentcore_list_all("list_agent_runtimes", ["agentRuntimes"])

for runtime in runtimes:
runtime_id = runtime.get("agentRuntimeId", "unknown")
runtime_name = runtime.get("agentRuntimeName", runtime_id)
runtime_arn = runtime.get("agentRuntimeArn")

try:
# Try to get resource policy
policy_response = (
agentcore_client.get_agent_runtime_resource_policy(
agentRuntimeId=runtime_id
if not runtime_arn:
resources_without_rbp.append(
{"type": "Runtime", "name": runtime_name, "id": runtime_id}
)
)
policy = policy_response.get("resourcePolicy")
continue

policy = _get_agentcore_resource_policy(runtime_arn)

if policy:
resources_with_rbp.append(f"Runtime: {runtime_name}")
Expand All @@ -1709,33 +1774,48 @@ def check_agentcore_resource_based_policies() -> List[Dict[str, Any]]:
resources_without_rbp.append(
{"type": "Runtime", "name": runtime_name, "id": runtime_id}
)
elif _is_access_denied_client_error(e):
policy_access_denied.append(
{"type": "Runtime", "name": runtime_name, "id": runtime_id}
)
else:
policy_check_errors.append(
{
"type": "Runtime",
"name": runtime_name,
"id": runtime_id,
"error_code": e.response.get("Error", {}).get(
"Code", "Unknown"
),
}
)
logger.warning(
f"Error checking policy for runtime {runtime_id}: {e}"
)
except AttributeError:
# API method doesn't exist
logger.info("get_agent_runtime_resource_policy API not available")
break

except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
logger.warning(f"Error listing runtimes: {e}")

# Check Gateways
try:
gateways_response = agentcore_client.list_gateways()
gateways = gateways_response.get("gateways", [])
gateways = _agentcore_list_all("list_gateways", ["items", "gateways"])

for gateway in gateways:
gateway_id = gateway.get("gatewayId", "unknown")
gateway_name = gateway.get("name", gateway_id)

try:
policy_response = agentcore_client.get_gateway_resource_policy(
gatewayId=gateway_id
)
policy = policy_response.get("resourcePolicy")
gateway_details = agentcore_client.get_gateway(gatewayId=gateway_id)
gateway_arn = gateway_details.get("gatewayArn")

if not gateway_arn:
resources_without_rbp.append(
{"type": "Gateway", "name": gateway_name, "id": gateway_id}
)
continue

policy = _get_agentcore_resource_policy(gateway_arn)

if policy:
resources_with_rbp.append(f"Gateway: {gateway_name}")
Expand All @@ -1749,9 +1829,24 @@ def check_agentcore_resource_based_policies() -> List[Dict[str, Any]]:
resources_without_rbp.append(
{"type": "Gateway", "name": gateway_name, "id": gateway_id}
)
except AttributeError:
logger.info("get_gateway_resource_policy API not available")
break
elif _is_access_denied_client_error(e):
policy_access_denied.append(
{"type": "Gateway", "name": gateway_name, "id": gateway_id}
)
else:
policy_check_errors.append(
{
"type": "Gateway",
"name": gateway_name,
"id": gateway_id,
"error_code": e.response.get("Error", {}).get(
"Code", "Unknown"
),
}
)
logger.warning(
f"Error checking policy for gateway {gateway_id}: {e}"
)

except (ClientError, AttributeError) as e:
logger.info(f"Gateway APIs not available: {e}")
Expand Down Expand Up @@ -1780,6 +1875,57 @@ def check_agentcore_resource_based_policies() -> List[Dict[str, Any]]:
)
)

if policy_access_denied:
resource_list = ", ".join(
[f"{r['type']} '{r['name']}'" for r in policy_access_denied[:5]]
)
if len(policy_access_denied) > 5:
resource_list += f" and {len(policy_access_denied) - 5} more"

findings.append(
create_finding(
check_id="AC-10",
finding_name="AgentCore Resource-Based Policy Assessment Access Denied",
finding_details=(
f"Unable to assess resource-based policies for {resource_list} "
"because access to AgentCore resource policy metadata was denied."
),
resolution=(
"Ensure the assessment role can call "
"bedrock-agentcore:GetResourcePolicy for AgentCore resources."
),
reference="https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/security_iam_service-with-iam.html",
severity=SeverityEnum.INFORMATIONAL,
status=StatusEnum.NA,
)
)

if policy_check_errors:
resource_list = ", ".join(
[f"{r['type']} '{r['name']}'" for r in policy_check_errors[:5]]
)
if len(policy_check_errors) > 5:
resource_list += f" and {len(policy_check_errors) - 5} more"

error_codes = sorted({r["error_code"] for r in policy_check_errors})
findings.append(
create_finding(
check_id="AC-10",
finding_name="AgentCore Resource-Based Policy Assessment Incomplete",
finding_details=(
f"Unable to fully assess resource-based policies for {resource_list} "
f"due to AgentCore API errors: {', '.join(error_codes)}."
),
resolution=(
"Re-run the assessment. If the issue persists, review AgentCore "
"service health and the assessment role's control plane permissions."
),
reference="https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/security_iam_service-with-iam.html",
severity=SeverityEnum.LOW,
status=StatusEnum.NA,
)
)

if not findings:
if resources_with_rbp:
findings.append(
Expand Down Expand Up @@ -1854,8 +2000,9 @@ def check_agentcore_policy_engine_encryption() -> List[Dict[str, Any]]:

try:
# List policy engines
policy_engines_response = agentcore_client.list_policy_engines()
policy_engines = policy_engines_response.get("policyEngines", [])
policy_engines = _agentcore_list_all(
"list_policy_engines", ["policyEngines"]
)

if not policy_engines:
findings.append(
Expand Down Expand Up @@ -2000,8 +2147,7 @@ def check_agentcore_gateway_encryption() -> List[Dict[str, Any]]:
logger.info("Checking AgentCore Gateway encryption")

try:
gateways_response = agentcore_client.list_gateways()
gateways = gateways_response.get("gateways", [])
gateways = _agentcore_list_all("list_gateways", ["items", "gateways"])

if not gateways:
findings.append(
Expand Down Expand Up @@ -2148,8 +2294,7 @@ def check_agentcore_gateway_configuration() -> List[Dict[str, Any]]:

# Try to list gateways - this API may not exist yet
try:
gateways_response = agentcore_client.list_gateways()
gateways = gateways_response.get("gateways", [])
gateways = _agentcore_list_all("list_gateways", ["items", "gateways"])

if not gateways:
logger.info("No Gateway resources found")
Expand Down
Loading
Loading