Skip to main content

Working with ROS 2 Sensor Interfaces

Overview​

Sensors are the eyes and ears of robotic systems, providing crucial information about the environment and robot state. In ROS 2, sensor interfaces follow standardized message types and communication patterns that enable seamless integration of various sensor types. This lesson covers how to interface with common sensors like cameras, LiDAR, and IMUs, work with sensor message types, and publish sensor data in ROS 2.

Sensor Integration in ROS 2​

The sensor_msgs Package​

ROS 2 provides a standardized sensor_msgs package that defines common message types for various sensors. This standardization ensures compatibility across different sensor manufacturers and ROS packages.

Sensor Node Architecture​

A typical sensor node in ROS 2 follows this pattern:

  1. Hardware Interface: Communicates with the physical sensor
  2. Data Conversion: Converts raw sensor data to ROS message format
  3. Publishing: Publishes sensor data to ROS topics
  4. Parameter Management: Handles sensor configuration parameters

Camera Interfaces​

Camera Message Types​

The primary message type for camera data is sensor_msgs/msg/Image, which contains:

# Header
std_msgs/Header header

# Image data
uint32 height
uint32 width
string encoding
uint8 is_bigendian
uint32 step
uint8[] data

Camera Calibration​

Camera calibration data is provided through sensor_msgs/msg/CameraInfo:

# Header
std_msgs/Header header

# Image dimensions
uint32 height
uint32 width

# Distortion parameters
string distortion_model
float64[] D # Distortion coefficients
float64[9] K # Intrinsic camera matrix
float64[9] R # Rectification matrix
float64[12] P # Projection/camera matrix

Camera Publisher Example (Python)​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from cv_bridge import CvBridge
import cv2

class CameraPublisher(Node):
def __init__(self):
super().__init__('camera_publisher')
self.publisher_ = self.create_publisher(Image, 'camera/image_raw', 10)
self.info_publisher_ = self.create_publisher(CameraInfo, 'camera/camera_info', 10)
self.bridge = CvBridge()

# Timer to publish images at a specific rate
self.timer = self.create_timer(0.1, self.timer_callback) # 10 Hz

# Open camera (0 is typically the default camera)
self.cap = cv2.VideoCapture(0)

def timer_callback(self):
ret, frame = self.cap.read()
if ret:
# Convert OpenCV image to ROS Image message
ros_image = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
ros_image.header.stamp = self.get_clock().now().to_msg()
ros_image.header.frame_id = 'camera_frame'

self.publisher_.publish(ros_image)
self.get_logger().info('Publishing camera image')

def destroy_node(self):
self.cap.release()
super().destroy_node()

def main(args=None):
rclpy.init(args=args)
camera_publisher = CameraPublisher()

try:
rclpy.spin(camera_publisher)
except KeyboardInterrupt:
pass
finally:
camera_publisher.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Camera Publisher Example (C++)​

#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <cv_bridge/cv_bridge.h>
#include <opencv2/opencv.hpp>

class CameraPublisher : public rclcpp::Node
{
public:
CameraPublisher() : Node("camera_publisher")
{
publisher_ = this->create_publisher<sensor_msgs::msg::Image>("camera/image_raw", 10);

// Timer for periodic publishing
timer_ = this->create_wall_timer(
std::chrono::milliseconds(100), // 10 Hz
std::bind(&CameraPublisher::timer_callback, this)
);

cap_.open(0); // Open default camera
}

private:
void timer_callback()
{
cv::Mat frame;
cap_ >> frame;

if (!frame.empty()) {
// Convert OpenCV image to ROS Image message
cv_bridge::CvImage cv_image;
cv_image.header.frame_id = "camera_frame";
cv_image.header.stamp = this->get_clock()->now();
cv_image.encoding = sensor_msgs::image_encodings::BGR8;
cv_image.image = frame;

publisher_->publish(*cv_image.toImageMsg());
RCLCPP_INFO(this->get_logger(), "Publishing camera image");
}
}

rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<sensor_msgs::msg::Image>::SharedPtr publisher_;
cv::VideoCapture cap_;
};

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<CameraPublisher>());
rclcpp::shutdown();
return 0;
}

LiDAR Interfaces​

LiDAR Message Types​

The primary message type for LiDAR data is sensor_msgs/msg/LaserScan, which contains:

# Header
std_msgs/Header header

# Time increment between measurements
float32 angle_min
float32 angle_max
float32 angle_increment
float32 time_increment
float32 scan_time

# Range data
float32 range_min
float32 range_max
float32[] ranges
float32[] intensities

LiDAR Publisher Example​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
import math

class LidarPublisher(Node):
def __init__(self):
super().__init__('lidar_publisher')
self.publisher_ = self.create_publisher(LaserScan, 'scan', 10)
self.timer = self.create_timer(0.1, self.timer_callback) # 10 Hz

# LiDAR parameters
self.angle_min = -math.pi
self.angle_max = math.pi
self.angle_increment = 0.01745 # ~1 degree
self.scan_time = 0.1
self.range_min = 0.1
self.range_max = 10.0

def timer_callback(self):
msg = LaserScan()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'lidar_frame'

# Set scan parameters
msg.angle_min = self.angle_min
msg.angle_max = self.angle_max
msg.angle_increment = self.angle_increment
msg.time_increment = 0.0
msg.scan_time = self.scan_time
msg.range_min = self.range_min
msg.range_max = self.range_max

# Calculate number of points
num_points = int((self.angle_max - self.angle_min) / self.angle_increment) + 1

# Generate sample range data (in a real implementation, this would come from the sensor)
msg.ranges = [2.0 + 0.5 * math.sin(i * 0.1) for i in range(num_points)]
msg.intensities = [100.0 for _ in range(num_points)]

self.publisher_.publish(msg)
self.get_logger().info(f'Publishing LiDAR scan with {len(msg.ranges)} points')

def main(args=None):
rclpy.init(args=args)
lidar_publisher = LidarPublisher()

try:
rclpy.spin(lidar_publisher)
except KeyboardInterrupt:
pass
finally:
lidar_publisher.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

IMU Interfaces​

IMU Message Types​

The primary message type for IMU data is sensor_msgs/msg/Imu, which contains:

# Header
std_msgs/Header header

# Orientation (quaternion)
geometry_msgs/Quaternion orientation
float64[9] orientation_covariance

# Angular velocity
geometry_msgs/Vector3 angular_velocity
float64[9] angular_velocity_covariance

# Linear acceleration
geometry_msgs/Vector3 linear_acceleration
float64[9] linear_acceleration_covariance

Magnetic Field Data​

For magnetic field measurements, use sensor_msgs/msg/MagneticField:

std_msgs/Header header
geometry_msgs/Vector3 magnetic_field
float64[9] magnetic_field_covariance

IMU Publisher Example​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Vector3, Quaternion
import math

class ImuPublisher(Node):
def __init__(self):
super().__init__('imu_publisher')
self.publisher_ = self.create_publisher(Imu, 'imu/data', 10)
self.timer = self.create_timer(0.02, self.timer_callback) # 50 Hz
self.angle = 0.0

def timer_callback(self):
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_frame'

# Simulate orientation (in a real implementation, this would come from the sensor)
self.angle += 0.01
msg.orientation = Quaternion(
x=0.0,
y=0.0,
z=math.sin(self.angle / 2.0),
w=math.cos(self.angle / 2.0)
)

# Orientation covariance (set to -1 if not available)
msg.orientation_covariance = [-1.0] + [0.0] * 8

# Angular velocity (simulate rotation around Z-axis)
msg.angular_velocity = Vector3(x=0.0, y=0.0, z=0.1)
msg.angular_velocity_covariance = [0.01, 0.0, 0.0, 0.0, 0.01, 0.0, 0.0, 0.0, 0.01]

# Linear acceleration (simulate gravity)
msg.linear_acceleration = Vector3(x=0.0, y=0.0, z=9.81)
msg.linear_acceleration_covariance = [0.01, 0.0, 0.0, 0.0, 0.01, 0.0, 0.0, 0.0, 0.01]

self.publisher_.publish(msg)
self.get_logger().info('Publishing IMU data')

def main(args=None):
rclpy.init(args=args)
imu_publisher = ImuPublisher()

try:
rclpy.spin(imu_publisher)
except KeyboardInterrupt:
pass
finally:
imu_publisher.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Other Common Sensor Types​

