Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(postgres): improve to_pyarrow_batches by using server-side cursors #10954

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions ibis/backends/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import importlib.metadata
import itertools
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any

import _pytest
Expand Down Expand Up @@ -148,20 +147,6 @@
ALL_BACKENDS = set(_get_backend_names())


@pytest.fixture(scope="session")
def data_dir() -> Path:
"""Return the test data directory.

Returns
-------
Path
Test data directory
"""
root = Path(__file__).absolute().parents[2]

return root / "ci" / "ibis-testing-data"


def _get_backend_conf(backend_str: str):
"""Convert a backend string to the test class for the backend."""
conftest = importlib.import_module(f"ibis.backends.{backend_str}.tests.conftest")
Expand Down
52 changes: 51 additions & 1 deletion ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ibis.backends.sql.compilers.base import TRUE, C, ColGen

if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Callable, Mapping
from urllib.parse import ParseResult

import pandas as pd
Expand Down Expand Up @@ -740,3 +740,53 @@
else:
con.commit()
return cursor

@util.experimental
def to_pyarrow_batches(
self,
expr: ir.Expr,
/,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.ipc.RecordBatchReader:
import pandas as pd
import pyarrow as pa

self._run_pre_execute_hooks(expr)

schema = expr.as_table().schema()

query = self.compile(expr, limit=limit, params=params)
with contextlib.suppress(AttributeError):
query = query.sql(dialect=self.dialect)

con = self.con
# server-side cursors need to be uniquely named
cursor = con.cursor(name=util.gen_name("postgres_cursor"))

try:
cursor.execute(query)
except Exception:
con.rollback()
cursor.close()
raise

Check warning on line 775 in ibis/backends/postgres/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/postgres/__init__.py#L772-L775

Added lines #L772 - L775 were not covered by tests

def _batches(schema: pa.Schema):
columns = schema.names
try:
while batch := cursor.fetchmany(chunk_size):
yield pa.RecordBatch.from_pandas(
pd.DataFrame(batch, columns=columns), schema=schema
)
except Exception:
con.rollback()
cursor.close()
raise

Check warning on line 787 in ibis/backends/postgres/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/postgres/__init__.py#L784-L787

Added lines #L784 - L787 were not covered by tests
else:
con.commit()

pa_schema = schema.to_pyarrow()
return pa.RecordBatchReader.from_batches(pa_schema, _batches(pa_schema))
15 changes: 15 additions & 0 deletions ibis/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import builtins
import os
import platform
from pathlib import Path

import pytest

Expand Down Expand Up @@ -44,3 +45,17 @@ def add_ibis(monkeypatch, doctest_namespace):
condition=WINDOWS,
reason="windows prevents two connections to the same file even in the same process",
)


@pytest.fixture(scope="session")
def data_dir() -> Path:
"""Return the test data directory.

Returns
-------
Path
Test data directory
"""
root = Path(__file__).absolute().parents[1]

return root / "ci" / "ibis-testing-data"
31 changes: 31 additions & 0 deletions ibis/tests/benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,3 +1019,34 @@
itertools.cycle(("int", "string", "array<int>", "float")),
),
)


@pytest.fixture(scope="session")
def pgtable(data_dir):
pd = pytest.importorskip("pandas")
pytest.importorskip("psycopg")

Check warning on line 1027 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1027

Added line #L1027 was not covered by tests

from ibis.backends.postgres.tests.conftest import (

Check warning on line 1029 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1029

Added line #L1029 was not covered by tests
IBIS_TEST_POSTGRES_DB,
PG_HOST,
PG_PASS,
PG_PORT,
PG_USER,
)

con = ibis.postgres.connect(

Check warning on line 1037 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1037

Added line #L1037 was not covered by tests
user=PG_USER,
password=PG_PASS,
host=PG_HOST,
port=PG_PORT,
database=IBIS_TEST_POSTGRES_DB,
)
name = ibis.util.gen_name("functional_alltypes_bench")
yield con.create_table(

Check warning on line 1045 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1044-L1045

Added lines #L1044 - L1045 were not covered by tests
name, obj=pd.read_csv(data_dir / "csv" / "functional_alltypes.csv"), temp=True
)
con.disconnect()

Check warning on line 1048 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1048

Added line #L1048 was not covered by tests


def test_postgres_record_batches(pgtable, benchmark):
benchmark(pgtable.to_pyarrow)

Check warning on line 1052 in ibis/tests/benchmarks/test_benchmarks.py

View check run for this annotation

Codecov / codecov/patch

ibis/tests/benchmarks/test_benchmarks.py#L1052

Added line #L1052 was not covered by tests