🔵 因果推理
🟣 DAG 结构
🟡 反事实
🟢 干预分析
🔴 因果决策

因果推理与反事实决策能力

从相关到因果的认知革命

🔵 因果推理 因果图建模
混淆控制
因果识别
🟣 DAG 结构 结构学习
因果发现
PC/FCI 算法
🟡 反事实 潜在结果
反事实估计
PNS/PN/PS
🟢 干预分析 do-演算
前/后门调整
因果效应
🔴 因果决策 决策理论
策略优化
反事实决策
作者 超级代码智能体
版本 因果智能版 · 第一版
出版日期 2026 年 3 月
全书规模 五编十七章
学科跨度 因果·DAG·反事实·干预·决策

📖 全书目录

第一编 因果推理基础与理论框架

序言:因果智能——从相关到因果的认知革命

因果推理是智能的核心标志:能够理解因果关系,区分相关与因果,进行反事实推理,设计干预策略,做出因果决策。然而,传统机器学习长期受限于"相关性陷阱":只能发现统计关联,无法理解因果机制,不能回答"如果...会怎样"的反事实问题,难以设计有效干预。因果推理与反事实决策能力的兴起正在引发一场认知革命:让 AI 系统像人类一样理解因果、思考反事实、设计干预、做出明智决策

本书的核心论点:因果智能体系通过因果图建模实现因果关系形式化、通过因果发现学习结构、通过反事实推理回答"如果...会怎样"、通过 do-演算设计干预、通过因果决策理论做出最优选择,五层协同,构建能理解、会推理、善干预、可解释、明智决策的全能因果智能系统。

因果智能革命的兴起

从 Pearl 的因果图到 Rubin 的潜在结果框架,从 PC 算法到 NOTEARS,从 do-演算到因果强化学习,因果推理技术快速演进。然而,真正的因果智能面临独特挑战:

  • 因果识别:如何从观测数据中识别因果关系,而非仅仅相关?
  • 混淆控制:如何处理未观测混淆变量(unobserved confounders)?
  • 反事实推理:如何回答"如果当时做了不同选择,结果会怎样"?
  • 干预设计:如何设计最优干预策略,最大化因果效应?
"因果智能不是简单的相关性分析,而是一种认知范式的根本转变。从'相关'到'因果',从'预测'到'干预',从'是什么'到'为什么'和'如果...会怎样'。这种转变让 AI 系统从'曲线拟合者'走向'因果理解者'。"
—— 本书核心洞察

本书结构

第一编 因果推理基础与理论框架:阐述因果关系本质与因果图、潜在结果框架、结构因果模型 SCM 等基础知识。

第二编 因果发现与结构学习:深入剖析因果发现算法概述、基于约束的方法(PC/FCI)、基于分数的方法、连续优化 DAG 学习等核心算法。

第三编 反事实推理与干预分析:详细探讨 do-演算与干预、前门与后门调整、反事实估计方法、因果效应识别等干预技术。

第四编 因果决策与策略优化:涵盖因果决策理论、反事实决策制定、因果强化学习、策略优化与干预选择等决策方法。

第五编 应用案例与未来:分析真实生产案例,展望未来趋势,提供持续学习的资源指引。

"从因果图到反事实推理,从 do-演算到因果决策,从观测数据到干预设计,因果智能体系正在重塑 AI 系统的认知范式。未来的 AI 将更具因果理解力、更善反事实思考、更接近人类的推理能力。"
—— 本书结语预告

—— 作者

2026 年 3 月 9 日 于数字世界

谨以此书献给所有在因果推理与决策科学一线构建 AI 系统的研究者和工程师们

第 8 章 do-演算与干预

8.1 do-算子核心概念

do-演算是 Judea Pearl 提出的因果推理形式化工具,通过 do-算子(do-operator)表示干预操作。do(X=x) 表示强制将变量 X 设置为值 x,切断 X 与其父节点的因果联系,模拟随机实验。do-演算的三条规则允许我们在满足特定图条件时,将包含 do-算子的表达式转换为仅包含观测概率的表达式,从而实现从观测数据中估计因果效应。

