From 55f0a24197221c8907636eb5a0c1bd3ed2bdebe4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 24 Jan 2024 11:21:33 -0600 Subject: [PATCH 1/8] fix: avoid unnecessary call to jobs.get in case of retriable query failure --- google/cloud/bigquery/job/query.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index ac0c51973..dbef226fa 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1554,6 +1554,8 @@ def result( # type: ignore # (incompatible with supertype) ) first = True + if self.state is None: + self._begin(retry=retry, timeout=timeout) def do_get_result(): nonlocal first @@ -1581,14 +1583,18 @@ def do_get_result(): self._retry_do_query = retry_do_query self._job_retry = job_retry - super(QueryJob, self).result(retry=retry, timeout=timeout) - # Since the job could already be "done" (e.g. got a finished job # via client.get_job), the superclass call to done() might not # set the self._query_results cache. if self._query_results is None or not self._query_results.complete: self._reload_query_results(retry=retry, timeout=timeout) + # jobs.getQueryResults should be called before jobs.get. The + # jobs.getQueryResults request will raise an exception for + # failed jobs. This means our job retry mechanism can start + # earlier without a wasted call to jobs.get. + super(QueryJob, self).result(retry=retry, timeout=timeout) + if retry_do_query is not None and job_retry is not None: do_get_result = job_retry(do_get_result) From d612b365115e6c787082a05c75f40e5019a43048 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 24 Jan 2024 11:21:33 -0600 Subject: [PATCH 2/8] fix: avoid unnecessary call to jobs.get in case of retriable query failure --- google/cloud/bigquery/job/query.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index ac0c51973..dbef226fa 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1554,6 +1554,8 @@ def result( # type: ignore # (incompatible with supertype) ) first = True + if self.state is None: + self._begin(retry=retry, timeout=timeout) def do_get_result(): nonlocal first @@ -1581,14 +1583,18 @@ def do_get_result(): self._retry_do_query = retry_do_query self._job_retry = job_retry - super(QueryJob, self).result(retry=retry, timeout=timeout) - # Since the job could already be "done" (e.g. got a finished job # via client.get_job), the superclass call to done() might not # set the self._query_results cache. if self._query_results is None or not self._query_results.complete: self._reload_query_results(retry=retry, timeout=timeout) + # jobs.getQueryResults should be called before jobs.get. The + # jobs.getQueryResults request will raise an exception for + # failed jobs. This means our job retry mechanism can start + # earlier without a wasted call to jobs.get. + super(QueryJob, self).result(retry=retry, timeout=timeout) + if retry_do_query is not None and job_retry is not None: do_get_result = job_retry(do_get_result) From bd464a6e79c6c8e322d1f3e1162f9cf2b2ea9f85 Mon Sep 17 00:00:00 2001 From: kiraksi Date: Fri, 26 Jan 2024 04:45:41 -0800 Subject: [PATCH 3/8] fix: client retries job exceeded rate limits for DDL query jobs --- tests/unit/test_job_retry.py | 315 +++++++++++++++++++---------------- 1 file changed, 168 insertions(+), 147 deletions(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 0e984c8fc..d625c034d 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -24,7 +24,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY +from google.cloud.bigquery.retry import DEFAULT_RETRY from .helpers import make_connection @@ -35,12 +35,19 @@ # - Pass NotFound retry to `result`. # - Pass BadRequest retry to query, with the value passed to `result` overriding. @pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"]) -@mock.patch("time.sleep") -def test_retry_failed_jobs(sleep, client, job_retry_on_query): +def test_retry_failed_jobs(job_retry_on_query): """ Test retry of job failures, as opposed to API-invocation failures. """ + freezegun.freeze_time(auto_tick_seconds=1) # Control time for retries + + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + retry_notfound = google.api_core.retry.Retry( predicate=google.api_core.retry.if_exception_type( google.api_core.exceptions.NotFound @@ -53,171 +60,185 @@ def test_retry_failed_jobs(sleep, client, job_retry_on_query): ) if job_retry_on_query is None: - reason = "rateLimitExceeded" - else: - reason = "notFound" - - err = dict(reason=reason) - responses = [ - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE")), - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - - def api_request(method, path, query_params=None, data=None, **kw): - response = responses.pop(0) - if data: - response["jobReference"] = data["jobReference"] - else: - response["jobReference"] = dict( - jobId=path.split("/")[-1], projectId="PROJECT" - ) - return response - - conn = client._connection = make_connection() - conn.api_request.side_effect = api_request - - if job_retry_on_query == "Query": - job_retry = dict(job_retry=retry_notfound) - elif job_retry_on_query == "Both": - # This will be overridden in `result` - job_retry = dict(job_retry=retry_badrequest) + errors = [{"reason": "rateLimitExceeded"}] else: - job_retry = {} - job = client.query("select 1", **job_retry) - - orig_job_id = job.job_id - job_retry = ( - dict(job_retry=retry_notfound) - if job_retry_on_query in ("Result", "Both") - else {} - ) - result = job.result(**job_retry) - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. - - # The job adjusts it's job id based on the id of the last attempt. - assert job.job_id != orig_job_id - assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"] - - # We had to sleep three times - assert len(sleep.mock_calls) == 3 + errors = [{"reason": "notFound"}] - # Sleeps are random, however they're more than 0 - assert min(c[1][0] for c in sleep.mock_calls) > 0 - - # They're at most 2 * (multiplier**(number of sleeps - 1)) * initial - # The default multiplier is 2 - assert max(c[1][0] for c in sleep.mock_calls) <= 8 + client._call_api.side_effect = ( + { + "jobReference": { + "jobId": "job1", + "projectId": "project", + "location": "location", + }, + "jobComplete": False, + }, + *[ + { + "status": {"state": "DONE", "errors": errors, "errorResult": errors[0]}, + } + for _ in range(3) + ], + { + "status": {"state": "DONE"}, + "jobReference": { + "jobId": "job2", # Updated job ID after retries + "projectId": "project", + "location": "location", + }, + }, + {"rows": [{"f": [{"v": "1"}]}], "totalRows": "1"}, + ) - # We can ask for the result again: - responses = [ - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - orig_job_id = job.job_id - result = job.result() - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. + rows = _job_helpers.query_and_wait( + client, + query="select 1", + location="location", + project="project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=retry_badrequest # Initial retry for "Both" case + if job_retry_on_query == "Both" + else None, + ) - # We wouldn't (and didn't) fail, because we're dealing with a successful job. - # So the job id hasn't changed. - assert job.job_id == orig_job_id + assert len(list(rows)) == 1 # With job_retry_on_query, we're testing 4 scenarios: # - Pass None retry to `query`. # - Pass None retry to `result`. @pytest.mark.parametrize("job_retry_on_query", ["Query", "Result"]) -@mock.patch("time.sleep") -def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query): +def test_disable_retry_failed_jobs(job_retry_on_query): """ Test retry of job failures, as opposed to API-invocation failures. """ - err = dict(reason="rateLimitExceeded") - responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err))] * 3 - def api_request(method, path, query_params=None, data=None, **kw): - response = responses.pop(0) - response["jobReference"] = data["jobReference"] - return response + freezegun.freeze_time(auto_tick_seconds=1) # Control time for retries - conn = client._connection = make_connection() - conn.api_request.side_effect = api_request + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () - if job_retry_on_query == "Query": - job_retry = dict(job_retry=None) - else: - job_retry = {} - job = client.query("select 1", **job_retry) + err = google.api_core.exceptions.InternalServerError(reason="rateLimitExceeded") + client._call_api.side_effect = ( + { + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobComplete": False, + }, + *[ + { + "status": {"state": "DONE", "errors": [err], "errorResult": err}, + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + } + for _ in range(3) + ], + ) - orig_job_id = job.job_id - job_retry = dict(job_retry=None) if job_retry_on_query == "Result" else {} - with pytest.raises(google.api_core.exceptions.Forbidden): - job.result(**job_retry) + rows = _job_helpers.query_and_wait( + client, + query="select 1", + location="location", + project="project", + job_config=None, + page_size=None, + max_results=None, + retry=None, # Explicitly disable retry + job_retry=None + if job_retry_on_query == "Query" + else {}, + ) - assert job.job_id == orig_job_id - assert len(sleep.mock_calls) == 0 + with pytest.raises(google.api_core.exceptions.InternalServerError): + list(rows) # Raise the last error -@mock.patch("time.sleep") -def test_retry_failed_jobs_after_retry_failed(sleep, client): +def test_retry_failed_jobs_after_retry_failed(client): """ If at first you don't succeed, maybe you will later. :) """ - conn = client._connection = make_connection() - - with freezegun.freeze_time("2024-01-01 00:00:00") as frozen_datetime: - err = dict(reason="rateLimitExceeded") - - def api_request(method, path, query_params=None, data=None, **kw): - calls = sleep.mock_calls - if calls: - frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0])) - response = dict(status=dict(state="DONE", errors=[err], errorResult=err)) - response["jobReference"] = data["jobReference"] - return response - - conn.api_request.side_effect = api_request - - job = client.query("select 1") - orig_job_id = job.job_id - - with pytest.raises(google.api_core.exceptions.RetryError): - job.result() - - # We never got a successful job, so the job id never changed: - assert job.job_id == orig_job_id - - # We failed because we couldn't succeed after 120 seconds. - # But we can try again: - err2 = dict(reason="backendError") # We also retry on this - responses = [ - dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), - dict(status=dict(state="DONE")), - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - - def api_request(method, path, query_params=None, data=None, **kw): - calls = sleep.mock_calls - frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0])) - response = responses.pop(0) - if data: - response["jobReference"] = data["jobReference"] - else: - response["jobReference"] = dict( - jobId=path.split("/")[-1], projectId="PROJECT" - ) - return response - - conn.api_request.side_effect = api_request - result = job.result() - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. - assert job.job_id != orig_job_id + + freezegun.freeze_time(auto_tick_seconds=1) + + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + + err = google.api_core.exceptions.InternalServerError(reason="rateLimitExceeded") + err2 = google.api_core.exceptions.BadRequest(reason="backendError") + + client._call_api.side_effect = ( + # Initial responses for initial retry failure + { + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobComplete": False, + }, + *[ + { + "status": {"state": "DONE", "errors": [err], "errorResult": err}, + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + } + for _ in range(120) # Simulate enough failures for timeout + ], + # Responses for subsequent success + { + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobComplete": False, + }, + { + "status": {"state": "DONE", "errors": [err2], "errorResult": err2}, + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + }, + { + "status": {"state": "DONE", "errors": [err], "errorResult": err}, + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + }, + { + "status": {"state": "DONE", "errors": [err2], "errorResult": err2}, + "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + }, + { + "status": {"state": "DONE"}, + "jobReference": {"jobId": "job2", "projectId": "project", "location": "location"}, + }, + {"rows": [{"f": [{"v": "1"}]}], "totalRows": "1"}, + ) + + rows = _job_helpers.query_and_wait( + client, + query="select 1", + location="location", + project="project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=DEFAULT_RETRY, + ) + + with pytest.raises(google.api_core.exceptions.RetryError): + list(rows) # Trigger the initial retry failure + + # Second attempt with successful retries + rows = _job_helpers.query_and_wait( + client, + query="select 1", + location="location", + project="project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=DEFAULT_RETRY, + ) + + assert list(rows) == [{"f": [{"v": "1"}]}] def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client): @@ -301,8 +322,8 @@ def test_query_and_wait_retries_job_for_DDL_queries(): job_config=None, page_size=None, max_results=None, - retry=DEFAULT_JOB_RETRY, - job_retry=DEFAULT_JOB_RETRY, + retry=DEFAULT_RETRY, + job_retry=DEFAULT_RETRY, ) assert len(list(rows)) == 4 From 321449a776d81dd6440e738b1506bfa8ad3cc96f Mon Sep 17 00:00:00 2001 From: kiraksi Date: Mon, 29 Jan 2024 04:41:15 -0800 Subject: [PATCH 4/8] add draft of edited tests - failed --- tests/unit/test_job_retry.py | 166 ++++++++++++++++++----------------- 1 file changed, 85 insertions(+), 81 deletions(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index d625c034d..de8dc8d4b 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -40,14 +40,6 @@ def test_retry_failed_jobs(job_retry_on_query): Test retry of job failures, as opposed to API-invocation failures. """ - freezegun.freeze_time(auto_tick_seconds=1) # Control time for retries - - client = mock.create_autospec(Client) - client._call_api.__name__ = "_call_api" - client._call_api.__qualname__ = "Client._call_api" - client._call_api.__annotations__ = {} - client._call_api.__type_params__ = () - retry_notfound = google.api_core.retry.Retry( predicate=google.api_core.retry.if_exception_type( google.api_core.exceptions.NotFound @@ -60,36 +52,57 @@ def test_retry_failed_jobs(job_retry_on_query): ) if job_retry_on_query is None: - errors = [{"reason": "rateLimitExceeded"}] + errs = [{"reason": "rateLimitExceeded"}] else: - errors = [{"reason": "notFound"}] + errs = [{"reason": "notFound"}] + freezegun.freeze_time(auto_tick_seconds=1) + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () client._call_api.side_effect = ( { "jobReference": { - "jobId": "job1", - "projectId": "project", - "location": "location", + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", }, "jobComplete": False, }, - *[ - { - "status": {"state": "DONE", "errors": errors, "errorResult": errors[0]}, - } - for _ in range(3) - ], + google.api_core.exceptions.InternalServerError("job_retry me", errors=errs), { - "status": {"state": "DONE"}, "jobReference": { - "jobId": "job2", # Updated job ID after retries - "projectId": "project", - "location": "location", + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], }, - {"rows": [{"f": [{"v": "1"}]}], "totalRows": "1"}, ) + if job_retry_on_query == "Query": + job_retry = retry_notfound + elif job_retry_on_query == "Both": + # This will be overridden in `result` + job_retry = retry_badrequest + else: + job_retry = None + + rows = _job_helpers.query_and_wait( client, query="select 1", @@ -99,12 +112,10 @@ def test_retry_failed_jobs(job_retry_on_query): page_size=None, max_results=None, retry=DEFAULT_RETRY, - job_retry=retry_badrequest # Initial retry for "Both" case - if job_retry_on_query == "Both" - else None, + job_retry=job_retry, ) - assert len(list(rows)) == 1 + assert len(list(rows)) == 4 # With job_retry_on_query, we're testing 4 scenarios: @@ -116,41 +127,36 @@ def test_disable_retry_failed_jobs(job_retry_on_query): Test retry of job failures, as opposed to API-invocation failures. """ - freezegun.freeze_time(auto_tick_seconds=1) # Control time for retries - + freezegun.freeze_time(auto_tick_seconds=1) client = mock.create_autospec(Client) client._call_api.__name__ = "_call_api" client._call_api.__qualname__ = "Client._call_api" client._call_api.__annotations__ = {} client._call_api.__type_params__ = () - - err = google.api_core.exceptions.InternalServerError(reason="rateLimitExceeded") client._call_api.side_effect = ( { - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, "jobComplete": False, }, - *[ - { - "status": {"state": "DONE", "errors": [err], "errorResult": err}, - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, - } - for _ in range(3) - ], + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ) ) rows = _job_helpers.query_and_wait( client, - query="select 1", - location="location", - project="project", + query="SELECT 1", + location="request-location", + project="request-project", job_config=None, page_size=None, max_results=None, retry=None, # Explicitly disable retry job_retry=None - if job_retry_on_query == "Query" - else {}, ) with pytest.raises(google.api_core.exceptions.InternalServerError): @@ -163,58 +169,56 @@ def test_retry_failed_jobs_after_retry_failed(client): """ freezegun.freeze_time(auto_tick_seconds=1) - client = mock.create_autospec(Client) client._call_api.__name__ = "_call_api" client._call_api.__qualname__ = "Client._call_api" client._call_api.__annotations__ = {} client._call_api.__type_params__ = () - - err = google.api_core.exceptions.InternalServerError(reason="rateLimitExceeded") - err2 = google.api_core.exceptions.BadRequest(reason="backendError") - client._call_api.side_effect = ( - # Initial responses for initial retry failure { - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, "jobComplete": False, }, - *[ - { - "status": {"state": "DONE", "errors": [err], "errorResult": err}, - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, - } - for _ in range(120) # Simulate enough failures for timeout - ], + google.api_core.exceptions.InternalServerError("job_retry me", errors=[{"reason": "rateLimitExceeded"}]), # Responses for subsequent success { "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, "jobComplete": False, }, + google.api_core.exceptions.BadRequest("job_retry me", errors=[{"reason": "backendError"}]), + google.api_core.exceptions.InternalServerError("job_retry me", errors=[{"reason": "rateLimitExceeded"}]), + google.api_core.exceptions.BadRequest("job_retry me", errors=[{"reason": "backendError"}]), { - "status": {"state": "DONE", "errors": [err2], "errorResult": err2}, - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, - }, - { - "status": {"state": "DONE", "errors": [err], "errorResult": err}, - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, - }, - { - "status": {"state": "DONE", "errors": [err2], "errorResult": err2}, - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, - }, - { - "status": {"state": "DONE"}, - "jobReference": {"jobId": "job2", "projectId": "project", "location": "location"}, + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], }, - {"rows": [{"f": [{"v": "1"}]}], "totalRows": "1"}, ) rows = _job_helpers.query_and_wait( client, - query="select 1", - location="location", - project="project", + query="SELECT 1", + location="request-location", + project="request-project", job_config=None, page_size=None, max_results=None, @@ -228,9 +232,9 @@ def test_retry_failed_jobs_after_retry_failed(client): # Second attempt with successful retries rows = _job_helpers.query_and_wait( client, - query="select 1", - location="location", - project="project", + query="SELECT 1", + location="request-location", + project="request-project", job_config=None, page_size=None, max_results=None, @@ -238,7 +242,7 @@ def test_retry_failed_jobs_after_retry_failed(client): job_retry=DEFAULT_RETRY, ) - assert list(rows) == [{"f": [{"v": "1"}]}] + assert len(list(rows)) == 4 def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client): From fbcfe4443b7c67d1cec2ca93312330f2c99344aa Mon Sep 17 00:00:00 2001 From: kiraksi Date: Mon, 29 Jan 2024 04:43:12 -0800 Subject: [PATCH 5/8] black --- tests/unit/test_job_retry.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index de8dc8d4b..f2d777635 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -101,7 +101,6 @@ def test_retry_failed_jobs(job_retry_on_query): job_retry = retry_badrequest else: job_retry = None - rows = _job_helpers.query_and_wait( client, @@ -144,7 +143,7 @@ def test_disable_retry_failed_jobs(job_retry_on_query): }, google.api_core.exceptions.InternalServerError( "job_retry me", errors=[{"reason": "rateLimitExceeded"}] - ) + ), ) rows = _job_helpers.query_and_wait( @@ -156,7 +155,7 @@ def test_disable_retry_failed_jobs(job_retry_on_query): page_size=None, max_results=None, retry=None, # Explicitly disable retry - job_retry=None + job_retry=None, ) with pytest.raises(google.api_core.exceptions.InternalServerError): @@ -183,15 +182,27 @@ def test_retry_failed_jobs_after_retry_failed(client): }, "jobComplete": False, }, - google.api_core.exceptions.InternalServerError("job_retry me", errors=[{"reason": "rateLimitExceeded"}]), + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ), # Responses for subsequent success { - "jobReference": {"jobId": "job1", "projectId": "project", "location": "location"}, + "jobReference": { + "jobId": "job1", + "projectId": "project", + "location": "location", + }, "jobComplete": False, }, - google.api_core.exceptions.BadRequest("job_retry me", errors=[{"reason": "backendError"}]), - google.api_core.exceptions.InternalServerError("job_retry me", errors=[{"reason": "rateLimitExceeded"}]), - google.api_core.exceptions.BadRequest("job_retry me", errors=[{"reason": "backendError"}]), + google.api_core.exceptions.BadRequest( + "job_retry me", errors=[{"reason": "backendError"}] + ), + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ), + google.api_core.exceptions.BadRequest( + "job_retry me", errors=[{"reason": "backendError"}] + ), { "jobReference": { "projectId": "response-project", From 2e4a3b33202816470b0441e6dfec19656ad41f66 Mon Sep 17 00:00:00 2001 From: kiraksi Date: Mon, 29 Jan 2024 12:14:03 -0800 Subject: [PATCH 6/8] edited query select for test --- tests/unit/test_job_retry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index f2d777635..fdc0134bf 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -104,9 +104,9 @@ def test_retry_failed_jobs(job_retry_on_query): rows = _job_helpers.query_and_wait( client, - query="select 1", - location="location", - project="project", + query="SELECT 1", + location="request-location", + project="request-project", job_config=None, page_size=None, max_results=None, From 3cb6b9ba20e953c755931e0a515dfcb49a9c8e8f Mon Sep 17 00:00:00 2001 From: kiraksi Date: Mon, 29 Jan 2024 13:13:52 -0800 Subject: [PATCH 7/8] change tests --- tests/unit/test_job_retry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index fdc0134bf..1311b4741 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -24,7 +24,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.retry import DEFAULT_RETRY +from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY from .helpers import make_connection @@ -234,9 +234,9 @@ def test_retry_failed_jobs_after_retry_failed(client): page_size=None, max_results=None, retry=DEFAULT_RETRY, - job_retry=DEFAULT_RETRY, + job_retry=DEFAULT_JOB_RETRY, ) - + #TODO: different test to test if it retries until it times out with pytest.raises(google.api_core.exceptions.RetryError): list(rows) # Trigger the initial retry failure @@ -338,7 +338,7 @@ def test_query_and_wait_retries_job_for_DDL_queries(): page_size=None, max_results=None, retry=DEFAULT_RETRY, - job_retry=DEFAULT_RETRY, + job_retry=DEFAULT_JOB_RETRY, ) assert len(list(rows)) == 4 From 015bb53cca2d2c14609e720366fba45f8d809739 Mon Sep 17 00:00:00 2001 From: kiraksi Date: Mon, 29 Jan 2024 15:43:05 -0800 Subject: [PATCH 8/8] fix: fix and reformat job retry tests --- google/cloud/bigquery/job/query.py | 10 ++-------- tests/unit/test_job_retry.py | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index dbef226fa..ac0c51973 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1554,8 +1554,6 @@ def result( # type: ignore # (incompatible with supertype) ) first = True - if self.state is None: - self._begin(retry=retry, timeout=timeout) def do_get_result(): nonlocal first @@ -1583,18 +1581,14 @@ def do_get_result(): self._retry_do_query = retry_do_query self._job_retry = job_retry + super(QueryJob, self).result(retry=retry, timeout=timeout) + # Since the job could already be "done" (e.g. got a finished job # via client.get_job), the superclass call to done() might not # set the self._query_results cache. if self._query_results is None or not self._query_results.complete: self._reload_query_results(retry=retry, timeout=timeout) - # jobs.getQueryResults should be called before jobs.get. The - # jobs.getQueryResults request will raise an exception for - # failed jobs. This means our job retry mechanism can start - # earlier without a wasted call to jobs.get. - super(QueryJob, self).result(retry=retry, timeout=timeout) - if retry_do_query is not None and job_retry is not None: do_get_result = job_retry(do_get_result) diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 1311b4741..d6154cd3e 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -236,7 +236,7 @@ def test_retry_failed_jobs_after_retry_failed(client): retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY, ) - #TODO: different test to test if it retries until it times out + # TODO: different test to test if it retries until it times out with pytest.raises(google.api_core.exceptions.RetryError): list(rows) # Trigger the initial retry failure