Skip to content

wip: GeoVortex #3213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 21 commits into
base: develop
Choose a base branch
from
633 changes: 563 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"bench-vortex",
"encodings/*",
"extensions/*",
"fuzz",
"pyvortex",
"vortex",
Expand Down Expand Up @@ -61,6 +62,7 @@ arrow-arith = "55"
arrow-array = "55"
arrow-buffer = "55"
arrow-cast = "55"
arrow-ipc = "55"
arrow-ord = "55"
arrow-schema = "55"
arrow-select = "55"
Expand All @@ -72,7 +74,7 @@ bit-vec = "0.8.0"
bytes = "1.10"
bzip2 = "0.5.0"
cfg-if = "1"
chrono = "0.4.40"
chrono = "0.4.41"
clap = "4.5"
compio = { version = "0.14", features = ["io-uring"], default-features = false }
crossterm = "0.28"
Expand Down Expand Up @@ -188,6 +190,7 @@ vortex-fastlanes = { version = "0.32.0", path = "./encodings/fastlanes", default
vortex-file = { version = "0.32.0", path = "./vortex-file", default-features = false }
vortex-flatbuffers = { version = "0.32.0", path = "./vortex-flatbuffers", default-features = false }
vortex-fsst = { version = "0.32.0", path = "./encodings/fsst", default-features = false }
vortex-geo = { version = "0.32.0", path = "./extensions/geo" }
vortex-io = { version = "0.32.0", path = "./vortex-io", default-features = false }
vortex-ipc = { version = "0.32.0", path = "./vortex-ipc", default-features = false }
vortex-layout = { version = "0.32.0", path = "./vortex-layout", default-features = false }
Expand Down
50 changes: 50 additions & 0 deletions extensions/geo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "vortex-geo"
description = "Geospatial extensions for Vortex"
authors.workspace = true
categories.workspace = true
edition.workspace = true
homepage.workspace = true
include.workspace = true
keywords.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true

[dependencies]
arcref = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
geoarrow = { git = "https://github.com/geoarrow/geoarrow-rs.git", rev = "c241034f" }
geoarrow-schema = { git = "https://github.com/geoarrow/geoarrow-rs.git", rev = "c241034f" }
serde_json = { workspace = true }
vortex-array = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-expr = { workspace = true, optional = true }
vortex-layout = { workspace = true, optional = true }
clap = { version = "4.5.37", features = ["derive"] }

[dev-dependencies]
anyhow = { workspace = true }
arrow-ipc = { workspace = true }
clap = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
vortex-btrblocks = { workspace = true }
vortex-dtype = { workspace = true, features = ["arrow"] }
vortex-file = { workspace = true, features = ["tokio"] }
vortex-scalar = { workspace = true }
geoarrow-geoparquet = { git = "https://github.com/geoarrow/geoarrow-rs.git", rev = "c241034f", features = [
"compression",
] }
Comment on lines +40 to +42

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way I've been considering a refactor to the geoparquet APIs so that users call upstream parquet APIs directly. geoarrow/geoarrow-rs#1089

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also FYI the crate name changed to just geoparquet. (The geoparquet crate name had been squatted on but crates.io removed it)



[features]
# layouts request both vortex-layout and vortex-expr for ExprEvaluator impls
layouts = ["dep:vortex-layout", "dep:vortex-expr"]

[lints]
workspace = true
9 changes: 9 additions & 0 deletions extensions/geo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# GeoVortex

This crate provides geospatial extension types on top of Vortex. The current set of supported types include

* Point for 2D or 3D point geometry
* LineString for 2D or 3D linestring geometry
* Polygon for 2D or 3D polygons
* WKB for opaque well-known binary encoded geometry

40 changes: 40 additions & 0 deletions extensions/geo/examples/load_geoparquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
extern crate vortex_dtype;
extern crate vortex_geo;

use std::fs::File;

use vortex_array::variants::StructArrayTrait;
use vortex_array::{ArrayRef, ToCanonical, TryIntoArray};
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_geo::POLYGON_ID;

#[allow(clippy::unwrap_used, clippy::expect_used)]
pub fn main() {
vortex_geo::arrow::register_extension_types();

let file = File::open("/Volumes/Code/Data/afghan.parquet").unwrap();
let mut geo = geoarrow_geoparquet::GeoParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();

let arrow_batch = geo.next().unwrap().unwrap();

let vortex_batch: ArrayRef = arrow_batch
.try_into_array()
.expect("convert to vortex array");
let geometry = vortex_batch
.to_struct()
.unwrap()
.maybe_null_field_by_name("geometry")
.unwrap();

assert_eq!(geometry.dtype().as_extension().unwrap().id(), &*POLYGON_ID);

println!("uncompressed:\n{}", geometry.tree_display());

// Run through the compressor to see how it performs. It should just compress the individual storage
// arrays.
let compressed = BtrBlocksCompressor.compress(geometry.as_ref()).unwrap();
println!("compressed:\n{}", compressed.tree_display());
}
59 changes: 59 additions & 0 deletions extensions/geo/examples/ogc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::fs::File;
use std::path::PathBuf;

use anyhow::Context;
use arrow_array::RecordBatch;
use arrow_array::cast::AsArray;
use clap::Parser;
use vortex_array::arrow::IntoArrowArray;
use vortex_file::VortexOpenOptions;

#[derive(Parser, Debug)]
pub struct Args {
/// Path to a Vortex file with geometry data
input: PathBuf,
}

#[allow(clippy::use_debug)]
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
vortex_geo::arrow::register_extension_types();

let args = Args::parse();
let reader = VortexOpenOptions::file()
.open(&args.input)
.await
.context("open input file")?;

let arrow_schema = reader
.dtype()
.to_arrow_schema()
.context("get arrow schema")?;

let out = File::create("ipc.arrow").context("open IPC file")?;
let mut writer = arrow_ipc::writer::StreamWriter::try_new(out, &arrow_schema)?;

println!("arrow schema: {:?}", arrow_schema);

// Stream batches to read the file as-is.
let batches = reader.scan().context("scan builder")?.into_array_iter()?;

let mut written = 0;
for batch in batches {
let record_batch = batch?.into_arrow_preferred()?;
let batch = RecordBatch::from(
record_batch
.as_struct_opt()
.ok_or_else(|| anyhow::anyhow!("expected struct"))?
.clone(),
);
writer.write(&batch)?;
written += batch.num_rows();
}

writer.finish()?;

eprintln!("wrote {written} rows to ipc.arrow");

Ok(())
}
161 changes: 161 additions & 0 deletions extensions/geo/src/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//! `ExtensionArray` wrapper for arrays that hold geospatial data types.