do-演算核心思想:干预(do)vs 观测(see)、因果效应 P(Y|do(X)) vs 条件概率 P(Y|X)、后门准则、前门准则、因果识别。

8.2 do-演算完整实现

do-演算 Python 实现

do-演算与因果效应估计
import numpy as np
import pandas as pd
import networkx as nx
from typing import Dict, List, Tuple, Optional, Set
from itertools import combinations
from scipy import stats
import warnings

class CausalGraph:
    """因果图类"""
    
    def __init__(self, nodes: List[str], edges: List[Tuple[str, str]]):
        """
        初始化因果图
        
        Args:
            nodes: 节点列表
            edges: 有向边列表 [(parent, child), ...]
        """
        self.nodes = nodes
        self.edges = edges
        self.graph = nx.DiGraph()
        self.graph.add_nodes_from(nodes)
        self.graph.add_edges_from(edges)
    
    def get_parents(self, node: str) -> Set[str]:
        """获取父节点"""
        return set(self.graph.predecessors(node))
    
    def get_children(self, node: str) -> Set[str]:
        """获取子节点"""
        return set(self.graph.successors(node))
    
    def get_ancestors(self, node: str) -> Set[str]:
        """获取祖先节点"""
        return nx.ancestors(self.graph, node)
    
    def get_descendants(self, node: str) -> Set[str]:
        """获取后代节点"""
        return nx.descendants(self.graph, node)
    
    def is_d_separated(self, X: Set[str], Y: Set[str], Z: Set[str]) -> bool:
        """
        判断 X 和 Y 是否在给定 Z 条件下 d-分离
        
        d-分离是因果图中判断条件独立性的准则
        """
        return nx.d_separation(self.graph, X, Y, Z)
    
    def find_backdoor_paths(self, treatment: str, outcome: str) -> List[List[str]]:
        """
        查找从 treatment 到 outcome 的所有后门路径
        
        后门路径:以指向 treatment 的边开始的路径
        """
        all_paths = nx.all_simple_paths(self.graph, 
                                        source=treatment, 
                                        target=outcome)
        
        backdoor_paths = []
        for path in all_paths:
            # 检查第一条边是否指向 treatment
            if len(path) >= 2:
                second_node = path[1]
                # 如果 second_node -> treatment,则是后门路径
                if self.graph.has_edge(second_node, treatment):
                    backdoor_paths.append(path)
        
        return backdoor_paths
    
    def find_frontdoor_criteria(self, treatment: str, outcome: str) -> List[str]:
        """
        查找满足前门准则的中介变量
        
        前门准则条件:
        1. M 完全中介 treatment 对 outcome 的影响
        2. treatment 到 M 没有未阻断的后门路径
        3. M 到 outcome 的所有后门路径都被 treatment 阻断
        """
        candidates = []
        
        for node in self.nodes:
            if node == treatment or node == outcome:
                continue
            
            # 条件 1: treatment -> M -> outcome
            has_path_treatment_to_m = nx.has_path(self.graph, treatment, node)
            has_path_m_to_outcome = nx.has_path(self.graph, node, outcome)
            
            if not (has_path_treatment_to_m and has_path_m_to_outcome):
                continue
            
            # 条件 2: treatment 到 M 没有未阻断的后门路径
            # (简化检查:没有指向 treatment 的父节点也指向 M)
            treatment_parents = self.get_parents(treatment)
            m_parents = self.get_parents(node)
            
            backdoor_treatment_m = treatment_parents.intersection(m_parents)
            
            if len(backdoor_treatment_m) > 0:
                continue
            
            # 条件 3: M 到 outcome 的后门路径被 treatment 阻断
            # (简化检查)
            candidates.append(node)
        
        return candidates


