Skip to content

Commit b4a7e77

Browse files
authored
Merge pull request #638 from splitgraph/add-big-query-data-source-cu-26udw0h
Big Query data source
2 parents c372912 + 0a6f297 commit b4a7e77

File tree

8 files changed

+250
-0
lines changed

8 files changed

+250
-0
lines changed

engine/Dockerfile

+4
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
195195
pip install "PyAthena>=2.4.1" && \
196196
pip install "pandas>=1.0.0"
197197

198+
# Install Google's Big Query SQLAlchemy dialect lib
199+
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
200+
pip install "sqlalchemy-bigquery"
201+
198202
ENV PATH "${PATH}:/splitgraph/bin"
199203
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
200204

engine/Dockerfile.debug

+4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
118118
pip install "PyAthena>=2.4.1" && \
119119
pip install "pandas>=1.0.0"
120120

121+
# Install Google's Big Query SQLAlchemy dialect lib
122+
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
123+
pip install "sqlalchemy-bigquery"
124+
121125
ENV PATH "${PATH}:/splitgraph/bin"
122126
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
123127

splitgraph/config/keys.py

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
"snowflake": "splitgraph.ingestion.snowflake.SnowflakeDataSource",
6868
"dbt": "splitgraph.ingestion.dbt.data_source.DBTDataSource",
6969
"athena": "splitgraph.ingestion.athena.AmazonAthenaDataSource",
70+
"bigquery": "splitgraph.ingestion.bigquery.BigQueryDataSource",
7071
},
7172
}
7273

