Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compat: Spatialpandas with dask-expr #1405

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 9 additions & 78 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import datashader.utils as du

import pytest
from datashader.tests.utils import dask_switcher
from datashader.tests.test_pandas import _pandas

try:
Expand All @@ -34,39 +33,26 @@
pytestmark = pytest.importorskip("dask")



@dask_switcher(query=False)
def _dask():
return dd.from_pandas(_pandas(), npartitions=2)

@dask_switcher(query=True)
def _dask_expr():
return dd.from_pandas(_pandas(), npartitions=2)

@dask_switcher(query=False, extras=["dask_cudf"])
def _dask_cudf():
import dask_cudf

_dask = dd.from_pandas(_pandas(), npartitions=2)
if Version(dask_cudf.__version__) >= Version("24.06"):
return _dask.to_backend("cudf")
else:
return dask_cudf.from_dask_dataframe(_dask)

@dask_switcher(query=True, extras=["dask_cudf"])
def _dask_expr_cudf():
import dask_cudf
if Version(dask_cudf.__version__) < Version("24.06"):
pytest.skip("dask-expr requires dask-cudf 24.06 or later")
_dask = dd.from_pandas(_pandas(), npartitions=2)
return _dask.to_backend("cudf")

_backends = [
pytest.param(_dask, id="dask"),
pytest.param(_dask_expr, id="dask-expr"),
pytest.param(_dask_cudf, marks=pytest.mark.gpu, id="dask-cudf"),
pytest.param(_dask_expr_cudf, marks=pytest.mark.gpu, id="dask-expr-cudf"),
]


@pytest.fixture(params=_backends)
def ddf(request):
return request.param()
Expand All @@ -76,7 +62,7 @@ def ddf(request):
def npartitions(request):
return request.param

@dask_switcher(query=False)

def _dask_DataFrame(*args, **kwargs):
if kwargs.pop("geo", False):
df = sp.GeoDataFrame(*args, **kwargs)
Expand All @@ -85,53 +71,21 @@ def _dask_DataFrame(*args, **kwargs):
return dd.from_pandas(df, npartitions=2)


@dask_switcher(query=True)
def _dask_expr_DataFrame(*args, **kwargs):
if kwargs.pop("geo", False):
pytest.skip("dask-expr currently does not work with spatialpandas")
# df = sp.GeoDataFrame(*args, **kwargs)
else:
df = pd.DataFrame(*args, **kwargs)
return dd.from_pandas(df, npartitions=2)


@dask_switcher(query=False, extras=["dask_cudf"])
def _dask_cudf_DataFrame(*args, **kwargs):
import cudf
import dask_cudf
if kwargs.pop("geo", False):
# As of dask-cudf version 24.06, dask-cudf is not
# compatible with spatialpandas version 0.4.10
pytest.skip("dask-cudf currently does not work with spatialpandas")
cdf = cudf.DataFrame.from_pandas(
pd.DataFrame(*args, **kwargs), nan_as_null=False
)
return dask_cudf.from_cudf(cdf, npartitions=2)


@dask_switcher(query=True, extras=["dask_cudf"])
def _dask_expr_cudf_DataFrame(*args, **kwargs):
import cudf
import dask_cudf

if Version(dask_cudf.__version__) < Version("24.06"):
pytest.skip("dask-expr requires dask-cudf 24.06 or later")

if kwargs.pop("geo", False):
# As of dask-cudf version 24.06, dask-cudf is not
# compatible with spatialpandas version 0.4.10
pytest.skip("dask-cudf currently does not work with spatialpandas")
cdf = cudf.DataFrame.from_pandas(
pd.DataFrame(*args, **kwargs), nan_as_null=False
)
cdf = cudf.DataFrame.from_pandas(pd.DataFrame(*args, **kwargs), nan_as_null=False)
return dask_cudf.from_cudf(cdf, npartitions=2)