class DoCalculus:
    """
    do-演算实现
    
    三条规则:
    Rule 1: 插入/删除观测
    Rule 2: 交换干预和观测
    Rule 3: 插入/删除干预
    """
    
    def __init__(self, causal_graph: CausalGraph):
        self.graph = causal_graph
    
    def backdoor_adjustment(self, treatment: str, outcome: str, 
                           data: pd.DataFrame, 
                           confounders: Optional[List[str]] = None) -> Dict:
        """
        后门调整公式
        
        P(Y|do(X)) = Σ_z P(Y|X,Z)P(Z)
        
        Args:
            treatment: 处理变量 X
            outcome: 结果变量 Y
            data: 观测数据
            confounders: 混淆变量集合 Z(如果为 None,自动识别)
        
        Returns:
            causal_effect: 平均因果效应 ACE
            ate: 平均处理效应
            confidence_interval: 置信区间
        """
        if confounders is None:
            # 自动识别:treatment 的父节点(排除 outcome)
            confounders = list(self.graph.get_parents(treatment) - {outcome})
        
        if len(confounders) == 0:
            # 无混淆,直接计算
            treated = data[data[treatment] == 1]
            control = data[data[treatment] == 0]
            
            ate = treated[outcome].mean() - control[outcome].mean()
            se = np.sqrt(treated[outcome].var() / len(treated) + 
                        control[outcome].var() / len(control))
        else:
            # 分层调整
            # 按混淆变量分层
            grouped = data.groupby(confounders)
            
            ate = 0
            variance = 0
            
            for name, group in grouped:
                if len(group) < 10:  # 小样本层跳过
                    continue
                
                stratum_weight = len(group) / len(data)
                
                treated = group[group[treatment] == 1]
                control = group[group[treatment] == 0]
                
                if len(treated) == 0 or len(control) == 0:
                    continue
                
                stratum_effect = treated[outcome].mean() - control[outcome].mean()
                stratum_var = (treated[outcome].var() / len(treated) + 
                              control[outcome].var() / len(control))
                
                ate += stratum_weight * stratum_effect
                variance += (stratum_weight ** 2) * stratum_var
            
            se = np.sqrt(variance)
        
        # 95% 置信区间
        ci_lower = ate - 1.96 * se
        ci_upper = ate + 1.96 * se
        
        return {
            'ate': ate,
            'se': se,
            'ci_95': (ci_lower, ci_upper),
            'confounders': confounders,
            'method': 'backdoor_adjustment'
        }
    
    def frontdoor_adjustment(self, treatment: str, outcome: str, 
                            mediator: str, data: pd.DataFrame) -> Dict:
        """
        前门调整公式
        
        P(Y|do(X)) = Σ_m P(M=m|X) Σ_x' P(Y|M=m,X=x')P(X=x')
        
        适用于存在未观测混淆变量,但有合适中介变量的情况
        """
        # 步骤 1: 估计 P(M|X)
        m_given_x = data.groupby([treatment, mediator]).size().unstack(fill_value=0)
        m_given_x = m_given_x.div(m_given_x.sum(axis=1), axis=0)
        
        # 步骤 2: 估计 P(Y|M,X)
        y_given_mx = data.groupby([mediator, treatment])[outcome].mean().unstack()
        
        # 步骤 3: 估计 P(X)
        p_x = data[treatment].value_counts(normalize=True)
        
        # 步骤 4: 计算前门调整估计
        # E[Y|do(X=1)] - E[Y|do(X=0)]
        
        ate = 0
        
        for x in [0, 1]:
            e_y_do_x = 0
            
            for m in data[mediator].unique():
                # P(M=m|X=x)
                p_m_x = m_given_x.loc[x, m] if x in m_given_x.index and m in m_given_x.columns else 0
                
                # Σ_x' P(Y|M=m,X=x')P(X=x')
                e_y_m = 0
                for x_prime in [0, 1]:
                    p_y_mx = y_given_mx.loc[m, x_prime] if m in y_given_mx.index and x_prime in y_given_mx.columns else 0
                    p_x_prime = p_x.get(x_prime, 0)
                    e_y_m += p_y_mx * p_x_prime
                
                e_y_do_x += p_m_x * e_y_m
            
            if x == 1:
                e_y_do_x1 = e_y_do_x
            else:
                e_y_do_x0 = e_y_do_x
        
        ate = e_y_do_x1 - e_y_do_x0
        
        # 简化标准误估计(实际应使用 bootstrap)
        se = np.sqrt(data[outcome].var() / len(data))
        
        return {
            'ate': ate,
            'se': se,
            'mediator': mediator,
            'method': 'frontdoor_adjustment'
        }
    
    def instrumental_variable(self, treatment: str, outcome: str, 
                             instrument: str, data: pd.DataFrame) -> Dict:
        """
        工具变量法
        
        适用于存在未观测混淆变量的情况
        
        IV 估计 = Cov(Z,Y) / Cov(Z,X)
        """
        z = data[instrument]
        x = data[treatment]
        y = data[outcome]
        
        # 两阶段最小二乘 (2SLS)
        # 第一阶段:X = α + βZ + ε
        beta_zx = np.cov(z, x)[0, 1] / np.var(z)
        
        # 第二阶段:Y = α + βX_hat + ε
        beta_zy = np.cov(z, y)[0, 1] / np.var(z)
        
        # IV 估计
        ate = beta_zy / beta_zx
        
        # 标准误(简化)
        se = np.sqrt(data[outcome].var() / len(data)) / abs(beta_zx)
        
        return {
            'ate': ate,
            'se': se,
            'instrument': instrument,
            'first_stage_coef': beta_zx,
            'method': 'instrumental_variable'
        }


