do-演算与因果效应估计
import numpy as np
import pandas as pd
import networkx as nx
from typing import Dict, List, Tuple, Optional, Set
from itertools import combinations
from scipy import stats
import warnings
class CausalGraph:
"""因果图类"""
def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]):
"""
初始化因果图
Args:
nodes: 节点列表
edges: 有向边列表 [(parent, child), ...]
"""
self.nodes = nodes
self.edges = edges
self.graph = nx.DiGraph()
self.graph.add_nodes_from(nodes)
self.graph.add_edges_from(edges)
def get_parents(self, node: str) -> Set[str]:
"""获取父节点"""
return set(self.graph.predecessors(node))
def get_children(self, node: str) -> Set[str]:
"""获取子节点"""
return set(self.graph.successors(node))
def get_ancestors(self, node: str) -> Set[str]:
"""获取祖先节点"""
return nx.ancestors(self.graph, node)
def get_descendants(self, node: str) -> Set[str]:
"""获取后代节点"""
return nx.descendants(self.graph, node)
def is_d_separated(self, X: Set[str], Y: Set[str], Z: Set[str]) -> bool:
"""
判断 X 和 Y 是否在给定 Z 条件下 d-分离
d-分离是因果图中判断条件独立性的准则
"""
return nx.d_separation(self.graph, X, Y, Z)
def find_backdoor_paths(self, treatment: str, outcome: str) -> List[List[str]]:
"""
查找从 treatment 到 outcome 的所有后门路径
后门路径:以指向 treatment 的边开始的路径
"""
all_paths = nx.all_simple_paths(self.graph,
source=treatment,
target=outcome)
backdoor_paths = []
for path in all_paths:
# 检查第一条边是否指向 treatment
if len(path) >= 2:
second_node = path[1]
# 如果 second_node -> treatment,则是后门路径
if self.graph.has_edge(second_node, treatment):
backdoor_paths.append(path)
return backdoor_paths
def find_frontdoor_criteria(self, treatment: str, outcome: str) -> List[str]:
"""
查找满足前门准则的中介变量
前门准则条件:
1. M 完全中介 treatment 对 outcome 的影响
2. treatment 到 M 没有未阻断的后门路径
3. M 到 outcome 的所有后门路径都被 treatment 阻断
"""
candidates = []
for node in self.nodes:
if node == treatment or node == outcome:
continue
# 条件 1: treatment -> M -> outcome
has_path_treatment_to_m = nx.has_path(self.graph, treatment, node)
has_path_m_to_outcome = nx.has_path(self.graph, node, outcome)
if not (has_path_treatment_to_m and has_path_m_to_outcome):
continue
# 条件 2: treatment 到 M 没有未阻断的后门路径
# (简化检查:没有指向 treatment 的父节点也指向 M)
treatment_parents = self.get_parents(treatment)
m_parents = self.get_parents(node)
backdoor_treatment_m = treatment_parents.intersection(m_parents)
if len(backdoor_treatment_m) > 0:
continue
# 条件 3: M 到 outcome 的后门路径被 treatment 阻断
# (简化检查)
candidates.append(node)
return candidates
class DoCalculus:
"""
do-演算实现
三条规则:
Rule 1: 插入/删除观测
Rule 2: 交换干预和观测
Rule 3: 插入/删除干预
"""
def __init__(self, causal_graph: CausalGraph):
self.graph = causal_graph
def backdoor_adjustment(self, treatment: str, outcome: str,
data: pd.DataFrame,
confounders: Optional[List[str]] = None) -> Dict:
"""
后门调整公式
P(Y|do(X)) = Σ_z P(Y|X,Z)P(Z)
Args:
treatment: 处理变量 X
outcome: 结果变量 Y
data: 观测数据
confounders: 混淆变量集合 Z(如果为 None,自动识别)
Returns:
causal_effect: 平均因果效应 ACE
ate: 平均处理效应
confidence_interval: 置信区间
"""
if confounders is None:
# 自动识别:treatment 的父节点(排除 outcome)
confounders = list(self.graph.get_parents(treatment) - {outcome})
if len(confounders) == 0:
# 无混淆,直接计算
treated = data[data[treatment] == 1]
control = data[data[treatment] == 0]
ate = treated[outcome].mean() - control[outcome].mean()
se = np.sqrt(treated[outcome].var() / len(treated) +
control[outcome].var() / len(control))
else:
# 分层调整
# 按混淆变量分层
grouped = data.groupby(confounders)
ate = 0
variance = 0
for name, group in grouped:
if len(group) < 10: # 小样本层跳过
continue
stratum_weight = len(group) / len(data)
treated = group[group[treatment] == 1]
control = group[group[treatment] == 0]
if len(treated) == 0 or len(control) == 0:
continue
stratum_effect = treated[outcome].mean() - control[outcome].mean()
stratum_var = (treated[outcome].var() / len(treated) +
control[outcome].var() / len(control))
ate += stratum_weight * stratum_effect
variance += (stratum_weight ** 2) * stratum_var
se = np.sqrt(variance)
# 95% 置信区间
ci_lower = ate - 1.96 * se
ci_upper = ate + 1.96 * se
return {
'ate': ate,
'se': se,
'ci_95': (ci_lower, ci_upper),
'confounders': confounders,
'method': 'backdoor_adjustment'
}
def frontdoor_adjustment(self, treatment: str, outcome: str,
mediator: str, data: pd.DataFrame) -> Dict:
"""
前门调整公式
P(Y|do(X)) = Σ_m P(M=m|X) Σ_x' P(Y|M=m,X=x')P(X=x')
适用于存在未观测混淆变量,但有合适中介变量的情况
"""
# 步骤 1: 估计 P(M|X)
m_given_x = data.groupby([treatment, mediator]).size().unstack(fill_value=0)
m_given_x = m_given_x.div(m_given_x.sum(axis=1), axis=0)
# 步骤 2: 估计 P(Y|M,X)
y_given_mx = data.groupby([mediator, treatment])[outcome].mean().unstack()
# 步骤 3: 估计 P(X)
p_x = data[treatment].value_counts(normalize=True)
# 步骤 4: 计算前门调整估计
# E[Y|do(X=1)] - E[Y|do(X=0)]
ate = 0
for x in [0, 1]:
e_y_do_x = 0
for m in data[mediator].unique():
# P(M=m|X=x)
p_m_x = m_given_x.loc[x, m] if x in m_given_x.index and m in m_given_x.columns else 0
# Σ_x' P(Y|M=m,X=x')P(X=x')
e_y_m = 0
for x_prime in [0, 1]:
p_y_mx = y_given_mx.loc[m, x_prime] if m in y_given_mx.index and x_prime in y_given_mx.columns else 0
p_x_prime = p_x.get(x_prime, 0)
e_y_m += p_y_mx * p_x_prime
e_y_do_x += p_m_x * e_y_m
if x == 1:
e_y_do_x1 = e_y_do_x
else:
e_y_do_x0 = e_y_do_x
ate = e_y_do_x1 - e_y_do_x0
# 简化标准误估计(实际应使用 bootstrap)
se = np.sqrt(data[outcome].var() / len(data))
return {
'ate': ate,
'se': se,
'mediator': mediator,
'method': 'frontdoor_adjustment'
}
def instrumental_variable(self, treatment: str, outcome: str,
instrument: str, data: pd.DataFrame) -> Dict:
"""
工具变量法
适用于存在未观测混淆变量的情况
IV 估计 = Cov(Z,Y) / Cov(Z,X)
"""
z = data[instrument]
x = data[treatment]
y = data[outcome]
# 两阶段最小二乘 (2SLS)
# 第一阶段:X = α + βZ + ε
beta_zx = np.cov(z, x)[0, 1] / np.var(z)
# 第二阶段:Y = α + βX_hat + ε
beta_zy = np.cov(z, y)[0, 1] / np.var(z)
# IV 估计
ate = beta_zy / beta_zx
# 标准误(简化)
se = np.sqrt(data[outcome].var() / len(data)) / abs(beta_zx)
return {
'ate': ate,
'se': se,
'instrument': instrument,
'first_stage_coef': beta_zx,
'method': 'instrumental_variable'
}
# 使用示例
if __name__ == "__main__":
# 创建因果图:经典的混淆结构
# Z -> X -> Y
# Z -> Y
nodes = ['Z', 'X', 'Y']
edges = [('Z', 'X'), ('Z', 'Y'), ('X', 'Y')]
graph = CausalGraph(nodes, edges)
do_calc = DoCalculus(graph)
# 生成模拟数据
np.random.seed(42)
n = 10000
Z = np.random.binomial(1, 0.5, n) # 混淆变量
X = np.random.binomial(1, 1 / (1 + np.exp(-(0.5 * Z - 0.2)))) # 处理变量
Y = (0.3 * X + 0.5 * Z + np.random.normal(0, 0.3, n) > 0.5).astype(int) # 结果变量
data = pd.DataFrame({'Z': Z, 'X': X, 'Y': Y})
print("=== do-演算与因果效应估计 ===\n")
# 1. 后门调整
print("1. 后门调整法:")
result_backdoor = do_calc.backdoor_adjustment('X', 'Y', data, confounders=['Z'])
print(f" ATE: {result_backdoor['ate']:.4f}")
print(f" 95% CI: [{result_backdoor['ci_95'][0]:.4f}, {result_backdoor['ci_95'][1]:.4f}]")
print(f" 混淆变量:{result_backdoor['confounders']}")
print()
# 2. 朴素估计(有偏)
print("2. 朴素估计(未调整混淆,有偏):")
treated = data[data['X'] == 1]
control = data[data['X'] == 0]
naive_ate = treated['Y'].mean() - control['Y'].mean()
print(f" Naive ATE: {naive_ate:.4f} (有偏!)")
print()
# 3. 工具变量法(示例)
print("3. 工具变量法(需要有效工具变量):")
print(" 注意:此例中 Z 不是有效 IV,因为 Z 直接影响 Y")
print(" 有效 IV 需满足:与 X 相关,不直接影响 Y,只通过 X 影响 Y")
print("\n关键观察:")
print("1. do-演算将因果问题(P(Y|do(X)))转化为统计问题(P(Y|X,Z))")
print("2. 后门调整:控制混淆变量 Z,得到无偏因果效应")
print("3. 前门调整:当存在未观测混淆时,使用中介变量")
print("4. 工具变量:当混淆未观测且无合适中介时,使用 IV")
print("5. 因果识别的关键:图结构 + 假设")