Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add lidar depth #9

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ If the pose is recorded using `/tf` topic rather tha a Pose or Odometry message,
```
type: bag_tf
path: <bag file path>
parent: <parent frame_id>
child: <child frame_id>
parent_frame: <parent frame_id>
child_frame: <child frame_id>
inv: <Whether the transformation from /tf should be inverted or not>
```

Expand Down
119 changes: 119 additions & 0 deletions roman/map/align_pointcloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
###########################################################
#
# align_pointcloud.py
#
# A class to align a point cloud with a camera & project onto the camera image frame
#
# Authors: Qingyuan Li
#
# January 27. 2025
#
###########################################################


from robotdatapy.data import PointCloudData, ImgData, PoseData
import numpy as np
import cv2 as cv
from roman.utils import expandvars_recursive

class AlignPointCloud():
pointcloud_data: PointCloudData
img_data: ImgData
camera_pose_data: PoseData
T_camera_rangesens_static: np.ndarray

def __init__(self, pointcloud_data: PointCloudData, img_data: ImgData, camera_pose_data: PoseData, tf_bag_path: str):
"""
Class for aligning a point cloud with a camera and projecting the point cloud onto the camera's image frame

Args:
pointcloud_data (PointCloudData): point cloud data class
img_data (ImgData): image data class
camera_pose_data (PoseData): pose data class
tf_bag_path (str): path to bag with static tf data (needed to calculate static transform from
camera to range sensor)
"""
self.pointcloud_data = pointcloud_data
self.img_data = img_data
self.camera_pose_data = camera_pose_data

# calculate static transform from camera to range sensor
self.pointcloud_frame = pointcloud_data.pointcloud(pointcloud_data.times[0]).header.frame_id
self.camera_frame = img_data.img_header(img_data.times[0]).frame_id
self.img_shape = img_data.img(img_data.times[0]).shape[:2]
self.T_camera_rangesens_static = PoseData.any_static_tf_from_bag(expandvars_recursive(tf_bag_path), self.camera_frame, self.pointcloud_frame)

def aligned_pointcloud(self, t: float):
"""
Return the closest point cloud at time t, aligned to the camera frame

Args:
t (float): time to get closest point cloud to

Returns:
np.ndarray: (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down)
"""
pcl = self.pointcloud_data.pointcloud(t)

img_header = self.img_data.img_header(t)

# get exact times of point cloud and image messages
pointcloud_time = pcl.header.stamp.sec + 1e-9 * pcl.header.stamp.nanosec
img_time = img_header.stamp.sec + 1e-9 * img_header.stamp.nanosec

# calculate dynamic transform between robot at time of image and robot at time of pointcloud
T_W_camera_pointcloud_time = self.camera_pose_data.T_WB(pointcloud_time)
T_W_camera_image_time = self.camera_pose_data.T_WB(img_time)
T_W_rangesens_pointcloud_time = T_W_camera_pointcloud_time @ self.T_camera_rangesens_static
T_W_rangesens_image_time = T_W_camera_image_time @ self.T_camera_rangesens_static

T_img_cloud_dynamic = np.linalg.inv(T_W_rangesens_image_time) @ T_W_rangesens_pointcloud_time

# compose static and dynamic transforms to get approximately exact transform
# between camera at time of image and range sensor at time of point cloud
T_camera_rangesens = self.T_camera_rangesens_static @ T_img_cloud_dynamic

# transform points into image frame
points = pcl.get_xyz()
points_h = np.hstack((points, np.ones((points.shape[0], 1))))
points_h_image_frame = points_h @ T_camera_rangesens.T
points_camera_frame = points_h_image_frame[:, :3]

# mask for points in front of camera (positive Z in camera frame)
in_front_mask = points_camera_frame[:, 2] >= 0
points_camera_frame_filtered = points_camera_frame[in_front_mask]

return points_camera_frame_filtered

def projected_pointcloud(self, points_camera_frame):
"""
Projects a 3D point cloud in the camera frame (from aligned_pointcloud) to
the 2D image frame

Args:
points_camera_frame (np.ndarray): (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down)

Returns:
np.ndarray: (n, 2) array containing 2D projected points in (u, v) coordinates, order unchanged
"""
# project point cloud onto 2D image
points_camera_frame_cv = points_camera_frame.reshape(-1, 1, 3)
points_2d_cv, _ = cv.projectPoints(points_camera_frame_cv, np.zeros(3), np.zeros(3), self.img_data.K, self.img_data.D)
points_2d = points_2d_cv.reshape((-1, 2))

return points_2d

