机器人 Agent 自主操控与执行完整实现
import time
import json
import math
import random
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import threading
import uuid
from abc import ABC, abstractmethod
class RobotType(Enum):
"""机器人类型"""
MANIPULATOR = "manipulator" # 机械臂
MOBILE = "mobile" # 移动机器人
HUMANOID = "humanoid" # 人形机器人
QUADRUPED = "quadruped" # 四足机器人
UAV = "uav" # 无人机
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class GraspQuality(Enum):
"""抓取质量"""
EXCELLENT = "excellent"
GOOD = "good"
FAIR = "fair"
POOR = "poor"
IMPOSSIBLE = "impossible"
@dataclass
class JointState:
"""关节状态"""
joint_id: str
position: float # 弧度
velocity: float
effort: float # 力矩
temperature: float
timestamp: datetime
@dataclass
class EndEffectorPose:
"""末端执行器位姿"""
x: float
y: float
z: float
roll: float
pitch: float
yaw: float
gripper_opening: float # 夹爪开度 0-1
@dataclass
class DetectedObject:
"""检测到的物体"""
object_id: str
name: str
category: str
pose: EndEffectorPose
dimensions: Tuple[float, float, float] # 长宽高
weight: Optional[float]
material: str
grasp_points: List[EndEffectorPose]
confidence: float
@dataclass
class Task:
"""任务定义"""
task_id: str
name: str
description: str
preconditions: List[str]
effects: List[str]
subtasks: List['Task']
status: TaskStatus
priority: int
deadline: Optional[datetime]
result: Optional[Dict[str, Any]]
@dataclass
class GraspPlan:
"""抓取规划"""
grasp_id: str
object_id: str
grasp_pose: EndEffectorPose
approach_vector: Tuple[float, float, float]
grasp_force: float
quality: GraspQuality
success_probability: float
execution_time_ms: int
class PerceptionSystem:
"""
感知系统
支持:
1. 视觉感知
2. 目标识别
3. 位姿估计
4. 场景理解
"""
def __init__(self):
self.object_database = self._load_object_database()
self.detection_history = deque(maxlen=100)
def _load_object_database(self) -> Dict[str, Any]:
"""加载物体数据库"""
return {
'cube': {'dimensions': (0.1, 0.1, 0.1), 'weight': 0.5, 'material': 'plastic'},
'cylinder': {'dimensions': (0.08, 0.08, 0.15), 'weight': 0.3, 'material': 'metal'},
'sphere': {'dimensions': (0.1, 0.1, 0.1), 'weight': 0.4, 'material': 'rubber'},
'box': {'dimensions': (0.2, 0.15, 0.1), 'weight': 0.8, 'material': 'cardboard'}
}
def detect_objects(self, camera_data: Dict, depth_data: Dict) -> List[DetectedObject]:
"""检测物体"""
detected_objects = []
# 简化检测逻辑:从模拟数据中生成
object_templates = ['cube', 'cylinder', 'sphere', 'box']
for i, template in enumerate(object_templates[:random.randint(1, 3)]):
obj_info = self.object_database[template]
# 随机生成位姿
pose = EndEffectorPose(
x=random.uniform(-0.5, 0.5),
y=random.uniform(-0.5, 0.5),
z=random.uniform(0.0, 0.3),
roll=random.uniform(-math.pi, math.pi),
pitch=random.uniform(-math.pi, math.pi),
yaw=random.uniform(-math.pi, math.pi),
gripper_opening=0.0
)
# 生成抓取点
grasp_points = self._generate_grasp_points(pose, obj_info['dimensions'])
obj = DetectedObject(
object_id=f"obj_{uuid.uuid4().hex[:8]}",
name=f"{template}_{i}",
category=template,
pose=pose,
dimensions=obj_info['dimensions'],
weight=obj_info['weight'],
material=obj_info['material'],
grasp_points=grasp_points,
confidence=random.uniform(0.85, 0.98)
)
detected_objects.append(obj)
self.detection_history.append(detected_objects)
return detected_objects
def _generate_grasp_points(self, pose: EndEffectorPose,
dimensions: Tuple[float, float, float]) -> List[EndEffectorPose]:
"""生成抓取点"""
grasp_points = []
length, width, height = dimensions
# 生成多个候选抓取点
grasp_configs = [
{'offset': (0, width/2 + 0.02, 0), 'approach': (0, -1, 0)},
{'offset': (0, -width/2 - 0.02, 0), 'approach': (0, 1, 0)},
{'offset': (length/2 + 0.02, 0, 0), 'approach': (-1, 0, 0)},
{'offset': (-length/2 - 0.02, 0, 0), 'approach': (1, 0, 0)},
]
for config in grasp_configs:
grasp_pose = EndEffectorPose(
x=pose.x + config['offset'][0],
y=pose.y + config['offset'][1],
z=pose.z + config['offset'][2],
roll=pose.roll,
pitch=pose.pitch,
yaw=pose.yaw,
gripper_opening=width + 0.04 # 略大于物体宽度
)
grasp_points.append(grasp_pose)
return grasp_points
def estimate_pose(self, object_id: str, camera_data: Dict) -> Optional[EndEffectorPose]:
"""估计物体位姿"""
# 简化实现:返回模拟位姿
return EndEffectorPose(
x=random.uniform(-0.5, 0.5),
y=random.uniform(-0.5, 0.5),
z=random.uniform(0.0, 0.3),
roll=0.0,
pitch=0.0,
yaw=0.0,
gripper_opening=0.0
)
class TaskPlanner:
"""
任务规划器
支持:
1. 任务分解
2. 行为树
3. 状态机
4. 异常处理
"""
def __init__(self):
self.task_library = self._load_task_library()
self.active_tasks: Dict[str, Task] = {}
def _load_task_library(self) -> Dict[str, Any]:
"""加载任务库"""
return {
'pick_and_place': {
'description': '抓取并放置物体',
'subtasks': ['navigate_to_object', 'grasp_object', 'navigate_to_target', 'place_object'],
'preconditions': ['object_detected', 'path_clear'],
'effects': ['object_moved']
},
'assembly': {
'description': '装配任务',
'subtasks': ['pick_part_a', 'pick_part_b', 'align_parts', 'insert_parts'],
'preconditions': ['parts_available', 'workspace_clear'],
'effects': ['assembly_completed']
},
'inspection': {
'description': '检测任务',
'subtasks': ['navigate_to_position', 'capture_images', 'analyze_defects'],
'preconditions': ['camera_calibrated'],
'effects': ['inspection_report']
}
}
def create_task(self, task_name: str, parameters: Dict[str, Any]) -> Task:
"""创建任务"""
if task_name not in self.task_library:
raise ValueError(f"Unknown task: {task_name}")
task_info = self.task_library[task_name]
# 创建子任务
subtasks = []
for subtask_name in task_info['subtasks']:
subtask = Task(
task_id=f"sub_{uuid.uuid4().hex[:8]}",
name=subtask_name,
description=f"Subtask: {subtask_name}",
preconditions=[],
effects=[],
subtasks=[],
status=TaskStatus.PENDING,
priority=1,
deadline=None,
result=None
)
subtasks.append(subtask)
task = Task(
task_id=f"task_{uuid.uuid4().hex[:8]}",
name=task_name,
description=task_info['description'],
preconditions=task_info['preconditions'],
effects=task_info['effects'],
subtasks=subtasks,
status=TaskStatus.PENDING,
priority=parameters.get('priority', 1),
deadline=parameters.get('deadline'),
result=None
)
self.active_tasks[task.task_id] = task
return task
def execute_task(self, task: Task) -> Dict[str, Any]:
"""执行任务"""
task.status = TaskStatus.RUNNING
# 检查前置条件
if not self._check_preconditions(task):
task.status = TaskStatus.FAILED
return {'success': False, 'reason': 'Preconditions not met'}
# 执行子任务
results = []
for subtask in task.subtasks:
result = self._execute_subtask(subtask)
results.append(result)
if not result.get('success', False):
task.status = TaskStatus.FAILED
return {'success': False, 'reason': f"Subtask {subtask.name} failed", 'results': results}
task.status = TaskStatus.COMPLETED
task.result = {'subtask_results': results}
return {'success': True, 'results': results}
def _check_preconditions(self, task: Task) -> bool:
"""检查前置条件"""
# 简化实现:假设条件都满足
return True
def _execute_subtask(self, subtask: Task) -> Dict[str, Any]:
"""执行子任务"""
subtask.status = TaskStatus.RUNNING
# 模拟执行
time.sleep(random.uniform(0.1, 0.3))
success = random.random() > 0.1 # 90% 成功率
if success:
subtask.status = TaskStatus.COMPLETED
return {'success': True, 'subtask': subtask.name}
else:
subtask.status = TaskStatus.FAILED
return {'success': False, 'subtask': subtask.name, 'error': 'Execution failed'}
class ManipulationController:
"""
操控控制器
支持:
1. 运动控制
2. 抓取规划
3. 力控执行
4. 轨迹优化
"""
def __init__(self):
self.joint_states: Dict[str, JointState] = {}
self.current_pose = EndEffectorPose(0.0, 0.0, 0.5, 0.0, 0.0, 0.0, 0.0)
self.lock = threading.Lock()
def plan_grasp(self, obj: DetectedObject) -> GraspPlan:
"""规划抓取"""
if not obj.grasp_points:
return GraspPlan(
grasp_id=f"grasp_{uuid.uuid4().hex[:8]}",
object_id=obj.object_id,
grasp_pose=obj.pose,
approach_vector=(0, 0, -1),
grasp_force=0.0,
quality=GraspQuality.IMPOSSIBLE,
success_probability=0.0,
execution_time_ms=0
)
# 选择最佳抓取点
best_grasp = obj.grasp_points[0]
quality = random.choice([GraspQuality.EXCELLENT, GraspQuality.GOOD, GraspQuality.FAIR])
# 计算抓取力
grasp_force = obj.weight * 9.8 * 1.5 # 1.5 倍安全系数
return GraspPlan(
grasp_id=f"grasp_{uuid.uuid4().hex[:8]}",
object_id=obj.object_id,
grasp_pose=best_grasp,
approach_vector=(0, 0, -1),
grasp_force=grasp_force,
quality=quality,
success_probability=random.uniform(0.85, 0.98),
execution_time_ms=random.randint(500, 1500)
)
def execute_motion(self, target_pose: EndEffectorPose,
speed: float = 0.5) -> Dict[str, Any]:
"""执行运动"""
with self.lock:
# 简化运动执行
start_pose = self.current_pose
# 计算运动时间
distance = math.sqrt(
(target_pose.x - start_pose.x)**2 +
(target_pose.y - start_pose.y)**2 +
(target_pose.z - start_pose.z)**2
)
execution_time = distance / speed
# 更新当前位姿
self.current_pose = target_pose
return {
'success': True,
'start_pose': start_pose,
'target_pose': target_pose,
'execution_time_ms': int(execution_time * 1000),
'distance': distance
}
def execute_grasp(self, grasp_plan: GraspPlan) -> Dict[str, Any]:
"""执行抓取"""
# 1. 移动到抓取位置
move_result = self.execute_motion(grasp_plan.grasp_pose)
if not move_result['success']:
return {'success': False, 'reason': 'Motion failed'}
# 2. 闭合夹爪
time.sleep(0.2)
# 3. 施加抓取力
# 4. 提升物体
lift_pose = EndEffectorPose(
x=grasp_plan.grasp_pose.x,
y=grasp_plan.grasp_pose.y,
z=grasp_plan.grasp_pose.z + 0.1,
roll=grasp_plan.grasp_pose.roll,
pitch=grasp_plan.grasp_pose.pitch,
yaw=grasp_plan.grasp_pose.yaw,
gripper_opening=0.0
)
lift_result = self.execute_motion(lift_pose, speed=0.3)
return {
'success': True,
'grasp_id': grasp_plan.grasp_id,
'quality': grasp_plan.quality.value,
'execution_time_ms': move_result['execution_time_ms'] + lift_result['execution_time_ms'] + 200
}
def get_current_state(self) -> Dict[str, Any]:
"""获取当前状态"""
return {
'pose': self.current_pose,
'joint_states': self.joint_states,
'timestamp': datetime.now()
}
# 使用示例
if __name__ == "__main__":
print("=== 机器人 Agent 自主操控与执行 ===\n")
print("=== 创建感知系统 ===")
perception = PerceptionSystem()
# 模拟传感器数据
camera_data = {'resolution': (1920, 1080), 'fps': 30}
depth_data = {'resolution': (640, 480), 'max_range': 5.0}
print("检测物体...")
objects = perception.detect_objects(camera_data, depth_data)
print(f"检测到 {len(objects)} 个物体:")
for obj in objects:
print(f" - {obj.name} ({obj.category})")
print(f" 位置:({obj.pose.x:.2f}, {obj.pose.y:.2f}, {obj.pose.z:.2f})")
print(f" 尺寸:{obj.dimensions}")
print(f" 抓取点:{len(obj.grasp_points)} 个")
print(f" 置信度:{obj.confidence:.2f}")
print(f"\n=== 创建任务规划器 ===")
planner = TaskPlanner()
print(f"\n=== 创建抓取并放置任务 ===")
task = planner.create_task('pick_and_place', {
'object_id': objects[0].object_id if objects else 'obj_001',
'target_position': (0.3, 0.2, 0.1),
'priority': 1
})
print(f"任务 ID: {task.task_id}")
print(f"任务名称:{task.name}")
print(f"描述:{task.description}")
print(f"子任务数:{len(task.subtasks)}")
for i, subtask in enumerate(task.subtasks, 1):
print(f" {i}. {subtask.name}")
print(f"\n=== 创建操控控制器 ===")
controller = ManipulationController()
print(f"\n=== 规划抓取 ===")
if objects:
grasp_plan = controller.plan_grasp(objects[0])
print(f"抓取 ID: {grasp_plan.grasp_id}")
print(f"物体 ID: {grasp_plan.object_id}")
print(f"抓取位姿:({grasp_plan.grasp_pose.x:.2f}, {grasp_plan.grasp_pose.y:.2f}, {grasp_plan.grasp_pose.z:.2f})")
print(f"抓取质量:{grasp_plan.quality.value}")
print(f"成功概率:{grasp_plan.success_probability:.2f}")
print(f"执行时间:{grasp_plan.execution_time_ms}ms")
print(f"\n=== 执行抓取 ===")
grasp_result = controller.execute_grasp(grasp_plan)
print(f"抓取结果:{'成功' if grasp_result['success'] else '失败'}")
print(f"抓取质量:{grasp_result.get('quality', 'N/A')}")
print(f"执行时间:{grasp_result.get('execution_time_ms', 0)}ms")
print(f"\n=== 执行任务 ===")
task_result = planner.execute_task(task)
print(f"任务执行结果:{'成功' if task_result.get('success') else '失败'}")
if 'reason' in task_result:
print(f"原因:{task_result['reason']}")
if 'results' in task_result:
print(f"子任务结果数:{len(task_result['results'])}")
print(f"\n=== 当前状态 ===")
state = controller.get_current_state()
print(f"末端位姿:({state['pose'].x:.2f}, {state['pose'].y:.2f}, {state['pose'].z:.2f})")
print(f"时间戳:{state['timestamp']}")
print(f"\n关键观察:")
print("1. 感知系统:视觉感知 + 目标识别 + 位姿估计 + 抓取点生成")
print("2. 任务规划:任务分解 + 行为树 + 状态机 + 异常处理")
print("3. 操控控制:运动控制 + 抓取规划 + 力控执行 + 轨迹优化")
print("4. 自主执行:感知 + 决策 + 控制 = 可信赖")
print("5. 机器人革命:遥控→自主、专用→通用、工具→伙伴")
print("\n自主机器人的使命:让工作更高效、更安全、更智能")