diff --git a/gcp/integration_quickstart/dist/gcp_integration_quickstart.pyz b/gcp/integration_quickstart/dist/gcp_integration_quickstart.pyz index 2c042ad..468002a 100755 Binary files a/gcp/integration_quickstart/dist/gcp_integration_quickstart.pyz and b/gcp/integration_quickstart/dist/gcp_integration_quickstart.pyz differ diff --git a/gcp/integration_quickstart/src/gcp_integration_quickstart/integration_configuration.py b/gcp/integration_quickstart/src/gcp_integration_quickstart/integration_configuration.py index 07d9498..7698459 100644 --- a/gcp/integration_quickstart/src/gcp_integration_quickstart/integration_configuration.py +++ b/gcp/integration_quickstart/src/gcp_integration_quickstart/integration_configuration.py @@ -19,7 +19,7 @@ "cloudresourcemanager.googleapis.com", ] -ROLES_TO_ADD: list[str] = [ +REQUIRED_ROLES: list[str] = [ "roles/cloudasset.viewer", "roles/browser", "roles/compute.viewer", @@ -78,13 +78,13 @@ def create_integration_with_permissions( ] ) - required_roles = ROLES_TO_ADD.copy() + required_roles = REQUIRED_ROLES.copy() if product_requirements: required_roles.extend( [ role for role in product_requirements.required_roles - if role not in ROLES_TO_ADD + if role not in REQUIRED_ROLES ] ) diff --git a/gcp/integration_quickstart/src/gcp_integration_quickstart/main.py b/gcp/integration_quickstart/src/gcp_integration_quickstart/main.py index e6d8f54..a7a1620 100644 --- a/gcp/integration_quickstart/src/gcp_integration_quickstart/main.py +++ b/gcp/integration_quickstart/src/gcp_integration_quickstart/main.py @@ -46,7 +46,7 @@ def main(): print( f"Missing required environment variables: {', '.join(missing_environment_vars)}" ) - exit(1) + sys.exit(1) workflow_id = os.environ["WORKFLOW_ID"] @@ -58,7 +58,7 @@ def main(): print( f"Workflow ID {workflow_id} has already been used. Please start a new workflow." ) - exit(1) + sys.exit(1) workflow_reporter.handle_login_step() diff --git a/gcp/integration_quickstart/tests/test_integration_configuration.py b/gcp/integration_quickstart/tests/test_integration_configuration.py index 0923345..bacb8fc 100644 --- a/gcp/integration_quickstart/tests/test_integration_configuration.py +++ b/gcp/integration_quickstart/tests/test_integration_configuration.py @@ -8,7 +8,7 @@ from gcp_integration_quickstart.integration_configuration import ( REQUIRED_APIS, - ROLES_TO_ADD, + REQUIRED_ROLES, assign_delegate_permissions, create_integration_with_permissions, ) @@ -203,7 +203,7 @@ def test_create_integration_with_permissions_success( f"services enable {services_str} --project child-project123 --quiet" ) - for role in ROLES_TO_ADD: + for role in REQUIRED_ROLES: expected_commands.append( f"resource-manager folders add-iam-policy-binding folder123 --member serviceAccount:{self.service_account} --role {role} --condition None --quiet" ) @@ -212,7 +212,7 @@ def test_create_integration_with_permissions_success( f"services enable {services_str} --project project123 --quiet" ) - for role in ROLES_TO_ADD: + for role in REQUIRED_ROLES: expected_commands.append( f"projects add-iam-policy-binding project123 --member serviceAccount:{self.service_account} --role {role} --condition None --quiet" ) @@ -289,7 +289,7 @@ def test_create_integration_with_permissions_with_product_requirements( f"services enable {services_str} --project project123 --quiet" ) - all_roles = ROLES_TO_ADD + additional_required_roles + all_roles = REQUIRED_ROLES + additional_required_roles for role in all_roles: expected_commands.append( f"projects add-iam-policy-binding project123 --member serviceAccount:{self.service_account} --role {role} --condition None --quiet" diff --git a/gcp/issue_resolver/README.md b/gcp/issue_resolver/README.md new file mode 100644 index 0000000..2b6d350 --- /dev/null +++ b/gcp/issue_resolver/README.md @@ -0,0 +1,48 @@ +# Overview + +This project provides **Datadog's GCP issue resolver** functionality for customers. It supports: + +- Automated issue detection and resolution +- GCP resource configuration checks +- Integration health monitoring +- Diagnostic reporting + +The produced executable is intended to run in a [Google Cloud Shell](https://cloud.google.com/shell/docs/using-cloud-shell) environment. + +For development purposes, the script can also be run locally (assuming you have the [gcloud CLI](https://cloud.google.com/sdk/docs/install) set up). + +During final testing, upload the executable to Google Cloud Shell and run it there. + +--- + +# Development + +### Dev Setup + +See instructions in the main `gcp/` folder. + +### Testing + +Run all tests from the `issue_resolver` folder: + +```bash +python -m pytest tests/ --tb=short +``` + +### Build / Ship + +From the `gcp/` folder, run: + +```bash +bash issue_resolver/build.sh +``` + +# Execution + +To run issue resolver, you must first open the Quickstart onboarding UI, which can be found [here](https://app.datadoghq.com/integrations/google-cloud-platform/add) under "Issue Resolver". + +At the top of this page, you will see a "setup script" snippet. Copy that snippet into Google Cloud Shell and run it. + +Once this command is run, the setup script will connect to Datadog and begin reporting back to the onboarding UI, where you will continue the issue resolution process. + + diff --git a/gcp/issue_resolver/build.sh b/gcp/issue_resolver/build.sh new file mode 100755 index 0000000..f71e113 --- /dev/null +++ b/gcp/issue_resolver/build.sh @@ -0,0 +1,16 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +rm -rf issue_resolver/dist/tmp +mkdir -p issue_resolver/dist/tmp +cp -r shared/src/. issue_resolver/dist/tmp +cp -r issue_resolver/src/. issue_resolver/dist/tmp +python -m zipapp issue_resolver/dist/tmp \ + -o issue_resolver/dist/gcp_issue_resolver.pyz \ + -p "/usr/bin/env python3" \ + -m "gcp_issue_resolver.main:main" +chmod +x issue_resolver/dist/gcp_issue_resolver.pyz +rm -r issue_resolver/dist/tmp + + diff --git a/gcp/issue_resolver/pytest.ini b/gcp/issue_resolver/pytest.ini new file mode 100644 index 0000000..9c193e2 --- /dev/null +++ b/gcp/issue_resolver/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +pythonpath=src:../shared/src + + diff --git a/gcp/issue_resolver/src/gcp_issue_resolver/__init__.py b/gcp/issue_resolver/src/gcp_issue_resolver/__init__.py new file mode 100644 index 0000000..9e3575c --- /dev/null +++ b/gcp/issue_resolver/src/gcp_issue_resolver/__init__.py @@ -0,0 +1,5 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + + diff --git a/gcp/issue_resolver/src/gcp_issue_resolver/issue_resolution.py b/gcp/issue_resolver/src/gcp_issue_resolver/issue_resolution.py new file mode 100644 index 0000000..a084ad5 --- /dev/null +++ b/gcp/issue_resolver/src/gcp_issue_resolver/issue_resolution.py @@ -0,0 +1,109 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +from typing import Any + +from gcp_shared.gcloud import gcloud +from gcp_shared.models import ConfigurationScope +from gcp_shared.reporter import StepStatusReporter +from gcp_shared.requests import dd_request + +from .models import IssueResolverConfiguration + + +def diagnose_issues( + step_reporter: StepStatusReporter, + service_account_email: str, + issue_resolver_configuration: IssueResolverConfiguration, + configuration_scope: ConfigurationScope, +) -> list[dict[str, Any]]: + """Diagnose issues in the GCP integration.""" + + step_reporter.report( + message=( + f"Diagnosing issues for service account '{service_account_email}'..." + ) + ) + + issues: list[dict[str, Any]] = [] + + + # Check permissions for folders + for folder in configuration_scope.folders: + step_reporter.report( + message=f"Checking permissions for folder '{folder.name}'" + ) + # TODO: Add actual issue detection logic here + # This is a placeholder for the actual implementation + pass + + # Check permissions for projects + for project in configuration_scope.projects: + step_reporter.report( + message=f"Checking permissions for project '{project.name}'" + ) + # TODO: Add actual issue detection logic here + # This is a placeholder for the actual implementation + pass + + step_reporter.report( + message=f"Found {len(issues)} issue(s)", + metadata={"issues": issues} + ) + + return issues + + +def resolve_issues( + step_reporter: StepStatusReporter, + service_account_email: str, + issues: list[dict[str, Any]], + issue_resolver_configuration: IssueResolverConfiguration, +) -> None: + """Resolve identified issues in the GCP integration.""" + + if not issues: + step_reporter.report(message="No issues to resolve") + return + + step_reporter.report( + message=f"Resolving {len(issues)} issue(s)..." + ) + + resolved_count = 0 + failed_count = 0 + + for issue in issues: + issue_type = issue.get("type", "unknown") + issue_id = issue.get("id", "unknown") + + try: + if issue_resolver_configuration.dry_run: + step_reporter.report( + message=f"[DRY RUN] Would resolve issue: {issue_type} ({issue_id})" + ) + else: + step_reporter.report( + message=f"Resolving issue: {issue_type} ({issue_id})" + ) + # TODO: Add actual issue resolution logic here + # This is a placeholder for the actual implementation + pass + + resolved_count += 1 + + except Exception as e: + step_reporter.report( + message=f"Failed to resolve issue {issue_id}: {str(e)}" + ) + failed_count += 1 + + step_reporter.report( + message=f"Resolution complete: {resolved_count} resolved, {failed_count} failed", + metadata={ + "resolved_count": resolved_count, + "failed_count": failed_count + } + ) + diff --git a/gcp/issue_resolver/src/gcp_issue_resolver/main.py b/gcp/issue_resolver/src/gcp_issue_resolver/main.py new file mode 100644 index 0000000..ba9d4a6 --- /dev/null +++ b/gcp/issue_resolver/src/gcp_issue_resolver/main.py @@ -0,0 +1,103 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +import os +import signal +import sys + +from gcp_shared.models import ( + ConfigurationScope, + Project, + from_dict_recursive, +) +from gcp_shared.gcloud import gcloud +from gcp_shared.service_accounts import find_service_account + +from .issue_resolution import ( + diagnose_issues, + resolve_issues, +) +from .models import IssueResolverConfiguration + +REQUIRED_ENVIRONMENT_VARS: set[str] = { + "DD_API_KEY", + "DD_APP_KEY", + "DD_SITE", + "SERVICE_ACCOUNT_ID", + "DEFAULT_PROJECT_ID", +} + + +def main(): + signal.signal(signal.SIGINT, sigint_handler) + missing_environment_vars = REQUIRED_ENVIRONMENT_VARS - os.environ.keys() + if missing_environment_vars: + print( + f"Missing required environment variables: {', '.join(missing_environment_vars)}" + ) + sys.exit(1) + + if not gcloud.is_logged_in(): + print("You must be logged in to GCloud CLI to run this script.") + sys.exit(1) + + service_account_email = find_service_account(os.environ["SERVICE_ACCOUNT_ID"], os.environ["DEFAULT_PROJECT_ID"]) + if not service_account_email: + print(f"Service account '{os.environ['SERVICE_ACCOUNT_ID']}' not found in project '{os.environ['DEFAULT_PROJECT_ID']}'") + sys.exit(1) + + # workflow_reporter.handle_login_step() + + # with workflow_reporter.report_step(OnboardingStep.SCOPES) as step_reporter: + # if not workflow_reporter.is_scopes_step_already_completed(): + # collect_configuration_scopes(step_reporter) + + # with workflow_reporter.report_step(OnboardingStep.SELECTIONS): + # user_selections = workflow_reporter.receive_user_selections() + # with workflow_reporter.report_step( + # OnboardingStep.CREATE_SERVICE_ACCOUNT + # ) as step_reporter: + # service_account_email = find_or_create_service_account( + # step_reporter, + # user_selections["service_account_id"], + # user_selections["default_project_id"], + # ) + # with workflow_reporter.report_step( + # OnboardingStep.DIAGNOSE_ISSUES + # ) as step_reporter: + # issues = diagnose_issues( + # step_reporter, + # service_account_email, + # IssueResolverConfiguration(**user_selections["issue_resolver_configuration"]), + # ConfigurationScope( + # projects=[ + # Project(**project) + # for project in user_selections.get("projects", []) + # ], + # folders=[ + # from_dict_recursive(folder) + # for folder in user_selections.get("folders", []) + # ], + # ), + # ) + # with workflow_reporter.report_step( + # OnboardingStep.RESOLVE_ISSUES + # ) as step_reporter: + # resolve_issues( + # step_reporter, + # service_account_email, + # issues, + # IssueResolverConfiguration(**user_selections["issue_resolver_configuration"]), + # ) + + print("Script succeeded. You may exit this shell.") + + +def sigint_handler(_, __): + print("Script terminating.") + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/gcp/issue_resolver/src/gcp_issue_resolver/models.py b/gcp/issue_resolver/src/gcp_issue_resolver/models.py new file mode 100644 index 0000000..0d496f2 --- /dev/null +++ b/gcp/issue_resolver/src/gcp_issue_resolver/models.py @@ -0,0 +1,18 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +from dataclasses import dataclass +from typing import Any + + +@dataclass +class IssueResolverConfiguration: + """Holds configuration details for the GCP issue resolver.""" + + issue_types: list[str] + auto_fix_enabled: bool + dry_run: bool + notification_preferences: dict[str, Any] + + diff --git a/gcp/issue_resolver/tests/__init__.py b/gcp/issue_resolver/tests/__init__.py new file mode 100644 index 0000000..9e3575c --- /dev/null +++ b/gcp/issue_resolver/tests/__init__.py @@ -0,0 +1,5 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + + diff --git a/gcp/issue_resolver/tests/test_issue_resolution.py b/gcp/issue_resolver/tests/test_issue_resolution.py new file mode 100644 index 0000000..e23d485 --- /dev/null +++ b/gcp/issue_resolver/tests/test_issue_resolution.py @@ -0,0 +1,70 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +from unittest.mock import MagicMock + +from gcp_issue_resolver.issue_resolution import diagnose_issues, resolve_issues +from gcp_issue_resolver.models import IssueResolverConfiguration +from gcp_shared.models import ConfigurationScope, Project + + +class TestIssueResolution: + def test_diagnose_issues_no_issues(self): + """Test diagnose_issues when no issues are found.""" + step_reporter = MagicMock() + service_account_email = "test@example.com" + config = IssueResolverConfiguration( + issue_types=["permissions"], + auto_fix_enabled=True, + dry_run=False, + notification_preferences={} + ) + scope = ConfigurationScope( + projects=[Project(id="test-project", name="Test Project")], + folders=[] + ) + + issues = diagnose_issues(step_reporter, service_account_email, config, scope) + + assert isinstance(issues, list) + assert len(issues) == 0 + + def test_resolve_issues_no_issues(self): + """Test resolve_issues when there are no issues.""" + step_reporter = MagicMock() + service_account_email = "test@example.com" + config = IssueResolverConfiguration( + issue_types=["permissions"], + auto_fix_enabled=True, + dry_run=False, + notification_preferences={} + ) + issues = [] + + resolve_issues(step_reporter, service_account_email, issues, config) + + # Verify that the "No issues to resolve" message was reported + step_reporter.report.assert_called() + + def test_resolve_issues_dry_run(self): + """Test resolve_issues in dry run mode.""" + step_reporter = MagicMock() + service_account_email = "test@example.com" + config = IssueResolverConfiguration( + issue_types=["permissions"], + auto_fix_enabled=True, + dry_run=True, + notification_preferences={} + ) + issues = [ + {"type": "permission_missing", "id": "issue-1"}, + {"type": "api_disabled", "id": "issue-2"} + ] + + resolve_issues(step_reporter, service_account_email, issues, config) + + # Verify that dry run messages were reported + assert step_reporter.report.call_count > 0 + + diff --git a/gcp/log_forwarding_quickstart/dist/gcp_log_forwarding_quickstart.pyz b/gcp/log_forwarding_quickstart/dist/gcp_log_forwarding_quickstart.pyz index 65a1daf..ba26db2 100755 Binary files a/gcp/log_forwarding_quickstart/dist/gcp_log_forwarding_quickstart.pyz and b/gcp/log_forwarding_quickstart/dist/gcp_log_forwarding_quickstart.pyz differ diff --git a/gcp/log_forwarding_quickstart/src/gcp_log_forwarding_quickstart/main.py b/gcp/log_forwarding_quickstart/src/gcp_log_forwarding_quickstart/main.py index 120b49b..932f956 100644 --- a/gcp/log_forwarding_quickstart/src/gcp_log_forwarding_quickstart/main.py +++ b/gcp/log_forwarding_quickstart/src/gcp_log_forwarding_quickstart/main.py @@ -54,7 +54,7 @@ def main(): print( f"Missing required environment variables: {', '.join(missing_environment_vars)}" ) - exit(1) + sys.exit(1) workflow_id = os.environ["WORKFLOW_ID"] @@ -64,7 +64,7 @@ def main(): print( f"Workflow ID {workflow_id} has already been used. Please start a new workflow." ) - exit(1) + sys.exit(1) workflow_reporter.handle_login_step() diff --git a/gcp/shared/src/gcp_shared/gcloud.py b/gcp/shared/src/gcp_shared/gcloud.py index dde3160..2406435 100644 --- a/gcp/shared/src/gcp_shared/gcloud.py +++ b/gcp/shared/src/gcp_shared/gcloud.py @@ -3,6 +3,7 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. import json +import sys import shlex import subprocess from typing import Any, Union @@ -13,7 +14,7 @@ class GcloudCmd: def __init__(self, service: str, action: str): """Initialize with service and action (e.g., 'pubsub topics', 'create').""" - self.cmd = service.split() + action.split() + self.cmd: list[str] = service.split() + action.split() def __str__(self) -> str: """Overload string representation to return the full command string with proper shell quoting.""" @@ -40,6 +41,22 @@ def flag(self, flag: str) -> "GcloudCmd": return self +def is_logged_in() -> bool: + """Check if the user is logged in to GCloud CLI.""" + try: + return gcloud("auth print-access-token") is not None + except Exception as e: + if "gcloud: command not found" in str(e): + print( + "You must install the GCloud CLI and log in to run this script.\n" + "https://cloud.google.com/sdk/docs/install" + ) + else: + print("You must be logged in to GCloud CLI to run this script.") + sys.exit(1) + return False + + def gcloud(cmd: Union[str, GcloudCmd], *keys: str) -> Any: """Run gcloud CLI command and produce its output. Raise an exception if it fails. diff --git a/gcp/shared/src/gcp_shared/reporter.py b/gcp/shared/src/gcp_shared/reporter.py index f97ce6d..2b8752b 100644 --- a/gcp/shared/src/gcp_shared/reporter.py +++ b/gcp/shared/src/gcp_shared/reporter.py @@ -8,7 +8,7 @@ from enum import Enum from typing import Any, Generator, Optional -from gcp_shared.gcloud import GcloudCmd, gcloud +from gcp_shared.gcloud import GcloudCmd, is_logged_in from gcp_shared.requests import dd_request @@ -150,24 +150,12 @@ def handle_login_step(self) -> None: """ Ensure that the user is logged into the GCloud Shell. """ - - try: - with self.report_step("login"): - if not gcloud(GcloudCmd("auth", "print-access-token")): - raise RuntimeError("not logged in to GCloud Shell") - except Exception as e: - if "gcloud: command not found" in str(e): - print( - "You must install the GCloud CLI and log in to run this script.\n" - "https://cloud.google.com/sdk/docs/install" - ) - else: - print("You must be logged in to GCloud CLI to run this script.") - exit(1) - else: - print( - "Connected! Leave this shell running and go back to the Datadog UI to continue." - ) + with self.report_step("login"): + if not is_logged_in(): + raise RuntimeError("not logged in to GCloud Shell") + print( + "Connected! Leave this shell running and go back to the Datadog UI to continue." + ) def is_scopes_step_already_completed(self) -> bool: """ diff --git a/gcp/shared/src/gcp_shared/service_accounts.py b/gcp/shared/src/gcp_shared/service_accounts.py index c087593..f0bff24 100644 --- a/gcp/shared/src/gcp_shared/service_accounts.py +++ b/gcp/shared/src/gcp_shared/service_accounts.py @@ -6,17 +6,11 @@ from gcp_shared.reporter import StepStatusReporter -def find_or_create_service_account( - step_reporter: StepStatusReporter, +def find_service_account( name: str, project_id: str, - display_name: str = "Datadog Service Account", ) -> str: - """Create a service account with the given name in the specified project.""" - step_reporter.report( - message=f"Looking for service account '{name}' in project '{project_id}'..." - ) - + """Find a service account with the given name in the specified project.""" service_account_search = gcloud( GcloudCmd("iam service-accounts", "list") .param("--project", project_id) @@ -24,7 +18,22 @@ def find_or_create_service_account( "email", ) if service_account_search and len(service_account_search) > 0: - email = service_account_search[0]["email"] + return service_account_search[0]["email"] + return None + + +def find_or_create_service_account( + step_reporter: StepStatusReporter, + name: str, + project_id: str, + display_name: str = "Datadog Service Account", +) -> str: + """Create a service account with the given name in the specified project.""" + step_reporter.report( + message=f"Looking for service account '{name}' in project '{project_id}'..." + ) + email = find_service_account(name, project_id) + if email: step_reporter.report(message=f"Found existing service account '{email}'") return email diff --git a/gcp/shared/tests/test_gcloud.py b/gcp/shared/tests/test_gcloud.py index 56f8e2c..35343d1 100644 --- a/gcp/shared/tests/test_gcloud.py +++ b/gcp/shared/tests/test_gcloud.py @@ -6,7 +6,7 @@ import unittest from unittest.mock import Mock, patch -from gcp_shared.gcloud import gcloud +from gcp_shared.gcloud import gcloud, is_logged_in class TestGCloudFunction(unittest.TestCase): @@ -63,5 +63,76 @@ def test_gcloud_failure(self, mock_run): self.assertIn("could not execute gcloud command", str(context.exception)) +class TestIsLoggedIn(unittest.TestCase): + """Test the is_logged_in function.""" + + @patch("gcp_shared.gcloud.gcloud") + def test_is_logged_in_success(self, mock_gcloud): + """Test is_logged_in when user is logged in.""" + mock_gcloud.return_value = "ya29.a0AfH6SMBx..." + + result = is_logged_in() + + self.assertTrue(result) + mock_gcloud.assert_called_once_with("auth print-access-token") + + @patch("gcp_shared.gcloud.gcloud") + def test_is_logged_in_not_logged_in(self, mock_gcloud): + """Test is_logged_in when user is not logged in.""" + mock_gcloud.side_effect = Exception( + "You do not currently have an active account selected." + ) + + with self.assertRaises(SystemExit) as context: + is_logged_in() + + self.assertEqual(context.exception.code, 1) + + @patch("gcp_shared.gcloud.gcloud") + @patch("builtins.print") + def test_is_logged_in_gcloud_not_found( + self, mock_print, mock_gcloud + ): + """Test is_logged_in when gcloud command is not found.""" + mock_gcloud.side_effect = Exception("gcloud: command not found") + + with self.assertRaises(SystemExit) as context: + is_logged_in() + + self.assertEqual(context.exception.code, 1) + mock_print.assert_called_once() + self.assertIn( + "You must install the GCloud CLI", + mock_print.call_args[0][0] + ) + + @patch("gcp_shared.gcloud.gcloud") + @patch("builtins.print") + def test_is_logged_in_other_exception( + self, mock_print, mock_gcloud + ): + """Test is_logged_in with other exceptions.""" + mock_gcloud.side_effect = Exception("Some other error") + + with self.assertRaises(SystemExit) as context: + is_logged_in() + + self.assertEqual(context.exception.code, 1) + mock_print.assert_called_once() + self.assertIn( + "You must be logged in to GCloud CLI", + mock_print.call_args[0][0] + ) + + @patch("gcp_shared.gcloud.gcloud") + def test_is_logged_in_returns_none(self, mock_gcloud): + """Test is_logged_in when gcloud returns None.""" + mock_gcloud.return_value = None + + result = is_logged_in() + + self.assertFalse(result) + + if __name__ == "__main__": unittest.main() diff --git a/gcp/shared/tests/test_reporter.py b/gcp/shared/tests/test_reporter.py index 599c992..2f23d2f 100644 --- a/gcp/shared/tests/test_reporter.py +++ b/gcp/shared/tests/test_reporter.py @@ -201,39 +201,45 @@ def test_is_valid_workflow_id_api_error(self, mock_dd_request): ) @patch("gcp_shared.reporter.dd_request") - @patch("gcp_shared.reporter.gcloud") - def test_handle_login_step_success(self, mock_gcloud, mock_dd_request): + @patch("gcp_shared.reporter.is_logged_in") + def test_handle_login_step_success(self, mock_is_logged_in, mock_dd_request): """Test handle_login_step when user is logged in.""" - mock_gcloud.return_value = [{"token": "dummy-token"}] + mock_is_logged_in.return_value = True mock_dd_request.return_value = ('{"status": "ok"}', 201) self.workflow_reporter.handle_login_step() - actual_commands = [str(call[0][0]) for call in mock_gcloud.call_args_list] - self.assertEqual(len(actual_commands), 1) - self.assertEqual(actual_commands[0], "auth print-access-token") + mock_is_logged_in.assert_called_once() @patch("gcp_shared.reporter.dd_request") - @patch("gcp_shared.reporter.gcloud") - def test_handle_login_step_failure(self, mock_gcloud, mock_dd_request): + @patch("gcp_shared.reporter.is_logged_in") + def test_handle_login_step_failure( + self, mock_is_logged_in, mock_dd_request + ): """Test handle_login_step when user is not logged in.""" - mock_gcloud.return_value = [] + mock_is_logged_in.return_value = False mock_dd_request.return_value = ('{"status": "ok"}', 201) - with self.assertRaises(SystemExit) as context: + with self.assertRaises(RuntimeError) as context: self.workflow_reporter.handle_login_step() - self.assertEqual(context.exception.code, 1) + self.assertEqual( + str(context.exception), "not logged in to GCloud Shell" + ) - @patch("gcp_shared.reporter.gcloud") - def test_handle_login_step_gcloud_not_found(self, mock_gcloud): + @patch("gcp_shared.reporter.dd_request") + @patch("gcp_shared.reporter.is_logged_in") + def test_handle_login_step_gcloud_not_found( + self, mock_is_logged_in, mock_dd_request + ): """Test handle_login_step when gcloud command is not found.""" - mock_gcloud.side_effect = Exception("gcloud: command not found") + mock_is_logged_in.side_effect = Exception("gcloud: command not found") + mock_dd_request.return_value = ('{"status": "ok"}', 201) - with self.assertRaises(SystemExit) as context: + with self.assertRaises(Exception) as context: self.workflow_reporter.handle_login_step() - self.assertEqual(context.exception.code, 1) + self.assertEqual(str(context.exception), "gcloud: command not found") @patch("gcp_shared.reporter.dd_request") def test_is_scopes_step_already_completed_success(self, mock_dd_request): diff --git a/gcp/shared/tests/test_scopes.py b/gcp/shared/tests/test_scopes.py index 014e3f1..3b54d13 100644 --- a/gcp/shared/tests/test_scopes.py +++ b/gcp/shared/tests/test_scopes.py @@ -228,8 +228,10 @@ def gcloud_side_effect(cmd, *_): actual_commands = [str(call[0][0]) for call in mock_gcloud.call_args_list] self.assertEqual(len(actual_commands), 2) - self.assertIn("projects list", actual_commands[0]) - self.assertIn("--filter", actual_commands[0]) + self.assertEqual( + actual_commands[0], + "projects list '--filter=lifecycleState=ACTIVE AND NOT projectId:sys*'", + ) self.assertEqual(actual_commands[1], "auth print-access-token") mock_fetch_folders.assert_called_once_with("test-token") @@ -296,8 +298,10 @@ def gcloud_side_effect(cmd, *_): actual_commands = [str(call[0][0]) for call in mock_gcloud.call_args_list] self.assertEqual(len(actual_commands), 2) - self.assertIn("projects list", actual_commands[0]) - self.assertIn("--filter", actual_commands[0]) + self.assertEqual( + actual_commands[0], + "projects list '--filter=lifecycleState=ACTIVE AND NOT projectId:sys*'", + ) self.assertEqual(actual_commands[1], "auth print-access-token") mock_fetch_folders.assert_called_once_with("test-token") diff --git a/gcp/shared/tests/test_service_accounts.py b/gcp/shared/tests/test_service_accounts.py new file mode 100644 index 0000000..7df493e --- /dev/null +++ b/gcp/shared/tests/test_service_accounts.py @@ -0,0 +1,210 @@ +# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2 License. + +# This product includes software developed at Datadog (https://www.datadoghq.com/) Copyright 2025 Datadog, Inc. + +import unittest +from unittest.mock import MagicMock, patch + +from gcp_shared.service_accounts import ( + find_or_create_service_account, + find_service_account, +) + + +class TestFindServiceAccount(unittest.TestCase): + """Test the find_service_account function.""" + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_service_account_found(self, mock_gcloud): + """Test finding an existing service account.""" + mock_gcloud.return_value = [ + {"email": "test-account@project-id.iam.gserviceaccount.com"} + ] + + result = find_service_account("test-account", "project-id") + + self.assertEqual( + result, "test-account@project-id.iam.gserviceaccount.com" + ) + mock_gcloud.assert_called_once() + # Check the GcloudCmd object by converting to string + call_args = mock_gcloud.call_args[0] + cmd_str = str(call_args[0]) + self.assertIn("iam", cmd_str) + self.assertIn("service-accounts", cmd_str) + self.assertIn("list", cmd_str) + self.assertIn("--project", cmd_str) + self.assertIn("project-id", cmd_str) + self.assertIn("--filter", cmd_str) + self.assertIn("test-account", cmd_str) + self.assertEqual(call_args[1], "email") + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_service_account_not_found(self, mock_gcloud): + """Test when service account is not found.""" + mock_gcloud.return_value = [] + + result = find_service_account("nonexistent", "project-id") + + self.assertIsNone(result) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_service_account_returns_none(self, mock_gcloud): + """Test when gcloud returns None.""" + mock_gcloud.return_value = None + + result = find_service_account("test-account", "project-id") + + self.assertIsNone(result) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_service_account_multiple_results(self, mock_gcloud): + """Test when multiple service accounts match (returns first).""" + mock_gcloud.return_value = [ + {"email": "test-account-1@project.iam.gserviceaccount.com"}, + {"email": "test-account-2@project.iam.gserviceaccount.com"} + ] + + result = find_service_account("test-account", "project-id") + + self.assertEqual( + result, "test-account-1@project.iam.gserviceaccount.com" + ) + + +class TestFindOrCreateServiceAccount(unittest.TestCase): + """Test the find_or_create_service_account function.""" + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_or_create_finds_existing(self, mock_gcloud): + """Test when service account already exists.""" + step_reporter = MagicMock() + mock_gcloud.return_value = [ + {"email": "existing@project-id.iam.gserviceaccount.com"} + ] + + result = find_or_create_service_account( + step_reporter, "existing", "project-id" + ) + + self.assertEqual( + result, "existing@project-id.iam.gserviceaccount.com" + ) + # Should be called once for the search + self.assertEqual(mock_gcloud.call_count, 1) + # Should report looking for and finding the account + self.assertEqual(step_reporter.report.call_count, 2) + self.assertIn( + "Looking for", step_reporter.report.call_args_list[0][1]["message"] + ) + self.assertIn( + "Found existing", + step_reporter.report.call_args_list[1][1]["message"] + ) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_or_create_creates_new(self, mock_gcloud): + """Test creating a new service account.""" + step_reporter = MagicMock() + # First call (search) returns empty, second call (create) returns new + mock_gcloud.side_effect = [ + [], # find returns nothing + {"email": "new-account@project-id.iam.gserviceaccount.com"} + ] + + result = find_or_create_service_account( + step_reporter, "new-account", "project-id" + ) + + self.assertEqual( + result, "new-account@project-id.iam.gserviceaccount.com" + ) + # Should be called twice: once for search, once for create + self.assertEqual(mock_gcloud.call_count, 2) + # Should report looking, and creating + self.assertEqual(step_reporter.report.call_count, 2) + self.assertIn( + "Looking for", step_reporter.report.call_args_list[0][1]["message"] + ) + self.assertIn( + "Creating new", + step_reporter.report.call_args_list[1][1]["message"] + ) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_or_create_custom_display_name(self, mock_gcloud): + """Test creating with custom display name.""" + step_reporter = MagicMock() + mock_gcloud.side_effect = [ + [], # find returns nothing + {"email": "custom@project-id.iam.gserviceaccount.com"} + ] + + result = find_or_create_service_account( + step_reporter, + "custom", + "project-id", + "Custom Display Name" + ) + + self.assertEqual( + result, "custom@project-id.iam.gserviceaccount.com" + ) + # Check that the create call includes the custom display name + create_call = mock_gcloud.call_args_list[1] + cmd_str = str(create_call[0][0]) + self.assertIn("Custom Display Name", cmd_str) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_or_create_default_display_name(self, mock_gcloud): + """Test creating with default display name.""" + step_reporter = MagicMock() + mock_gcloud.side_effect = [ + [], # find returns nothing + {"email": "default@project-id.iam.gserviceaccount.com"} + ] + + result = find_or_create_service_account( + step_reporter, "default", "project-id" + ) + + self.assertEqual( + result, "default@project-id.iam.gserviceaccount.com" + ) + # Check that the create call includes the default display name + create_call = mock_gcloud.call_args_list[1] + cmd_str = str(create_call[0][0]) + self.assertIn("Datadog Service Account", cmd_str) + + @patch("gcp_shared.service_accounts.gcloud") + def test_find_or_create_reports_correctly(self, mock_gcloud): + """Test that status reports are sent correctly.""" + step_reporter = MagicMock() + mock_gcloud.side_effect = [ + None, # find returns None + {"email": "new@project-id.iam.gserviceaccount.com"} + ] + + find_or_create_service_account( + step_reporter, "new", "my-project" + ) + + # Verify the correct messages were reported + calls = step_reporter.report.call_args_list + self.assertEqual(len(calls), 2) + + # First message should be about looking for the account + first_message = calls[0][1]["message"] + self.assertIn("Looking for", first_message) + self.assertIn("new", first_message) + self.assertIn("my-project", first_message) + + # Second message should be about creating the account + second_message = calls[1][1]["message"] + self.assertIn("Creating new", second_message) + self.assertIn("new", second_message) + self.assertIn("my-project", second_message) + + +if __name__ == "__main__": + unittest.main()