def filter_pointcloud_and_projection(self, points_camera_frame, points_2d):
"""
Filters array of (u, v) coordinates in image frame and associated 3D point cloud by checking
that it is inside the associated rgb image frame bounds and casting to int

Args:
points_camera_frame (np.ndarray): (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down)
points_2d (np.ndarray): (n, 2) array containing 2D projected points in (u, v) coordinates, in same order
"""
points_2d = np.round(points_2d).astype(int)
inside_frame = (points_2d[:, 0] >= 0) & (points_2d[:, 0] < self.img_shape[1]) & (points_2d[:, 1] >= 0) & (points_2d[:, 1] < self.img_shape[0])
points_camera_frame = points_camera_frame[inside_frame]
points_2d = points_2d[inside_frame]
return points_camera_frame, points_2d
152 changes: 106 additions & 46 deletions roman/map/fastsam_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# A Python wrapper for sending RGBD images to FastSAM and using segmentation
# masks to create object observations.
#
# Authors: Jouko Kinnari, Mason Peterson, Lucas Jia, Annika Thomas
# Authors: Jouko Kinnari, Mason Peterson, Lucas Jia, Annika Thomas, Qingyuan Li
#
# Dec. 21, 2024
#
Expand Down Expand Up @@ -74,8 +74,9 @@ def __init__(self,
device='cuda',
mask_downsample_factor=1,
rotate_img=None,
use_pointcloud=False
):
"""Wrapper for running FastSAM on images (especially RGB/depth pairs)
"""Wrapper for running FastSAM on images (RGB/depth data)

Args:
weights (str): Path to FastSAM weights.
Expand All @@ -87,6 +88,7 @@ def __init__(self,
Defaults to 1.
rotate_img (_type_, optional): 'CW', 'CCW', or '180' for rotating image before
feeding into FastSAM. Defaults to None.
use_pointcloud (bool, optional): True if depth data source is pointcloud
"""
# parameters
self.weights = weights
Expand All @@ -96,6 +98,7 @@ def __init__(self,
self.imgsz = imgsz
self.mask_downsample_factor = mask_downsample_factor
self.rotate_img = rotate_img
self.use_pointcloud = use_pointcloud

# member variables
self.observations = []
Expand All @@ -111,13 +114,14 @@ def __init__(self,
or self.rotate_img == '180', "Invalid rotate_img option."

@classmethod
def from_params(cls, params: FastSAMParams, depth_cam_params: CameraParams):
def from_params(cls, params: FastSAMParams, depth_cam_params: CameraParams, use_pointcloud: bool):
fastsam = cls(
weights=expandvars_recursive(params.weights_path),
imgsz=params.imgsz,
device=params.device,
mask_downsample_factor=params.mask_downsample_factor,
rotate_img=params.rotate_img
rotate_img=params.rotate_img,
use_pointcloud=use_pointcloud
)
fastsam.setup_rgbd_params(
depth_cam_params=depth_cam_params,
Expand Down Expand Up @@ -211,14 +215,15 @@ def setup_rgbd_params(
self.max_depth = max_depth
self.within_depth_frac = within_depth_frac
self.depth_scale = depth_scale
self.depth_cam_intrinsics = o3d.camera.PinholeCameraIntrinsic(
width=int(depth_cam_params.width),
height=int(depth_cam_params.height),
fx=depth_cam_params.fx,
fy=depth_cam_params.fy,
cx=depth_cam_params.cx,
cy=depth_cam_params.cy,
)
if not self.use_pointcloud:
self.depth_cam_intrinsics = o3d.camera.PinholeCameraIntrinsic(
width=int(depth_cam_params.width),
height=int(depth_cam_params.height),
fx=depth_cam_params.fx,
fy=depth_cam_params.fy,
cx=depth_cam_params.cx,
cy=depth_cam_params.cy,
)
self.voxel_size = voxel_size
self.pcd_stride = pcd_stride
if erosion_size > 0:
Expand All @@ -230,7 +235,7 @@ def setup_rgbd_params(
self.erosion_element = None
self.plane_filter_params = plane_filter_params

def run(self, t, pose, img, img_depth=None, plot=False):
def run(self, t, pose, img, depth_data=None, plot=False):
"""
Takes and image and returns filtered FastSAM masks as Observations.

Expand All @@ -247,6 +252,9 @@ def run(self, t, pose, img, img_depth=None, plot=False):
img_orig = img
img = self.apply_rotation(img)

if self.use_pointcloud:
pcl, pcl_proj = depth_data

if self.run_yolo:
ignore_mask, keep_mask = self._create_mask(img)
else:
Expand All @@ -262,40 +270,92 @@ def run(self, t, pose, img, img_depth=None, plot=False):

# Extract point cloud of object from RGBD
ptcld = None
if img_depth is not None:
depth_obj = copy.deepcopy(img_depth)
if self.erosion_element is not None:
eroded_mask = cv.erode(mask, self.erosion_element)
depth_obj[eroded_mask==0] = 0
if depth_data is not None:
if self.use_pointcloud:
# if False: # visualizations for debugging
# # point_cloud = o3d.geometry.PointCloud()
# # point_cloud.points = o3d.utility.Vector3dVector(pcl)
# # point_cloud.paint_uniform_color([1, 0, 0])
# # coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=10.0)

# # o3d.visualization.draw_geometries([point_cloud, coordinate_frame])

# import matplotlib.pyplot as plt

# img_shape = img.shape[:2]
# depth_image = np.zeros(img_shape, dtype=np.float32)
# depths = pcl[:, 2]
# MAX_DEPTH=15

# for i in range(pcl_proj.shape[0]):
# u, v = pcl_proj[i]
# u, v = int(u), int(v)
# depth = depths[i]
# if 0 <= u < img_shape[1] and 0 <= v < img_shape[0] and depth <= MAX_DEPTH:
# depth = depths[i]
# depth_image[v, u] = depth

# depth_normalized = cv.normalize(depth_image, None, 0, 1, cv.NORM_MINMAX)
# depth_colored = cv.applyColorMap(((1 - depth_normalized) * 255).astype(np.uint8), cv.COLORMAP_JET)
# rgb_image_rgb = cv.cvtColor(img, cv.COLOR_BGR2RGB)

# overlay = depth_colored
# overlay[depth_image == 0] = rgb_image_rgb[depth_image == 0]
# plt.imshow(overlay)
# plt.show()

# # import ipdb
# # ipdb.set_trace()

# get 3D points that project within the mask
inside_mask = mask[pcl_proj[:, 1], pcl_proj[:, 0]] == 1
inside_mask_points = pcl[inside_mask]
pre_truncate_len = len(inside_mask_points)
ptcld_test = inside_mask_points[inside_mask_points[:, 2] < self.max_depth]

if len(ptcld_test) < self.within_depth_frac*pre_truncate_len:
continue

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(inside_mask_points)

else:
depth_obj[mask==0] = 0
logger.debug(f"img_depth type {img_depth.dtype}, shape={img_depth.shape}")

# Extract point cloud without truncation to heuristically check if enough of the object
# is within the max depth
pcd_test = o3d.geometry.PointCloud.create_from_depth_image(
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)),
self.depth_cam_intrinsics,
depth_scale=self.depth_scale,
# depth_trunc=self.max_depth,
stride=self.pcd_stride,
project_valid_depth_only=True
)
ptcld_test = np.asarray(pcd_test.points)
pre_truncate_len = len(ptcld_test)
ptcld_test = ptcld_test[ptcld_test[:,2] < self.max_depth]
# require some fraction of the points to be within the max depth
if len(ptcld_test) < self.within_depth_frac*pre_truncate_len:
continue

