Skip to content

feat(ibis): postgreSQL, PostGIS support for IBIS Server. #1188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ibis-server/app/model/metadata/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class RustWrenEngineColumnType(Enum):
TIME = "TIME"
NULL = "NULL"

# Extension types
## PostGIS
GEOMETRY = "GEOMETRY"
GEOGRAPHY = "GEOGRAPHY"


class Column(BaseModel):
name: str
Expand Down
88 changes: 88 additions & 0 deletions ibis-server/app/model/metadata/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,92 @@
from app.model.metadata.metadata import Metadata


class ExtensionHandler:
def __init__(self, connection):
self.connection = connection

self.handlers = {
"postgis": self.postgis_handler,
}

def augment(self, tables: list[Table]) -> list[Table]:
# Get the list of extensions from the database
extensions = self.get_extensions()

# Iterate through the extensions and call the appropriate handler
for ext in extensions:
ext_name = ext["extension_name"]
schema_name = ext["schema_name"]

if ext_name in self.handlers:
handler = self.handlers[ext_name]
tables = handler(tables, schema_name)

return tables

def get_extensions(self) -> list[str]:
sql = """
SELECT
e.extname AS extension_name,
n.nspname AS schema_name
FROM
pg_extension e
JOIN
pg_namespace n ON n.oid = e.extnamespace;
"""
df = self.connection.sql(sql).to_pandas()
if df.empty:
return []
response = df.to_dict(orient="records")
return response

def postgis_handler(self, tables: list[Table], schema_name: str) -> list[Table]:
# Get the list of geometry and geography columns
sql = f"""
SELECT
f_table_schema,
f_table_name,
f_geometry_column AS column_name,
'geometry' AS column_type
FROM
{schema_name}.geometry_columns
UNION ALL
SELECT
f_table_schema,
f_table_name,
f_geography_column AS column_name,
'geography' AS column_type
FROM
{schema_name}.geography_columns;
"""
response = self.connection.sql(sql).to_pandas().to_dict(orient="records")

# Update tables
for row in response:
# TODO: Might want to use a global `_format_postgres_compact_table_name` function.
table_name = f"{row['f_table_schema']}.{row['f_table_name']}"
table = tables[table_name]
for column in table.columns:
column.type = str(
self._transform_postgres_column_type(row["column_type"])
)
break

return tables

def _transform_postgres_column_type(self, data_type):
# lower case the data_type
data_type = data_type.lower()

# Extension types
switcher = {
"geometry": RustWrenEngineColumnType.GEOMETRY,
"geography": RustWrenEngineColumnType.GEOGRAPHY,
}

return switcher.get(data_type, RustWrenEngineColumnType.UNKNOWN)


class PostgresMetadata(Metadata):
def __init__(self, connection_info: PostgresConnectionInfo):
super().__init__(connection_info)
Expand Down Expand Up @@ -80,6 +166,8 @@ def get_table_list(self) -> list[Table]:
properties=None,
)
)
extension_handler = ExtensionHandler(self.connection)
unique_tables = extension_handler.augment(unique_tables)
return list(unique_tables.values())
Comment on lines +169 to 171
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify extension handler augmentation behavior

The augment method signature returns a list of tables, but it's being used here to modify the unique_tables dictionary. Since Python dictionaries are passed by reference, this likely works as intended, but it would be clearer if the method returned the modified dictionary or the variable assignment was removed.


🏁 Script executed:

#!/bin/bash
# Check if the augment method modifies the tables in place

echo "Examining ExtensionHandler.augment method:"
rg -A 5 "def augment" --type py
echo "------------------------------------------------"
echo "Checking reference to unique_tables before and after augment call:"
rg -A 5 -B 2 "extension_handler.augment" --type py

Length of output: 1566


🏁 Script executed:

#!/bin/bash
# Show full implementation of the augment method including its return behavior
rg -n "def augment" -A 50 ibis-server/app/model/metadata/postgres.py

