88)
99from  google .api_core  import  retry_async  as  retries 
1010import  asyncio 
11- import  google .auth .transport ._aiohttp_requests 
1211
1312
14- class  AsyncClient ():   
13+ class  AsyncClient : 
1514    def  __init__ (self , * args , ** kwargs ):
1615        self ._client  =  Client (* args , ** kwargs )
1716
18- 
1917    async  def  query_and_wait (
2018        self ,
2119        query ,
@@ -29,14 +27,14 @@ async def query_and_wait(
2927        job_retry : retries .AsyncRetry  =  DEFAULT_ASYNC_JOB_RETRY ,
3028        page_size : Optional [int ] =  None ,
3129        max_results : Optional [int ] =  None ,
32-         ) ->  RowIterator :
33- 
30+     ) ->  RowIterator :
3431        if  project  is  None :
3532            project  =  self ._client .project 
3633
3734        if  location  is  None :
3835            location  =  self ._client .location 
3936
37+         # for some reason these cannot find the function call 
4038        # if job_config is not None: 
4139        #     self._client._verify_job_config_type(job_config, QueryJobConfig) 
4240
@@ -62,7 +60,7 @@ async def query_and_wait(
6260        )
6361
6462
65- async  def  async_query_and_wait (     
63+ async  def  async_query_and_wait (
6664    client : "Client" ,
6765    query : str ,
6866    * ,
@@ -76,23 +74,24 @@ async def async_query_and_wait(
7674    page_size : Optional [int ] =  None ,
7775    max_results : Optional [int ] =  None ,
7876) ->  table .RowIterator :
79-             
8077    # Some API parameters aren't supported by the jobs.query API. In these 
8178    # cases, fallback to a jobs.insert call. 
8279    if  not  _job_helpers ._supported_by_jobs_query (job_config ):
8380        return  await  async_wait_or_cancel (
84-             asyncio .to_thread (_job_helpers .query_jobs_insert ( # throw in a background thread 
85-                 client = client ,
86-                 query = query ,
87-                 job_id = None ,
88-                 job_id_prefix = None ,
89-                 job_config = job_config ,
90-                 location = location ,
91-                 project = project ,
92-                 retry = retry ,
93-                 timeout = api_timeout ,
94-                 job_retry = job_retry ,
95-             )),
81+             asyncio .to_thread (
82+                 _job_helpers .query_jobs_insert (
83+                     client = client ,
84+                     query = query ,
85+                     job_id = None ,
86+                     job_id_prefix = None ,
87+                     job_config = job_config ,
88+                     location = location ,
89+                     project = project ,
90+                     retry = retry ,
91+                     timeout = api_timeout ,
92+                     job_retry = job_retry ,
93+                 )
94+             ),
9695            api_timeout = api_timeout ,
9796            wait_timeout = wait_timeout ,
9897            retry = retry ,
@@ -113,21 +112,20 @@ async def async_query_and_wait(
113112    if  os .getenv ("QUERY_PREVIEW_ENABLED" , "" ).casefold () ==  "true" :
114113        request_body ["jobCreationMode" ] =  "JOB_CREATION_OPTIONAL" 
115114
116- 
117115    request_body ["requestId" ] =  _job_helpers .make_job_id ()
118116    span_attributes  =  {"path" : path }
119117
120-     # For easier testing, handle the retries ourselves. 
121118    if  retry  is  not   None :
122-         response  =  retry ( client ._call_api )(  # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth) 
123-             retry = None ,  # We're calling the retry decorator ourselves, async_retries 
119+         response  =  client ._call_api (   # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry( ) 
120+             retry = None ,  # We're calling the retry decorator ourselves, async_retries, need to implement after making HTTP calls async  
124121            span_name = "BigQuery.query" ,
125122            span_attributes = span_attributes ,
126123            method = "POST" ,
127124            path = path ,
128125            data = request_body ,
129126            timeout = api_timeout ,
130127        )
128+ 
131129    else :
132130        response  =  client ._call_api (
133131            retry = None ,
@@ -141,9 +139,7 @@ async def async_query_and_wait(
141139
142140    # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages 
143141    # to fetch, there will be a job ID for jobs.getQueryResults. 
144-     query_results  =  google .cloud .bigquery .query ._QueryResults .from_api_repr (
145-         await  response 
146-     )
142+     query_results  =  google .cloud .bigquery .query ._QueryResults .from_api_repr (response )
147143    page_token  =  query_results .page_token 
148144    more_pages  =  page_token  is  not   None 
149145
@@ -161,7 +157,7 @@ async def async_query_and_wait(
161157            max_results = max_results ,
162158        )
163159
164-     result  =  table .RowIterator ( # async of RowIterator? async version without all the pandas stuff 
160+     result  =  table .RowIterator (   # async of RowIterator? async version without all the pandas stuff 
165161        client = client ,
166162        api_request = functools .partial (client ._call_api , retry , timeout = api_timeout ),
167163        path = None ,
@@ -177,12 +173,12 @@ async def async_query_and_wait(
177173        num_dml_affected_rows = query_results .num_dml_affected_rows ,
178174    )
179175
180- 
181176    if  job_retry  is  not   None :
182-         return  job_retry (result )  # AsyncRetries, new default objects, default_job_retry_async, default_retry_async 
177+         return  job_retry (result )
183178    else :
184179        return  result 
185180
181+ 
186182async  def  async_wait_or_cancel (
187183    job : job .QueryJob ,
188184    api_timeout : Optional [float ],
@@ -192,17 +188,19 @@ async def async_wait_or_cancel(
192188    max_results : Optional [int ],
193189) ->  table .RowIterator :
194190    try :
195-         return  asyncio .to_thread (job .result ( # run in a background thread 
196-             page_size = page_size ,
197-             max_results = max_results ,
198-             retry = retry ,
199-             timeout = wait_timeout ,
200-         ))
191+         return  asyncio .to_thread (
192+             job .result (  # run in a background thread 
193+                 page_size = page_size ,
194+                 max_results = max_results ,
195+                 retry = retry ,
196+                 timeout = wait_timeout ,
197+             )
198+         )
201199    except  Exception :
202200        # Attempt to cancel the job since we can't return the results. 
203201        try :
204202            job .cancel (retry = retry , timeout = api_timeout )
205203        except  Exception :
206204            # Don't eat the original exception if cancel fails. 
207205            pass 
208-         raise 
206+         raise 
0 commit comments