88)
99from google .api_core import retry_async as retries
1010import asyncio
11- import google .auth .transport ._aiohttp_requests
1211
1312
14- class AsyncClient ():
13+ class AsyncClient :
1514 def __init__ (self , * args , ** kwargs ):
1615 self ._client = Client (* args , ** kwargs )
1716
18-
1917 async def query_and_wait (
2018 self ,
2119 query ,
@@ -29,14 +27,14 @@ async def query_and_wait(
2927 job_retry : retries .AsyncRetry = DEFAULT_ASYNC_JOB_RETRY ,
3028 page_size : Optional [int ] = None ,
3129 max_results : Optional [int ] = None ,
32- ) -> RowIterator :
33-
30+ ) -> RowIterator :
3431 if project is None :
3532 project = self ._client .project
3633
3734 if location is None :
3835 location = self ._client .location
3936
37+ # for some reason these cannot find the function call
4038 # if job_config is not None:
4139 # self._client._verify_job_config_type(job_config, QueryJobConfig)
4240
@@ -62,7 +60,7 @@ async def query_and_wait(
6260 )
6361
6462
65- async def async_query_and_wait (
63+ async def async_query_and_wait (
6664 client : "Client" ,
6765 query : str ,
6866 * ,
@@ -76,23 +74,24 @@ async def async_query_and_wait(
7674 page_size : Optional [int ] = None ,
7775 max_results : Optional [int ] = None ,
7876) -> table .RowIterator :
79-
8077 # Some API parameters aren't supported by the jobs.query API. In these
8178 # cases, fallback to a jobs.insert call.
8279 if not _job_helpers ._supported_by_jobs_query (job_config ):
8380 return await async_wait_or_cancel (
84- asyncio .to_thread (_job_helpers .query_jobs_insert ( # throw in a background thread
85- client = client ,
86- query = query ,
87- job_id = None ,
88- job_id_prefix = None ,
89- job_config = job_config ,
90- location = location ,
91- project = project ,
92- retry = retry ,
93- timeout = api_timeout ,
94- job_retry = job_retry ,
95- )),
81+ asyncio .to_thread (
82+ _job_helpers .query_jobs_insert (
83+ client = client ,
84+ query = query ,
85+ job_id = None ,
86+ job_id_prefix = None ,
87+ job_config = job_config ,
88+ location = location ,
89+ project = project ,
90+ retry = retry ,
91+ timeout = api_timeout ,
92+ job_retry = job_retry ,
93+ )
94+ ),
9695 api_timeout = api_timeout ,
9796 wait_timeout = wait_timeout ,
9897 retry = retry ,
@@ -113,21 +112,20 @@ async def async_query_and_wait(
113112 if os .getenv ("QUERY_PREVIEW_ENABLED" , "" ).casefold () == "true" :
114113 request_body ["jobCreationMode" ] = "JOB_CREATION_OPTIONAL"
115114
116-
117115 request_body ["requestId" ] = _job_helpers .make_job_id ()
118116 span_attributes = {"path" : path }
119117
120- # For easier testing, handle the retries ourselves.
121118 if retry is not None :
122- response = retry ( client ._call_api )( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth)
123- retry = None , # We're calling the retry decorator ourselves, async_retries
119+ response = client ._call_api ( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry( )
120+ retry = None , # We're calling the retry decorator ourselves, async_retries, need to implement after making HTTP calls async
124121 span_name = "BigQuery.query" ,
125122 span_attributes = span_attributes ,
126123 method = "POST" ,
127124 path = path ,
128125 data = request_body ,
129126 timeout = api_timeout ,
130127 )
128+
131129 else :
132130 response = client ._call_api (
133131 retry = None ,
@@ -141,9 +139,7 @@ async def async_query_and_wait(
141139
142140 # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
143141 # to fetch, there will be a job ID for jobs.getQueryResults.
144- query_results = google .cloud .bigquery .query ._QueryResults .from_api_repr (
145- await response
146- )
142+ query_results = google .cloud .bigquery .query ._QueryResults .from_api_repr (response )
147143 page_token = query_results .page_token
148144 more_pages = page_token is not None
149145
@@ -161,7 +157,7 @@ async def async_query_and_wait(
161157 max_results = max_results ,
162158 )
163159
164- result = table .RowIterator ( # async of RowIterator? async version without all the pandas stuff
160+ result = table .RowIterator ( # async of RowIterator? async version without all the pandas stuff
165161 client = client ,
166162 api_request = functools .partial (client ._call_api , retry , timeout = api_timeout ),
167163 path = None ,
@@ -177,12 +173,12 @@ async def async_query_and_wait(
177173 num_dml_affected_rows = query_results .num_dml_affected_rows ,
178174 )
179175
180-
181176 if job_retry is not None :
182- return job_retry (result ) # AsyncRetries, new default objects, default_job_retry_async, default_retry_async
177+ return job_retry (result )
183178 else :
184179 return result
185180
181+
186182async def async_wait_or_cancel (
187183 job : job .QueryJob ,
188184 api_timeout : Optional [float ],
@@ -192,17 +188,19 @@ async def async_wait_or_cancel(
192188 max_results : Optional [int ],
193189) -> table .RowIterator :
194190 try :
195- return asyncio .to_thread (job .result ( # run in a background thread
196- page_size = page_size ,
197- max_results = max_results ,
198- retry = retry ,
199- timeout = wait_timeout ,
200- ))
191+ return asyncio .to_thread (
192+ job .result ( # run in a background thread
193+ page_size = page_size ,
194+ max_results = max_results ,
195+ retry = retry ,
196+ timeout = wait_timeout ,
197+ )
198+ )
201199 except Exception :
202200 # Attempt to cancel the job since we can't return the results.
203201 try :
204202 job .cancel (retry = retry , timeout = api_timeout )
205203 except Exception :
206204 # Don't eat the original exception if cancel fails.
207205 pass
208- raise
206+ raise
0 commit comments