pcd = o3d.geometry.PointCloud.create_from_depth_image(
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)),
self.depth_cam_intrinsics,
depth_scale=self.depth_scale,
depth_trunc=self.max_depth,
stride=self.pcd_stride,
project_valid_depth_only=True
)
depth_obj = copy.deepcopy(depth_data)
if self.erosion_element is not None:
eroded_mask = cv.erode(mask, self.erosion_element)
depth_obj[eroded_mask==0] = 0
else:
depth_obj[mask==0] = 0
logger.debug(f"img_depth type {depth_data.dtype}, shape={depth_data.shape}")

# Extract point cloud without truncation to heuristically check if enough of the object
# is within the max depth
pcd_test = o3d.geometry.PointCloud.create_from_depth_image(
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)),
self.depth_cam_intrinsics,
depth_scale=self.depth_scale,
# depth_trunc=self.max_depth,
stride=self.pcd_stride,
project_valid_depth_only=True
)
ptcld_test = np.asarray(pcd_test.points)
pre_truncate_len = len(ptcld_test)
ptcld_test = ptcld_test[ptcld_test[:,2] < self.max_depth]
# require some fraction of the points to be within the max depth
if len(ptcld_test) < self.within_depth_frac*pre_truncate_len:
continue

pcd = o3d.geometry.PointCloud.create_from_depth_image(
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)),
self.depth_cam_intrinsics,
depth_scale=self.depth_scale,
depth_trunc=self.max_depth,
stride=self.pcd_stride,
project_valid_depth_only=True
)

# shared for depth & rangesens, once PointCloud object is created

pcd.remove_non_finite_points()
pcd_sampled = pcd.voxel_down_sample(voxel_size=self.voxel_size)
if not pcd_sampled.is_empty():
Expand Down
Loading