AI Perception Techniques for Robots
Overview​
AI perception is the foundation of autonomous robotics, enabling robots to understand and interpret their environment through sensor data. This lesson explores the fundamental AI perception techniques used in robotics, including deep learning for recognition, object detection, sensor fusion, and 3D understanding. We'll examine how these techniques work together to provide robots with situational awareness and the ability to make intelligent decisions based on sensory input.
The Perception Pipeline in Robotics​
Components of Robot Perception​
Robot perception typically involves multiple interconnected components that process sensor data to extract meaningful information:
- Sensor Acquisition: Raw data collection from cameras, LiDAR, radar, IMUs, etc.
- Preprocessing: Data cleaning, calibration, and normalization
- Feature Extraction: Identification of relevant patterns and features
- Recognition: Classification and identification of objects/entities
- Localization: Determining position and orientation in the environment
- Mapping: Creating representations of the environment
- Understanding: High-level interpretation of the scene
Perception Architecture​
Modern robot perception systems often follow a hierarchical architecture:
import numpy as np
import cv2
import torch
import torch.nn as nn
class RobotPerceptionSystem:
def __init__(self):
# Initialize perception components
self.sensor_fusion = SensorFusionModule()
self.object_detector = ObjectDetectionModule()
self.semantic_segmenter = SemanticSegmentationModule()
self.depth_estimator = DepthEstimationModule()
self.scene_understanding = SceneUnderstandingModule()
def process_sensor_data(self, sensor_inputs):
"""
Process multi-modal sensor data through the perception pipeline
"""
# 1. Sensor fusion
fused_data = self.sensor_fusion.fuse_sensors(sensor_inputs)
# 2. Object detection
objects = self.object_detector.detect(fused_data)
# 3. Semantic segmentation
semantic_map = self.semantic_segmenter.segment(fused_data)
# 4. Depth estimation
depth_map = self.depth_estimator.estimate(fused_data)
# 5. Scene understanding
scene_graph = self.scene_understanding.understand(
objects, semantic_map, depth_map
)
return {
'objects': objects,
'semantic_map': semantic_map,
'depth_map': depth_map,
'scene_graph': scene_graph
}
class SensorFusionModule:
def __init__(self):
# Kalman filter for sensor fusion
self.kalman_filter = self.initialize_kalman_filter()
def fuse_sensors(self, sensor_inputs):
"""
Fuse data from multiple sensors using Kalman filtering
"""
# Example: Fusing camera and LiDAR data
camera_data = sensor_inputs.get('camera', {})
lidar_data = sensor_inputs.get('lidar', {})
imu_data = sensor_inputs.get('imu', {})
# Apply Kalman filter to combine sensor readings
# This is a simplified example
fused_output = {
'position': self.combine_positions(
camera_data.get('position', np.zeros(3)),
lidar_data.get('position', np.zeros(3)),
imu_data.get('position', np.zeros(3))
),
'orientation': self.combine_orientations(
camera_data.get('orientation', np.zeros(4)),
lidar_data.get('orientation', np.zeros(4)),
imu_data.get('orientation', np.zeros(4))
)
}
return fused_output
def combine_positions(self, cam_pos, lidar_pos, imu_pos):
"""Combine position estimates from different sensors"""
# Weighted average based on sensor uncertainties
weights = np.array([0.3, 0.5, 0.2]) # Camera, LiDAR, IMU weights
positions = np.vstack([cam_pos, lidar_pos, imu_pos])
combined_pos = np.average(positions, axis=0, weights=weights)
return combined_pos
def combine_orientations(self, cam_ori, lidar_ori, imu_ori):
"""Combine orientation estimates from different sensors"""
# Quaternion averaging (simplified)
quaternions = np.vstack([cam_ori, lidar_ori, imu_ori])
avg_quat = np.mean(quaternions, axis=0)
# Normalize quaternion
return avg_quat / np.linalg.norm(avg_quat)
Deep Learning for Recognition​
Convolutional Neural Networks (CNNs)​
CNNs are the backbone of modern computer vision in robotics:
class RecognitionCNN(nn.Module):
def __init__(self, num_classes=1000, input_channels=3):
super(RecognitionCNN, self).__init__()
# Feature extraction layers
self.features = nn.Sequential(
# First convolutional block
nn.Conv2d(input_channels, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
# Second convolutional block
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
# Third convolutional block
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
# Fourth convolutional block
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.ReLU(inplace=True),
)
# Classification head
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(512, 1024),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(1024, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
class FeatureExtractor(nn.Module):
def __init__(self, backbone='resnet50'):
super(FeatureExtractor, self).__init__()
if backbone == 'resnet50':
import torchvision.models as models
self.backbone = models.resnet50(pretrained=True)
# Remove the final classification layer
self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
else:
# Use custom CNN backbone
self.backbone = RecognitionCNN(input_channels=3)
# Remove classifier for feature extraction
self.backbone = self.backbone.features
def forward(self, x):
features = self.backbone(x)
# Flatten features for downstream tasks
if len(features.shape) > 2:
features = features.view(features.size(0), -1)
return features
# Example usage for robot perception
def extract_visual_features(image_tensor):
"""Extract features from robot camera images"""
feature_extractor = FeatureExtractor(backbone='resnet50')
with torch.no_grad():
features = feature_extractor(image_tensor)
return features
Recurrent Neural Networks for Temporal Perception​
For tasks requiring temporal understanding:
class TemporalPerceptionRNN(nn.Module):
def __init__(self, input_size, hidden_size=256, num_layers=2, output_size=100):
super(TemporalPerceptionRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM for temporal feature extraction
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.1 if num_layers > 1 else 0
)
# Output layer
self.output_layer = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden=None):
# x shape: (batch_size, sequence_length, input_size)
lstm_out, hidden = self.lstm(x, hidden)
# Use the last output for classification
output = self.output_layer(lstm_out[:, -1, :])
return output, hidden
class TemporalSceneUnderstanding:
def __init__(self, feature_dim=2048):
self.temporal_rnn = TemporalPerceptionRNN(
input_size=feature_dim,
hidden_size=512,
num_layers=2,
output_size=100 # Number of activity classes
)
# Feature buffer for temporal processing
self.feature_buffer = []
self.buffer_size = 10 # Process sequences of 10 frames
def update_with_frame(self, frame_features):
"""Add new frame features and process if buffer is full"""
self.feature_buffer.append(frame_features)
if len(self.feature_buffer) >= self.buffer_size:
# Process the sequence
sequence = torch.stack(self.feature_buffer, dim=1) # (batch, seq_len, features)
activity_prediction, _ = self.temporal_rnn(sequence)
# Clear buffer and keep last frame for continuity
self.feature_buffer = [self.feature_buffer[-1]]
return activity_prediction
return None
Object Detection in Robotics​
Modern Object Detection Architectures​
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, ssd300_vgg16, retinanet_resnet50_fpn
class RobotObjectDetector:
def __init__(self, model_type='faster_rcnn'):
self.model_type = model_type
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Initialize detection model
if model_type == 'faster_rcnn':
self.model = fasterrcnn_resnet50_fpn(pretrained=True)
elif model_type == 'ssd':
self.model = ssd300_vgg16(pretrained=True)
elif model_type == 'retinanet':
self.model = retinanet_resnet50_fpn(pretrained=True)
else:
raise ValueError(f"Unsupported model type: {model_type}")
self.model.to(self.device)
self.model.eval()
# COCO class names for object detection
self.coco_names = [
'__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane',
'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant',
'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack',
'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard',
'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard',
'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork',
'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange',
'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair',
'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv',
'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave',
'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]
def detect_objects(self, image):
"""
Detect objects in an image
Args:
image: PIL Image or numpy array (H, W, C) in RGB format
Returns:
Dictionary with detection results
"""
# Preprocess image
if isinstance(image, np.ndarray):
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
transform = torchvision.transforms.Compose([
torchvision.transforms.ToTensor()
])
input_tensor = transform(image).unsqueeze(0).to(self.device)
# Perform detection
with torch.no_grad():
predictions = self.model(input_tensor)
# Process predictions
prediction = predictions[0]
# Filter detections by confidence threshold
confidence_threshold = 0.5
keep = prediction['scores'] > confidence_threshold
detections = {
'boxes': prediction['boxes'][keep].cpu().numpy(),
'labels': prediction['labels'][keep].cpu().numpy(),
'scores': prediction['scores'][keep].cpu().numpy(),
'class_names': [self.coco_names[label] for label in prediction['labels'][keep].cpu().numpy()]
}
return detections
def draw_detections(self, image, detections, colors=None):
"""Draw bounding boxes on image"""
if isinstance(image, Image.Image):
image = np.array(image)
image = image.copy()
boxes = detections['boxes']
labels = detections['class_names']
scores = detections['scores']
if colors is None:
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]
for i, (box, label, score) in enumerate(zip(boxes, labels, scores)):
color = colors[i % len(colors)]
# Draw bounding box
x1, y1, x2, y2 = map(int, box)
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
# Draw label and confidence
text = f'{label}: {score:.2f}'
cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return image
# Example usage
def robot_vision_pipeline(image):
"""Complete robot vision pipeline"""
detector = RobotObjectDetector(model_type='faster_rcnn')
detections = detector.detect_objects(image)
# Draw results
result_image = detector.draw_detections(image, detections)
return detections, result_image
Custom Object Detection for Robotics​
For robot-specific objects:
class CustomObjectDetector(nn.Module):
def __init__(self, num_robot_objects=20):
super(CustomObjectDetector, self).__init__()
# Feature extractor
self.backbone = torchvision.models.mobilenet_v2(pretrained=True)
self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
# Custom detection head
self.detection_head = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(1280, 512), # MobileNetV2 feature size
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, num_robot_objects * 6) # 4 bbox coords + 1 conf + 1 class
)
self.num_objects = num_robot_objects
self.grid_size = 13 # Grid size for detection
def forward(self, x):
# Extract features
features = self.backbone(x)
# Detection predictions
detections = self.detection_head(features)
# Reshape to (batch, grid_size, grid_size, num_objects, 6)
batch_size = detections.size(0)
detections = detections.view(batch_size, self.grid_size, self.grid_size, self.num_objects, 6)
return detections
def predict(self, image_tensor):
"""Make predictions on input image"""
with torch.no_grad():
detections = self.forward(image_tensor)
# Process detections (decode bounding boxes, apply NMS, etc.)
processed_detections = self.decode_detections(detections)
return processed_detections
def decode_detections(self, raw_detections):
"""Decode raw detections to bounding boxes"""
# This would implement YOLO-style detection decoding
# Convert grid-relative coordinates to image coordinates
# Apply sigmoid to confidence scores
# Decode bounding box coordinates
# Apply non-maximum suppression
# Simplified version
batch_size, grid_h, grid_w, num_objects, num_attrs = raw_detections.shape
# Reshape and decode
detections = raw_detections.view(batch_size, -1, 6)
# Apply sigmoid to confidence and class scores
detections[..., 4] = torch.sigmoid(detections[..., 4]) # Confidence
detections[..., 5] = torch.sigmoid(detections[..., 5]) # Class probability
return detections
Sensor Fusion Techniques​
Kalman Filtering for Sensor Fusion​
import numpy as np
from scipy.linalg import block_diag
class ExtendedKalmanFilter:
def __init__(self, state_dim, measurement_dim):
self.state_dim = state_dim
self.measurement_dim = measurement_dim
# State vector: [x, y, z, vx, vy, vz, qx, qy, qz, qw]
# (position, velocity, orientation quaternion)
self.state = np.zeros(state_dim)
self.covariance = np.eye(state_dim) * 1000 # Initial uncertainty
# Process noise
self.Q = np.eye(state_dim) * 0.1
# Measurement noise
self.R = np.eye(measurement_dim) * 1.0
# Identity matrix
self.I = np.eye(state_dim)
def predict(self, dt, control_input=None):
"""Predict next state using motion model"""
# State transition model (simplified constant velocity model)
F = self.compute_jacobian_F(dt)
# Predict state
self.state = self.motion_model(self.state, dt, control_input)
# Predict covariance
self.covariance = F @ self.covariance @ F.T + self.Q
def update(self, measurement):
"""Update state estimate with measurement"""
# Measurement model Jacobian
H = self.compute_jacobian_H()
# Innovation (measurement residual)
innovation = measurement - self.measurement_model(self.state)
# Innovation covariance
S = H @ self.covariance @ H.T + self.R
# Kalman gain
K = self.covariance @ H.T @ np.linalg.inv(S)
# Update state
self.state = self.state + K @ innovation
# Update covariance
self.covariance = (self.I - K @ H) @ self.covariance
def motion_model(self, state, dt, control=None):
"""Constant velocity motion model"""
x, y, z, vx, vy, vz, qw, qx, qy, qz = state
# Update position based on velocity
new_x = x + vx * dt
new_y = y + vy * dt
new_z = z + vz * dt
# Velocities remain constant (or affected by control)
if control is not None:
new_vx = vx + control[0] * dt
new_vy = vy + control[1] * dt
new_vz = vz + control[2] * dt
else:
new_vx, new_vy, new_vz = vx, vy, vz
# Quaternion integration (simplified)
# In practice, you'd use proper quaternion integration
new_qw, new_qx, new_qy, new_qz = qw, qx, qy, qz
return np.array([new_x, new_y, new_z, new_vx, new_vy, new_vz,
new_qw, new_qx, new_qy, new_qz])
def measurement_model(self, state):
"""Measurement model - return position and orientation"""
x, y, z, _, _, _, qw, qx, qy, qz = state
return np.array([x, y, z, qw, qx, qy, qz])
def compute_jacobian_F(self, dt):
"""Compute state transition Jacobian"""
F = np.eye(self.state_dim)
# Position-velocity relationships
F[0, 3] = dt # dx/dvx
F[1, 4] = dt # dy/dvy
F[2, 5] = dt # dz/dvz
return F
def compute_jacobian_H(self):
"""Compute measurement model Jacobian"""
H = np.zeros((self.measurement_dim, self.state_dim))
# Position measurements
H[0, 0] = 1 # dx/dx
H[1, 1] = 1 # dy/dy
H[2, 2] = 1 # dz/dz
# Orientation measurements
H[3, 6] = 1 # dqw/dqw
H[4, 7] = 1 # dqx/dqx
H[5, 8] = 1 # dqy/dqy
H[6, 9] = 1 # dqz/dqz
return H
class MultiSensorFusion:
def __init__(self):
# Initialize EKF for state estimation
self.ekf = ExtendedKalmanFilter(state_dim=10, measurement_dim=7) # pos + orient
# Sensor weights for data association
self.sensor_weights = {
'camera': 0.3,
'lidar': 0.4,
'imu': 0.3
}
def fuse_sensor_data(self, camera_data, lidar_data, imu_data, dt):
"""Fuse data from multiple sensors"""
# Predict step
self.ekf.predict(dt)
# Fuse camera and LiDAR position measurements
if camera_data and lidar_data:
# Triangulate position from camera and LiDAR
fused_position = self.triangulate_position(camera_data, lidar_data)
elif camera_data:
fused_position = camera_data['position']
elif lidar_data:
fused_position = lidar_data['position']
else:
fused_position = self.ekf.state[:3] # Keep previous estimate
# Fuse IMU orientation
if imu_data:
fused_orientation = imu_data['orientation']
else:
fused_orientation = self.ekf.state[6:10] # Keep previous estimate
# Create measurement vector
measurement = np.concatenate([fused_position, fused_orientation])
# Update step
self.ekf.update(measurement)
return self.ekf.state.copy()
def triangulate_position(self, camera_data, lidar_data):
"""Triangulate position from camera and LiDAR data"""
# This is a simplified approach
# In practice, you'd use geometric triangulation or optimization
cam_pos = np.array(camera_data['position'])
lidar_pos = np.array(lidar_data['position'])
# Weighted average based on sensor accuracies
weight_cam = self.sensor_weights['camera']
weight_lidar = self.sensor_weights['lidar']
fused_pos = (weight_cam * cam_pos + weight_lidar * lidar_pos) / (weight_cam + weight_lidar)
return fused_pos
Particle Filtering for Non-Linear Systems​
class ParticleFilter:
def __init__(self, num_particles=1000, state_dim=6):
self.num_particles = num_particles
self.state_dim = state_dim
# Initialize particles randomly around initial state
self.particles = np.random.normal(0, 1, (num_particles, state_dim))
self.weights = np.ones(num_particles) / num_particles
def predict(self, control, noise_std=0.1):
"""Predict particle states using motion model"""
# Add control input with noise
noise = np.random.normal(0, noise_std, (self.num_particles, self.state_dim))
self.particles += control + noise
def update(self, measurement, measurement_std=0.1):
"""Update particle weights based on measurement"""
# Calculate likelihood of each particle given measurement
diffs = self.particles - measurement
distances = np.sum(diffs**2, axis=1)
# Calculate weights using Gaussian likelihood
likelihoods = np.exp(-0.5 * distances / (measurement_std**2))
self.weights *= likelihoods
# Normalize weights
self.weights += 1e-300 # Avoid division by zero
self.weights /= np.sum(self.weights)
def resample(self):
"""Resample particles based on weights"""
# Systematic resampling
indices = self.systematic_resample()
self.particles = self.particles[indices]
self.weights.fill(1.0 / self.num_particles)
def systematic_resample(self):
"""Systematic resampling algorithm"""
cumulative_sum = np.cumsum(self.weights)
start = np.random.uniform(0, 1/self.num_particles)
indices = []
for i in range(self.num_particles):
u = start + i / self.num_particles
idx = np.searchsorted(cumulative_sum, u, side='right')
indices.append(idx)
return np.array(indices)
def estimate(self):
"""Get state estimate from particles"""
# Weighted average of particles
estimate = np.average(self.particles, axis=0, weights=self.weights)
return estimate
def get_particles(self):
"""Get current particles and weights"""
return self.particles.copy(), self.weights.copy()
class MultiModalParticleFilter:
def __init__(self, state_dim=6):
self.camera_pf = ParticleFilter(num_particles=500, state_dim=state_dim)
self.lidar_pf = ParticleFilter(num_particles=500, state_dim=state_dim)
self.imu_pf = ParticleFilter(num_particles=500, state_dim=state_dim)
# Fusion weights
self.fusion_weights = {'camera': 0.4, 'lidar': 0.4, 'imu': 0.2}
def update_multi_modal(self, camera_meas, lidar_meas, imu_meas, control):
"""Update multiple particle filters with different modalities"""
# Update each modality-specific filter
if camera_meas is not None:
self.camera_pf.predict(control)
self.camera_pf.update(camera_meas)
self.camera_pf.resample()
if lidar_meas is not None:
self.lidar_pf.predict(control)
self.lidar_pf.update(lidar_meas)
self.lidar_pf.resample()
if imu_meas is not None:
self.imu_pf.predict(control)
self.imu_pf.update(imu_meas)
self.imu_pf.resample()
def get_fused_estimate(self):
"""Get fused estimate from all particle filters"""
estimates = []
weights = []
if np.any(self.camera_pf.weights > 0):
estimates.append(self.camera_pf.estimate())
weights.append(self.fusion_weights['camera'])
if np.any(self.lidar_pf.weights > 0):
estimates.append(self.lidar_pf.estimate())
weights.append(self.fusion_weights['lidar'])
if np.any(self.imu_pf.weights > 0):
estimates.append(self.imu_pf.estimate())
weights.append(self.fusion_weights['imu'])
if estimates:
# Weighted average of estimates
weights = np.array(weights) / sum(weights) # Normalize
fused_estimate = np.average(estimates, axis=0, weights=weights)
return fused_estimate
else:
return np.zeros(self.camera_pf.state_dim)
3D Environment Perception​
Point Cloud Processing​
import open3d as o3d
from sklearn.cluster import DBSCAN
import pcl # Python Point Cloud Library
class PointCloudProcessor:
def __init__(self):
self.voxel_size = 0.05 # 5cm voxels for downsampling
self.distance_threshold = 0.02 # 2cm for plane segmentation
self.max_iterations = 1000
def preprocess_point_cloud(self, point_cloud):
"""Preprocess point cloud for perception tasks"""
# Downsample point cloud using voxel grid filter
downsampled = point_cloud.voxel_down_sample(voxel_size=self.voxel_size)
# Estimate normals
downsampled.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(
radius=0.1, max_nn=30
)
)
# Orient normals consistently
downsampled.orient_normals_consistent_tangent_plane(10)
return downsampled
def segment_planes(self, point_cloud):
"""Segment planar surfaces (e.g., floors, walls, tables)"""
# Plane segmentation using RANSAC
plane_model, inliers = point_cloud.segment_plane(
distance_threshold=self.distance_threshold,
ransac_n=3,
num_iterations=self.max_iterations
)
# Extract inlier (plane) points
plane_cloud = point_cloud.select_by_index(inliers)
remaining_cloud = point_cloud.select_by_index(inliers, invert=True)
return plane_model, plane_cloud, remaining_cloud
def cluster_objects(self, point_cloud, eps=0.05, min_points=10):
"""Cluster points into separate objects using DBSCAN"""
# Convert to numpy array for clustering
points = np.asarray(point_cloud.points)
# Perform DBSCAN clustering
clustering = DBSCAN(eps=eps, min_samples=min_points).fit(points)
labels = clustering.labels_
# Extract clusters
unique_labels = set(labels)
clusters = []
for label in unique_labels:
if label == -1: # Noise points
continue
# Extract points belonging to this cluster
cluster_points = points[labels == label]
cluster_cloud = o3d.geometry.PointCloud()
cluster_cloud.points = o3d.utility.Vector3dVector(cluster_points)
clusters.append(cluster_cloud)
return clusters
def extract_features(self, point_cloud):
"""Extract geometric features from point cloud"""
points = np.asarray(point_cloud.points)
# Calculate basic statistics
centroid = np.mean(points, axis=0)
bbox = np.ptp(points, axis=0) # Bounding box dimensions
volume = np.prod(bbox)
# Calculate eigenvalues for shape analysis
cov_matrix = np.cov(points.T)
eigenvalues, _ = np.linalg.eigh(cov_matrix)
# Shape descriptors based on eigenvalues
linearity = (eigenvalues[2] - eigenvalues[1]) / eigenvalues[2]
planarity = (eigenvalues[1] - eigenvalues[0]) / eigenvalues[2]
sphericity = eigenvalues[0] / eigenvalues[2]
features = {
'centroid': centroid,
'bbox': bbox,
'volume': volume,
'linearity': linearity,
'planarity': planarity,
'sphericity': sphericity,
'point_count': len(points)
}
return features
def register_point_clouds(self, source_cloud, target_cloud):
"""Register two point clouds using ICP"""
# Initial alignment using FPFH features
source_fpfh = self.compute_fpfh_features(source_cloud)
target_fpfh = self.compute_fpfh_features(target_cloud)
# Initial registration using RANSAC
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_cloud, target_cloud, source_fpfh, target_fpfh,
mutual_filter=True,
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(),
ransac_n=4,
checkers=[o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9)],
criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
)
# Refine with ICP
refined_result = o3d.pipelines.registration.registration_icp(
source_cloud, target_cloud,
max_correspondence_distance=0.02,
init=result.transformation,
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane()
)
return refined_result
def compute_fpfh_features(self, point_cloud):
"""Compute Fast Point Feature Histograms"""
radius_normal = self.voxel_size * 2
point_cloud.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30)
)
radius_feature = self.voxel_size * 5
fpfh = o3d.pipelines.registration.compute_fpfh_feature(
point_cloud,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100)
)
return fpfh
class DepthImageProcessor:
def __init__(self):
self.focal_length_x = 525.0 # Assumed focal length
self.focal_length_y = 525.0
self.center_x = 319.5 # Principal point
self.center_y = 239.5
def depth_to_point_cloud(self, depth_image, color_image=None):
"""Convert depth image to point cloud"""
height, width = depth_image.shape
# Create coordinate grids
y_coords, x_coords = np.mgrid[0:height, 0:width]
# Convert to 3D coordinates
x_coords = x_coords.astype(np.float32)
y_coords = y_coords.astype(np.float32)
# Calculate 3D positions
z_coords = depth_image.astype(np.float32)
x_coords = (x_coords - self.center_x) * z_coords / self.focal_length_x
y_coords = (y_coords - self.center_y) * z_coords / self.focal_length_y
# Stack to get 3D points
points = np.stack([x_coords, y_coords, z_coords], axis=-1)
# Reshape to (N, 3)
points = points.reshape(-1, 3)
# Remove invalid points (zeros or infinities)
valid_mask = np.isfinite(points).all(axis=1) & (points[:, 2] > 0)
points = points[valid_mask]
# Create Open3D point cloud
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)
if color_image is not None:
# Add colors to valid points
colors = color_image.reshape(-1, 3)[valid_mask] / 255.0 # Normalize to [0,1]
pcd.colors = o3d.utility.Vector3dVector(colors)
return pcd
def filter_depth_outliers(self, depth_image, kernel_size=5, threshold=3):
"""Filter outliers in depth image using median filtering"""
# Apply median filter
median_filtered = cv2.medianBlur(depth_image.astype(np.float32), kernel_size)
# Calculate difference from median
diff = np.abs(depth_image.astype(np.float32) - median_filtered)
# Calculate local standard deviation
local_mean = cv2.blur(depth_image.astype(np.float32), (kernel_size, kernel_size))
local_var = cv2.blur((depth_image.astype(np.float32) - local_mean)**2, (kernel_size, kernel_size))
local_std = np.sqrt(local_var)
# Create mask for outliers
outlier_mask = diff > threshold * (local_std + 1e-6) # Add small epsilon to avoid division by zero
# Replace outliers with median values
filtered_depth = depth_image.copy().astype(np.float32)
filtered_depth[outlier_mask] = median_filtered[outlier_mask]
return filtered_depth
SLAM Fundamentals​
class SimpleSLAM:
def __init__(self):
# Robot pose (x, y, theta)
self.robot_pose = np.array([0.0, 0.0, 0.0])
# Landmarks in the environment (x, y)
self.landmarks = {}
self.landmark_observations = {}
# Covariance matrices
self.pose_covariance = np.eye(3) * 0.1
self.landmark_covariances = {}
# Motion and measurement noise
self.motion_noise = np.diag([0.01, 0.01, 0.001]) # x, y, theta
self.measurement_noise = np.diag([0.1, 0.05]) # range, bearing
def predict_motion(self, control_input, dt):
"""Predict robot motion based on control input"""
# Simple differential drive model
v, omega = control_input # Linear and angular velocities
# Update pose using motion model
new_theta = self.robot_pose[2] + omega * dt
new_x = self.robot_pose[0] + v * np.cos(new_theta) * dt
new_y = self.robot_pose[1] + v * np.sin(new_theta) * dt
self.robot_pose = np.array([new_x, new_y, new_theta])
# Propagate uncertainty
G = self.compute_motion_jacobian(control_input, dt)
self.pose_covariance = G @ self.pose_covariance @ G.T + self.motion_noise
def compute_motion_jacobian(self, control_input, dt):
"""Compute Jacobian of motion model"""
v, omega = control_input
theta = self.robot_pose[2]
G = np.eye(3)
if abs(omega) > 1e-6: # Non-zero angular velocity
G[0, 2] = -v/omega * np.cos(theta) + v/omega * np.cos(theta + omega*dt)
G[1, 2] = -v/omega * np.sin(theta) + v/omega * np.sin(theta + omega*dt)
else: # Pure translation
G[0, 2] = -v * np.sin(theta) * dt
G[1, 2] = v * np.cos(theta) * dt
return G
def update_landmark(self, landmark_id, range_measurement, bearing_measurement):
"""Update landmark position based on observation"""
# Convert polar to Cartesian coordinates relative to robot
x_rel = range_measurement * np.cos(bearing_measurement)
y_rel = range_measurement * np.sin(bearing_measurement)
# Transform to global coordinates
cos_th, sin_th = np.cos(self.robot_pose[2]), np.sin(self.robot_pose[2])
x_glob = self.robot_pose[0] + cos_th * x_rel - sin_th * y_rel
y_glob = self.robot_pose[1] + sin_th * x_rel + cos_th * y_rel
observed_landmark = np.array([x_glob, y_glob])
if landmark_id not in self.landmarks:
# New landmark - initialize
self.landmarks[landmark_id] = observed_landmark.copy()
self.landmark_covariances[landmark_id] = np.eye(2) * 100.0
self.landmark_observations[landmark_id] = [observed_landmark]
else:
# Existing landmark - update estimate
self.update_existing_landmark(landmark_id, observed_landmark)
def update_existing_landmark(self, landmark_id, observed_landmark):
"""Update existing landmark using EKF update"""
predicted_landmark = self.landmarks[landmark_id]
# Measurement Jacobian
H = self.compute_measurement_jacobian(landmark_id)
# Innovation
innovation = observed_landmark - predicted_landmark
# Innovation covariance
landmark_cov = self.landmark_covariances[landmark_id]
S = H @ landmark_cov @ H.T + self.measurement_noise
# Kalman gain
K = landmark_cov @ H.T @ np.linalg.inv(S)
# Update landmark estimate
self.landmarks[landmark_id] = predicted_landmark + K @ innovation
self.landmark_covariances[landmark_id] = (np.eye(2) - K @ H) @ landmark_cov
def compute_measurement_jacobian(self, landmark_id):
"""Compute Jacobian of measurement function"""
# For landmark observation, this is typically the identity matrix
# since we're directly observing the landmark position
return np.eye(2)
def get_map(self):
"""Return current map of landmarks"""
return self.landmarks.copy()
class VisualSLAM:
def __init__(self):
# Feature tracking
self.feature_detector = cv2.ORB_create(nfeatures=1000)
self.descriptor_matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
# Pose tracking
self.current_pose = np.eye(4) # 4x4 transformation matrix
self.keyframes = []
# Map points
self.map_points = []
self.tracked_features = {}
def process_frame(self, image, camera_intrinsics):
"""Process a new camera frame for SLAM"""
# Detect and extract features
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
keypoints, descriptors = self.feature_detector.detectAndCompute(gray, None)
if len(keypoints) < 10:
return self.current_pose # Not enough features to process
# If this is the first frame, initialize
if not self.keyframes:
self.initialize_first_frame(keypoints, descriptors, image)
return self.current_pose
# Track features from previous frame
prev_keypoints = self.keyframes[-1]['keypoints']
prev_descriptors = self.keyframes[-1]['descriptors']
# Match features
matches = self.descriptor_matcher.match(descriptors, prev_descriptors)
matches = sorted(matches, key=lambda x: x.distance)
# Keep only good matches
good_matches = matches[:min(50, len(matches))]
if len(good_matches) < 10:
return self.current_pose # Not enough matches to estimate motion
# Extract matched points
curr_points = np.float32([keypoints[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
prev_points = np.float32([prev_keypoints[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# Estimate motion using Essential Matrix
E, mask = cv2.findEssentialMat(
curr_points, prev_points,
camera_intrinsics,
method=cv2.RANSAC,
prob=0.999,
threshold=1.0
)
if E is not None:
# Decompose Essential Matrix to get rotation and translation
_, R, t, _ = cv2.recoverPose(E, curr_points, prev_points, camera_intrinsics)
# Create transformation matrix
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = t.flatten()
# Update current pose
self.current_pose = self.current_pose @ np.linalg.inv(T)
# Add current frame as keyframe if significant motion occurred
if self.should_add_keyframe():
self.add_keyframe(keypoints, descriptors, image, self.current_pose)
return self.current_pose
def initialize_first_frame(self, keypoints, descriptors, image):
"""Initialize SLAM with first frame"""
keyframe = {
'keypoints': keypoints,
'descriptors': descriptors,
'image': image,
'pose': self.current_pose.copy()
}
self.keyframes.append(keyframe)
def should_add_keyframe(self):
"""Determine if current frame should be added as a keyframe"""
if not self.keyframes:
return True
# Add keyframe if enough time has passed or significant motion detected
return len(self.keyframes) % 10 == 0 # Every 10th frame as keyframe (simplified)
def add_keyframe(self, keypoints, descriptors, image, pose):
"""Add current frame as a keyframe"""
keyframe = {
'keypoints': keypoints,
'descriptors': descriptors,
'image': image,
'pose': pose.copy()
}
self.keyframes.append(keyframe)
def get_trajectory(self):
"""Return the robot trajectory"""
return [kf['pose'] for kf in self.keyframes]
Implementation Example: Complete Perception System​
Here's a complete example showing how all these techniques work together:
class IntegratedPerceptionSystem:
def __init__(self):
# Initialize perception modules
self.object_detector = RobotObjectDetector(model_type='faster_rcnn')
self.point_cloud_processor = PointCloudProcessor()
self.depth_processor = DepthImageProcessor()
self.slam_system = VisualSLAM()
self.sensor_fusion = MultiSensorFusion()
# Camera intrinsics (would be calibrated)
self.camera_intrinsics = np.array([
[525.0, 0.0, 319.5],
[0.0, 525.0, 239.5],
[0.0, 0.0, 1.0]
])
def process_robot_perception(self, rgb_image, depth_image, imu_data, control_input, dt):
"""
Process all sensor inputs for robot perception
"""
results = {}
# 1. Object detection in RGB image
detections = self.object_detector.detect_objects(rgb_image)
results['detections'] = detections
# 2. Process depth image to point cloud
color_array = np.array(rgb_image) if hasattr(rgb_image, 'size') else rgb_image
point_cloud = self.depth_processor.depth_to_point_cloud(depth_image, color_array)
# 3. Process point cloud
processed_cloud = self.point_cloud_processor.preprocess_point_cloud(point_cloud)
plane_model, plane_cloud, object_cloud = self.point_cloud_processor.segment_planes(processed_cloud)
# 4. Cluster objects in point cloud
object_clusters = self.point_cloud_processor.cluster_objects(object_cloud)
results['object_clusters'] = len(object_clusters)
# 5. Perform SLAM to estimate pose
current_pose = self.slam_system.process_frame(rgb_image, self.camera_intrinsics)
results['robot_pose'] = current_pose
# 6. Fuse sensor data
camera_pos = self.extract_camera_position(detections)
lidar_pos = self.extract_lidar_position(object_clusters)
fused_state = self.sensor_fusion.fuse_sensor_data(
camera_pos, lidar_pos, imu_data, dt
)
results['fused_state'] = fused_state
# 7. Extract semantic information
semantic_map = self.create_semantic_map(detections, object_clusters)
results['semantic_map'] = semantic_map
# 8. Generate scene understanding
scene_graph = self.understand_scene(detections, semantic_map, current_pose)
results['scene_graph'] = scene_graph
return results
def extract_camera_position(self, detections):
"""Extract position estimate from camera detections"""
if not detections['boxes']:
return None
# Use object positions in camera frame
# This is simplified - in practice, you'd use geometric relationships
avg_pos = np.mean([box[:2] for box in detections['boxes']], axis=0)
return {'position': avg_pos, 'orientation': np.array([0, 0, 0, 1])}
def extract_lidar_position(self, clusters):
"""Extract position estimate from LiDAR clusters"""
if not clusters:
return None
# Use centroids of clusters
centroids = []
for cluster in clusters:
centroid = np.mean(np.asarray(cluster.points), axis=0)
centroids.append(centroid)
avg_centroid = np.mean(centroids, axis=0) if centroids else np.zeros(3)
return {'position': avg_centroid, 'orientation': np.array([0, 0, 0, 1])}
def create_semantic_map(self, detections, clusters):
"""Create semantic map from detections and clusters"""
semantic_map = {
'objects': [],
'surfaces': [],
'relationships': []
}
# Add detected objects
for i, (box, label, score) in enumerate(zip(
detections['boxes'],
detections['class_names'],
detections['scores']
)):
obj_info = {
'label': label,
'confidence': score,
'bbox': box.tolist(),
'cluster_association': self.associate_cluster(i, clusters)
}
semantic_map['objects'].append(obj_info)
# Add surface information from plane segmentation
# This would be populated from point cloud processing
return semantic_map
def associate_cluster(self, detection_idx, clusters):
"""Associate detection with point cloud cluster"""
# This would implement geometric association
# between 2D detections and 3D clusters
return detection_idx if detection_idx < len(clusters) else -1
def understand_scene(self, detections, semantic_map, pose):
"""Generate high-level scene understanding"""
scene_graph = {
'entities': [],
'relations': [],
'activities': [],
'intentions': []
}
# Identify entities
for obj in detections['class_names']:
if obj not in scene_graph['entities']:
scene_graph['entities'].append(obj)
# Identify spatial relations
# This would analyze relative positions of objects
if len(detections['boxes']) >= 2:
# Example: analyze if objects are near each other
for i in range(len(detections['boxes'])):
for j in range(i+1, len(detections['boxes'])):
box1 = detections['boxes'][i]
box2 = detections['boxes'][j]
# Calculate distance between object centers
center1 = [(box1[0] + box1[2])/2, (box1[1] + box1[3])/2]
center2 = [(box2[0] + box2[2])/2, (box2[1] + box2[3])/2]
dist = np.linalg.norm(np.array(center1) - np.array(center2))
if dist < 100: # Pixels - adjust threshold as needed
relation = {
'subject': detections['class_names'][i],
'predicate': 'near',
'object': detections['class_names'][j]
}
scene_graph['relations'].append(relation)
return scene_graph
# Example usage of the complete perception system
def run_perception_demo():
"""Run a complete perception system demo"""
perception_system = IntegratedPerceptionSystem()
# Simulate sensor inputs (in practice, these would come from real sensors)
# For demo purposes, we'll create dummy data
dummy_rgb = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
dummy_depth = np.random.rand(480, 640).astype(np.float32) * 10.0 # 0-10m
dummy_imu = {'position': np.zeros(3), 'orientation': np.array([0, 0, 0, 1])}
dummy_control = np.array([0.1, 0.01]) # v, omega
dt = 0.1 # 10Hz
# Process perception pipeline
results = perception_system.process_robot_perception(
dummy_rgb, dummy_depth, dummy_imu, dummy_control, dt
)
print("Perception Results:")
print(f"- Detected {len(results['detections']['boxes'])} objects")
print(f"- Found {results['object_clusters']} object clusters")
print(f"- Robot pose: {results['robot_pose'][:3, 3]}") # Position only
print(f"- Scene contains: {results['scene_graph']['entities']}")
return results
# Run the demo
if __name__ == "__main__":
results = run_perception_demo()
Best Practices for AI Perception in Robotics​
1. Multi-Modal Sensing​
- Combine different sensor modalities for robust perception
- Use sensor fusion to improve accuracy and reliability
- Implement redundancy for safety-critical applications
2. Real-Time Performance​
- Optimize algorithms for real-time execution
- Use efficient data structures and algorithms
- Consider hardware acceleration (GPUs, TPUs, FPGAs)
3. Robustness and Adaptation​
- Handle sensor failures gracefully
- Adapt to changing environmental conditions
- Implement online learning for continuous improvement
4. Evaluation and Validation​
- Use appropriate metrics for perception quality
- Test in diverse environments and conditions
- Validate safety and reliability for deployment
Learning Objectives​
By the end of this lesson, you should be able to:
- Understand the components of a robot perception system and their interactions
- Implement deep learning techniques for object recognition and classification
- Apply sensor fusion methods to combine data from multiple sensors
- Process 3D data including point clouds and depth images
- Understand SLAM fundamentals and their role in perception
- Design integrated perception pipelines for robotics applications
- Evaluate perception system performance and robustness
- Apply best practices for real-time perception in robotics