仿真环境与训练平台完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import random
from collections import defaultdict
import copy
class EnvironmentType(Enum):
"""环境类型"""
GRID_WORLD = "grid_world" # 网格世界
CONTINUOUS = "continuous" # 连续空间
GRAPH = "graph" # 图结构
PHYSICS = "physics" # 物理引擎
class ObservationType(Enum):
"""观测类型"""
VECTOR = "vector" # 向量观测
IMAGE = "image" # 图像观测
DICT = "dict" # 字典观测
@dataclass
class AgentConfig:
"""智能体配置"""
id: str
initial_position: np.ndarray
observation_space: Tuple
action_space: Tuple
max_speed: float = 1.0
sensor_range: float = 10.0
@dataclass
class EnvironmentConfig:
"""环境配置"""
env_type: EnvironmentType
observation_type: ObservationType
num_agents: int
world_size: Tuple
time_limit: int = 1000
render_mode: str = "human"
class PhysicsEngine:
"""
物理引擎
支持:
1. 刚体动力学
2. 碰撞检测
3. 摩擦力模拟
4. 重力模拟
"""
def __init__(self, gravity: float = 9.8, friction: float = 0.1):
self.gravity = gravity
self.friction = friction
self.objects: Dict[str, Dict[str, Any]] = {}
def add_object(self, obj_id: str, mass: float, position: np.ndarray,
velocity: np.ndarray = None, shape: str = "sphere",
size: float = 1.0):
"""添加物理对象"""
self.objects[obj_id] = {
"mass": mass,
"position": position.copy(),
"velocity": velocity if velocity is not None else np.zeros_like(position),
"acceleration": np.zeros_like(position),
"shape": shape,
"size": size,
"forces": []
}
def apply_force(self, obj_id: str, force: np.ndarray):
"""施加力"""
if obj_id in self.objects:
self.objects[obj_id]["forces"].append(force.copy())
def detect_collision(self, obj1_id: str, obj2_id: str) -> bool:
"""碰撞检测 (简化版)"""
obj1 = self.objects.get(obj1_id)
obj2 = self.objects.get(obj2_id)
if not obj1 or not obj2:
return False
# 计算距离
distance = np.linalg.norm(obj1["position"] - obj2["position"])
min_distance = obj1["size"] + obj2["size"]
return distance < min_distance
def step(self, dt: float = 0.01):
"""物理步进"""
for obj_id, obj in self.objects.items():
# 合力计算
total_force = np.zeros_like(obj["position"])
# 重力
gravity_force = np.array([0, -self.gravity * obj["mass"]])
total_force += gravity_force
# 其他力
for force in obj["forces"]:
total_force += force
# 摩擦力
friction_force = -self.friction * obj["velocity"]
total_force += friction_force
# 牛顿第二定律:F = ma
obj["acceleration"] = total_force / obj["mass"]
# 更新速度
obj["velocity"] += obj["acceleration"] * dt
# 更新位置
obj["position"] += obj["velocity"] * dt
# 清除力
obj["forces"] = []
def get_state(self, obj_id: str) -> Dict[str, Any]:
"""获取对象状态"""
if obj_id not in self.objects:
return {}
obj = self.objects[obj_id]
return {
"position": obj["position"].copy(),
"velocity": obj["velocity"].copy(),
"acceleration": obj["acceleration"].copy()
}
class MultiAgentEnvironment:
"""
多智能体仿真环境
支持:
1. 多智能体管理
2. 状态观测
3. 动作执行
4. 奖励计算
"""
def __init__(self, config: EnvironmentConfig):
self.config = config
self.agents: Dict[str, Dict[str, Any]] = {}
self.physics_engine = PhysicsEngine()
self.current_step = 0
self.done = False
self.info: Dict[str, Any] = {}
# 初始化环境
self._initialize_world()
def _initialize_world(self):
"""初始化世界"""
if self.config.env_type == EnvironmentType.GRID_WORLD:
self.world = np.zeros(self.config.world_size)
elif self.config.env_type == EnvironmentType.CONTINUOUS:
self.world_bounds = {
"x": (0, self.config.world_size[0]),
"y": (0, self.config.world_size[1])
}
def register_agent(self, agent_config: AgentConfig):
"""注册智能体"""
agent_id = agent_config.id
self.agents[agent_id] = {
"config": agent_config,
"position": agent_config.initial_position.copy(),
"velocity": np.zeros_like(agent_config.initial_position),
"observation": None,
"reward": 0.0,
"done": False
}
# 在物理引擎中添加对象
self.physics_engine.add_object(
agent_id,
mass=1.0,
position=agent_config.initial_position,
shape="circle",
size=0.5
)
def get_observation(self, agent_id: str) -> np.ndarray:
"""获取观测"""
if agent_id not in self.agents:
return np.array([])
agent = self.agents[agent_id]
# 简化:返回位置 + 速度 + 附近智能体
obs = np.concatenate([
agent["position"],
agent["velocity"]
])
# 添加附近智能体信息
nearby_agents = []
for other_id, other in self.agents.items():
if other_id != agent_id:
distance = np.linalg.norm(agent["position"] - other["position"])
if distance < agent["config"].sensor_range:
relative_pos = other["position"] - agent["position"]
relative_vel = other["velocity"] - agent["velocity"]
nearby_agents.extend([distance, *relative_pos, *relative_vel])
# 填充或截断到固定长度
max_nearby = 10
if len(nearby_agents) < max_nearby * 5:
nearby_agents.extend([0] * (max_nearby * 5 - len(nearby_agents)))
else:
nearby_agents = nearby_agents[:max_nearby * 5]
obs = np.concatenate([obs, nearby_agents])
agent["observation"] = obs
return obs
def step(self, actions: Dict[str, np.ndarray]) -> Tuple[Dict[str, np.ndarray],
Dict[str, float],
bool, Dict[str, Any]]:
"""环境步进"""
self.current_step += 1
rewards = {}
observations = {}
# 执行动作
for agent_id, action in actions.items():
if agent_id not in self.agents:
continue
agent = self.agents[agent_id]
# 解析动作 (简化:动作是力向量)
force = action[:2] if len(action) >= 2 else np.zeros(2)
# 施加力到物理引擎
self.physics_engine.apply_force(agent_id, force * 10.0)
# 物理步进
self.physics_engine.step(dt=0.1)
# 更新智能体状态
for agent_id in self.agents:
state = self.physics_engine.get_state(agent_id)
if state:
self.agents[agent_id]["position"] = state["position"]
self.agents[agent_id]["velocity"] = state["velocity"]
# 计算奖励 (简化:鼓励移动)
speed = np.linalg.norm(self.agents[agent_id]["velocity"])
rewards[agent_id] = speed * 0.1
# 获取新观测
observations[agent_id] = self.get_observation(agent_id)
# 检查终止条件
self.done = self.current_step >= self.config.time_limit
self.info = {
"step": self.current_step,
"num_agents": len(self.agents),
"done": self.done
}
return observations, rewards, self.done, self.info
def reset(self) -> Dict[str, np.ndarray]:
"""重置环境"""
self.current_step = 0
self.done = False
observations = {}
for agent_id, agent in self.agents.items():
# 重置位置
agent["position"] = agent["config"].initial_position.copy()
agent["velocity"] = np.zeros_like(agent["position"])
# 重置物理对象
self.physics_engine.objects[agent_id]["position"] = agent["position"].copy()
self.physics_engine.objects[agent_id]["velocity"] = np.zeros_like(agent["position"])
# 获取初始观测
observations[agent_id] = self.get_observation(agent_id)
return observations
def render(self):
"""渲染环境 (简化)"""
print(f"Step: {self.current_step}")
for agent_id, agent in self.agents.items():
pos = agent["position"]
vel = agent["velocity"]
print(f" {agent_id}: pos=({pos[0]:.2f}, {pos[1]:.2f}), vel=({vel[0]:.2f}, {vel[1]:.2f})")
class TrainingPlatform:
"""
训练平台
支持:
1. 并行仿真
2. 分布式训练
3. 模型管理
4. 训练监控
"""
def __init__(self, num_envs: int = 4):
self.num_envs = num_envs
self.environments: List[MultiAgentEnvironment] = []
self.training_history: List[Dict[str, Any]] = []
self.models: Dict[str, Any] = {}
def create_environment(self, config: EnvironmentConfig) -> MultiAgentEnvironment:
"""创建环境"""
env = MultiAgentEnvironment(config)
self.environments.append(env)
return env
def parallel_rollout(self, actions_list: List[Dict[str, np.ndarray]]) -> List[Tuple]:
"""并行 rollout (简化为串行)"""
results = []
for env, actions in zip(self.environments, actions_list):
obs, rewards, done, info = env.step(actions)
results.append((obs, rewards, done, info))
return results
def train_episode(self, policy_fn, max_steps: int = 100) -> Dict[str, Any]:
"""训练一集 (简化)"""
if not self.environments:
return {}
env = self.environments[0]
observations = env.reset()
episode_rewards = defaultdict(float)
trajectory = []
for step in range(max_steps):
# 获取动作
actions = {}
for agent_id, obs in observations.items():
action = policy_fn(agent_id, obs)
actions[agent_id] = action
# 环境步进
observations, rewards, done, info = env.step(actions)
# 记录
for agent_id, reward in rewards.items():
episode_rewards[agent_id] += reward
trajectory.append({
"step": step,
"observations": copy.deepcopy(observations),
"actions": copy.deepcopy(actions),
"rewards": rewards
})
if done:
break
episode_info = {
"timestamp": datetime.now().isoformat(),
"total_steps": len(trajectory),
"rewards": dict(episode_rewards),
"trajectory_length": len(trajectory)
}
self.training_history.append(episode_info)
return episode_info
def save_model(self, model_name: str, model_data: Dict[str, Any]):
"""保存模型"""
self.models[model_name] = {
"data": model_data,
"saved_at": datetime.now().isoformat()
}
def load_model(self, model_name: str) -> Optional[Dict[str, Any]]:
"""加载模型"""
if model_name in self.models:
return self.models[model_name]["data"]
return None
def get_training_stats(self) -> Dict[str, Any]:
"""获取训练统计"""
if not self.training_history:
return {}
total_episodes = len(self.training_history)
avg_rewards = {}
# 计算平均奖励
for episode in self.training_history:
for agent_id, reward in episode["rewards"].items():
if agent_id not in avg_rewards:
avg_rewards[agent_id] = []
avg_rewards[agent_id].append(reward)
avg_rewards = {k: np.mean(v) for k, v in avg_rewards.items()}
return {
"total_episodes": total_episodes,
"average_rewards": avg_rewards,
"latest_episode": self.training_history[-1] if self.training_history else None
}
# 使用示例
if __name__ == "__main__":
print("=== 多智能体仿真环境与训练平台 ===\n")
print("=== 创建仿真环境 ===")
# 创建环境配置
env_config = EnvironmentConfig(
env_type=EnvironmentType.CONTINUOUS,
observation_type=ObservationType.VECTOR,
num_agents=4,
world_size=(50.0, 50.0),
time_limit=200
)
# 创建环境
env = MultiAgentEnvironment(env_config)
# 注册智能体
num_agents = 4
for i in range(num_agents):
agent_config = AgentConfig(
id=f"agent_{i}",
initial_position=np.array([10.0 + i * 5, 25.0]),
observation_space=(16,), # 位置 (2) + 速度 (2) + 附近智能体 (12)
action_space=(2,), # 力向量 (2D)
max_speed=5.0,
sensor_range=15.0
)
env.register_agent(agent_config)
print(f"注册{num_agents}个智能体")
print(f"\n=== 创建训练平台 ===")
# 创建训练平台
platform = TrainingPlatform(num_envs=1)
platform.create_environment(env_config)
print(f"训练平台创建完成,环境数:{platform.num_envs}")
print(f"\n=== 初始观测 ===")
# 获取初始观测
observations = env.reset()
for agent_id, obs in observations.items():
print(f"{agent_id}: 观测维度={obs.shape}, 前 5 个值={obs[:5]}")
print(f"\n=== 随机策略测试 ===")
# 定义随机策略
def random_policy(agent_id, obs):
return np.random.randn(2) * 0.5
# 运行几步
for step in range(5):
actions = {agent_id: random_policy(agent_id, obs)
for agent_id, obs in observations.items()}
observations, rewards, done, info = env.step(actions)
print(f"\n步骤 {step+1}:")
for agent_id, reward in rewards.items():
print(f" {agent_id}: 奖励={reward:.3f}")
if done:
break
print(f"\n=== 训练演示 ===")
# 训练几集
num_episodes = 5
for episode in range(num_episodes):
episode_info = platform.train_episode(random_policy, max_steps=50)
if episode_info:
print(f"集数 {episode+1}:")
print(f" 步数:{episode_info['total_steps']}")
print(f" 总奖励:{episode_info['rewards']}")
print(f"\n=== 训练统计 ===")
stats = platform.get_training_stats()
print(f"训练统计:")
print(f" 总集数:{stats['total_episodes']}")
print(f" 平均奖励:{stats['average_rewards']}")
print(f"\n关键观察:")
print("1. 仿真环境:零成本试错,无限数据生成")
print("2. 物理引擎:刚体动力学,碰撞检测")
print("3. 多智能体:独立观测,独立动作,共享环境")
print("4. 训练平台:并行仿真,模型管理")
print("5. 数字孪生:虚实融合,加速训练")
print("\n仿真的力量:环境 + 物理 + 场景 + 训练 = 智能培养")