具身 Agent 框架完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import math
class SensorType(Enum):
"""传感器类型"""
CAMERA = "camera"
LIDAR = "lidar"
IMU = "imu"
FORCE = "force"
TORQUE = "torque"
ENCODER = "encoder"
TACTILE = "tactile"
class ActuatorType(Enum):
"""执行器类型"""
MOTOR = "motor"
SERVO = "servo"
HYDRAULIC = "hydraulic"
PNEUMATIC = "pneumatic"
GRIPPER = "gripper"
@dataclass
class SensorData:
"""传感器数据"""
sensor_id: str
sensor_type: SensorType
timestamp: datetime
data: np.ndarray
confidence: float = 1.0
@dataclass
class ActuatorCommand:
"""执行器指令"""
actuator_id: str
actuator_type: ActuatorType
command_type: str # position, velocity, torque
target_value: float
max_force: float = None
timeout: float = 1.0
@dataclass
class JointState:
"""关节状态"""
joint_id: str
position: float # rad or m
velocity: float # rad/s or m/s
torque: float # Nm or N
temperature: float = 25.0 # Celsius
@dataclass
class EndEffectorPose:
"""末端执行器位姿"""
position: np.ndarray # [x, y, z] in meters
orientation: np.ndarray # quaternion [w, x, y, z]
@property
def x(self): return self.position[0]
@property
def y(self): return self.position[1]
@property
def z(self): return self.position[2]
class EmbodiedAgent:
"""
具身 Agent 基类
核心功能:
1. 多传感器融合
2. 状态估计
3. 动作规划
4. 运动控制
5. 感知 - 动作闭环
"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.sensors: Dict[str, SensorType] = {}
self.actuators: Dict[str, ActuatorType] = {}
self.joint_states: Dict[str, JointState] = {}
self.sensor_data_buffer: List[SensorData] = []
self.is_initialized = False
def register_sensor(self, sensor_id: str, sensor_type: SensorType):
"""注册传感器"""
self.sensors[sensor_id] = sensor_type
print(f"✓ 注册传感器:{sensor_id} ({sensor_type.value})")
def register_actuator(self, actuator_id: str, actuator_type: ActuatorType):
"""注册执行器"""
self.actuators[actuator_id] = actuator_type
print(f"✓ 注册执行器:{actuator_id} ({actuator_type.value})")
def receive_sensor_data(self, sensor_data: SensorData):
"""接收传感器数据"""
self.sensor_data_buffer.append(sensor_data)
# 保持缓冲区大小
if len(self.sensor_data_buffer) > 1000:
self.sensor_data_buffer = self.sensor_data_buffer[-500:]
def get_latest_sensor_data(self, sensor_id: str) -> Optional[SensorData]:
"""获取最新传感器数据"""
for data in reversed(self.sensor_data_buffer):
if data.sensor_id == sensor_id:
return data
return None
def estimate_state(self) -> Dict[str, Any]:
"""
状态估计
Returns:
系统状态字典
"""
state = {
"timestamp": datetime.now(),
"joint_states": {},
"end_effector_pose": None,
"sensor_fusion": {}
}
# 融合所有关节状态
for joint_id, joint_state in self.joint_states.items():
state["joint_states"][joint_id] = {
"position": joint_state.position,
"velocity": joint_state.velocity,
"torque": joint_state.torque
}
# 传感器融合
for sensor_id, sensor_type in self.sensors.items():
latest_data = self.get_latest_sensor_data(sensor_id)
if latest_data:
state["sensor_fusion"][sensor_id] = {
"type": sensor_type.value,
"data": latest_data.data.tolist(),
"confidence": latest_data.confidence
}
return state
def plan_action(self, goal: Dict[str, Any]) -> List[ActuatorCommand]:
"""
动作规划
Args:
goal: 目标状态
Returns:
执行器指令列表
"""
raise NotImplementedError("Subclasses must implement plan_action")
def execute_command(self, command: ActuatorCommand) -> bool:
"""
执行指令
Args:
command: 执行器指令
Returns:
执行是否成功
"""
print(f"执行指令:{command.actuator_id} -> {command.command_type}={command.target_value}")
# 实际实现会发送指令到硬件
return True
def perception_action_loop(self, goal: Dict[str, Any]) -> bool:
"""
感知 - 动作闭环
Args:
goal: 目标状态
Returns:
任务是否成功完成
"""
# 1. 感知:获取当前状态
current_state = self.estimate_state()
# 2. 规划:生成动作序列
action_sequence = self.plan_action(goal)
# 3. 执行:执行动作序列
success = True
for command in action_sequence:
if not self.execute_command(command):
success = False
break
# 4. 反馈:检查执行结果
if success:
final_state = self.estimate_state()
# 验证是否达到目标
# ... 验证逻辑
return success
def initialize(self) -> bool:
"""初始化 Agent"""
print(f"\n=== 初始化具身 Agent: {self.agent_id} ===")
print(f"传感器数量:{len(self.sensors)}")
print(f"执行器数量:{len(self.actuators)}")
self.is_initialized = True
return True
class RobotArmAgent(EmbodiedAgent):
"""
机械臂具身 Agent
6 自由度机械臂示例
"""
def __init__(self, agent_id: str = "robot_arm_001"):
super().__init__(agent_id)
self.num_joints = 6
self._setup_robot()
def _setup_robot(self):
"""配置机器人"""
# 注册关节编码器(位置传感器)
for i in range(self.num_joints):
self.register_sensor(f"encoder_{i}", SensorType.ENCODER)
self.register_actuator(f"motor_{i}", ActuatorType.MOTOR)
self.joint_states[f"joint_{i}"] = JointState(
joint_id=f"joint_{i}",
position=0.0,
velocity=0.0,
torque=0.0
)
# 注册力扭矩传感器
self.register_sensor("ft_sensor", SensorType.FORCE)
# 注册末端夹爪
self.register_actuator("gripper", ActuatorType.GRIPPER)
def forward_kinematics(self, joint_angles: np.ndarray) -> EndEffectorPose:
"""
正运动学:从关节角度计算末端位姿
Args:
joint_angles: 6 个关节角度 [rad]
Returns:
末端执行器位姿
"""
# 简化的 DH 参数模型(实际应使用真实机器人参数)
# 这里使用简化模型演示
# 假设各连杆长度(米)
L = [0.1, 0.2, 0.2, 0.15, 0.15, 0.1]
# 简化计算:只计算位置,忽略姿态
x = 0.0
y = 0.0
z = 0.0
# 累积变换(简化)
current_angle = 0.0
for i in range(self.num_joints):
current_angle += joint_angles[i]
if i % 2 == 0:
x += L[i] * np.cos(current_angle)
z += L[i] * np.sin(current_angle)
else:
y += L[i] * np.cos(current_angle)
z += L[i] * np.sin(current_angle)
# 简化:姿态设为单位四元数
orientation = np.array([1.0, 0.0, 0.0, 0.0]) # [w, x, y, z]
return EndEffectorPose(
position=np.array([x, y, z]),
orientation=orientation
)
def inverse_kinematics(self, target_pose: EndEffectorPose,
current_angles: np.ndarray,
max_iterations: int = 100,
tolerance: float = 1e-4) -> Optional[np.ndarray]:
"""
逆运动学:从末端位姿计算关节角度(使用数值方法)
Args:
target_pose: 目标末端位姿
current_angles: 当前关节角度(初始猜测)
max_iterations: 最大迭代次数
tolerance: 收敛容差
Returns:
关节角度列表,如果无解返回 None
"""
angles = current_angles.copy()
for iteration in range(max_iterations):
# 计算当前末端位置
current_pose = self.forward_kinematics(angles)
# 计算位置误差
position_error = target_pose.position - current_pose.position
error_norm = np.linalg.norm(position_error)
# 检查是否收敛
if error_norm < tolerance:
print(f"IK 收敛于迭代 {iteration}, 误差:{error_norm:.6f}")
return angles
# 使用雅可比矩阵的伪逆更新角度(简化实现)
# 实际应计算完整的雅可比矩阵
jacobian = self._compute_jacobian(angles)
# 阻尼最小二乘法 (DLS)
lambda_damping = 0.5
J_pinv = np.linalg.pinv(jacobian.T @ jacobian + lambda_damping * np.eye(6)) @ jacobian.T
# 更新关节角度
delta_angles = J_pinv @ position_error
angles += delta_angles
# 限制关节角度范围
angles = np.clip(angles, -np.pi, np.pi)
print(f"IK 未在 {max_iterations} 次迭代内收敛,最终误差:{error_norm:.6f}")
return None
def _compute_jacobian(self, joint_angles: np.ndarray) -> np.ndarray:
"""
计算雅可比矩阵(简化版本)
Returns:
6x6 雅可比矩阵
"""
# 简化:返回单位矩阵(实际应基于 DH 参数计算)
return np.eye(6) * 0.5
def plan_action(self, goal: Dict[str, Any]) -> List[ActuatorCommand]:
"""
动作规划:移动到目标位置
Args:
goal: 包含目标位置的字典 {"position": [x, y, z]}
Returns:
执行器指令列表
"""
target_position = np.array(goal["position"])
target_pose = EndEffectorPose(
position=target_position,
orientation=np.array([1.0, 0.0, 0.0, 0.0])
)
# 获取当前关节角度
current_angles = np.array([
self.joint_states[f"joint_{i}"].position
for i in range(self.num_joints)
])
# 求解逆运动学
target_angles = self.inverse_kinematics(target_pose, current_angles)
if target_angles is None:
print("逆运动学求解失败!")
return []
# 生成轨迹(简单线性插值)
num_waypoints = 10
commands = []
for i in range(num_waypoints):
t = i / (num_waypoints - 1)
interpolated_angles = current_angles * (1 - t) + target_angles * t
# 为每个关节生成指令
for j in range(self.num_joints):
command = ActuatorCommand(
actuator_id=f"motor_{j}",
actuator_type=ActuatorType.MOTOR,
command_type="position",
target_value=interpolated_angles[j],
max_force=50.0, # Nm
timeout=0.5
)
commands.append(command)
# 添加夹爪指令(如果需要)
if "gripper_action" in goal:
gripper_command = ActuatorCommand(
actuator_id="gripper",
actuator_type=ActuatorType.GRIPPER,
command_type="position",
target_value=goal["gripper_action"], # 0: close, 1: open
timeout=0.3
)
commands.append(gripper_command)
return commands
def execute_pick_and_place(self, pick_position: List[float],
place_position: List[float]) -> bool:
"""
执行抓取 - 放置任务
Args:
pick_position: 抓取位置 [x, y, z]
place_position: 放置位置 [x, y, z]
Returns:
任务是否成功
"""
print(f"\n=== 执行抓取 - 放置任务 ===")
print(f"抓取位置:{pick_position}")
print(f"放置位置:{place_position}")
# 1. 移动到抓取位置上方
approach_pick = pick_position.copy()
approach_pick[2] += 0.1 # 上方 10cm
success = self.move_to_position(approach_pick)
if not success:
return False
# 2. 下降到抓取位置
success = self.move_to_position(pick_position)
if not success:
return False
# 3. 闭合夹爪
success = self.close_gripper()
if not success:
return False
# 4. 抬起到抓取位置上方
success = self.move_to_position(approach_pick)
if not success:
return False
# 5. 移动到放置位置上方
approach_place = place_position.copy()
approach_place[2] += 0.1
success = self.move_to_position(approach_place)
if not success:
return False
# 6. 下降到放置位置
success = self.move_to_position(place_position)
if not success:
return False
# 7. 打开夹爪
success = self.open_gripper()
if not success:
return False
# 8. 抬起到安全位置
success = self.move_to_position(approach_place)
print("✓ 抓取 - 放置任务完成")
return success
def move_to_position(self, position: List[float]) -> bool:
"""移动到目标位置"""
goal = {"position": position}
commands = self.plan_action(goal)
if not commands:
return False
for command in commands:
if not self.execute_command(command):
return False
return True
def close_gripper(self) -> bool:
"""闭合夹爪"""
command = ActuatorCommand(
actuator_id="gripper",
actuator_type=ActuatorType.GRIPPER,
command_type="position",
target_value=0.0, # 闭合
timeout=0.3
)
return self.execute_command(command)
def open_gripper(self) -> bool:
"""打开夹爪"""
command = ActuatorCommand(
actuator_id="gripper",
actuator_type=ActuatorType.GRIPPER,
command_type="position",
target_value=1.0, # 打开
timeout=0.3
)
return self.execute_command(command)
# 使用示例
if __name__ == "__main__":
print("=== 具身 Agent 动作执行与运动控制 ===\n")
# 创建机械臂 Agent
robot = RobotArmAgent("robot_arm_001")
# 初始化
robot.initialize()
# 模拟传感器数据
for i in range(6):
sensor_data = SensorData(
sensor_id=f"encoder_{i}",
sensor_type=SensorType.ENCODER,
timestamp=datetime.now(),
data=np.array([0.0]),
confidence=0.99
)
robot.receive_sensor_data(sensor_data)
# 状态估计
print("\n=== 状态估计 ===")
state = robot.estimate_state()
print(f"时间戳:{state['timestamp']}")
print(f"关节数量:{len(state['joint_states'])}")
print(f"传感器数据:{len(state['sensor_fusion'])} 个")
# 正运动学示例
print("\n=== 正运动学 ===")
joint_angles = np.array([0.0, np.pi/4, -np.pi/3, np.pi/6, -np.pi/4, 0.0])
end_pose = robot.forward_kinematics(joint_angles)
print(f"关节角度:{np.degrees(joint_angles)}")
print(f"末端位置:[{end_pose.x:.3f}, {end_pose.y:.3f}, {end_pose.z:.3f}] m")
# 逆运动学示例
print("\n=== 逆运动学 ===")
target_pose = EndEffectorPose(
position=np.array([0.3, 0.2, 0.4]),
orientation=np.array([1.0, 0.0, 0.0, 0.0])
)
current_angles = np.zeros(6)
solution = robot.inverse_kinematics(target_pose, current_angles)
if solution is not None:
print(f"目标位置:{target_pose.position}")
print(f"求解结果:{np.degrees(solution)}")
# 验证:用求解的角度计算正运动学
verified_pose = robot.forward_kinematics(solution)
print(f"验证位置:[{verified_pose.x:.3f}, {verified_pose.y:.3f}, {verified_pose.z:.3f}] m")
# 抓取 - 放置任务
print("\n=== 抓取 - 放置任务演示 ===")
pick_pos = [0.3, 0.0, 0.1]
place_pos = [0.3, 0.3, 0.1]
# 注意:这是演示,实际执行需要真实硬件
print(f"任务:从 {pick_pos} 抓取,放置到 {place_pos}")
print("(实际执行需要连接真实机器人硬件)")
print("\n关键观察:")
print("1. 具身 Agent:具有物理身体,能感知和执行")
print("2. 感觉运动系统:传感器 + 执行器闭环")
print("3. 运动学:正运动学(关节→末端)、逆运动学(末端→关节)")
print("4. 动作规划:从任务目标生成执行指令")
print("5. 感知 - 动作闭环:感知→规划→执行→反馈")
print("\n具身智能的核心:身体力行,知行合一")