# Show context around unique_tables before and after its definition and usage
rg -n "unique_tables" -C 5 ibis-server/app/model/metadata/postgres.py

Length of output: 3870


🏁 Script executed:

#!/bin/bash
# Show complete ExtensionHandler class, including handler registrations
rg -n "class ExtensionHandler" -A 200 ibis-server/app/model/metadata/postgres.py

echo "---- handlers mapping ----"
rg -n "self.handlers" -A 20 ibis-server/app/model/metadata/postgres.py

echo "---- postgis_handler implementation ----"
rg -n "def postgis_handler" -A 200 ibis-server/app/model/metadata/postgres.py

Length of output: 19056


Fix type annotations for ExtensionHandler.augment and handlers

The augment and related handler methods actually accept and return a mapping of table names to Table objects (a dict[str, Table]), not a list[Table]. Update the type signatures to reflect that, or else change the implementation to work with lists:

• In ibis-server/app/model/metadata/postgres.py, adjust:

-    class ExtensionHandler:
-        def augment(self, tables: list[Table]) -> list[Table]:
+    class ExtensionHandler:
+        def augment(self, tables: dict[str, Table]) -> dict[str, Table]:
             # …
-    def postgis_handler(self, tables: list[Table], schema_name: str) -> list[Table]:
+    def postgis_handler(self, tables: dict[str, Table], schema_name: str) -> dict[str, Table]:
             # …

• Optionally remove the assignment
Since the handlers mutate the dict in place and then return it, you could also call:

extension_handler.augment(unique_tables)
return list(unique_tables.values())

without re-assigning unique_tables.

These changes will make the code’s behavior and its annotations consistent.


def get_constraints(self) -> list[Constraint]:
Expand Down
Binary file not shown.
33 changes: 33 additions & 0 deletions ibis-server/tests/routers/v2/connector/test_postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
from urllib.parse import quote_plus, urlparse

import geopandas as gpd
import orjson
import pandas as pd
import psycopg
Expand Down Expand Up @@ -149,6 +150,19 @@ def postgres(request) -> PostgresContainer:
return pg


@pytest.fixture(scope="module")
def postgis(request) -> PostgresContainer:
pg = PostgresContainer("postgis/postgis:16-3.5-alpine").start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because postgis only provides the amd64 images, I faced an error about the image not found on my M4 Mac. We can solve it by downloading the image manually:

docker pull --platform linux/amd64 postgis/postgis:16-3.5-alpine

I think it's better to add a comment to mention it.

engine = sqlalchemy.create_engine(pg.get_connection_url())
with engine.begin() as conn:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS postgis"))
gpd.read_parquet(
file_path("resource/tpch/data/cities_geometry.parquet")
).to_postgis("cities_geometry", engine, index=False)
request.addfinalizer(pg.stop)
return pg


async def test_query(client, manifest_str, postgres: PostgresContainer):
connection_info = _to_connection_info(postgres)
response = await client.post(
Expand Down Expand Up @@ -1050,6 +1064,25 @@ async def test_model_substitute_non_existent_column(
assert 'column "x" does not exist' in response.text


async def test_postgis_geometry(client, manifest_str, postgis: PostgresContainer):
connection_info = _to_connection_info(postgis)
response = await client.post(
url=f"{base_url}/query",
json={
"connectionInfo": connection_info,
"manifestStr": manifest_str,
"sql": (
"SELECT ST_Distance(a.geometry, b.geometry) AS distance "
"FROM cities_geometry a, cities_geometry b "
"WHERE a.\"City\" = 'London' AND b.\"City\" = 'New York'"
),
},
)
assert response.status_code == 200
result = response.json()
assert result["data"][0] == ["74.6626535"]


def _to_connection_info(pg: PostgresContainer):
return {
"host": pg.get_container_host_ip(),
Expand Down
Loading