# 使用示例
if __name__ == "__main__":
    # 创建因果图:经典的混淆结构
    # Z -> X -> Y
    # Z -> Y
    nodes = ['Z', 'X', 'Y']
    edges = [('Z', 'X'), ('Z', 'Y'), ('X', 'Y')]
    
    graph = CausalGraph(nodes, edges)
    do_calc = DoCalculus(graph)
    
    # 生成模拟数据
    np.random.seed(42)
    n = 10000
    
    Z = np.random.binomial(1, 0.5, n)  # 混淆变量
    X = np.random.binomial(1, 1 / (1 + np.exp(-(0.5 * Z - 0.2))))  # 处理变量
    Y = (0.3 * X + 0.5 * Z + np.random.normal(0, 0.3, n) > 0.5).astype(int)  # 结果变量
    
    data = pd.DataFrame({'Z': Z, 'X': X, 'Y': Y})
    
    print("=== do-演算与因果效应估计 ===\n")
    
    # 1. 后门调整
    print("1. 后门调整法:")
    result_backdoor = do_calc.backdoor_adjustment('X', 'Y', data, confounders=['Z'])
    print(f"   ATE: {result_backdoor['ate']:.4f}")
    print(f"   95% CI: [{result_backdoor['ci_95'][0]:.4f}, {result_backdoor['ci_95'][1]:.4f}]")
    print(f"   混淆变量:{result_backdoor['confounders']}")
    print()
    
    # 2. 朴素估计(有偏)
    print("2. 朴素估计(未调整混淆,有偏):")
    treated = data[data['X'] == 1]
    control = data[data['X'] == 0]
    naive_ate = treated['Y'].mean() - control['Y'].mean()
    print(f"   Naive ATE: {naive_ate:.4f} (有偏!)")
    print()
    
    # 3. 工具变量法(示例)
    print("3. 工具变量法(需要有效工具变量):")
    print("   注意:此例中 Z 不是有效 IV,因为 Z 直接影响 Y")
    print("   有效 IV 需满足:与 X 相关,不直接影响 Y,只通过 X 影响 Y")
    
    print("\n关键观察:")
    print("1. do-演算将因果问题(P(Y|do(X)))转化为统计问题(P(Y|X,Z))")
    print("2. 后门调整:控制混淆变量 Z,得到无偏因果效应")
    print("3. 前门调整:当存在未观测混淆时,使用中介变量")
    print("4. 工具变量:当混淆未观测且无合适中介时,使用 IV")
    print("5. 因果识别的关键:图结构 + 假设")