_backends = [
pytest.param(_dask_DataFrame, id="dask"),
pytest.param(_dask_expr_DataFrame, id="dask-expr"),
pytest.param(_dask_cudf_DataFrame, marks=pytest.mark.gpu, id="dask-cudf"),
pytest.param(_dask_expr_cudf_DataFrame, marks=pytest.mark.gpu, id="dask-expr-cudf"),
]

@pytest.fixture(params=_backends)
Expand Down Expand Up @@ -163,25 +117,6 @@ def floats(n):
n = n + np.spacing(n)


@pytest.mark.gpu
def test_check_query_setting():
import os
from subprocess import check_output, SubprocessError

# dask-cudf does not support query planning as of 24.04.
# So we check that it is not set outside of Python.
assert os.environ.get('DASK_DATAFRAME__QUERY_PLANNING', 'false').lower() != 'true'

# This also have problem with the global setting so we check
try:
cmd = ['dask', 'config', 'get', 'dataframe.query-planning']
output = check_output(cmd, text=True).strip().lower()
assert output != 'true'
except SubprocessError:
# Newer version will error out if not set
pass


def test_count(ddf, npartitions):
ddf = ddf.repartition(npartitions=npartitions)
assert ddf.npartitions == npartitions
Expand Down Expand Up @@ -1236,7 +1171,6 @@ def test_log_axis_points(ddf):


@pytest.mark.skipif(not sp, reason="spatialpandas not installed")
@dask_switcher(query=False, extras=["spatialpandas.dask"])
def test_points_geometry():
axis = ds.core.LinearAxis()
lincoords = axis.compute_index(axis.compute_scale_and_translate((0., 2.), 3), 3)
Expand All @@ -1257,7 +1191,6 @@ def test_points_geometry():
assert_eq_xr(agg, out)


@dask_switcher(query=False, extras=["spatialpandas.dask"])
def test_line(DataFrame):
axis = ds.core.LinearAxis()
lincoords = axis.compute_index(axis.compute_scale_and_translate((-3., 3.), 7), 7)
Expand Down Expand Up @@ -1339,7 +1272,6 @@ def test_line(DataFrame):
}, dtype='Line[int64]'), dict(geometry='geom'))
)

@dask_switcher(query=False, extras=["spatialpandas.dask"])
@pytest.mark.parametrize('df_kwargs,cvs_kwargs', line_manual_range_params[5:7])
def test_line_manual_range(DataFrame, df_kwargs, cvs_kwargs, request):
if "cudf" in request.node.name:
Expand Down Expand Up @@ -1452,7 +1384,6 @@ def test_line_manual_range(DataFrame, df_kwargs, cvs_kwargs, request):
}, dtype='Line[int64]'), dict(geometry='geom'))
)

@dask_switcher(query=False, extras=["spatialpandas.dask"])
@pytest.mark.parametrize('df_kwargs,cvs_kwargs', line_autorange_params)
def test_line_autorange(DataFrame, df_kwargs, cvs_kwargs, request):
if "cudf" in request.node.name:
Expand Down Expand Up @@ -1621,7 +1552,7 @@ def test_auto_range_line(DataFrame):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_fixedrange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1713,7 +1644,7 @@ def test_area_to_zero_fixedrange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_autorange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame ==_dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1790,7 +1721,7 @@ def test_area_to_zero_autorange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame ==_dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1893,7 +1824,7 @@ def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', y_stack='y_stack', axis=1))
])
def test_area_to_line_autorange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1980,7 +1911,7 @@ def test_area_to_line_autorange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', y_stack='y_stack', axis=1))
])
def test_area_to_line_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down
44 changes: 6 additions & 38 deletions datashader/tests/test_geopandas.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,16 @@
# Testing GeoPandas and SpatialPandas
import contextlib

import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray
import numpy as np
from numpy import nan
import pytest
from datashader.tests.utils import dask_switcher
from packaging.version import Version

try:
import dask.dataframe as dd
except ImportError:
dd = None