splitgraph/ingestion/bigquery/BUILD

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
python_sources(
2+
skip_black=True,
3+
dependencies=[
4+
"src/py/splitgraph/splitgraph/resources/icons",
5+
],
6+
)
+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import base64
2+
from copy import deepcopy
3+
from typing import TYPE_CHECKING, Any, Dict, Optional
4+
5+
from splitgraph.core.types import Credentials, Params, TableInfo
6+
from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
7+
from splitgraph.ingestion.common import build_commandline_help
8+
9+
if TYPE_CHECKING:
10+
from splitgraph.engine.postgres.engine import PostgresEngine
11+
12+
13+
class BigQueryDataSource(ForeignDataWrapperDataSource):
14+
credentials_schema: Dict[str, Any] = {
15+
"type": "object",
16+
"properties": {
17+
"credentials": {
18+
"type": "string",
19+
"title": "GCP credentials",
20+
"description": "GCP credentials in JSON format",
21+
},
22+
},
23+
}
24+
25+
params_schema = {
26+
"type": "object",
27+
"properties": {
28+
"project": {
29+
"type": "string",
30+
"title": "GCP project name",
31+
"description": "Name of the GCP project to use",
32+
},
33+
"dataset_name": {
34+
"type": "string",
35+
"title": "Big Query dataset",
36+
"description": "Name of the dataset in Big Query",
37+
},
38+
},
39+
"required": ["project", "dataset_name"],
40+
}
41+
42+
supports_mount = True
43+
supports_load = True
44+
supports_sync = False
45+
46+
commandline_help = """Mount a GCP Big Query project/dataset.
47+
48+
This will mount a Big Query dataset:
49+
50+
\b
51+
```
52+
$ sgr mount bigquery bq -o@- <<EOF
53+
{
54+
"credentials": "/path/to/my/creds.json",
55+
"project": "my-project-name",
56+
"dataset_name": "my_dataset"
57+
}
58+
EOF
59+
```
60+
"""
61+
62+
commandline_kwargs_help: str = (
63+
build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema)
64+
)
65+
66+
_icon_file = "bigquery.svg"
67+
68+
def __init__(
69+
self,
70+
engine: "PostgresEngine",
71+
credentials: Credentials,
72+
params: Params,
73+
tables: Optional[TableInfo] = None,
74+
):
75+
super().__init__(engine, credentials, params, tables)
76+
77+
def get_fdw_name(self):
78+
return "multicorn"
79+
80+
@classmethod
81+
def get_name(cls) -> str:
82+
return "Google BigQuery"
83+
84+
@classmethod
85+
def get_description(cls) -> str:
86+
return "Query data in GCP BigQuery datasets"
87+
88+
@classmethod
89+
def from_commandline(cls, engine, commandline_kwargs) -> "BigQueryDataSource":
90+
params = deepcopy(commandline_kwargs)
91+
credentials = Credentials({})
92+
93+
if "credentials" in params:
94+
with open(params["credentials"], "r") as credentials_file:
95+
credentials_str = credentials_file.read()
96+
97+
params.pop("credentials")
98+
credentials["credentials"] = credentials_str
99+
100+
return cls(engine, credentials, params)
101+
102+
def get_table_options(
103+
self, table_name: str, tables: Optional[TableInfo] = None
104+
) -> Dict[str, str]:
105+
result = super().get_table_options(table_name, tables)
106+
result["tablename"] = result.get("tablename", table_name)
107+
return result
108+
109+
def get_server_options(self):
110+
options: Dict[str, Optional[str]] = {
111+
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
112+
"db_url": self._build_db_url(),
113+
}
114+
115+
# For some reason, in SQLAlchemy, if this is not passed
116+
# to the FDW params (even if it is in the DB URL), it doesn't
117+
# schema-qualify tables and server-side cursors don't work for scanning
118+
# (loads the whole table instead of scrolling through it).
119+
if "schema" in self.params:
120+
options["schema"] = self.params["schema"]
121+
122+
return options
123+
124+
def _build_db_url(self) -> str:
125+
"""Construct the SQLAlchemy GCP Big Query db_url"""
126+
127+
db_url = f"bigquery://{self.params['project']}/{self.params['dataset_name']}"
128+
129+
if "credentials" in self.credentials:
130+
# base64 encode the credentials
131+
credentials_str = self.credentials["credentials"]
132+
credentials_base64 = base64.urlsafe_b64encode(credentials_str.encode()).decode()
133+
db_url += f"?credentials_base64={credentials_base64}"
134+
135+
return db_url
136+
137+
def get_remote_schema_name(self) -> str:
138+
if "dataset_name" not in self.params:
139+
raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a dataset_name!")
140+
return str(self.params["dataset_name"])
+6
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"type": "service_account",
3+
"project_id": "project_id",
4+
"private_key_id": "private_key_id",
5+
"private_key": "private_key",
6+
"client_email": "client_email",
7+
"client_id": "client_id",
8+
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
9+
"token_uri": "https://oauth2.googleapis.com/token",
10+
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
11+
"client_x509_cert_url": "client_x509_cert_url"
12+
}
+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import base64
2+
from unittest.mock import Mock
3+
4+
import pytest
5+
from psycopg2 import DatabaseError
6+
7+
from splitgraph.core.types import Credentials, Params
8+
from splitgraph.hooks.mount_handlers import mount
9+
from splitgraph.ingestion.bigquery import BigQueryDataSource
10+
11+
12+
def test_bigquery_data_source_options_creds_file(local_engine_empty):
13+
source = BigQueryDataSource.from_commandline(
14+
local_engine_empty,
15+
{
16+
"credentials": "test/resources/ingestion/bigquery/dummy_credentials.json",
17+
"project": "bigquery-public-data",
18+
"dataset_name": "hacker_news",
19+
},
20+
)
21+
22+
with open("test/resources/ingestion/bigquery/dummy_credentials.json", "r") as credentials_file:
23+
credentials_str = credentials_file.read()
24+
credentials_base64 = base64.urlsafe_b64encode(credentials_str.encode()).decode()
25+
26+
assert source.get_server_options() == {
27+
"db_url": f"bigquery://bigquery-public-data/hacker_news?credentials_base64={credentials_base64}",
28+
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
29+
}
30+
31+
32+
def test_bigquery_data_source_options_creds_raw():
33+
source = BigQueryDataSource(
34+
Mock(),
35+
credentials=Credentials({"credentials": "test-raw-creds"}),
36+
params=Params(
37+
{
38+
"project": "bigquery-public-data",
39+
"dataset_name": "hacker_news",
40+
}
41+
),
42+
)
43+
44+
credentials_base64 = base64.urlsafe_b64encode("test-raw-creds".encode()).decode()
45+
46+
assert source.get_server_options() == {
47+
"db_url": f"bigquery://bigquery-public-data/hacker_news?credentials_base64={credentials_base64}",
48+
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
49+
}
50+
51+
52+
def test_bigquery_data_source_options_no_creds_file():
53+
source = BigQueryDataSource(
54+
Mock(),
55+
credentials=Credentials({}),
56+
params=Params(
57+
{
58+
"project": "bigquery-public-data",
59+
"dataset_name": "hacker_news",
60+
}
61+
),
62+
)
63+
64+
assert source.get_server_options() == {
65+
"db_url": "bigquery://bigquery-public-data/hacker_news",
66+
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
67+
}
68+
69+
70+
@pytest.mark.mounting
71+
def test_bigquery_mount_expected_error():
72+
with pytest.raises(DatabaseError, match="Could not automatically determine credentials"):
73+
mount(
74+
"bq",
75+
"bigquery",
76+
{"project": "bigquery-public-data", "dataset_name": "hacker_news"},
77+
)

0 commit comments

Comments
 (0)