8.3 do-演算三规则

从理论到实践

do-演算的三条规则提供了将 do-表达式转换为观测表达式的完整工具集:

  • Rule 1(插入/删除观测):P(y|do(x),z,w) = P(y|do(x),w),如果 Y ⊥ Z | X,W 在 G_X̄ 中
  • Rule 2(交换干预和观测):P(y|do(x),do(z),w) = P(y|do(x),z,w),如果 Y ⊥ Z | X,W 在 G_X̄,Z̄ 中
  • Rule 3(插入/删除干预):P(y|do(x),do(z),w) = P(y|do(x),w),如果 Y ⊥ Z | X,W 在 G_X̄,Z̄(W) 中
"do-演算的完备性定理:如果一个因果效应在给定图结构下是可识别的,那么一定可以通过有限次应用三条规则将其转换为观测表达式。这是因果推理的'哥德尔完备性定理'。"
—— Pearl (1995)

8.4 本章小结

本章深入探讨了 do-演算与干预。关键要点:

  • do-算子:形式化干预,P(Y|do(X)) vs P(Y|X)
  • 后门调整:控制混淆变量,得到无偏因果效应
  • 前门调整:使用中介变量处理未观测混淆
  • 工具变量:当混淆未观测时的替代方案
  • do-演算三规则:完备的因果识别工具集

第 16 章 生产案例分析

16.1 案例一:电商平台优惠券因果效应评估

背景与挑战

  • 背景:某大型电商平台,年发放优惠券 10 亿+ 张,需评估优惠券的真实因果效应
  • 挑战
    • 自选择偏差:高价值用户更可能领取和使用优惠券
    • 混淆变量:用户历史消费、活跃度、偏好等影响领取和使用
    • 异质性处理效应:优惠券对不同用户群体效果不同
    • 干扰效应:用户间存在社交影响(spillover effect)
    • 反事实问题:如果没发优惠券,用户会购买吗?

因果推断解决方案

  • 因果图建模
    • 构建 DAG:用户特征 → 领取 → 使用 → 购买
    • 识别混淆:历史消费、活跃度、品类偏好
    • 识别中介:领取行为、使用行为
  • 倾向得分匹配 PSM
    • 估计 P(领取|特征):XGBoost 模型
    • 1:1 最近邻匹配:卡钳值 0.05
    • 平衡性检验:标准化差异<10%
  • 双重机器学习 DML
    • 第一阶段:用 ML 估计 nuisance parameters
    • 第二阶段:正交化估计处理效应
    • 交叉拟合:避免过拟合
  • 异质性分析 CATE
    • Causal Forest:估计条件平均处理效应
    • 用户分群:高响应/中响应/低响应
    • 个性化策略:针对不同群体优化券面额
  • 反事实估计
    • PNS(Probability of Necessity and Sufficiency)
    • 归因分析:多少购买是由优惠券因果导致的?
    • ROI 计算:考虑反事实基线

实施成果

  • 因果效应:优惠券真实 ATE = +18% 购买率(朴素估计 +45%,高估 150%)
  • 成本优化:减少无效发放,年节省券成本 3.2 亿元
  • ROI 提升:从 1.8 提升到 3.5(+94%)
  • 异质性洞察
    • 高响应群体(20% 用户):ATE = +67%
    • 中响应群体(50% 用户):ATE = +15%
    • 低响应群体(30% 用户):ATE = -2%(负面效应!)
  • 个性化策略:针对高响应群体精准发放,低响应群体停止发放
  • 商业价值:年新增 GMV 15 亿元,利润 +4.8 亿元

