Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ dependencies:
- nitime=0.10
- scikit-image=0.22
- scikit-learn=1.4
# SimpleITK, so build doesn't complain about building scikit from sources
- simpleitk=2.4
# Utilities
- graphviz=9.0
- pandoc=3.1
Expand Down
68 changes: 48 additions & 20 deletions nitransforms/io/itk.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,15 @@ def from_h5obj(cls, fileobj, check=True):


class ITKDisplacementsField(DisplacementsField):
"""A data structure representing displacements fields."""
"""
A data structure representing displacements fields.

Note that ITK's world coordinates are LPS:
http://sourceforge.net/p/advants/discussion/840261/thread/2a1e9307/.
This translates into the fact that `antsApplyTransformsToPoints` expects LPS coordinates,
and therefore, points must correct for the x and y directions before feeding into the tool.

"""

@classmethod
def from_image(cls, imgobj):
Expand All @@ -347,22 +355,47 @@ def from_image(cls, imgobj):
warnings.warn("Incorrect intent identified.")
hdr.set_intent("vector")

field = np.squeeze(np.asanyarray(imgobj.dataobj))
field[..., (0, 1)] *= -1.0
affine, qcode = hdr.get_qform(coded=True)

if qcode != 1:
warnings.warn(
"Displacements field has qform code={qcode}, which is not ITK-like. "
"Setting it to 1 (aligned with the image)."
)
affine = imgobj.affine
hdr.set_qform(imgobj.affine, code=1)
hdr.set_sform(imgobj.affine, code=0)

# ITK uses LPS coordinates, so first we patch the affine's offset
# This patch was developed under gh-266, by adapting
# nitransforms/tests/test_nonlinear.py::test_densefield_map_vs_ants[True]
# until the same delta maps were obtained with `antsApplyTransformsToPoints`
# and our NiTransforms transform's `.map()`.
mid_ijk = 0.5 * (np.array(imgobj.shape[:3]) - 1)
offset = (affine - LPS @ affine) @ (*mid_ijk, 1.0)
affine[:3, 3] -= offset[:3]

# Create a NIfTI image with the patched affine
data = np.squeeze(imgobj.get_fdata(dtype="float32"))
refmap = imgobj.__class__(np.flip(data, axis=(0, 1)), affine, hdr)

return imgobj.__class__(field, imgobj.affine, hdr)
return refmap

@classmethod
def to_image(cls, imgobj):
"""Export a displacements field from a nibabel object."""

hdr = imgobj.header.copy()
hdr.set_intent("vector")

warp_data = imgobj.get_fdata().reshape(imgobj.shape[:3] + (1, imgobj.shape[-1]))
warp_data[..., (0, 1)] *= -1

return imgobj.__class__(warp_data, imgobj.affine, hdr)
hdr.set_data_dtype("float32")
affine = LPS @ imgobj.affine @ LPS
hdr.set_qform(affine, code=1)
hdr.set_sform(affine, code=0)
hdr.set_xyzt_units("mm", None)

return imgobj.__class__(
imgobj.get_fdata(dtype="float32")[..., None, :], affine, hdr
)


class ITKCompositeH5:
Expand Down Expand Up @@ -410,21 +443,16 @@ def from_h5obj(cls, fileobj, check=True, only_linear=False):
directions = np.reshape(_fixed[9:], (3, 3))
affine = from_matvec(directions * zooms, offset)
# ITK uses Fortran ordering, like NIfTI, but with the vector dimension first
field = np.moveaxis(
np.reshape(
xfm[f"{typo_fallback}Parameters"],
(3, *shape.astype(int)),
order="F",
),
0,
-1,
)
field[..., (0, 1)] *= -1.0
# In practice, this seems to work (see issue #171)
field = np.reshape(
xfm[f"{typo_fallback}Parameters"], (*shape.astype(int), 3)
).transpose(2, 1, 0, 3)

hdr = Nifti1Header()
hdr.set_intent("vector")
hdr.set_data_dtype("float")

xfm_list.append(Nifti1Image(field.astype("float"), LPS @ affine, hdr))
xfm_list.append(Nifti1Image(field.astype("float"), affine, hdr))
continue

