NSGA-II 算法完整实现
import numpy as np
from typing import List, Tuple, Callable, Dict
import matplotlib.pyplot as plt
from dataclasses import dataclass
import random
@dataclass
class Individual:
"""个体"""
x: np.ndarray # 决策变量
objectives: np.ndarray # 目标值
rank: int = 0 # 非支配等级
crowding_distance: float = 0.0 # 拥挤度距离
class NSGA2:
"""
NSGA-II 多目标优化算法
核心机制:
1. 快速非支配排序
2. 拥挤度距离计算
3. 精英保留策略
"""
def __init__(self,
n_objectives: int,
n_variables: int,
bounds: List[Tuple[float, float]],
population_size: int = 100,
n_generations: int = 250,
crossover_prob: float = 0.9,
mutation_prob: float = 0.1,
eta_c: float = 20.0,
eta_m: float = 20.0):
"""
初始化 NSGA-II
Args:
n_objectives: 目标函数数量
n_variables: 决策变量数量
bounds: 变量边界 [(min, max), ...]
population_size: 种群大小
n_generations: 迭代代数
crossover_prob: 交叉概率
mutation_prob: 变异概率
eta_c: SBX 交叉分布指数
eta_m: 多项式变异分布指数
"""
self.n_objectives = n_objectives
self.n_variables = n_variables
self.bounds = np.array(bounds)
self.pop_size = population_size
self.n_gen = n_generations
self.cx_prob = crossover_prob
self.mut_prob = mutation_prob
self.eta_c = eta_c
self.eta_m = eta_m
self.population: List[Individual] = []
self.pareto_front: List[Individual] = []
self.history = []
def _initialize_population(self) -> List[Individual]:
"""初始化种群"""
population = []
for _ in range(self.pop_size):
# 随机生成决策变量
x = np.random.uniform(
self.bounds[:, 0],
self.bounds[:, 1],
size=self.n_variables
)
population.append(Individual(x=x, objectives=np.zeros(self.n_objectives)))
return population
def _evaluate_objectives(self,
population: List[Individual],
obj_funcs: List[Callable]) -> None:
"""评估目标函数"""
for ind in population:
obj_values = [func(ind.x) for func in obj_funcs]
ind.objectives = np.array(obj_values)
def _fast_non_dominated_sort(self,
population: List[Individual]) -> List[List[int]]:
"""
快速非支配排序
Returns:
fronts: 非支配前沿列表,每个前沿是个体索引列表
"""
N = len(population)
# 初始化
domination_count = np.zeros(N, dtype=int) # 被支配次数
dominated_set = [[] for _ in range(N)] # 支配的个体集合
fronts = [[]] # 前沿列表
ranks = np.zeros(N, dtype=int) # 等级
# 计算支配关系
for i in range(N):
for j in range(i + 1, N):
if self._dominates(population[i], population[j]):
dominated_set[i].append(j)
domination_count[j] += 1
elif self._dominates(population[j], population[i]):
dominated_set[j].append(i)
domination_count[i] += 1
# 构建前沿
for i in range(N):
if domination_count[i] == 0:
ranks[i] = 0
fronts[0].append(i)
current_front = 0
while fronts[current_front]:
next_front = []
for i in fronts[current_front]:
for j in dominated_set[i]:
domination_count[j] -= 1
if domination_count[j] == 0:
ranks[j] = current_front + 1
next_front.append(j)
current_front += 1
if next_front:
fronts.append(next_front)
# 保存等级
for i, ind in enumerate(population):
ind.rank = ranks[i]
return fronts
def _dominates(self, ind1: Individual, ind2: Individual) -> bool:
"""
判断 ind1 是否支配 ind2
支配定义:ind1 在所有目标上都不差于 ind2,且至少有一个目标严格优于 ind2
"""
better_in_one = False
for i in range(self.n_objectives):
if ind1.objectives[i] > ind2.objectives[i]:
return False # ind1 在目标 i 上更差
elif ind1.objectives[i] < ind2.objectives[i]:
better_in_one = True
return better_in_one
def _calculate_crowding_distance(self,
population: List[Individual],
front: List[int]) -> None:
"""
计算拥挤度距离
Args:
population: 种群
front: 前沿个体索引列表
"""
n = len(front)
if n == 0:
return
# 初始化距离为 0
distances = np.zeros(n)
# 对每个目标计算距离
for m in range(self.n_objectives):
# 按目标 m 排序
sorted_indices = sorted(
range(n),
key=lambda i: population[front[i]].objectives[m]
)
# 边界个体距离设为无穷大
distances[sorted_indices[0]] = float('inf')
distances[sorted_indices[-1]] = float('inf')
# 计算中间个体距离
obj_range = (population[front[sorted_indices[-1]]].objectives[m] -
population[front[sorted_indices[0]]].objectives[m])
if obj_range == 0:
continue
for i in range(1, n - 1):
distances[sorted_indices[i]] += (
population[front[sorted_indices[i + 1]]].objectives[m] -
population[front[sorted_indices[i - 1]]].objectives[m]
) / obj_range
# 保存距离
for i, idx in enumerate(front):
population[idx].crowding_distance = distances[i]
def _tournament_selection(self,
population: List[Individual],
tournament_size: int = 2) -> Individual:
"""锦标赛选择"""
candidates = random.sample(population, tournament_size)
# 选择等级高的,等级相同选择拥挤度大的
best = candidates[0]
for cand in candidates[1:]:
if (cand.rank < best.rank or
(cand.rank == best.rank and
cand.crowding_distance > best.crowding_distance)):
best = cand
return best
def _sbx_crossover(self,
parent1: Individual,
parent2: Individual) -> Tuple[np.ndarray, np.ndarray]:
"""
SBX (Simulated Binary Crossover) 交叉
Returns:
child1, child2: 子代决策变量
"""
child1 = np.zeros(self.n_variables)
child2 = np.zeros(self.n_variables)
for i in range(self.n_variables):
if random.random() > self.cx_prob:
# 不交叉
child1[i] = parent1.x[i]
child2[i] = parent2.x[i]
continue
y1 = min(parent1.x[i], parent2.x[i])
y2 = max(parent1.x[i], parent2.x[i])
# 计算 beta
if abs(y2 - y1) < 1e-14:
beta = 1.0
else:
beta = 1.0 + (2.0 * (y1 - self.bounds[i, 0]) /
max(1e-14, y2 - y1))
alpha = 2.0 - beta ** (-(self.eta_c + 1))
rand = random.random()
if rand <= 1.0 / alpha:
betaq = (rand * alpha) ** (1.0 / (self.eta_c + 1))
else:
betaq = (1.0 / (2.0 - rand * alpha)) ** (1.0 / (self.eta_c + 1))
# 生成子代
child1[i] = 0.5 * ((y1 + y2) - betaq * (y2 - y1))
child2[i] = 0.5 * ((y1 + y2) + betaq * (y2 - y1))
# 边界处理
child1[i] = np.clip(child1[i], self.bounds[i, 0], self.bounds[i, 1])
child2[i] = np.clip(child2[i], self.bounds[i, 0], self.bounds[i, 1])
return child1, child2
def _polynomial_mutation(self,
x: np.ndarray) -> np.ndarray:
"""多项式变异"""
for i in range(self.n_variables):
if random.random() > self.mut_prob:
continue
delta_min = x[i] - self.bounds[i, 0]
delta_max = self.bounds[i, 1] - x[i]
rand = random.random()
if rand < 0.5:
delta = delta_min / (self.bounds[i, 1] - self.bounds[i, 0])
deltaq = (2.0 * rand + (1.0 - 2.0 * rand) *
(1.0 - delta) ** (self.eta_m + 1)) ** (1.0 / (self.eta_m + 1)) - 1.0
else:
delta = delta_max / (self.bounds[i, 1] - self.bounds[i, 0])
deltaq = 1.0 - (2.0 * (1.0 - rand) + 2.0 * (rand - 0.5) *
(1.0 - delta) ** (self.eta_m + 1)) ** (1.0 / (self.eta_m + 1))
x[i] = x[i] + deltaq * (self.bounds[i, 1] - self.bounds[i, 0])
x[i] = np.clip(x[i], self.bounds[i, 0], self.bounds[i, 1])
return x
def optimize(self,
obj_funcs: List[Callable]) -> List[Individual]:
"""
执行 NSGA-II 优化
Args:
obj_funcs: 目标函数列表
Returns:
pareto_front: Pareto 最优解集
"""
print(f"开始 NSGA-II 优化...")
print(f" 目标数:{self.n_objectives}")
print(f" 变量数:{self.n_variables}")
print(f" 种群大小:{self.pop_size}")
print(f" 迭代代数:{self.n_gen}")
print()
# 初始化种群
self.population = self._initialize_population()
self._evaluate_objectives(self.population, obj_funcs)
# 进化循环
for gen in range(self.n_gen):
# 1. 非支配排序
fronts = self._fast_non_dominated_sort(self.population)
# 2. 计算拥挤度距离
for front in fronts:
self._calculate_crowding_distance(self.population, front)
# 3. 记录历史
if gen % 10 == 0 or gen == self.n_gen - 1:
pareto_front = [ind for ind in self.population if ind.rank == 0]
self.history.append({
'generation': gen,
'n_pareto': len(pareto_front),
'pareto_solutions': pareto_front.copy()
})
print(f"Generation {gen}: {len(pareto_front)} Pareto 解")
# 4. 生成子代
offspring = []
while len(offspring) < self.pop_size:
# 选择
parent1 = self._tournament_selection(self.population)
parent2 = self._tournament_selection(self.population)
# 交叉
child1_x, child2_x = self._sbx_crossover(parent1, parent2)
# 变异
child1_x = self._polynomial_mutation(child1_x)
child2_x = self._polynomial_mutation(child2_x)
# 创建子代个体
child1 = Individual(x=child1_x, objectives=np.zeros(self.n_objectives))
child2 = Individual(x=child2_x, objectives=np.zeros(self.n_objectives))
offspring.append(child1)
if len(offspring) < self.pop_size:
offspring.append(child2)
# 评估子代
self._evaluate_objectives(offspring, obj_funcs)
# 5. 精英保留:合并父代和子代
combined = self.population + offspring
# 6. 非支配排序
fronts = self._fast_non_dominated_sort(combined)
# 7. 计算拥挤度
for front in fronts:
self._calculate_crowding_distance(combined, front)
# 8. 选择下一代种群
new_population = []
front_idx = 0
while len(new_population) + len(fronts[front_idx]) <= self.pop_size:
# 整个前沿加入
for idx in fronts[front_idx]:
new_population.append(combined[idx])
front_idx += 1
# 最后一个前沿按拥挤度排序选择
if len(new_population) < self.pop_size and front_idx < len(fronts):
last_front = fronts[front_idx]
# 按拥挤度降序排序
last_front_sorted = sorted(
last_front,
key=lambda idx: combined[idx].crowding_distance,
reverse=True
)
# 选择剩余名额
remaining = self.pop_size - len(new_population)
for idx in last_front_sorted[:remaining]:
new_population.append(combined[idx])
self.population = new_population
# 提取最终 Pareto 前沿
self.pareto_front = [ind for ind in self.population if ind.rank == 0]
print(f"\n优化完成!")
print(f" Pareto 解数量:{len(self.pareto_front)}")
return self.pareto_front
def plot_pareto_front(self,
obj1_idx: int = 0,
obj2_idx: int = 1,
title: str = "Pareto Front") -> None:
"""绘制 Pareto 前沿(2D)"""
if self.n_objectives < 2:
print("需要至少 2 个目标才能绘制 2D Pareto 前沿")
return
obj1 = [ind.objectives[obj1_idx] for ind in self.pareto_front]
obj2 = [ind.objectives[obj2_idx] for ind in self.pareto_front]
plt.figure(figsize=(10, 8))
plt.scatter(obj1, obj2, c='blue', alpha=0.6, s=50, edgecolors='black')
plt.xlabel(f'Objective {obj1_idx + 1}')
plt.ylabel(f'Objective {obj2_idx + 1}')
plt.title(title)
plt.grid(True, alpha=0.3)
plt.show()
# 使用示例:ZDT1 测试函数
if __name__ == "__main__":
# ZDT1 测试函数(2 目标)
def zdt1_f1(x):
return x[0]
def zdt1_f2(x):
g = 1 + 9 * np.sum(x[1:]) / (len(x) - 1)
h = 1 - np.sqrt(x[0] / g)
return g * h
# 初始化 NSGA-II
n_vars = 30
bounds = [(0, 1)] * n_vars
nsga2 = NSGA2(
n_objectives=2,
n_variables=n_vars,
bounds=bounds,
population_size=100,
n_generations=250
)
# 优化
pareto_solutions = nsga2.optimize([zdt1_f1, zdt1_f2])
# 绘制 Pareto 前沿
nsga2.plot_pareto_front(title="NSGA-II on ZDT1")
print("\n关键观察:")
print("1. 快速非支配排序:O(MN²) 复杂度,高效分层")
print("2. 拥挤度距离:保持 Pareto 前沿多样性")
print("3. 精英保留:父代 + 子代竞争,确保收敛")
print("4. SBX 交叉 + 多项式变异:实数编码优化")
print("5. 适用于 2-3 目标问题(Many-Objective 用 NSGA-III)")