2727
2828from google import auth
2929import google .api_core .exceptions
30- from google .cloud .bigquery import dbapi
30+ from google .cloud .bigquery import dbapi , ConnectionProperty
3131from google .cloud .bigquery .table import (
3232 RangePartitioning ,
3333 TableReference ,
6161from .parse_url import parse_url
6262from . import _helpers , _struct , _types
6363import sqlalchemy_bigquery_vendored .sqlalchemy .postgresql .base as vendored_postgresql
64+ from google .cloud .bigquery import QueryJobConfig
6465
6566# Illegal characters is intended to be all characters that are not explicitly
6667# allowed as part of the flexible column names.
@@ -1080,6 +1081,7 @@ def __init__(
10801081 self ,
10811082 arraysize = 5000 ,
10821083 credentials_path = None ,
1084+ billing_project_id = None ,
10831085 location = None ,
10841086 credentials_info = None ,
10851087 credentials_base64 = None ,
@@ -1092,6 +1094,8 @@ def __init__(
10921094 self .credentials_path = credentials_path
10931095 self .credentials_info = credentials_info
10941096 self .credentials_base64 = credentials_base64
1097+ self .project_id = None
1098+ self .billing_project_id = billing_project_id
10951099 self .location = location
10961100 self .identifier_preparer = self .preparer (self )
10971101 self .dataset_id = None
@@ -1114,15 +1118,20 @@ def _build_formatted_table_id(table):
11141118 """Build '<dataset_id>.<table_id>' string using given table."""
11151119 return "{}.{}" .format (table .reference .dataset_id , table .table_id )
11161120
1117- @staticmethod
1118- def _add_default_dataset_to_job_config (job_config , project_id , dataset_id ):
1119- # If dataset_id is set, then we know the job_config isn't None
1120- if dataset_id :
1121- # If project_id is missing, use default project_id for the current environment
1121+ def create_job_config (self , provided_config : QueryJobConfig ):
1122+ project_id = self .project_id
1123+ if self .dataset_id is None and project_id == self .billing_project_id :
1124+ return provided_config
1125+ job_config = provided_config or QueryJobConfig ()
1126+ if project_id != self .billing_project_id :
1127+ job_config .connection_properties = [
1128+ ConnectionProperty (key = "dataset_project_id" , value = project_id )
1129+ ]
1130+ if self .dataset_id :
11221131 if not project_id :
11231132 _ , project_id = auth .default ()
1124-
1125- job_config . default_dataset = "{}.{}" . format ( project_id , dataset_id )
1133+ job_config . default_dataset = "{}.{}" . format ( project_id , self . dataset_id )
1134+ return job_config
11261135
11271136 def do_execute (self , cursor , statement , parameters , context = None ):
11281137 kwargs = {}
@@ -1132,13 +1141,13 @@ def do_execute(self, cursor, statement, parameters, context=None):
11321141
11331142 def create_connect_args (self , url ):
11341143 (
1135- project_id ,
1144+ self . project_id ,
11361145 location ,
11371146 dataset_id ,
11381147 arraysize ,
11391148 credentials_path ,
11401149 credentials_base64 ,
1141- default_query_job_config ,
1150+ provided_job_config ,
11421151 list_tables_page_size ,
11431152 user_supplied_client ,
11441153 ) = parse_url (url )
@@ -1149,9 +1158,9 @@ def create_connect_args(self, url):
11491158 self .credentials_path = credentials_path or self .credentials_path
11501159 self .credentials_base64 = credentials_base64 or self .credentials_base64
11511160 self .dataset_id = dataset_id
1152- self ._add_default_dataset_to_job_config (
1153- default_query_job_config , project_id , dataset_id
1154- )
1161+ self .billing_project_id = self . billing_project_id or self . project_id
1162+
1163+ default_query_job_config = self . create_job_config ( provided_job_config )
11551164
11561165 if user_supplied_client :
11571166 # The user is expected to supply a client with
@@ -1162,10 +1171,14 @@ def create_connect_args(self, url):
11621171 credentials_path = self .credentials_path ,
11631172 credentials_info = self .credentials_info ,
11641173 credentials_base64 = self .credentials_base64 ,
1165- project_id = project_id ,
1174+ project_id = self . billing_project_id ,
11661175 location = self .location ,
11671176 default_query_job_config = default_query_job_config ,
11681177 )
1178+ # If the user specified `bigquery://` we need to set the project_id
1179+ # from the client
1180+ self .project_id = self .project_id or client .project
1181+ self .billing_project_id = self .billing_project_id or client .project
11691182 return ([], {"client" : client })
11701183
11711184 def _get_table_or_view_names (self , connection , item_types , schema = None ):
@@ -1177,7 +1190,7 @@ def _get_table_or_view_names(self, connection, item_types, schema=None):
11771190 )
11781191
11791192 client = connection .connection ._client
1180- datasets = client .list_datasets ()
1193+ datasets = client .list_datasets (self . project_id )
11811194
11821195 result = []
11831196 for dataset in datasets :
@@ -1278,7 +1291,8 @@ def _get_table(self, connection, table_name, schema=None):
12781291
12791292 client = connection .connection ._client
12801293
1281- table_ref = self ._table_reference (schema , table_name , client .project )
1294+ # table_ref = self._table_reference(schema, table_name, client.project)
1295+ table_ref = self ._table_reference (schema , table_name , self .project_id )
12821296 try :
12831297 table = client .get_table (table_ref )
12841298 except NotFound :
@@ -1332,7 +1346,7 @@ def get_schema_names(self, connection, **kw):
13321346 if isinstance (connection , Engine ):
13331347 connection = connection .connect ()
13341348
1335- datasets = connection .connection ._client .list_datasets ()
1349+ datasets = connection .connection ._client .list_datasets (self . project_id )
13361350 return [d .dataset_id for d in datasets ]
13371351
13381352 def get_table_names (self , connection , schema = None , ** kw ):
0 commit comments