diff --git a/framework/test_cases/csm_observability_mixin.py b/framework/test_cases/csm_observability_mixin.py new file mode 100644 index 00000000..853cd22a --- /dev/null +++ b/framework/test_cases/csm_observability_mixin.py @@ -0,0 +1,81 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +A mixin class for CSM Observability tests. + +This mixin is shared between test environments that exports CSM +Observability metrics. +""" +from framework.test_cases import testcase_mixins + +class CsmObservabilityMixin(testcase_mixins.CsmObservabilityTestCaseMixin): + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + namespace: str, + remote_namespace: str, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + # Based on default retry settings for list_time_series method: + # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 + # Modified: predicate extended to retry on a wider range of error types. + retry_settings = gapi_retries.Retry( + initial=0.1, + maximum=30.0, + multiplier=1.3, + predicate=gapi_retries.if_exception_type( + # Retry on 5xx, not just 503 ServiceUnavailable. This also + # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. + # 501 MethodNotImplemented not excluded because most likely + # reason we'd see this error is server misconfiguration, so we + # want to give it a chance to recovering this situation too. + gapi_errors.ServerError, + # Retry on 429/ResourceExhausted: recoverable rate limiting. + gapi_errors.TooManyRequests, + ), + deadline=90.0, + ) + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric, namespace, remote_namespace), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + retry=retry_settings, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results diff --git a/framework/test_cases/testcase_mixins.py b/framework/test_cases/testcase_mixins.py index 16102ba9..4df90a43 100644 --- a/framework/test_cases/testcase_mixins.py +++ b/framework/test_cases/testcase_mixins.py @@ -20,6 +20,10 @@ from typing import Protocol +class CsmObservabilityTestCaseMixin(Protocol): + pass + + class XdsKubernetesBaseTestCaseMixin(Protocol): pass diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index ea98003a..cfe0f195 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -31,6 +31,7 @@ from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner from framework.test_app.runners.k8s import k8s_xds_server_runner +from framework.test_cases import csm_observability_mixin logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -169,7 +170,10 @@ def close(self): self.log_stream.close() -class AppNetCsmObservabilityTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): +class AppNetCsmObservabilityTest( + xds_k8s_testcase.AppNetXdsKubernetesTestCase, + csm_observability_mixin.CsmObservabilityMixin +): metric_client: monitoring_v3.MetricServiceClient @staticmethod @@ -514,65 +518,6 @@ def filter_label_matcher_based_on_lang( label_matcher.pop("otel_scope_version", None) label_matcher.pop("otel_scope_name", None) - def query_metrics( - self, - metric_names: Iterable[str], - build_query_fn: BuildQueryFn, - namespace: str, - remote_namespace: str, - interval: monitoring_v3.TimeInterval, - ) -> dict[str, MetricTimeSeries]: - """ - A helper function to make the cloud monitoring API call to query - metrics created by this test run. - """ - # Based on default retry settings for list_time_series method: - # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 - # Modified: predicate extended to retry on a wider range of error types. - retry_settings = gapi_retries.Retry( - initial=0.1, - maximum=30.0, - multiplier=1.3, - predicate=gapi_retries.if_exception_type( - # Retry on 5xx, not just 503 ServiceUnavailable. This also - # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. - # 501 MethodNotImplemented not excluded because most likely - # reason we'd see this error is server misconfiguration, so we - # want to give it a chance to recovering this situation too. - gapi_errors.ServerError, - # Retry on 429/ResourceExhausted: recoverable rate limiting. - gapi_errors.TooManyRequests, - ), - deadline=90.0, - ) - results = {} - for metric in metric_names: - logger.info("Requesting list_time_series for metric %s", metric) - response = self.metric_client.list_time_series( - name=f"projects/{self.project}", - filter=build_query_fn(metric, namespace, remote_namespace), - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - retry=retry_settings, - ) - time_series = list(response) - - self.assertLen( - time_series, - 1, - msg=f"Query for {metric} should return exactly 1 time series." - f" Found {len(time_series)}.", - ) - - metric_time_series = MetricTimeSeries.from_response( - metric, time_series[0] - ) - logger.info( - "Metric %s:\n%s", metric, metric_time_series.pretty_print() - ) - results[metric] = metric_time_series - return results - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point], diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index cad08ec5..edfb365e 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -32,6 +32,7 @@ from framework.test_app.runners.k8s import gamma_server_runner from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_cases import csm_observability_mixin logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -169,7 +170,10 @@ def close(self): self.log_stream.close() -class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): +class CsmObservabilityTest( + xds_gamma_testcase.GammaXdsKubernetesTestCase, + csm_observability_mixin.CsmObservabilityMixin +): metric_client: monitoring_v3.MetricServiceClient @staticmethod @@ -493,65 +497,6 @@ def filter_label_matcher_based_on_lang( label_matcher.pop("otel_scope_version", None) label_matcher.pop("otel_scope_name", None) - def query_metrics( - self, - metric_names: Iterable[str], - build_query_fn: BuildQueryFn, - namespace: str, - remote_namespace: str, - interval: monitoring_v3.TimeInterval, - ) -> dict[str, MetricTimeSeries]: - """ - A helper function to make the cloud monitoring API call to query - metrics created by this test run. - """ - # Based on default retry settings for list_time_series method: - # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 - # Modified: predicate extended to retry on a wider range of error types. - retry_settings = gapi_retries.Retry( - initial=0.1, - maximum=30.0, - multiplier=1.3, - predicate=gapi_retries.if_exception_type( - # Retry on 5xx, not just 503 ServiceUnavailable. This also - # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. - # 501 MethodNotImplemented not excluded because most likely - # reason we'd see this error is server misconfiguration, so we - # want to give it a chance to recovering this situation too. - gapi_errors.ServerError, - # Retry on 429/ResourceExhausted: recoverable rate limiting. - gapi_errors.TooManyRequests, - ), - deadline=90.0, - ) - results = {} - for metric in metric_names: - logger.info("Requesting list_time_series for metric %s", metric) - response = self.metric_client.list_time_series( - name=f"projects/{self.project}", - filter=build_query_fn(metric, namespace, remote_namespace), - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - retry=retry_settings, - ) - time_series = list(response) - - self.assertLen( - time_series, - 1, - msg=f"Query for {metric} should return exactly 1 time series." - f" Found {len(time_series)}.", - ) - - metric_time_series = MetricTimeSeries.from_response( - metric, time_series[0] - ) - logger.info( - "Metric %s:\n%s", metric, metric_time_series.pretty_print() - ) - results[metric] = metric_time_series - return results - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point], diff --git a/tests/gamma/csm_observability_with_injection_test.py b/tests/gamma/csm_observability_with_injection_test.py index edce55e2..93e34a90 100644 --- a/tests/gamma/csm_observability_with_injection_test.py +++ b/tests/gamma/csm_observability_with_injection_test.py @@ -32,6 +32,7 @@ from framework.test_app.runners.k8s import gamma_server_runner from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_cases import csm_observability_mixin logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) @@ -170,7 +171,8 @@ def close(self): class CsmObservabilityTestWithInjection( - xds_gamma_testcase.GammaXdsKubernetesTestCase + xds_gamma_testcase.GammaXdsKubernetesTestCase, + csm_observability_mixin.CsmObservabilityMixin ): metric_client: monitoring_v3.MetricServiceClient @@ -495,65 +497,6 @@ def filter_label_matcher_based_on_lang( label_matcher.pop("otel_scope_version", None) label_matcher.pop("otel_scope_name", None) - def query_metrics( - self, - metric_names: Iterable[str], - build_query_fn: BuildQueryFn, - namespace: str, - remote_namespace: str, - interval: monitoring_v3.TimeInterval, - ) -> dict[str, MetricTimeSeries]: - """ - A helper function to make the cloud monitoring API call to query - metrics created by this test run. - """ - # Based on default retry settings for list_time_series method: - # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 - # Modified: predicate extended to retry on a wider range of error types. - retry_settings = gapi_retries.Retry( - initial=0.1, - maximum=30.0, - multiplier=1.3, - predicate=gapi_retries.if_exception_type( - # Retry on 5xx, not just 503 ServiceUnavailable. This also - # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. - # 501 MethodNotImplemented not excluded because most likely - # reason we'd see this error is server misconfiguration, so we - # want to give it a chance to recovering this situation too. - gapi_errors.ServerError, - # Retry on 429/ResourceExhausted: recoverable rate limiting. - gapi_errors.TooManyRequests, - ), - deadline=90.0, - ) - results = {} - for metric in metric_names: - logger.info("Requesting list_time_series for metric %s", metric) - response = self.metric_client.list_time_series( - name=f"projects/{self.project}", - filter=build_query_fn(metric, namespace, remote_namespace), - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - retry=retry_settings, - ) - time_series = list(response) - - self.assertLen( - time_series, - 1, - msg=f"Query for {metric} should return exactly 1 time series." - f" Found {len(time_series)}.", - ) - - metric_time_series = MetricTimeSeries.from_response( - metric, time_series[0] - ) - logger.info( - "Metric %s:\n%s", metric, metric_time_series.pretty_print() - ) - results[metric] = metric_time_series - return results - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point],