use std::sync::Arc;

use vortex_array::ArrayRef;
use vortex_array::arrays::ExtensionArray;
use vortex_array::variants::ExtensionArrayTrait;
use vortex_dtype::{DType, PType, StructDType};
use vortex_error::{VortexError, VortexResult, vortex_assert, vortex_bail};

use crate::{Dimension, GeoMetadata, GeometryType, OwnedGeoMetadata, OwnedGeometryType};

/// Holder for what is known to be one of the blessed extension array types.
pub enum GeometryArray<'a> {
Point(&'a ExtensionArray, GeoMetadata<'a>),
LineString(&'a ExtensionArray, GeoMetadata<'a>),
Polygon(&'a ExtensionArray, GeoMetadata<'a>),
#[allow(clippy::upper_case_acronyms)]
WKB(&'a ExtensionArray, GeoMetadata<'a>),
}

impl<'a> TryFrom<&'a ExtensionArray> for GeometryArray<'a> {
type Error = VortexError;

fn try_from(value: &'a ExtensionArray) -> VortexResult<Self> {
let geometry_type = GeometryType::try_from(value.ext_dtype().as_ref())?;
Ok(match geometry_type {
GeometryType::Point(meta) => Self::Point(value, meta),
GeometryType::Polygon(meta) => Self::Polygon(value, meta),
GeometryType::WKB(meta) => Self::WKB(value, meta),
GeometryType::LineString(meta) => Self::LineString(value, meta),
})
}
}

#[derive(Debug, Clone)]
pub struct PointArray {
inner: ExtensionArray,
metadata: OwnedGeoMetadata,
}