raise TransformIOError(
Expand Down
165 changes: 164 additions & 1 deletion nitransforms/tests/test_nonlinear.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"""Tests of nonlinear transforms."""

import os
from subprocess import check_call
import shutil

import pytest

import numpy as np
Expand All @@ -13,6 +16,7 @@
DenseFieldTransform,
)
from nitransforms.tests.utils import get_points
from nitransforms.io.itk import ITKDisplacementsField

rng = np.random.default_rng()

Expand Down Expand Up @@ -66,7 +70,6 @@ def test_bsplines_references(testdata_path):
reference=testdata_path / "someones_anatomy.nii.gz",
)


@pytest.mark.parametrize("image_orientation", ["RAS", "LAS", "LPS", "oblique"])
@pytest.mark.parametrize("ongrid", [True, False])
def test_densefield_map(get_testdata, image_orientation, ongrid):
Expand Down Expand Up @@ -159,6 +162,166 @@ def test_densefield_map_vs_bspline(tmp_path, testdata_path):
np.testing.assert_allclose(dispxfm._field, bsplxfm._field, atol=1e-1, rtol=1e-4)


@pytest.mark.parametrize("ongrid", [True, False])
def test_densefield_map_vs_ants(testdata_path, tmp_path, ongrid):
"""Map points with DenseFieldTransform and compare to ANTs."""
warpfile = (
testdata_path
/ "regressions"
/ ("01_ants_t1_to_mniComposite_DisplacementFieldTransform.nii.gz")
)
if not warpfile.exists():
pytest.skip("Composite transform test data not available")

nii = ITKDisplacementsField.from_filename(warpfile)

# Get sampling indices
coords_xyz, points_ijk, grid_xyz, shape, ref_affine, reference, subsample = (
get_points(nii, ongrid, npoints=5, rng=rng)
)
coords_map = grid_xyz.reshape(*shape, 3)

csvin = tmp_path / "fixed_coords.csv"
csvout = tmp_path / "moving_coords.csv"
np.savetxt(csvin, coords_xyz, delimiter=",", header="x,y,z", comments="")

cmd = f"antsApplyTransformsToPoints -d 3 -i {csvin} -o {csvout} -t {warpfile}"
exe = cmd.split()[0]
if not shutil.which(exe):
pytest.skip(f"Command {exe} not found on host")
check_call(cmd, shell=True)

ants_res = np.genfromtxt(csvout, delimiter=",", names=True)
ants_pts = np.vstack([ants_res[n] for n in ("x", "y", "z")]).T

xfm = DenseFieldTransform(nii, reference=reference)
mapped = xfm.map(coords_xyz)

if ongrid:
ants_mapped_xyz = ants_pts.reshape(*shape, 3)
nit_mapped_xyz = mapped.reshape(*shape, 3)

nb.load(warpfile).to_filename(tmp_path / "original_ants_deltas.nii.gz")

nb.Nifti1Image(coords_map, ref_affine, None).to_filename(
tmp_path / "baseline_positions.nii.gz"
)

nii.to_filename(tmp_path / "original_interpreted_deltas.nii.gz")

nb.Nifti1Image(nit_mapped_xyz, ref_affine, None).to_filename(
tmp_path / "nit_deformation_xyz.nii.gz"
)
nb.Nifti1Image(ants_mapped_xyz - coords_map, ref_affine, None).to_filename(
tmp_path / "ants_deltas_xyz.nii.gz"
)
nb.Nifti1Image(nit_mapped_xyz - coords_map, ref_affine, None).to_filename(
tmp_path / "nit_deltas_xyz.nii.gz"
)

atol = 0 if ongrid else 1e-2
rtol = 1e-4 if ongrid else 1e-6
assert np.allclose(mapped, ants_pts, atol=atol, rtol=rtol)


@pytest.mark.parametrize("image_orientation", ["RAS", "LAS", "LPS", "oblique"])
@pytest.mark.parametrize("ongrid", [True, False])
def test_constant_field_vs_ants(tmp_path, get_testdata, image_orientation, ongrid):
"""Create a constant displacement field and compare mappings."""

nii = get_testdata[image_orientation]

# Get sampling indices
coords_xyz, points_ijk, grid_xyz, shape, ref_affine, reference, subsample = (
get_points(nii, ongrid, npoints=5, rng=rng)
)

coords_map = grid_xyz.reshape(*shape, 3)

deltas = np.hstack(
(
np.zeros(np.prod(shape)),
np.linspace(-80, 80, num=np.prod(shape)),
np.linspace(-50, 50, num=np.prod(shape)),
)
).reshape(shape + (3,))
gold_mapped_xyz = coords_map + deltas

