角色分工与能力协同完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import hashlib
import json
from collections import defaultdict
import heapq
import random
class RoleType(Enum):
"""角色类型"""
LEADER = "leader" # 领导者
PLANNER = "planner" # 规划者
EXECUTOR = "executor" # 执行者
EVALUATOR = "evaluator" # 评估者
COORDINATOR = "coordinator" # 协调者
SPECIALIST = "specialist" # 专家
GENERALIST = "generalist" # 多面手
class CapabilityType(Enum):
"""能力类型"""
COGNITIVE = "cognitive" # 认知能力
TECHNICAL = "technical" # 技术能力
SOCIAL = "social" # 社交能力
PHYSICAL = "physical" # 物理能力
CREATIVE = "creative" # 创造能力
@dataclass
class Capability:
"""能力"""
type: CapabilityType
name: str
level: float # 0-1
experience: int # 经验值
last_used: datetime = field(default_factory=datetime.now)
def decay(self, decay_rate: float = 0.01):
"""能力衰减"""
days_since_use = (datetime.now() - self.last_used).days
self.level = max(0, self.level * (1 - decay_rate * days_since_use))
@dataclass
class AgentProfile:
"""智能体画像"""
id: str
capabilities: Dict[str, Capability]
roles: List[RoleType] = field(default_factory=list)
current_role: RoleType = None
task_history: List[Dict[str, Any]] = field(default_factory=list)
performance_score: float = 0.0
collaboration_score: float = 0.0
@dataclass
class Task:
"""任务"""
id: str
description: str
required_capabilities: Dict[str, float]
difficulty: float
priority: int
deadline: datetime
assigned_to: str = None
status: str = "pending"
reward: float = 0.0
@dataclass
class Role:
"""角色"""
id: str
role_type: RoleType
description: str
required_capabilities: Dict[str, float]
responsibilities: List[str]
assigned_agents: List[str] = field(default_factory=list)
class CapabilityMatcher:
"""
能力匹配器
支持:
1. 能力评估
2. 任务 - 能力匹配
3. 角色 - 能力匹配
4. 团队能力互补
"""
def __init__(self):
self.matching_history: List[Dict[str, Any]] = []
def calculate_match_score(self, agent: AgentProfile,
required_caps: Dict[str, float]) -> float:
"""计算能力匹配分数"""
if not required_caps:
return 0.0
total_score = 0.0
total_weight = 0.0
for cap_name, required_level in required_caps.items():
agent_cap = agent.capabilities.get(cap_name)
if not agent_cap:
# 缺少该能力,严重扣分
total_score -= required_level * 2
else:
# 能力匹配度
match = min(agent_cap.level / required_level, 1.0) if required_level > 0 else 1.0
total_score += match * required_level
total_weight += required_level
return total_score / total_weight if total_weight > 0 else 0.0
def find_best_agent(self, task: Task,
agents: Dict[str, AgentProfile]) -> Optional[str]:
"""为任务找到最佳智能体"""
best_agent = None
best_score = -float('inf')
for agent_id, agent in agents.items():
score = self.calculate_match_score(agent, task.required_capabilities)
# 考虑性能分数
score *= (1 + agent.performance_score * 0.3)
# 考虑当前负载(简化:任务历史数量)
load_penalty = len(agent.task_history) * 0.05
score -= load_penalty
if score > best_score:
best_score = score
best_agent = agent_id
return best_agent
def build_complementary_team(self, task: Task,
agents: Dict[str, AgentProfile],
team_size: int = 3) -> List[str]:
"""构建能力互补的团队"""
selected = []
remaining_caps = dict(task.required_capabilities)
for _ in range(team_size):
best_agent = None
best_complement_score = -float('inf')
for agent_id, agent in agents.items():
if agent_id in selected:
continue
# 计算互补分数
complement_score = 0.0
for cap_name, required_level in remaining_caps.items():
agent_cap = agent.capabilities.get(cap_name)
if agent_cap and agent_cap.level > 0:
# 填补能力缺口
gap = max(0, required_level - sum(
agents[a].capabilities.get(cap_name, Capability(CapabilityType.TECHNICAL, cap_name, 0, 0)).level
for a in selected
))
complement_score += min(agent_cap.level, gap)
if complement_score > best_complement_score:
best_complement_score = complement_score
best_agent = agent_id
if best_agent:
selected.append(best_agent)
# 更新剩余能力需求
for cap_name in list(remaining_caps.keys()):
agent_cap = agents[best_agent].capabilities.get(cap_name)
if agent_cap:
remaining_caps[cap_name] = max(0, remaining_caps[cap_name] - agent_cap.level)
if remaining_caps[cap_name] == 0:
del remaining_caps[cap_name]
return selected
class RoleManager:
"""
角色管理器
支持:
1. 角色定义
2. 角色分配
3. 动态调整
4. 角色评估
"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.role_assignments: Dict[str, str] = {} # agent_id -> role_id
self.role_history: List[Dict[str, Any]] = []
def define_role(self, role: Role):
"""定义角色"""
self.roles[role.id] = role
def assign_role(self, agent_id: str, role_id: str,
agents: Dict[str, AgentProfile]) -> bool:
"""分配角色"""
if role_id not in self.roles:
return False
role = self.roles[role_id]
agent = agents.get(agent_id)
if not agent:
return False
# 检查能力匹配
matcher = CapabilityMatcher()
match_score = matcher.calculate_match_score(agent, role.required_capabilities)
if match_score < 0.5: # 匹配度低于 50% 不分配
return False
# 分配角色
self.role_assignments[agent_id] = role_id
agent.current_role = role.role_type
role.assigned_agents.append(agent_id)
# 记录历史
self.role_history.append({
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"role_id": role_id,
"match_score": match_score
})
return True
def get_role_stats(self) -> Dict[str, Any]:
"""获取角色统计"""
return {
"total_roles": len(self.roles),
"assigned_roles": len(self.role_assignments),
"unassigned_agents": sum(1 for r in self.roles.values() if not r.assigned_agents),
"role_distribution": {
role_id: len(role.assigned_agents)
for role_id, role in self.roles.items()
}
}
class TeamBuilder:
"""
团队构建器
支持:
1. 团队形成
2. 组织结构
3. 联盟构建
4. 团队优化
"""
def __init__(self):
self.teams: Dict[str, List[str]] = {}
self.team_performance: Dict[str, float] = {}
def form_team(self, team_id: str, agents: List[str],
task: Task, agents_profile: Dict[str, AgentProfile]):
"""形成团队"""
self.teams[team_id] = agents
# 计算团队能力
team_capabilities = defaultdict(float)
for agent_id in agents:
agent = agents_profile.get(agent_id)
if agent:
for cap_name, cap in agent.capabilities.items():
team_capabilities[cap_name] = max(team_capabilities[cap_name], cap.level)
# 评估团队匹配度
matcher = CapabilityMatcher()
match_score = 0.0
for cap_name, required_level in task.required_capabilities.items():
team_level = team_capabilities.get(cap_name, 0)
match_score += min(team_level / required_level, 1.0) * required_level
total_required = sum(task.required_capabilities.values())
self.team_performance[team_id] = match_score / total_required if total_required > 0 else 0
def optimize_team(self, team_id: str, agents_profile: Dict[str, AgentProfile],
available_agents: Dict[str, AgentProfile]) -> List[str]:
"""优化团队"""
if team_id not in self.teams:
return []
current_team = self.teams[team_id]
best_team = current_team.copy()
best_score = self.team_performance.get(team_id, 0)
# 尝试替换每个成员
for i, current_agent in enumerate(current_team):
for new_agent_id in available_agents:
if new_agent_id in current_team:
continue
# 创建新团队
new_team = current_team.copy()
new_team[i] = new_agent_id
# 评估新团队(简化)
new_score = random.uniform(0.5, 1.0) # 实际应计算
if new_score > best_score:
best_score = new_score
best_team = new_team
self.teams[team_id] = best_team
self.team_performance[team_id] = best_score
return best_team
class CollaborationOptimizer:
"""
协同优化器
支持:
1. 协同评估
2. 冲突检测
3. 效能优化
4. 动态调整
"""
def __init__(self):
self.collaboration_history: List[Dict[str, Any]] = []
self.conflicts: List[Dict[str, Any]] = []
def evaluate_collaboration(self, team: List[str],
agents_profile: Dict[str, AgentProfile]) -> float:
"""评估协同效能"""
if not team:
return 0.0
# 计算能力互补性
capabilities = []
for agent_id in team:
agent = agents_profile.get(agent_id)
if agent:
cap_vector = [cap.level for cap in agent.capabilities.values()]
capabilities.append(cap_vector)
if not capabilities:
return 0.0
# 计算互补性(简化:方差)
complementarity = np.var(capabilities, axis=0).mean()
# 考虑协作历史
collaboration_scores = [agents_profile[aid].collaboration_score for aid in team if aid in agents_profile]
avg_collab = np.mean(collaboration_scores) if collaboration_scores else 0.5
return (complementarity + avg_collab) / 2
def detect_conflicts(self, team: List[str],
agents_profile: Dict[str, AgentProfile]) -> List[Dict[str, Any]]:
"""检测冲突"""
conflicts = []
# 检测角色冲突
roles = [agents_profile[aid].current_role for aid in team if aid in agents_profile]
role_counts = defaultdict(int)
for role in roles:
if role:
role_counts[role] += 1
for role, count in role_counts.items():
if count > 1:
conflicts.append({
"type": "role_conflict",
"description": f"Multiple agents with role {role.value}",
"severity": count - 1
})
# 检测能力重叠
# 简化实现
self.conflicts.extend(conflicts)
return conflicts
def resolve_conflicts(self, conflicts: List[Dict[str, Any]],
team: List[str],
agents_profile: Dict[str, AgentProfile]) -> bool:
"""解决冲突"""
for conflict in conflicts:
if conflict["type"] == "role_conflict":
# 重新分配角色(简化)
pass
return len(conflicts) == 0
# 使用示例
if __name__ == "__main__":
print("=== 多智能体角色分工与能力协同 ===\n")
# 创建组件
matcher = CapabilityMatcher()
role_manager = RoleManager()
team_builder = TeamBuilder()
collab_optimizer = CollaborationOptimizer()
print("=== 定义角色 ===")
# 定义角色
roles_data = [
{"id": "leader", "type": RoleType.LEADER, "desc": "团队领导", "caps": {"leadership": 0.9, "communication": 0.8}},
{"id": "planner", "type": RoleType.PLANNER, "desc": "任务规划", "caps": {"planning": 0.9, "analysis": 0.8}},
{"id": "executor", "type": RoleType.EXECUTOR, "desc": "任务执行", "caps": {"execution": 0.9, "efficiency": 0.8}},
]
for data in roles_data:
role = Role(
id=data["id"],
role_type=data["type"],
description=data["desc"],
required_capabilities=data["caps"],
responsibilities=["Execute tasks"]
)
role_manager.define_role(role)
print(f"定义角色:{data['id']} - {data['desc']}")
print(f"\n=== 创建智能体 ===")
# 创建智能体
agents = {}
agents_data = [
{"id": "agent1", "caps": {"leadership": 0.9, "communication": 0.85, "planning": 0.7}},
{"id": "agent2", "caps": {"planning": 0.95, "analysis": 0.9, "execution": 0.6}},
{"id": "agent3", "caps": {"execution": 0.95, "efficiency": 0.9, "technical": 0.8}},
{"id": "agent4", "caps": {"communication": 0.9, "social": 0.85, "coordination": 0.8}},
]
for data in agents_data:
caps = {name: Capability(CapabilityType.COGNITIVE, name, level, 10)
for name, level in data["caps"].items()}
agent = AgentProfile(id=data["id"], capabilities=caps)
agents[data["id"]] = agent
print(f"创建智能体:{data['id']} - 能力:{list(caps.keys())}")
print(f"\n=== 分配角色 ===")
# 分配角色
assignments = [
("agent1", "leader"),
("agent2", "planner"),
("agent3", "executor"),
]
for agent_id, role_id in assignments:
success = role_manager.assign_role(agent_id, role_id, agents)
status = "✓" if success else "✗"
print(f"{status} {agent_id} -> {role_id}")
print(f"\n=== 创建任务 ===")
# 创建任务
task = Task(
id="task1",
description="Complex project requiring multiple skills",
required_capabilities={"planning": 0.8, "execution": 0.8, "leadership": 0.7},
difficulty=0.8,
priority=1,
deadline=datetime.now(),
reward=100.0
)
print(f"创建任务:{task.id} - {task.description[:40]}...")
print(f"\n=== 能力匹配 ===")
# 为任务找到最佳智能体
best_agent = matcher.find_best_agent(task, agents)
print(f"最佳匹配智能体:{best_agent}")
print(f"\n=== 构建团队 ===")
# 构建互补团队
team = matcher.build_complementary_team(task, agents, team_size=3)
print(f"互补团队:{team}")
# 形成团队
team_builder.form_team("team1", team, task, agents)
print(f"团队形成:team1 - 成员:{team}")
print(f"\n=== 协同评估 ===")
# 评估协同效能
collab_score = collab_optimizer.evaluate_collaboration(team, agents)
print(f"协同效能分数:{collab_score:.2f}")
# 检测冲突
conflicts = collab_optimizer.detect_conflicts(team, agents)
if conflicts:
print(f"检测到冲突:{len(conflicts)}")
for conflict in conflicts:
print(f" - {conflict['type']}: {conflict['description']}")
else:
print("无冲突")
print(f"\n=== 角色统计 ===")
stats = role_manager.get_role_stats()
print(f"总角色数:{stats['total_roles']}")
print(f"已分配角色:{stats['assigned_roles']}")
print(f"角色分布:{stats['role_distribution']}")
print(f"\n关键观察:")
print("1. 角色分工:专业化提升效率(leader, planner, executor)")
print("2. 能力匹配:量化评估实现精准匹配")
print("3. 团队构建:能力互补实现 1+1>2")
print("4. 协同优化:检测冲突、提升效能")
print("5. 动态调整:根据情况优化团队")
print("\n协同的核心:角色分工 + 能力匹配 + 团队构建 + 协同优化 = 群体智能")