Created
January 19, 2024 14:00
-
-
Save davidliyutong/24739f220122eed1eea9aeb3d69ca7bf to your computer and use it in GitHub Desktop.
Capture PointCloud with Realsense Cameras
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import List, Optional, Tuple | |
import numpy as np | |
import pyrealsense2 as rs | |
from pydantic import BaseModel | |
import open3d as o3d | |
import cv2 | |
import py_cli_interaction | |
import datetime | |
import os | |
# Color Config, must match Intel Realsense Viewer | |
class RealsenseCameraColorCfg(BaseModel): | |
width: int = 1920 | |
height: int = 1080 | |
format: str = 'rs.format.bgr8' | |
fps: int = 30 | |
exposure: int = 0 | |
use_auto_exposure: bool = True | |
# Depth Config, must match Intel Realsense Viewer | |
class RealsenseCameraDepthCfg(BaseModel): | |
width: int = 1024 | |
height: int = 768 | |
format: str = 'rs.format.z16' | |
fps: int = 30 | |
preset: str = 'Short Range' | |
# Bundled Config | |
class RealsenseCameraCfg(BaseModel): | |
sn: str | |
depth: RealsenseCameraDepthCfg = RealsenseCameraDepthCfg() | |
color: RealsenseCameraColorCfg = RealsenseCameraColorCfg() | |
def get_device_by_cfg(cfg: RealsenseCameraCfg) -> Optional[rs.device]: | |
"""Input configuration, output device handle | |
Args: | |
cfg (RealsenseCameraCfg): configuration | |
Returns: | |
Optional[rs.device]: device handle | |
""" | |
context = rs.context() | |
for d in context.devices: | |
if d.get_info(rs.camera_info.name).lower() != 'platform camera': | |
sn = d.get_info(rs.camera_info.serial_number) | |
if sn == cfg.sn: | |
return d | |
return None | |
class RealsenseCameraModel: | |
rs_config: Optional[rs.config] = None | |
pipline: Optional[rs.pipeline] = None | |
device: Optional[rs.device] = None | |
intrinsics_matrix: Optional[List[List[float]]] = None | |
align: Optional[rs.align] = None | |
def __init__(self, | |
camera_cfg: RealsenseCameraCfg, | |
): | |
self.option = camera_cfg | |
def open(self): | |
self.rs_config = rs.config() | |
# configure config | |
self.rs_config.enable_stream(rs.stream.depth, | |
self.option.depth.width, | |
self.option.depth.height, | |
eval(self.option.depth.format), | |
self.option.depth.fps) | |
self.rs_config.enable_stream(rs.stream.color, | |
self.option.color.width, | |
self.option.color.height, | |
eval(self.option.color.format), | |
self.option.color.fps) | |
self.rs_config.enable_device(self.option.sn) | |
# get device | |
self.device = get_device_by_cfg(self.option) | |
assert self.device is not None, f"Device {self.option.sn} not found" | |
# configure depth preset | |
depth_sensor = self.device.first_depth_sensor() | |
preset_range = depth_sensor.get_option_range(rs.option.visual_preset) | |
for i in range(int(preset_range.max) + 1): | |
visual_preset = depth_sensor.get_option_value_description(rs.option.visual_preset, i) | |
if visual_preset == self.option.depth.preset: | |
depth_sensor.set_option(rs.option.visual_preset, i) | |
# configure RGB exposure | |
sensor = self.device.query_sensors()[1] | |
if self.option.color.use_auto_exposure: | |
sensor.set_option(rs.option.enable_auto_exposure, 1) | |
else: | |
sensor.set_option(rs.option.exposure, self.option.color.exposure) | |
# configure alignment (depth -> color) | |
self.align = rs.align(rs.stream.color) | |
# self.align = rs.align(rs.stream.depth) | |
def start(self): | |
# create pipeline | |
self.pipeline = rs.pipeline() | |
profile = self.pipeline.start(self.rs_config) | |
# set intrinsics | |
intrinsics = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics() | |
self.intrinsics_matrix = [ | |
[intrinsics.fx, 0., intrinsics.ppx], | |
[0., intrinsics.fy, intrinsics.ppy], | |
[0., 0., 1.] | |
] | |
logging.debug(f"camera {self.option.sn} intrinsics: {self.intrinsics_matrix}") | |
def get_frames(self, timeout_ms=5000) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: | |
ret, frames = self.pipeline.try_wait_for_frames(timeout_ms=timeout_ms) | |
if not ret: | |
return None, None | |
else: | |
pass | |
try: | |
aligned_frames = self.align.process(frames) | |
except RuntimeError as e: | |
return None, None | |
depth_frame = aligned_frames.get_depth_frame() | |
color_frame = aligned_frames.get_color_frame() | |
if depth_frame: | |
depth_image = np.asanyarray(depth_frame.get_data()) | |
else: | |
depth_image = None | |
color_image = np.asanyarray(color_frame.get_data()) if color_frame else None | |
return color_image, depth_image | |
def close(self): | |
logging.debug(f"closing camera {self.option.sn}") | |
if self.pipeline is not None: | |
try: | |
self.pipeline.stop() | |
except RuntimeError as e: | |
pass | |
if self.device is not None: | |
self.device.hardware_reset() | |
@staticmethod | |
def rgbd2pc( | |
color_raw, | |
depth_raw, | |
camera_intrinsic, | |
depth_scale=1000., | |
depth_trunc=10.0, | |
): | |
if isinstance(color_raw, np.ndarray): | |
color_raw = o3d.geometry.Image(cv2.cvtColor(color_raw.astype(np.uint8), cv2.COLOR_BGRA2RGB)) # Convert to RGB | |
if isinstance(depth_raw, np.ndarray): | |
depth_raw = o3d.geometry.Image(depth_raw.astype(np.uint16)) | |
if isinstance(camera_intrinsic, tuple): | |
width, height, intrinsic_matrix = camera_intrinsic | |
camera_intrinsic = o3d.camera.PinholeCameraIntrinsic(int(width), int(height), intrinsic_matrix[0][0], intrinsic_matrix[1][1], intrinsic_matrix[0][2], intrinsic_matrix[1][2]) | |
rgbd_image = o3d.geometry.RGBDImage().create_from_color_and_depth( | |
color_raw, | |
depth_raw, | |
depth_scale, | |
depth_trunc, | |
convert_rgb_to_intensity=False | |
) | |
pcd = o3d.geometry.PointCloud().create_from_rgbd_image( | |
image=rgbd_image, | |
intrinsic=camera_intrinsic, | |
extrinsic=np.eye(4), | |
) | |
return pcd | |
def visualize_pcd(pcd: o3d.geometry.PointCloud): | |
coordinate = o3d.geometry.TriangleMesh().create_coordinate_frame(size=0.1, origin=[0, 0, 0]) | |
o3d.visualization.draw_geometries([coordinate, pcd]) | |
def main(args): | |
# prepare folder | |
if not os.path.exists('pcd'): | |
os.makedirs('pcd', exist_ok=True) | |
# prepare camera | |
if args.sn != '': | |
cam = RealsenseCameraModel(RealsenseCameraCfg(sn=args.sn)) | |
else: | |
cam = RealsenseCameraModel(RealsenseCameraCfg(sn=py_cli_interaction.must_parse_cli_string('Serial Number:'))) | |
cam.open() | |
cam.start() | |
try: | |
while True: | |
if py_cli_interaction.must_parse_cli_bool('Capture ?', default_value=True): | |
pass | |
else: | |
continue | |
color, depth = cam.get_frames() | |
pc = cam.rgbd2pc( | |
color, | |
depth, | |
(cam.option.color.width, cam.option.color.height, cam.intrinsics_matrix), | |
depth_scale=4000., | |
) | |
visualize_pcd(pc) | |
if py_cli_interaction.must_parse_cli_bool('Save ?', default_value=True): | |
o3d.io.write_point_cloud(os.path.join('pcd', f'{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}.ply'), pc) | |
print("Saved") | |
except KeyboardInterrupt as e: | |
cam.close() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--sn', type=str, default='') | |
main(parser.parse_args()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment