Skip to main content

Sensor Data Processing in ROS 2

Overview​

Raw sensor data often requires processing before it can be effectively used by robotic systems. In ROS 2, sensor data processing involves filtering, synchronization, transformation, and analysis to extract meaningful information for perception, navigation, and control tasks. This lesson covers essential techniques for processing sensor data in ROS 2, including filtering approaches, data synchronization methods, and analysis tools.

Sensor Data Processing Pipeline​

Typical Processing Steps​

A typical sensor data processing pipeline includes:

  1. Raw Data Acquisition: Receiving sensor data from hardware interfaces
  2. Preprocessing: Filtering, calibration, and noise reduction
  3. Synchronization: Aligning data from multiple sensors in time
  4. Transformation: Converting data between coordinate frames
  5. Feature Extraction: Identifying relevant patterns or objects
  6. Fusion: Combining data from multiple sensors
  7. Analysis: Extracting high-level information

Sensor Data Filtering​

Basic Filtering Concepts​

Filtering is essential for removing noise, outliers, and invalid measurements from sensor data. Common filtering approaches include:

Range-Based Filtering​

def filter_lidar_data(ranges, min_range=0.1, max_range=10.0):
"""Filter LiDAR data to remove invalid measurements"""
filtered_ranges = []
for r in ranges:
if min_range <= r <= max_range:
filtered_ranges.append(r)
else:
# Use infinity for out-of-range values
filtered_ranges.append(float('inf'))
return filtered_ranges

Statistical Filtering​

import numpy as np

def statistical_filter(data, threshold=2.0):
"""Remove outliers using statistical methods"""
mean = np.mean(data)
std = np.std(data)

filtered_data = []
for value in data:
if abs(value - mean) <= threshold * std:
filtered_data.append(value)
else:
# Replace outlier with mean or interpolate
filtered_data.append(mean)
return filtered_data

ROS 2 Content Filtering​

ROS 2 supports content-based filtering for topics using the --filter option:

# Filter messages that start with "foo"
ros2 topic echo --filter 'm.data.startswith("foo")' /chatter

# Filter based on numeric values
ros2 topic echo --filter 'm.data > 100' /sensor_data

Sensor-Specific Filtering​

Camera Data Filtering​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np

class ImageFilterNode(Node):
def __init__(self):
super().__init__('image_filter_node')
self.subscription = self.create_subscription(
Image,
'camera/image_raw',
self.image_callback,
10
)
self.publisher = self.create_publisher(
Image,
'camera/image_filtered',
10
)
self.bridge = CvBridge()

def image_callback(self, msg):
# Convert ROS Image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

# Apply filtering (example: Gaussian blur)
filtered_image = cv2.GaussianBlur(cv_image, (5, 5), 0)

# Convert back to ROS Image
filtered_msg = self.bridge.cv2_to_imgmsg(filtered_image, encoding='bgr8')
filtered_msg.header = msg.header # Preserve header

# Publish filtered image
self.publisher.publish(filtered_msg)

LiDAR Data Filtering​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan

class LidarFilterNode(Node):
def __init__(self):
super().__init__('lidar_filter_node')
self.subscription = self.create_subscription(
LaserScan,
'scan',
self.scan_callback,
10
)
self.publisher = self.create_publisher(
LaserScan,
'scan_filtered',
10
)

def scan_callback(self, msg):
# Create filtered scan message
filtered_msg = LaserScan()
filtered_msg.header = msg.header
filtered_msg.angle_min = msg.angle_min
filtered_msg.angle_max = msg.angle_max
filtered_msg.angle_increment = msg.angle_increment
filtered_msg.time_increment = msg.time_increment
filtered_msg.scan_time = msg.scan_time
filtered_msg.range_min = msg.range_min
filtered_msg.range_max = msg.range_max

# Apply range filtering
filtered_msg.ranges = []
for r in msg.ranges:
if msg.range_min <= r <= msg.range_max:
filtered_msg.ranges.append(r)
else:
filtered_msg.ranges.append(float('inf'))

# Apply intensity filtering if available
if msg.intensities:
filtered_msg.intensities = []
for i, intensity in enumerate(msg.intensities):
if msg.ranges[i] != float('inf'):
filtered_msg.intensities.append(intensity)
else:
filtered_msg.intensities.append(0.0)

self.publisher.publish(filtered_msg)

Sensor Data Synchronization​

Time Synchronization Challenges​

Multiple sensors often operate at different frequencies and may have different timing characteristics. Proper synchronization is crucial for:

  • Sensor fusion algorithms
  • Simultaneous mapping and localization (SLAM)
  • Multi-modal perception systems

Message Filters (Python Implementation)​

In ROS 2, message synchronization requires custom implementation since the message_filters package from ROS 1 is not directly available:

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, LaserScan, Imu
from threading import Lock
from collections import deque
import time

class SynchronizedSensorNode(Node):
def __init__(self):
super().__init__('synchronized_sensor_node')

# Create subscribers for different sensor types
self.image_subscription = self.create_subscription(
Image, 'camera/image_raw', self.image_callback, 10
)
self.scan_subscription = self.create_subscription(
LaserScan, 'scan', self.scan_callback, 10
)
self.imu_subscription = self.create_subscription(
Imu, 'imu/data', self.imu_callback, 10
)

# Publishers for synchronized data
self.sync_publisher = self.create_publisher(
# Custom message type for synchronized data
# In practice, you'd define a custom message
Image, # Placeholder
'synchronized_data',
10
)

# Storage for messages with timestamps
self.image_buffer = deque(maxlen=10)
self.scan_buffer = deque(maxlen=10)
self.imu_buffer = deque(maxlen=10)

# Lock for thread safety
self.buffer_lock = Lock()

# Timer for synchronization
self.sync_timer = self.create_timer(0.1, self.synchronize_callback)

def image_callback(self, msg):
with self.buffer_lock:
self.image_buffer.append(msg)

def scan_callback(self, msg):
with self.buffer_lock:
self.scan_buffer.append(msg)

def imu_callback(self, msg):
with self.buffer_lock:
self.imu_buffer.append(msg)

def synchronize_callback(self):
with self.buffer_lock:
# Find closest timestamps across all sensor types
if self.image_buffer and self.scan_buffer and self.imu_buffer:
# Simple approach: use the most recent of each type
# In practice, you'd implement more sophisticated time-based matching
latest_image = self.image_buffer[-1]
latest_scan = self.scan_buffer[-1]
latest_imu = self.imu_buffer[-1]

# Process synchronized data
self.process_synchronized_data(latest_image, latest_scan, latest_imu)

def process_synchronized_data(self, image, scan, imu):
# Process the synchronized sensor data
self.get_logger().info(f'Synchronized data: Image timestamp {image.header.stamp}, '
f'Scan timestamp {scan.header.stamp}, '
f'IMU timestamp {imu.header.stamp}')

Approximate Time Synchronization​

For approximate time synchronization (allowing small time differences):

def find_approximate_sync(self, target_time, buffer, tolerance=0.1):
"""Find message in buffer with closest timestamp to target time"""
closest_msg = None
min_diff = float('inf')

for msg in buffer:
msg_time = msg.header.stamp.sec + msg.header.stamp.nanosec / 1e9
target_time_sec = target_time.sec + target_time.nanosec / 1e9
diff = abs(msg_time - target_time_sec)

if diff < min_diff and diff <= tolerance:
min_diff = diff
closest_msg = msg

return closest_msg

Coordinate Frame Transformations​

TF2 Integration​

Transformations between coordinate frames are crucial for sensor data processing:

import rclpy
from rclpy.node import Node
from tf2_ros import TransformListener, Buffer
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import PointStamped
import tf2_ros

class TransformNode(Node):
def __init__(self):
super().__init__('transform_node')

# Initialize TF2 buffer and listener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)

# Create publisher and subscriber
self.subscription = self.create_subscription(
PointCloud2, 'points_raw', self.pointcloud_callback, 10
)
self.publisher = self.create_publisher(
PointCloud2, 'points_transformed', 10
)

def pointcloud_callback(self, msg):
try:
# Transform from sensor frame to robot base frame
transform = self.tf_buffer.lookup_transform(
'base_link', # Target frame
msg.header.frame_id, # Source frame
rclpy.time.Time(), # Latest available time
timeout=rclpy.duration.Duration(seconds=1.0)
)

# Apply transformation to point cloud
# (In practice, you'd use tf2_sensor_msgs or similar)
transformed_msg = self.transform_pointcloud(msg, transform)
self.publisher.publish(transformed_msg)

except tf2_ros.TransformException as ex:
self.get_logger().info(f'Could not transform point cloud: {ex}')

Sensor Data Analysis Tools​

Real-time Analysis Node​

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
import statistics
import numpy as np

class SensorAnalysisNode(Node):
def __init__(self):
super().__init__('sensor_analysis_node')
self.subscription = self.create_subscription(
LaserScan, 'scan', self.scan_callback, 10
)

# Statistics storage
self.scan_history = []
self.max_history = 100

def scan_callback(self, msg):
# Calculate basic statistics
valid_ranges = [r for r in msg.ranges if not (r == float('inf') or r == float('-inf'))]

if valid_ranges:
avg_distance = statistics.mean(valid_ranges)
min_distance = min(valid_ranges)
max_distance = max(valid_ranges)
std_dev = statistics.stdev(valid_ranges) if len(valid_ranges) > 1 else 0.0

# Log statistics
self.get_logger().info(
f'Distance stats - Avg: {avg_distance:.2f}, '
f'Min: {min_distance:.2f}, Max: {max_distance:.2f}, '
f'StdDev: {std_dev:.2f}'
)

# Store for historical analysis
self.scan_history.append({
'timestamp': msg.header.stamp,
'avg_distance': avg_distance,
'min_distance': min_distance,
'max_distance': max_distance,
'std_dev': std_dev
})

# Maintain history size
if len(self.scan_history) > self.max_history:
self.scan_history.pop(0)

def get_historical_stats(self):
"""Calculate statistics over historical data"""
if not self.scan_history:
return None

avg_distances = [s['avg_distance'] for s in self.scan_history]
min_distances = [s['min_distance'] for s in self.scan_history]
max_distances = [s['max_distance'] for s in self.scan_history]

return {
'avg_over_time': statistics.mean(avg_distances),
'min_over_time': min(min_distances),
'max_over_time': max(max_distances),
'trend': 'increasing' if avg_distances[-1] > avg_distances[0] else 'decreasing'
}

Data Quality Assessment​

def assess_data_quality(self, sensor_data):
"""Assess the quality of sensor data"""
quality_metrics = {}

# Completeness (percentage of valid measurements)
if hasattr(sensor_data, 'ranges'): # LiDAR data
valid_count = sum(1 for r in sensor_data.ranges if r != float('inf'))
total_count = len(sensor_data.ranges)
quality_metrics['completeness'] = valid_count / total_count if total_count > 0 else 0.0

# Consistency (temporal consistency check)
# This would require comparing with previous measurements

# Range validity
if hasattr(sensor_data, 'ranges'):
in_range_count = sum(1 for r in sensor_data.ranges
if sensor_data.range_min <= r <= sensor_data.range_max)
quality_metrics['valid_range_ratio'] = in_range_count / total_count if total_count > 0 else 0.0

return quality_metrics

Performance Considerations​

Processing Rate Management​

class RateLimitedProcessor(Node):
def __init__(self):
super().__init__('rate_limited_processor')

# Create timer to control processing rate
self.processing_rate = 10 # Hz
self.timer = self.create_timer(1.0/self.processing_rate, self.process_data)

# Store latest data
self.latest_data = None
self.data_lock = Lock()

# Subscribe to sensor data
self.subscription = self.create_subscription(
LaserScan, 'scan', self.data_callback, 10
)

def data_callback(self, msg):
with self.data_lock:
self.latest_data = msg

def process_data(self):
with self.data_lock:
if self.latest_data is not None:
# Process the latest data
processed_data = self.process_sensor_data(self.latest_data)
# Publish results
self.publish_results(processed_data)

Memory Management​

def manage_sensor_buffer(self, data_buffer, max_size=1000):
"""Manage memory usage for sensor data buffers"""
if len(data_buffer) > max_size:
# Remove oldest entries
excess = len(data_buffer) - max_size
for _ in range(excess):
data_buffer.popleft()

return data_buffer

Quality of Service (QoS) for Sensor Processing​

Appropriate QoS Settings​

from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy

# For real-time sensor processing
sensor_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE
)

# For critical sensor data that must not be lost
critical_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_ALL,
durability=QoSDurabilityPolicy.VOLATILE
)

Best Practices for Sensor Data Processing​

1. Data Validation​

  • Always validate sensor data before processing
  • Check for NaN, infinity, and out-of-range values
  • Implement range checks based on sensor specifications

2. Time Synchronization​

  • Use hardware timestamps when available
  • Implement appropriate buffering strategies
  • Consider sensor-specific timing characteristics

3. Performance Optimization​

  • Use efficient algorithms for real-time processing
  • Consider offloading to GPU when possible
  • Implement appropriate downsampling for high-rate sensors

4. Error Handling​

  • Implement graceful degradation when sensors fail
  • Provide diagnostic information for debugging
  • Log sensor data quality metrics

5. Modularity​

  • Separate acquisition from processing
  • Use configurable parameters for different use cases
  • Design reusable processing components

Common Issues and Solutions​

Issue: Sensor data timing problems​

Solution: Implement proper buffering and synchronization mechanisms.

Issue: Memory consumption with high-rate sensors​

Solution: Use appropriate buffer sizes and implement memory management.

Issue: Processing lag affecting real-time performance​

Solution: Optimize algorithms and consider parallel processing.

Issue: Inconsistent coordinate frame transformations​

Solution: Ensure TF tree is properly maintained and transformations are accurate.

Learning Objectives​

By the end of this lesson, you should be able to:

  • Implement various filtering techniques for sensor data
  • Synchronize data from multiple sensors in time
  • Apply coordinate frame transformations to sensor data
  • Analyze sensor data quality and performance metrics
  • Optimize sensor processing for real-time applications
  • Apply best practices for sensor data processing in ROS 2
Physical AI assistant
Hello! I am your Physical AI assistant. Ask me anything about the course!