fieldnii = nb.Nifti1Image(deltas, ref_affine, None)
warpfile = tmp_path / "itk_transform.nii.gz"
ITKDisplacementsField.to_filename(fieldnii, warpfile)

# Ensure direct (xfm) and ITK roundtrip (itk_xfm) are equivalent
xfm = DenseFieldTransform(fieldnii)
itk_xfm = DenseFieldTransform(ITKDisplacementsField.from_filename(warpfile))

assert xfm == itk_xfm
np.testing.assert_allclose(xfm.reference.affine, itk_xfm.reference.affine)
np.testing.assert_allclose(ref_affine, itk_xfm.reference.affine)
np.testing.assert_allclose(xfm.reference.shape, itk_xfm.reference.shape)
np.testing.assert_allclose(xfm._field, itk_xfm._field)

# Ensure transform (xfm_orig) and ITK roundtrip (itk_xfm) are equivalent
xfm_orig = DenseFieldTransform(deltas, reference=reference)
np.testing.assert_allclose(xfm_orig.reference.shape, itk_xfm.reference.shape)
np.testing.assert_allclose(ref_affine, xfm_orig.reference.affine)
np.testing.assert_allclose(xfm_orig.reference.affine, itk_xfm.reference.affine)
np.testing.assert_allclose(xfm_orig._field, itk_xfm._field)

# Ensure deltas and mapped grid are equivalent
grid_mapped_xyz = itk_xfm.map(grid_xyz).reshape(*shape, -1)
orig_grid_mapped_xyz = xfm_orig.map(grid_xyz).reshape(*shape, -1)

# Check apparent healthiness of mapping
np.testing.assert_array_equal(orig_grid_mapped_xyz, grid_mapped_xyz)
np.testing.assert_array_equal(gold_mapped_xyz, orig_grid_mapped_xyz)
np.testing.assert_array_equal(gold_mapped_xyz, grid_mapped_xyz)

csvout = tmp_path / "mapped_xyz.csv"
csvin = tmp_path / "coords_xyz.csv"
np.savetxt(csvin, coords_xyz, delimiter=",", header="x,y,z", comments="")

cmd = f"antsApplyTransformsToPoints -d 3 -i {csvin} -o {csvout} -t {warpfile}"
exe = cmd.split()[0]
if not shutil.which(exe):
pytest.skip(f"Command {exe} not found on host")
check_call(cmd, shell=True)

ants_res = np.genfromtxt(csvout, delimiter=",", names=True)
ants_pts = np.vstack([ants_res[n] for n in ("x", "y", "z")]).T

nb.Nifti1Image(grid_mapped_xyz, ref_affine, None).to_filename(
tmp_path / "grid_mapped.nii.gz"
)
nb.Nifti1Image(coords_map, ref_affine, None).to_filename(
tmp_path / "baseline_field.nii.gz"
)
nb.Nifti1Image(gold_mapped_xyz, ref_affine, None).to_filename(
tmp_path / "gold_mapped_xyz.nii.gz"
)

if ongrid:
ants_pts = ants_pts.reshape(*shape, 3)

nb.Nifti1Image(ants_pts, ref_affine, None).to_filename(
tmp_path / "ants_mapped_xyz.nii.gz"
)
np.testing.assert_array_equal(gold_mapped_xyz, ants_pts)
np.testing.assert_array_equal(deltas, ants_pts - coords_map)
else:
ants_deltas = ants_pts - coords_xyz
deltas_xyz = deltas.reshape(-1, 3)[subsample]
gold_xyz = coords_xyz + deltas_xyz
np.testing.assert_array_equal(gold_xyz, ants_pts)
np.testing.assert_array_equal(deltas_xyz, ants_deltas)

# np.testing.assert_array_equal(mapped, ants_pts)
# diff = mapped - ants_pts
# mask = np.argwhere(np.abs(diff) > 1e-2)[:, 0]

# assert len(mask) == 0, f"A total of {len(mask)}/{ants_pts.shape[0]} contained errors:\n{diff[mask]}"


@pytest.mark.parametrize("is_deltas", [True, False])
def test_densefield_oob_resampling(is_deltas):
"""Ensure mapping outside the field returns input coordinates."""
Expand Down
Loading
Loading