Point Cloud Data​

For 3D sensors like stereo cameras or 3D LiDAR, use sensor_msgs/msg/PointCloud2:

std_msgs/Header header
uint32 height
uint32 width
sensor_msgs/PointField[] fields
bool is_bigendian
uint32 point_step
uint32 row_step
uint8[] data
bool is_dense

Joint State Data​

For robot joint positions, velocities, and efforts, use sensor_msgs/msg/JointState:

std_msgs/Header header
string[] name
float64[] position
float64[] velocity
float64[] effort

Sensor Data Publishing Patterns​

Publisher-Subscriber Pattern​

The standard ROS 2 pattern for sensor data:

# Publisher node
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu

class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
self.publisher_ = self.create_publisher(Imu, 'sensor_data', 10)
self.timer = self.create_timer(0.02, self.timer_callback) # 50 Hz

def timer_callback(self):
# Get sensor data (from actual hardware or simulation)
sensor_data = self.get_sensor_reading()

# Convert to ROS message
msg = self.convert_to_ros_msg(sensor_data)

# Publish the message
self.publisher_.publish(msg)

# Subscriber node
class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
self.subscription = self.create_subscription(
Imu, 'sensor_data', self.sensor_callback, 10)

def sensor_callback(self, msg):
# Process sensor data
self.process_sensor_data(msg)

Quality of Service (QoS) Considerations​

For sensor data, consider using appropriate QoS settings:

from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy

# For real-time sensor data (LiDAR, IMU)
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)

self.publisher_ = self.create_publisher(LaserScan, 'scan', qos_profile)

Sensor Data Processing​

Synchronization​

For multi-sensor systems, you may need to synchronize data from different sensors:

from rclpy.qos import QoSProfile
from message_filters import ApproximateTimeSynchronizer, Subscriber

# Example of synchronizing camera and LiDAR data
def sync_callback(image_msg, lidar_msg):
# Process synchronized data
pass

# This would typically be done with message_filters in ROS 1,
# but in ROS 2, synchronization requires custom implementation

Filtering and Preprocessing​

Common preprocessing steps for sensor data:

def filter_lidar_data(ranges, min_range=0.1, max_range=10.0):
"""Filter LiDAR data to remove invalid measurements"""
filtered_ranges = []
for r in ranges:
if min_range <= r <= max_range:
filtered_ranges.append(r)
else:
filtered_ranges.append(float('inf')) # or use NaN
return filtered_ranges

def process_imu_data(imu_msg):
"""Process IMU data to remove noise or calibrate"""
# Apply calibration, filtering, etc.
return imu_msg

Sensor Integration Best Practices​

1. Use Standard Message Types​

  • Always use standard sensor_msgs types when available
  • This ensures compatibility with existing ROS tools and algorithms

2. Frame Conventions​

  • Follow REP-103 for coordinate frame conventions
  • Use right-handed coordinate systems
  • Specify frame_id in headers

3. Timing Considerations​

  • Publish sensor data at the appropriate rate
  • Use hardware timestamps when available
  • Consider network latency for real-time applications

4. Error Handling​

  • Handle sensor failures gracefully
  • Provide diagnostic information
  • Implement fallback behaviors when possible

5. Parameter Configuration​

  • Make sensor parameters configurable
  • Use ROS parameters for calibration values
  • Provide reasonable defaults

Common Sensor Integration Issues​

Issue: Sensor data not publishing​

Solution: Check hardware connections, permissions, and node configuration.

Issue: High latency in sensor data​

Solution: Optimize QoS settings, reduce message size, or use faster network.

Issue: Inconsistent timestamps​

Solution: Use hardware timestamps or proper ROS time synchronization.

Issue: Sensor data corruption​

Solution: Check data types, encoding, and transmission protocols.

Learning Objectives​

By the end of this lesson, you should be able to:

  • Interface with common sensors (cameras, LiDAR, IMUs) in ROS 2
  • Understand and use standard sensor message types
  • Publish sensor data using appropriate ROS 2 patterns
  • Configure sensor nodes with proper QoS settings
  • Apply best practices for sensor integration
  • Troubleshoot common sensor interface issues
Physical AI assistant
Hello! I am your Physical AI assistant. Ask me anything about the course!