11from google .cloud .bigquery .client import *
22from google .cloud .bigquery import _job_helpers
33from google .cloud .bigquery import table
4+ from google .cloud .bigquery .retry import (
5+ DEFAULT_ASYNC_JOB_RETRY ,
6+ DEFAULT_ASYNC_RETRY ,
7+ DEFAULT_TIMEOUT ,
8+ )
9+ from google .api_core import retry_async as retries
410import asyncio
11+ import google .auth .transport ._aiohttp_requests
512
6- class AsyncClient (Client ):
13+
14+ class AsyncClient ():
715 def __init__ (self , * args , ** kwargs ):
8- super (). __init__ (* args , ** kwargs )
16+ self . _client = Client (* args , ** kwargs )
917
1018
1119 async def query_and_wait (
@@ -17,30 +25,30 @@ async def query_and_wait(
1725 project : Optional [str ] = None ,
1826 api_timeout : TimeoutType = DEFAULT_TIMEOUT ,
1927 wait_timeout : TimeoutType = None ,
20- retry : retries .Retry = DEFAULT_RETRY ,
21- job_retry : retries .Retry = DEFAULT_JOB_RETRY ,
28+ retry : retries .AsyncRetry = DEFAULT_ASYNC_RETRY ,
29+ job_retry : retries .AsyncRetry = DEFAULT_ASYNC_JOB_RETRY ,
2230 page_size : Optional [int ] = None ,
2331 max_results : Optional [int ] = None ,
2432 ) -> RowIterator :
2533
2634 if project is None :
27- project = self .project
35+ project = self ._client . project
2836
2937 if location is None :
30- location = self .location
38+ location = self ._client . location
3139
3240 # if job_config is not None:
33- # self._verify_job_config_type(job_config, QueryJobConfig)
41+ # self._client. _verify_job_config_type(job_config, QueryJobConfig)
3442
3543 # if job_config is not None:
36- # self._verify_job_config_type(job_config, QueryJobConfig)
44+ # self._client. _verify_job_config_type(job_config, QueryJobConfig)
3745
3846 job_config = _job_helpers .job_config_with_defaults (
39- job_config , self ._default_query_job_config
47+ job_config , self ._client . _default_query_job_config
4048 )
4149
4250 return await async_query_and_wait (
43- self ,
51+ self . _client ,
4452 query ,
4553 job_config = job_config ,
4654 location = location ,
@@ -63,8 +71,8 @@ async def async_query_and_wait(
6371 project : str ,
6472 api_timeout : Optional [float ] = None ,
6573 wait_timeout : Optional [float ] = None ,
66- retry : Optional [retries .Retry ],
67- job_retry : Optional [retries .Retry ],
74+ retry : Optional [retries .AsyncRetry ],
75+ job_retry : Optional [retries .AsyncRetry ],
6876 page_size : Optional [int ] = None ,
6977 max_results : Optional [int ] = None ,
7078) -> table .RowIterator :
@@ -73,7 +81,7 @@ async def async_query_and_wait(
7381 # cases, fallback to a jobs.insert call.
7482 if not _job_helpers ._supported_by_jobs_query (job_config ):
7583 return await async_wait_or_cancel (
76- _job_helpers .query_jobs_insert (
84+ asyncio . to_thread ( _job_helpers .query_jobs_insert ( # throw in a background thread
7785 client = client ,
7886 query = query ,
7987 job_id = None ,
@@ -84,7 +92,7 @@ async def async_query_and_wait(
8492 retry = retry ,
8593 timeout = api_timeout ,
8694 job_retry = job_retry ,
87- ),
95+ )) ,
8896 api_timeout = api_timeout ,
8997 wait_timeout = wait_timeout ,
9098 retry = retry ,
@@ -105,90 +113,91 @@ async def async_query_and_wait(
105113 if os .getenv ("QUERY_PREVIEW_ENABLED" , "" ).casefold () == "true" :
106114 request_body ["jobCreationMode" ] = "JOB_CREATION_OPTIONAL"
107115
108- async def do_query ():
109- request_body ["requestId" ] = _job_helpers .make_job_id ()
110- span_attributes = {"path" : path }
111-
112- # For easier testing, handle the retries ourselves.
113- if retry is not None :
114- response = retry (client ._call_api )(
115- retry = None , # We're calling the retry decorator ourselves.
116- span_name = "BigQuery.query" ,
117- span_attributes = span_attributes ,
118- method = "POST" ,
119- path = path ,
120- data = request_body ,
121- timeout = api_timeout ,
122- )
123- else :
124- response = client ._call_api (
125- retry = None ,
126- span_name = "BigQuery.query" ,
127- span_attributes = span_attributes ,
128- method = "POST" ,
129- path = path ,
130- data = request_body ,
131- timeout = api_timeout ,
132- )
133116
134- # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
135- # to fetch, there will be a job ID for jobs.getQueryResults.
136- query_results = google .cloud .bigquery .query ._QueryResults .from_api_repr (
137- response
117+ request_body ["requestId" ] = _job_helpers .make_job_id ()
118+ span_attributes = {"path" : path }
119+
120+ # For easier testing, handle the retries ourselves.
121+ if retry is not None :
122+ response = retry (client ._call_api )( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth)
123+ retry = None , # We're calling the retry decorator ourselves, async_retries
124+ span_name = "BigQuery.query" ,
125+ span_attributes = span_attributes ,
126+ method = "POST" ,
127+ path = path ,
128+ data = request_body ,
129+ timeout = api_timeout ,
138130 )
139- page_token = query_results .page_token
140- more_pages = page_token is not None
141-
142- if more_pages or not query_results .complete :
143- # TODO(swast): Avoid a call to jobs.get in some cases (few
144- # remaining pages) by waiting for the query to finish and calling
145- # client._list_rows_from_query_results directly. Need to update
146- # RowIterator to fetch destination table via the job ID if needed.
147- return await async_wait_or_cancel (
148- _job_helpers ._to_query_job (client , query , job_config , response ),
149- api_timeout = api_timeout ,
150- wait_timeout = wait_timeout ,
151- retry = retry ,
152- page_size = page_size ,
153- max_results = max_results ,
154- )
155-
156- return table .RowIterator (
157- client = client ,
158- api_request = functools .partial (client ._call_api , retry , timeout = api_timeout ),
159- path = None ,
160- schema = query_results .schema ,
161- max_results = max_results ,
131+ else :
132+ response = client ._call_api (
133+ retry = None ,
134+ span_name = "BigQuery.query" ,
135+ span_attributes = span_attributes ,
136+ method = "POST" ,
137+ path = path ,
138+ data = request_body ,
139+ timeout = api_timeout ,
140+ )
141+
142+ # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
143+ # to fetch, there will be a job ID for jobs.getQueryResults.
144+ query_results = google .cloud .bigquery .query ._QueryResults .from_api_repr (
145+ await response
146+ )
147+ page_token = query_results .page_token
148+ more_pages = page_token is not None
149+
150+ if more_pages or not query_results .complete :
151+ # TODO(swast): Avoid a call to jobs.get in some cases (few
152+ # remaining pages) by waiting for the query to finish and calling
153+ # client._list_rows_from_query_results directly. Need to update
154+ # RowIterator to fetch destination table via the job ID if needed.
155+ result = await async_wait_or_cancel (
156+ _job_helpers ._to_query_job (client , query , job_config , response ),
157+ api_timeout = api_timeout ,
158+ wait_timeout = wait_timeout ,
159+ retry = retry ,
162160 page_size = page_size ,
163- total_rows = query_results .total_rows ,
164- first_page_response = response ,
165- location = query_results .location ,
166- job_id = query_results .job_id ,
167- query_id = query_results .query_id ,
168- project = query_results .project ,
169- num_dml_affected_rows = query_results .num_dml_affected_rows ,
161+ max_results = max_results ,
170162 )
171163
164+ result = table .RowIterator ( # async of RowIterator? async version without all the pandas stuff
165+ client = client ,
166+ api_request = functools .partial (client ._call_api , retry , timeout = api_timeout ),
167+ path = None ,
168+ schema = query_results .schema ,
169+ max_results = max_results ,
170+ page_size = page_size ,
171+ total_rows = query_results .total_rows ,
172+ first_page_response = response ,
173+ location = query_results .location ,
174+ job_id = query_results .job_id ,
175+ query_id = query_results .query_id ,
176+ project = query_results .project ,
177+ num_dml_affected_rows = query_results .num_dml_affected_rows ,
178+ )
179+
180+
172181 if job_retry is not None :
173- return job_retry (do_query )()
182+ return job_retry (result ) # AsyncRetries, new default objects, default_job_retry_async, default_retry_async
174183 else :
175- return await do_query ()
184+ return result
176185
177186async def async_wait_or_cancel (
178187 job : job .QueryJob ,
179188 api_timeout : Optional [float ],
180189 wait_timeout : Optional [float ],
181- retry : Optional [retries .Retry ],
190+ retry : Optional [retries .AsyncRetry ],
182191 page_size : Optional [int ],
183192 max_results : Optional [int ],
184193) -> table .RowIterator :
185194 try :
186- return await job .result (
195+ return asyncio . to_thread ( job .result ( # run in a background thread
187196 page_size = page_size ,
188197 max_results = max_results ,
189198 retry = retry ,
190199 timeout = wait_timeout ,
191- )
200+ ))
192201 except Exception :
193202 # Attempt to cancel the job since we can't return the results.
194203 try :
0 commit comments