Skip to content

Commit b2caee8

Browse files
committed
Add a fork of postgres-elasticsearch-fdw to the engine and make it available through sgr mount.
Basic usage: ``` sgr mount elasticsearch -c elasticsearch:9200 -o@- <<EOF { "table_spec": { "table_1": { "schema": { "id": "text", "@timestamp": "timestamp", "query": "text", "col_1": "text", "col_2": "boolean", } "index": "index-pattern*", "rowid_column": "id", "query_column": "query", } } } EOF ```
1 parent 76d081a commit b2caee8

File tree

6 files changed

+114
-10
lines changed

6 files changed

+114
-10
lines changed

.gitmodules

+3
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
[submodule "engine/src/cstore_fdw"]
55
path = engine/src/cstore_fdw
66
url = https://github.com/splitgraph/cstore_fdw.git
7+
[submodule "engine/src/postgres-elasticsearch-fdw"]
8+
path = engine/src/postgres-elasticsearch-fdw
9+
url = https://github.com/splitgraph/postgres-elasticsearch-fdw.git

engine/Dockerfile

+8-1
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,19 @@ ENV POSTGRES_USER sgr
175175
COPY ./engine/etc /etc/
176176
COPY ./engine/init_scripts /docker-entrypoint-initdb.d/
177177

178-
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph"
179178

180179
# Copy the actual Splitgraph code over at this point.
181180
COPY ./splitgraph /splitgraph/splitgraph
182181
COPY ./bin /splitgraph/bin
182+
183+
# "Install" elasticsearch_fdw
184+
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
185+
mkdir /pg_es_fdw && \
186+
pip install elasticsearch>=7.7.0
187+
COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/
188+
183189
ENV PATH "${PATH}:/splitgraph/bin"
190+
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"
184191

185192
# https://github.com/postgis/docker-postgis/blob/master/12-3.0/Dockerfile
186193
ARG with_postgis

splitgraph/config/keys.py

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"mongo_fdw": "splitgraph.hooks.mount_handlers.mount_mongo",
6262
"mysql_fdw": "splitgraph.hooks.mount_handlers.mount_mysql",
6363
"socrata": "splitgraph.ingestion.socrata.mount.mount_socrata",
64+
"elasticsearch": "splitgraph.hooks.mount_handlers.mount_elasticsearch",
6465
},
6566
}
6667

splitgraph/core/output.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,17 @@ def parse_repo_tag_or_hash(value, default="latest"):
6666

6767
def conn_string_to_dict(connection: Optional[str]) -> Dict[str, Any]:
6868
if connection:
69-
match = re.match(r"(\S+):(\S+)@(.+):(\d+)", connection)
69+
match = re.match(r"((\S+):(\S+)@)?(.+):(\d+)", connection)
7070
if not match:
7171
raise ValueError("Invalid connection string!")
7272
# In the future, we could turn all of these options into actual Click options,
7373
# but then we'd also have to parse the docstring deeper to find out the types the function
7474
# requires, how to serialize them etc etc. Idea for a click-contrib addon perhaps?
7575
return dict(
76-
server=match.group(3),
77-
port=int(match.group(4)),
78-
username=match.group(1),
79-
password=match.group(2),
76+
server=match.group(4),
77+
port=int(match.group(5)),
78+
username=match.group(2),
79+
password=match.group(3),
8080
)
8181
else:
8282
return dict(server=None, port=None, username=None, password=None)

splitgraph/hooks/mount_handlers.py