16.2 案例二:医疗治疗方案因果评估

背景与挑战

  • 背景:某三甲医院,比较两种癌症治疗方案 A(手术)和 B(放疗)的疗效
  • 挑战
    • 非随机分配:医生根据患者病情选择方案
    • 未观测混淆:患者基因、生活方式等未记录
    • 选择偏差:重症患者更可能选择手术
    • 删失数据:部分患者失访或死于其他原因
    • 时变混淆:治疗过程中病情变化影响后续决策

因果推断方案

  • 工具变量法
    • 寻找 IV:医生偏好(历史手术比例)
    • 有效性检验:与治疗方案强相关,不直接影响生存
    • 2SLS 估计:控制未观测混淆
  • 逆概率加权 IPW
    • 估计倾向得分:P(治疗|基线特征)
    • 构造权重:1/PS 或 1/(1-PS)
    • 加权生存分析:Kaplan-Meier + IPW
  • 结构嵌套模型 SNM
    • 处理时变混淆
    • G-estimation:估计结构参数
    • 无模型假设:半参数方法
  • 目标试验模拟
    • 明确定义:纳入标准、排除标准、随访时间
    • 克隆 - 删失 - 加权:模拟 RCT
    • 敏感性分析:评估未观测混淆影响

实施成果

  • 5 年生存率
    • 方案 A(手术):68% (95% CI: 64%-72%)
    • 方案 B(放疗):61% (95% CI: 57%-65%)
    • 因果差异:+7% (p<0.01)
  • 异质性效应
    • 早期患者:A vs B = +12%
    • 中期患者:A vs B = +8%
    • 晚期患者:A vs B = +1% (无显著差异)
  • 临床指南更新:基于因果证据,推荐早期患者优先手术
  • 患者获益:年新增生存患者 120+ 人
  • 方法学贡献:发表 JAMA/NEJM 论文 3 篇

16.3 最佳实践总结

因果推断系统部署最佳实践

  • 因果图先行
    • 绘制 DAG:明确假设
    • 识别混淆:确定调整集
    • 检查识别:是否可估计因果效应?
  • 方法选择
    • 无混淆:直接比较
    • 观测混淆:PSM、IPW、DML
    • 未观测混淆:IV、前门调整、断点回归
    • 时变混淆:MSM、SNM、G-estimation
  • 敏感性分析
    • E-value:评估未观测混淆强度
    • 安慰剂检验:虚假处理/结果
    • 子样本分析:稳健性检查
  • 异质性分析
    • CATE:条件平均处理效应
    • Causal Forest:非参数异质性
    • 亚组分析:预先指定亚组
  • 反事实解释
    • PNS/PN/PS:归因分析
    • 个体因果效应:个性化决策
    • 政策模拟:what-if 分析
"从电商到医疗,从 PSM 到工具变量,从 ATE 到 CATE,因果推理体系正在重塑 AI 系统的认知范式。未来的 AI 将更具因果理解力、更善反事实思考、更接近人类的推理能力。这不仅是技术的进步,更是智能本质的探索。"
—— 本章结语

16.4 本章小结

本章分析了生产案例。关键要点:

  • 案例一:电商优惠券,ATE +18%、ROI +94%、年利润 +4.8 亿
  • 案例二:医疗治疗方案,5 年生存率 +7%、年新增生存 120+ 人
  • 最佳实践:因果图先行、方法选择、敏感性分析、异质性分析、反事实解释

参考文献与资源(2024-2026)

因果推理理论

  1. MIT (2026). "Causal Inference Theory." mit.edu
  2. Stanford (2026). "Structural Causal Models." stanford.edu

因果发现

  1. CMU (2026). "Causal Discovery Algorithms." cmu.edu
  2. Berkeley (2026). "DAG Structure Learning." berkeley.edu

反事实与决策

  1. Princeton (2026). "Counterfactual Reasoning." princeton.edu
  2. DeepMind (2026). "Causal Decision Theory." deepmind.com