_backends = [
pytest.param(False, id="dask"),
]

_extras = ["spatialpandas.dask", "dask_geopandas.backends", "dask_geopandas"]

with contextlib.suppress(ImportError):
import dask_geopandas

if Version(dask_geopandas.__version__) >= Version("0.4.0"):
_backends.append(pytest.param(True, id="dask-expr"))


@pytest.fixture(params=_backends)
def dask_both(request):
with dask_switcher(query=request.param, extras=_extras): ...
return request.param

@pytest.fixture
def dask_classic(request):
with dask_switcher(query=False, extras=_extras): ...

try:
import dask_geopandas
Expand Down Expand Up @@ -129,14 +105,6 @@ def dask_classic(request):
])


@pytest.mark.skipif(not dask_geopandas, reason="dask_geopandas not installed")
def test_dask_geopandas_switcher(dask_both):
import dask_geopandas
if dask_both:
assert dask_geopandas.expr.GeoDataFrame == dask_geopandas.GeoDataFrame
else:
assert dask_geopandas.core.GeoDataFrame == dask_geopandas.GeoDataFrame


@pytest.mark.skipif(not geodatasets, reason="geodatasets not installed")
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
Expand Down Expand Up @@ -177,7 +145,7 @@ def test_lines_geopandas(geom_type, explode, use_boundary):
("linestring", True, True),
],
)
def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions, dask_both):
def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df)) # Extra column for aggregation.
geometry = "boundary" if use_boundary else "geometry"
Expand Down Expand Up @@ -209,7 +177,7 @@ def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions, das
("linestring", True, True),
],
)
def test_lines_spatialpandas(geom_type, explode, use_boundary, npartitions, dask_classic):
def test_lines_spatialpandas(geom_type, explode, use_boundary, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df)) # Extra column for aggregation.
geometry = "boundary" if use_boundary else "geometry"
Expand Down Expand Up @@ -252,7 +220,7 @@ def test_points_geopandas(geom_type):
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
@pytest.mark.parametrize('npartitions', [1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipoint", "point"])
def test_points_dask_geopandas(geom_type, npartitions, dask_both):
def test_points_dask_geopandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))

df["geometry"] = df["geometry"].sample_points(100, rng=93814) # multipoint
Expand All @@ -274,7 +242,7 @@ def test_points_dask_geopandas(geom_type, npartitions, dask_both):
@pytest.mark.skipif(not spatialpandas, reason="spatialpandas not installed")
@pytest.mark.parametrize('npartitions', [0, 1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipoint", "point"])
def test_points_spatialpandas(geom_type, npartitions, dask_classic):
def test_points_spatialpandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))

df["geometry"] = df["geometry"].sample_points(100, rng=93814) # multipoint
Expand Down Expand Up @@ -315,7 +283,7 @@ def test_polygons_geopandas(geom_type):
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
@pytest.mark.parametrize('npartitions', [1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipolygon", "polygon"])
def test_polygons_dask_geopandas(geom_type, npartitions, dask_both):
def test_polygons_dask_geopandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df))

Expand All @@ -338,7 +306,7 @@ def test_polygons_dask_geopandas(geom_type, npartitions, dask_both):
@pytest.mark.skipif(not spatialpandas, reason="spatialpandas not installed")
@pytest.mark.parametrize('npartitions', [0, 1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipolygon", "polygon"])
def test_polygons_spatialpandas(geom_type, npartitions, dask_classic):
def test_polygons_spatialpandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df))

Expand Down
4 changes: 0 additions & 4 deletions datashader/tests/test_polygons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
import xarray as xr
import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray, assert_eq_xr
from datashader.tests.utils import dask_switcher

try:
import dask.dataframe as dd
except ImportError:
dd = None

@pytest.fixture(autouse=True)
def _classic_dd():
with dask_switcher(query=False, extras=["spatialpandas.dask"]): ...

try:
# Import to register extension arrays
Expand Down
Loading