1+ from google .cloud .bigquery .client import *
2+ from google .cloud .bigquery import _job_helpers
3+ from google .cloud .bigquery import table
4+ import asyncio
5+ from google .api_core import gapic_v1 , retry_async
6+
7+ class AsyncClient (Client ):
8+ def __init__ (self , * args , ** kwargs ):
9+ super ().__init__ (* args , ** kwargs )
10+
11+
12+ async def query_and_wait (
13+ self ,
14+ query ,
15+ * ,
16+ job_config : Optional [QueryJobConfig ] = None ,
17+ location : Optional [str ] = None ,
18+ project : Optional [str ] = None ,
19+ api_timeout : TimeoutType = DEFAULT_TIMEOUT ,
20+ wait_timeout : TimeoutType = None ,
21+ retry : retries .Retry = DEFAULT_RETRY ,
22+ job_retry : retries .Retry = DEFAULT_JOB_RETRY ,
23+ page_size : Optional [int ] = None ,
24+ max_results : Optional [int ] = None ,
25+ ) -> RowIterator :
26+
27+ if project is None :
28+ project = self .project
29+
30+ if location is None :
31+ location = self .location
32+
33+ # if job_config is not None:
34+ # self._verify_job_config_type(job_config, QueryJobConfig)
35+
36+ # if job_config is not None:
37+ # self._verify_job_config_type(job_config, QueryJobConfig)
38+
39+ job_config = _job_helpers .job_config_with_defaults (
40+ job_config , self ._default_query_job_config
41+ )
42+
43+ return await async_query_and_wait (
44+ self ,
45+ query ,
46+ job_config = job_config ,
47+ location = location ,
48+ project = project ,
49+ api_timeout = api_timeout ,
50+ wait_timeout = wait_timeout ,
51+ retry = retry ,
52+ job_retry = job_retry ,
53+ page_size = page_size ,
54+ max_results = max_results ,
55+ )
56+
57+
58+ async def async_query_and_wait (
59+ client : "Client" ,
60+ query : str ,
61+ * ,
62+ job_config : Optional [job .QueryJobConfig ],
63+ location : Optional [str ],
64+ project : str ,
65+ api_timeout : Optional [float ] = None ,
66+ wait_timeout : Optional [float ] = None ,
67+ retry : Optional [retries .Retry ],
68+ job_retry : Optional [retries .Retry ],
69+ page_size : Optional [int ] = None ,
70+ max_results : Optional [int ] = None ,
71+ ) -> table .RowIterator :
72+
73+ # Some API parameters aren't supported by the jobs.query API. In these
74+ # cases, fallback to a jobs.insert call.
75+ if not _job_helpers ._supported_by_jobs_query (job_config ):
76+ return await async_wait_or_cancel (
77+ _job_helpers .query_jobs_insert (
78+ client = client ,
79+ query = query ,
80+ job_id = None ,
81+ job_id_prefix = None ,
82+ job_config = job_config ,
83+ location = location ,
84+ project = project ,
85+ retry = retry ,
86+ timeout = api_timeout ,
87+ job_retry = job_retry ,
88+ ),
89+ api_timeout = api_timeout ,
90+ wait_timeout = wait_timeout ,
91+ retry = retry ,
92+ page_size = page_size ,
93+ max_results = max_results ,
94+ )
95+
96+ path = _job_helpers ._to_query_path (project )
97+ request_body = _job_helpers ._to_query_request (
98+ query = query , job_config = job_config , location = location , timeout = api_timeout
99+ )
100+
101+ if page_size is not None and max_results is not None :
102+ request_body ["maxResults" ] = min (page_size , max_results )
103+ elif page_size is not None or max_results is not None :
104+ request_body ["maxResults" ] = page_size or max_results
105+
106+ if os .getenv ("QUERY_PREVIEW_ENABLED" , "" ).casefold () == "true" :
107+ request_body ["jobCreationMode" ] = "JOB_CREATION_OPTIONAL"
108+
109+ async def do_query ():
110+ request_body ["requestId" ] = _job_helpers .make_job_id ()
111+ span_attributes = {"path" : path }
112+
113+ # Wrap the RPC method; this adds retry and timeout information,
114+ # and friendly error handling.
115+ rpc = gapic_v1 .method_async .wrap_method (
116+ client ._call_api ,
117+ default_retry = retry_async .AsyncRetry (
118+ initial = 0.1 ,
119+ maximum = 60.0 ,
120+ multiplier = 1.3 ,
121+ predicate = retries .if_exception_type (
122+ core_exceptions .ServiceUnavailable ,
123+ ),
124+ deadline = 60.0 ,
125+ ),
126+ default_timeout = 60.0 ,
127+ client_info = DEFAULT_CLIENT_INFO ,
128+ )
129+
130+ # For easier testing, handle the retries ourselves.
131+ # if retry is not None:
132+ # response = retry(client._call_api)(
133+ # retry=None, # We're calling the retry decorator ourselves.
134+ # span_name="BigQuery.query",
135+ # span_attributes=span_attributes,
136+ # method="POST",
137+ # path=path,
138+ # data=request_body,
139+ # timeout=api_timeout,
140+ # )
141+ # else:
142+ response = await rpc (
143+ retry = None ,
144+ span_name = "BigQuery.query" ,
145+ span_attributes = span_attributes ,
146+ method = "POST" ,
147+ path = path ,
148+ data = request_body ,
149+ timeout = api_timeout ,
150+ )
151+
152+ # Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
153+ # to fetch, there will be a job ID for jobs.getQueryResults.
154+ query_results = google .cloud .bigquery .query ._QueryResults .from_api_repr (
155+ response
156+ )
157+ page_token = query_results .page_token
158+ more_pages = page_token is not None
159+
160+ if more_pages or not query_results .complete :
161+ # TODO(swast): Avoid a call to jobs.get in some cases (few
162+ # remaining pages) by waiting for the query to finish and calling
163+ # client._list_rows_from_query_results directly. Need to update
164+ # RowIterator to fetch destination table via the job ID if needed.
165+ return await async_wait_or_cancel (
166+ _job_helpers ._to_query_job (client , query , job_config , response ),
167+ api_timeout = api_timeout ,
168+ wait_timeout = wait_timeout ,
169+ retry = retry ,
170+ page_size = page_size ,
171+ max_results = max_results ,
172+ )
173+
174+ return table .RowIterator (
175+ client = client ,
176+ api_request = functools .partial (client ._call_api , retry , timeout = api_timeout ),
177+ path = None ,
178+ schema = query_results .schema ,
179+ max_results = max_results ,
180+ page_size = page_size ,
181+ total_rows = query_results .total_rows ,
182+ first_page_response = response ,
183+ location = query_results .location ,
184+ job_id = query_results .job_id ,
185+ query_id = query_results .query_id ,
186+ project = query_results .project ,
187+ num_dml_affected_rows = query_results .num_dml_affected_rows ,
188+ )
189+
190+
191+ if job_retry is not None :
192+ return job_retry (do_query )()
193+ else :
194+ return do_query ()
195+
196+ async def async_wait_or_cancel (
197+ job : job .QueryJob ,
198+ api_timeout : Optional [float ],
199+ wait_timeout : Optional [float ],
200+ retry : Optional [retries .Retry ],
201+ page_size : Optional [int ],
202+ max_results : Optional [int ],
203+ ) -> table .RowIterator :
204+ try :
205+ return await job .result (
206+ page_size = page_size ,
207+ max_results = max_results ,
208+ retry = retry ,
209+ timeout = wait_timeout ,
210+ )
211+ except Exception :
212+ # Attempt to cancel the job since we can't return the results.
213+ try :
214+ job .cancel (retry = retry , timeout = api_timeout )
215+ except Exception :
216+ # Don't eat the original exception if cancel fails.
217+ pass
218+ raise
219+
220+
221+ DEFAULT_CLIENT_INFO = gapic_v1 .client_info .ClientInfo (
222+ "3.17.2"
223+ )
224+
225+ __all__ = ("AsyncClient" ,)
0 commit comments