+96-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def init_fdw(
4141
engine: "PostgresEngine",
4242
server_id: str,
4343
wrapper: str,
44-
server_options: Optional[Dict[str, Union[str, None]]] = None,
44+
server_options: Optional[Dict[str, Union[str, int, None]]] = None,
4545
user_options: Optional[Dict[str, str]] = None,
4646
overwrite: bool = True,
4747
) -> None:
@@ -67,7 +67,7 @@ def init_fdw(
6767
if server_options:
6868
server_keys, server_vals = zip(*server_options.items())
6969
create_server += _format_options(server_keys)
70-
engine.run_sql(create_server, server_vals)
70+
engine.run_sql(create_server, [str(v) for v in server_vals])
7171
else:
7272
engine.run_sql(create_server)
7373

@@ -77,7 +77,7 @@ def init_fdw(
7777
)
7878
user_keys, user_vals = zip(*user_options.items())
7979
create_mapping += _format_options(user_keys)
80-
engine.run_sql(create_mapping, user_vals)
80+
engine.run_sql(create_mapping, [str(v) for v in user_vals])
8181

8282

8383
def _format_options(option_names):
@@ -178,7 +178,7 @@ def _create_foreign_table(engine, local_schema, table_name, schema_spec, server_
178178
if server_options:
179179
server_keys, server_vals = zip(*server_options.items())
180180
query += _format_options(server_keys)
181-
engine.run_sql(query, server_vals)
181+
engine.run_sql(query, [str(v) for v in server_vals])
182182
else:
183183
engine.run_sql(query)
184184

@@ -296,6 +296,98 @@ def mount_mysql(
296296
)
297297

298298

299+
def mount_elasticsearch(
300+
mountpoint: str,
301+
server: str,
302+
port: int,
303+
username: str,
304+
password: str,
305+
table_spec: Dict[str, Dict[str, Any]],
306+
):
307+
"""
308+
Mount an ElasticSearch instance.
309+
310+
Mount a set of tables proxying to a remote ElasticSearch index.
311+
312+
This uses a fork of postgres-elasticsearch-fdw behind the scenes. You can add a column
313+
`query` to your table and set it as `query_column` to pass advanced ES queries and aggregations.
314+
For example:
315+
316+
```
317+
sgr mount elasticsearch -c elasticsearch:9200 -o@- <<EOF
318+
{
319+
"table_spec": {
320+
"table_1": {
321+
"schema": {
322+
"id": "text",
323+
"@timestamp": "timestamp",
324+
"query": "text",
325+
"col_1": "text",
326+
"col_2": "boolean",
327+
}
328+
"index": "index-pattern*",
329+
"rowid_column": "id",
330+
"query_column": "query",
331+
}
332+
}
333+
}
334+
```
335+
\b
336+
337+
:param mountpoint: Schema to mount the remote into.
338+
:param server: Database hostname.
339+
:param port: Database port
340+
:param username: A read-only user that the database will be accessed as.
341+
:param password: Password for the read-only user.
342+
:param table_spec: A dictionary of form
343+
```
344+
{"table_name":
345+
{"schema": {"col1": "type1"...},
346+
"index": <es index>,
347+
"type": <es doc_type, optional in ES7 and later>,
348+
"query_column": <column to pass ES query in>,
349+
"score_column": <column to return document score>,
350+
"scroll_size": <fetch size, default 1000>,
351+
"scroll_duration": <how long to hold the scroll context open for, default 10m>},
352+
...}
353+
```
354+
"""
355+
from splitgraph.engine import get_engine
356+
from psycopg2.sql import Identifier, SQL
357+
358+
engine = get_engine()
359+
logging.info("Mounting ElasticSearch instance...")
360+
server_id = mountpoint + "_server"
361+
362+
init_fdw(
363+
engine,
364+
server_id,
365+
"multicorn",
366+
{
367+
"wrapper": "pg_es_fdw.ElasticsearchFDW",
368+
"host": server,
369+
"port": port,
370+
"username": username,
371+
"password": password,
372+
},
373+
None,
374+
)
375+
376+
engine.run_sql(SQL("CREATE SCHEMA IF NOT EXISTS {}").format(Identifier(mountpoint)))
377+
378+
for table_name, table_options in table_spec.items():
379+
logging.info("Mounting table %s", table_name)
380+
schema = table_options.pop("schema")
381+
_create_foreign_table(
382+
engine,
383+
local_schema=mountpoint,
384+
table_name=table_name,
385+
schema_spec=schema,
386+
server_id=server_id,
387+
server_options=table_options,
388+
)
389+
390+
299391
def mount(
300392
mountpoint: str,
301393
mount_handler: str,

0 commit comments

Comments
 (0)