自动驾驶场景 Agent 决策系统完整实现
import time
import json
import math
import random
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import threading
import uuid
from abc import ABC, abstractmethod
class ObjectType(Enum):
"""目标类型"""
VEHICLE = "vehicle"
PEDESTRIAN = "pedestrian"
CYCLIST = "cyclist"
TRAFFIC_LIGHT = "traffic_light"
SIGN = "sign"
LANE = "lane"
OBSTACLE = "obstacle"
class TrafficLightState(Enum):
"""交通灯状态"""
RED = "red"
YELLOW = "yellow"
GREEN = "green"
UNKNOWN = "unknown"
class DrivingBehavior(Enum):
"""驾驶行为"""
LANE_KEEPING = "lane_keeping"
LANE_CHANGE_LEFT = "lane_change_left"
LANE_CHANGE_RIGHT = "lane_change_right"
TURN_LEFT = "turn_left"
TURN_RIGHT = "turn_right"
STOP = "stop"
YIELD = "yield"
OVERTAKE = "overtake"
EMERGENCY_STOP = "emergency_stop"
class RiskLevel(Enum):
"""风险等级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class BoundingBox:
"""边界框"""
x: float # 中心点 x
y: float # 中心点 y
z: float # 中心点 z
length: float
width: float
height: float
yaw: float # 航向角
@dataclass
class DetectedObject:
"""检测到的目标"""
object_id: str
object_type: ObjectType
bbox: BoundingBox
velocity: Tuple[float, float, float] # vx, vy, vz
acceleration: Optional[Tuple[float, float, float]]
confidence: float
timestamp: datetime
track_id: Optional[str]
@dataclass
class PredictedTrajectory:
"""预测轨迹"""
object_id: str
trajectory: List[Tuple[float, float, float]] # x, y, t
probabilities: List[float]
intention: str
confidence: float
@dataclass
class SceneContext:
"""场景上下文"""
scene_id: str
ego_state: Dict[str, Any] # 自车状态
objects: List[DetectedObject]
traffic_lights: Dict[str, TrafficLightState]
lane_info: Dict[str, Any]
weather: str
road_condition: str
timestamp: datetime
@dataclass
class DecisionResult:
"""决策结果"""
decision_id: str
behavior: DrivingBehavior
target_speed: float
target_lane: Optional[str]
trajectory: List[Tuple[float, float, float]]
risk_level: RiskLevel
confidence: float
reasoning: str
timestamp: datetime
class PerceptionFusion:
"""
感知融合模块
支持:
1. 多传感器融合
2. 目标检测
3. 跟踪
4. 置信度评估
"""
def __init__(self):
self.object_tracks: Dict[str, deque] = defaultdict(lambda: deque(maxlen=30))
self.lock = threading.Lock()
def fuse_sensors(self, lidar_data: List[Dict], camera_data: List[Dict],
radar_data: List[Dict]) -> List[DetectedObject]:
"""融合多传感器数据"""
fused_objects = []
# 简化融合逻辑:时间同步 + 空间对齐 + 关联匹配
all_detections = []
# 处理激光雷达数据
for det in lidar_data:
all_detections.append({
'source': 'lidar',
'object_id': det.get('id', str(uuid.uuid4())),
'type': det.get('type', 'unknown'),
'bbox': det.get('bbox'),
'confidence': det.get('confidence', 0.8),
'timestamp': det.get('timestamp', datetime.now())
})
# 处理摄像头数据
for det in camera_data:
all_detections.append({
'source': 'camera',
'object_id': det.get('id', str(uuid.uuid4())),
'type': det.get('type', 'unknown'),
'bbox': det.get('bbox'),
'confidence': det.get('confidence', 0.85),
'timestamp': det.get('timestamp', datetime.now())
})
# 处理毫米波雷达数据
for det in radar_data:
all_detections.append({
'source': 'radar',
'object_id': det.get('id', str(uuid.uuid4())),
'type': det.get('type', 'unknown'),
'bbox': det.get('bbox'),
'velocity': det.get('velocity'),
'confidence': det.get('confidence', 0.75),
'timestamp': det.get('timestamp', datetime.now())
})
# 关联匹配(简化:基于位置)
matched_objects = self._associate_detections(all_detections)
# 创建融合后的目标
for match in matched_objects:
fused_obj = self._create_fused_object(match)
fused_objects.append(fused_obj)
# 更新跟踪
with self.lock:
self.object_tracks[fused_obj.object_id].append(fused_obj)
return fused_objects
def _associate_detections(self, detections: List[Dict]) -> List[List[Dict]]:
"""关联检测(简化版)"""
# 按时间分组
time_groups = defaultdict(list)
for det in detections:
time_key = det['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
time_groups[time_key].append(det)
# 每组内基于位置关联
matched = []
for time_key, group in time_groups.items():
if len(group) >= 2:
matched.append(group)
else:
matched.append(group)
return matched
def _create_fused_object(self, detections: List[Dict]) -> DetectedObject:
"""创建融合目标"""
# 加权平均置信度
total_conf = sum(d['confidence'] for d in detections)
weights = [d['confidence'] / total_conf for d in detections]
# 融合边界框(简化:取第一个)
primary = detections[0]
# 估计速度(如果有雷达数据)
velocity = (0.0, 0.0, 0.0)
for det in detections:
if 'velocity' in det and det['source'] == 'radar':
velocity = det['velocity']
break
return DetectedObject(
object_id=primary['object_id'],
object_type=self._map_type(primary['type']),
bbox=primary['bbox'],
velocity=velocity,
acceleration=None,
confidence=total_conf / len(detections),
timestamp=datetime.now(),
track_id=primary['object_id']
)
def _map_type(self, type_str: str) -> ObjectType:
"""映射类型"""
type_map = {
'car': ObjectType.VEHICLE,
'truck': ObjectType.VEHICLE,
'person': ObjectType.PEDESTRIAN,
'cyclist': ObjectType.CYCLIST,
'traffic_light': ObjectType.TRAFFIC_LIGHT,
'sign': ObjectType.SIGN
}
return type_map.get(type_str.lower(), ObjectType.OBSTACLE)
def get_track_history(self, object_id: str) -> List[DetectedObject]:
"""获取跟踪历史"""
return list(self.object_tracks.get(object_id, []))
class BehaviorPredictor:
"""
行为预测器
支持:
1. 轨迹预测
2. 意图识别
3. 概率评估
"""
def __init__(self):
self.prediction_models = self._load_models()
def _load_models(self) -> Dict[str, Any]:
"""加载预测模型(简化)"""
return {
'vehicle': {'lane_change_prob': 0.15, 'turn_prob': 0.08},
'pedestrian': {'cross_prob': 0.25},
'cyclist': {'lane_change_prob': 0.20}
}
def predict_trajectory(self, obj: DetectedObject,
scene_context: SceneContext,
horizon_seconds: float = 5.0) -> PredictedTrajectory:
"""预测轨迹"""
# 基于运动学模型的简单预测
trajectory = []
probabilities = []
dt = 0.5 # 时间步长
steps = int(horizon_seconds / dt)
# 当前状态
x, y, z = obj.bbox.x, obj.bbox.y, obj.bbox.z
vx, vy, vz = obj.velocity
# 生成多个假设轨迹
intentions = ['keep_lane', 'lane_change_left', 'lane_change_right', 'turn']
for intention in intentions:
traj_points = []
prob = self._calculate_intention_probability(obj, intention, scene_context)
if prob < 0.05:
continue
# 基于意图生成轨迹
for step in range(steps):
t = step * dt
if intention == 'keep_lane':
# 保持车道:匀速直线
new_x = x + vx * t
new_y = y + vy * t
elif intention == 'lane_change_left':
# 向左变道
new_x = x + vx * t
new_y = y + vy * t + 3.5 * (1 - math.exp(-t/2))
elif intention == 'lane_change_right':
# 向右变道
new_x = x + vx * t
new_y = y + vy * t - 3.5 * (1 - math.exp(-t/2))
else: # turn
# 转弯
turn_radius = 20.0
angle = vx * t / turn_radius
new_x = x + turn_radius * math.sin(angle)
new_y = y + turn_radius * (1 - math.cos(angle))
traj_points.append((new_x, new_y, t))
trajectory.append(traj_points)
probabilities.append(prob)
# 归一化概率
total_prob = sum(probabilities)
if total_prob > 0:
probabilities = [p / total_prob for p in probabilities]
# 选择最可能的意图
best_idx = probabilities.index(max(probabilities)) if probabilities else 0
best_intention = intentions[best_idx] if best_idx < len(intentions) else 'keep_lane'
return PredictedTrajectory(
object_id=obj.object_id,
trajectory=trajectory[best_idx] if trajectory else [],
probabilities=probabilities,
intention=best_intention,
confidence=max(probabilities) if probabilities else 0.0
)
def _calculate_intention_probability(self, obj: DetectedObject,
intention: str,
scene_context: SceneContext) -> float:
"""计算意图概率"""
base_prob = 0.25
# 基于类型调整
if obj.object_type == ObjectType.VEHICLE:
if intention == 'keep_lane':
base_prob = 0.6
elif intention in ['lane_change_left', 'lane_change_right']:
base_prob = 0.15
elif obj.object_type == ObjectType.PEDESTRIAN:
if intention == 'cross':
base_prob = 0.4
# 基于场景调整(简化)
if scene_context.traffic_lights:
light_state = list(scene_context.traffic_lights.values())[0]
if light_state == TrafficLightState.RED and intention == 'keep_lane':
base_prob *= 0.3 # 红灯时直行概率降低
return min(base_prob, 1.0)
class DecisionMaker:
"""
决策制定器
支持:
1. 行为决策
2. 风险评估
3. 规则引擎
4. 学习策略
"""
def __init__(self):
self.rules = self._load_rules()
self.decision_history = deque(maxlen=100)
def _load_rules(self) -> List[Dict]:
"""加载决策规则"""
return [
{
'name': 'red_light_stop',
'condition': lambda ctx: ctx.traffic_lights and
any(state == TrafficLightState.RED
for state in ctx.traffic_lights.values()),
'behavior': DrivingBehavior.STOP,
'priority': 10
},
{
'name': 'obstacle_yield',
'condition': lambda ctx: any(obj.object_type == ObjectType.OBSTACLE
and self._calculate_ttc(obj, ctx) < 3.0
for obj in ctx.objects),
'behavior': DrivingBehavior.YIELD,
'priority': 9
},
{
'name': 'lane_keep',
'condition': lambda ctx: True,
'behavior': DrivingBehavior.LANE_KEEPING,
'priority': 1
}
]
def make_decision(self, scene_context: SceneContext,
predictions: List[PredictedTrajectory]) -> DecisionResult:
"""制定决策"""
# 1. 规则匹配
matched_rules = []
for rule in self.rules:
if rule['condition'](scene_context):
matched_rules.append(rule)
# 2. 选择最高优先级规则
if matched_rules:
best_rule = max(matched_rules, key=lambda r: r['priority'])
behavior = best_rule['behavior']
reasoning = f"Rule: {best_rule['name']}"
else:
behavior = DrivingBehavior.LANE_KEEPING
reasoning = "Default: lane keeping"
# 3. 风险评估
risk_level = self._assess_risk(scene_context, predictions)
# 4. 目标速度规划
target_speed = self._plan_target_speed(behavior, risk_level, scene_context)
# 5. 轨迹生成
trajectory = self._generate_trajectory(behavior, scene_context)
# 6. 创建决策结果
decision = DecisionResult(
decision_id=str(uuid.uuid4()),
behavior=behavior,
target_speed=target_speed,
target_lane=None,
trajectory=trajectory,
risk_level=risk_level,
confidence=0.85,
reasoning=reasoning,
timestamp=datetime.now()
)
self.decision_history.append(decision)
return decision
def _calculate_ttc(self, obj: DetectedObject, ctx: SceneContext) -> float:
"""计算碰撞时间(Time To Collision)"""
# 简化计算
if obj.velocity[0] <= 0:
return float('inf')
distance = obj.bbox.x # 假设 x 是纵向距离
relative_speed = obj.velocity[0]
if relative_speed <= 0:
return float('inf')
return distance / relative_speed
def _assess_risk(self, ctx: SceneContext,
predictions: List[PredictedTrajectory]) -> RiskLevel:
"""风险评估"""
# 基于 TTC 评估风险
min_ttc = float('inf')
for obj in ctx.objects:
ttc = self._calculate_ttc(obj, ctx)
min_ttc = min(min_ttc, ttc)
if min_ttc < 2.0:
return RiskLevel.CRITICAL
elif min_ttc < 3.5:
return RiskLevel.HIGH
elif min_ttc < 5.0:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def _plan_target_speed(self, behavior: DrivingBehavior,
risk_level: RiskLevel,
ctx: SceneContext) -> float:
"""规划目标速度"""
base_speed = 50.0 # km/h
# 基于行为调整
if behavior == DrivingBehavior.STOP:
return 0.0
elif behavior == DrivingBehavior.YIELD:
return 15.0
elif behavior in [DrivingBehavior.TURN_LEFT, DrivingBehavior.TURN_RIGHT]:
return 20.0
# 基于风险调整
risk_factors = {
RiskLevel.LOW: 1.0,
RiskLevel.MEDIUM: 0.8,
RiskLevel.HIGH: 0.5,
RiskLevel.CRITICAL: 0.2
}
return base_speed * risk_factors[risk_level]
def _generate_trajectory(self, behavior: DrivingBehavior,
ctx: SceneContext) -> List[Tuple[float, float, float]]:
"""生成轨迹"""
trajectory = []
# 简化轨迹生成
for t in range(0, 50, 5): # 5 秒,步长 0.5 秒
if behavior == DrivingBehavior.STOP:
trajectory.append((t * 0.0, 0.0, t / 10.0))
elif behavior == DrivingBehavior.LANE_KEEPING:
trajectory.append((t * 1.5, 0.0, t / 10.0))
elif behavior == DrivingBehavior.LANE_CHANGE_LEFT:
trajectory.append((t * 1.5, t * 0.3, t / 10.0))
elif behavior == DrivingBehavior.LANE_CHANGE_RIGHT:
trajectory.append((t * 1.5, -t * 0.3, t / 10.0))
else:
trajectory.append((t * 1.0, 0.0, t / 10.0))
return trajectory
# 使用示例
if __name__ == "__main__":
print("=== 自动驾驶场景 Agent 决策系统 ===\n")
print("=== 创建感知融合模块 ===")
perception = PerceptionFusion()
# 模拟传感器数据
lidar_data = [
{'id': 'obj_001', 'type': 'car', 'bbox': BoundingBox(50.0, 0.0, 0.0, 4.5, 1.8, 1.5, 0.0),
'confidence': 0.92, 'timestamp': datetime.now()},
{'id': 'obj_002', 'type': 'person', 'bbox': BoundingBox(30.0, 3.0, 0.0, 0.5, 0.5, 1.7, 0.0),
'confidence': 0.88, 'timestamp': datetime.now()}
]
camera_data = [
{'id': 'obj_001', 'type': 'car', 'bbox': BoundingBox(50.0, 0.0, 0.0, 4.5, 1.8, 1.5, 0.0),
'confidence': 0.90, 'timestamp': datetime.now()},
{'id': 'obj_003', 'type': 'traffic_light', 'bbox': BoundingBox(100.0, 0.0, 5.0, 0.3, 0.3, 1.0, 0.0),
'confidence': 0.95, 'timestamp': datetime.now()}
]
radar_data = [
{'id': 'obj_001', 'type': 'car', 'bbox': BoundingBox(50.0, 0.0, 0.0, 4.5, 1.8, 1.5, 0.0),
'velocity': (5.0, 0.0, 0.0), 'confidence': 0.85, 'timestamp': datetime.now()}
]
print("融合传感器数据...")
fused_objects = perception.fuse_sensors(lidar_data, camera_data, radar_data)
print(f"融合后目标数:{len(fused_objects)}")
for obj in fused_objects:
print(f" - {obj.object_type.value}: 位置 ({obj.bbox.x:.1f}, {obj.bbox.y:.1f}), "
f"速度 {obj.velocity}, 置信度 {obj.confidence:.2f}")
print(f"\n=== 创建行为预测器 ===")
predictor = BehaviorPredictor()
print(f"\n=== 创建场景上下文 ===")
scene = SceneContext(
scene_id="scene_001",
ego_state={'speed': 45.0, 'lane': 'center', 'position': (0.0, 0.0)},
objects=fused_objects,
traffic_lights={'light_001': TrafficLightState.GREEN},
lane_info={'current_lane': 'center', 'lane_count': 3},
weather='clear',
road_condition='dry',
timestamp=datetime.now()
)
print(f"场景 ID: {scene.scene_id}")
print(f"自车速度:{scene.ego_state['speed']} km/h")
print(f"交通灯:{[state.value for state in scene.traffic_lights.values()]}")
print(f"\n=== 预测他车行为 ===")
predictions = []
for obj in fused_objects:
if obj.object_type in [ObjectType.VEHICLE, ObjectType.PEDESTRIAN, ObjectType.CYCLIST]:
pred = predictor.predict_trajectory(obj, scene)
predictions.append(pred)
print(f"目标 {obj.object_id}:")
print(f" 意图:{pred.intention}")
print(f" 置信度:{pred.confidence:.2f}")
print(f" 轨迹点数:{len(pred.trajectory)}")
print(f"\n=== 创建决策制定器 ===")
decision_maker = DecisionMaker()
print(f"\n=== 制定驾驶决策 ===")
decision = decision_maker.make_decision(scene, predictions)
print(f"决策 ID: {decision.decision_id}")
print(f"行为:{decision.behavior.value}")
print(f"目标速度:{decision.target_speed:.1f} km/h")
print(f"风险等级:{decision.risk_level.value}")
print(f"置信度:{decision.confidence:.2f}")
print(f"推理:{decision.reasoning}")
print(f"轨迹点数:{len(decision.trajectory)}")
print(f"\n关键观察:")
print("1. 感知融合:激光雷达 + 摄像头 + 毫米波雷达 → 融合目标")
print("2. 行为预测:轨迹预测 + 意图识别 + 概率评估")
print("3. 决策制定:规则引擎 + 风险评估 + 速度规划 + 轨迹生成")
print("4. 安全执行:风险等级 + 置信度 + 推理过程")
print("5. 智能决策:感知 + 理解 + 决策 + 规划 = 可信赖")
print("\n智能驾驶的使命:让出行更安全、更高效、更舒适")