impl PointArray {
/// Wrap an existing array as a `geovortex.point` extension array.
///
/// ## Error checking
///
/// The provided `points` storage array must be a struct-typed array with f64 columns named based
/// on their dimensions.
pub fn try_new(points: ArrayRef, metadata: OwnedGeoMetadata) -> VortexResult<Self> {
let DType::Struct(schema, _) = points.dtype() else {
vortex_bail!("points must be Struct typed, was {}", points.dtype())
};

validate_coord_schema(schema, metadata.dimension)?;
let point_type =
OwnedGeometryType::Point(metadata.clone()).into_ext_dtype(points.dtype().nullability());
let inner = ExtensionArray::new(Arc::new(point_type), points);
Ok(Self { inner, metadata })
}

/// Deconstruct the `PointsArray` wrapper into the storage array and the parsed extension metadata.
pub fn into_parts(self) -> (ExtensionArray, OwnedGeoMetadata) {
(self.inner, self.metadata)
}
}

pub fn validate_coord_schema(schema: &StructDType, dimensions: Dimension) -> VortexResult<()> {
match dimensions {
Dimension::XY => {
vortex_assert!(schema.nfields() == 2);
vortex_assert!(schema.field_name(0)?.as_ref().eq("x"));
vortex_assert!(schema.field_name(1)?.as_ref().eq("y"));
schema
.fields()
.all(|field| field.eq_ignore_nullability(PType::F64.into()));
}
Dimension::XYZ => {
vortex_assert!(schema.nfields() == 3);
vortex_assert!(schema.field_name(0)?.as_ref().eq("x"));
vortex_assert!(schema.field_name(1)?.as_ref().eq("y"));
vortex_assert!(schema.field_name(2)?.as_ref().eq("z"));
schema
.fields()
.all(|field| field.eq_ignore_nullability(PType::F64.into()));
}
Dimension::XYM => {
vortex_assert!(schema.nfields() == 3);
vortex_assert!(schema.field_name(0)?.as_ref().eq("x"));
vortex_assert!(schema.field_name(1)?.as_ref().eq("y"));
vortex_assert!(schema.field_name(2)?.as_ref().eq("m"));
schema
.fields()
.all(|field| field.eq_ignore_nullability(PType::F64.into()));
}
Dimension::XYZM => {
vortex_assert!(schema.nfields() == 3);
Copy link
Preview

Copilot AI May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Dimension::XYZM case, the schema should have 4 fields (x, y, z, m) but the assertion expects 3. Please update the check to 'schema.nfields() == 4' to correctly validate the schema.

Suggested change
vortex_assert!(schema.nfields() == 3);
vortex_assert!(schema.nfields() == 4);

Copilot uses AI. Check for mistakes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to make a macro or something b/c I keep making this mistake...

vortex_assert!(schema.field_name(0)?.as_ref().eq("x"));
vortex_assert!(schema.field_name(1)?.as_ref().eq("y"));
vortex_assert!(schema.field_name(2)?.as_ref().eq("z"));
vortex_assert!(schema.field_name(3)?.as_ref().eq("m"));
schema
.fields()
.all(|field| field.eq_ignore_nullability(PType::F64.into()));
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use vortex_array::Array;
use vortex_array::arrays::{PrimitiveArray, StructArray};

use super::PointArray;
use crate::{OwnedGeoMetadata, POINT_ID};

#[test]
fn test_points() {
let values = StructArray::from_fields(&[
(
"x",
PrimitiveArray::from_iter([0f64, 0f64, 0f64]).into_array(),
),
(
"y",
PrimitiveArray::from_iter([1f64, 2f64, 3f64]).into_array(),
),
(
"z",
PrimitiveArray::from_iter([4f64, 5f64, 6f64]).into_array(),
),
])
.unwrap()
.into_array();

let points_array = PointArray::try_new(
values,
OwnedGeoMetadata {
crs: None,
dimension: crate::Dimension::XYZ,
},
)
.unwrap();

let (ext, meta) = points_array.into_parts();
assert_eq!(
meta,
OwnedGeoMetadata {
crs: None,
dimension: crate::Dimension::XYZ,
}
);
assert_eq!(ext.id(), &*POINT_ID);
}

#[test]
fn test_polygon() {
// Create two lists of points: one defining an interior and exterior.
}
}
Loading