医疗 Agent 辅助诊断与流程协同完整实现
import time
import json
import math
import random
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import threading
import uuid
from abc import ABC, abstractmethod
class ImagingModality(Enum):
"""影像模态"""
XRAY = "xray"
CT = "ct"
MRI = "mri"
ULTRASOUND = "ultrasound"
PET = "pet"
PATHOLOGY = "pathology"
class DiagnosisConfidence(Enum):
"""诊断置信度"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
VERY_HIGH = "very_high"
class UrgencyLevel(Enum):
"""紧急程度"""
ROUTINE = "routine"
URGENT = "urgent"
EMERGENCY = "emergency"
CRITICAL = "critical"
class WorkflowStatus(Enum):
"""工作流状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class PatientInfo:
"""患者信息"""
patient_id: str
name: str
age: int
gender: str
medical_history: List[str]
allergies: List[str]
current_medications: List[str]
vital_signs: Dict[str, float]
@dataclass
class ImagingStudy:
"""影像检查"""
study_id: str
patient_id: str
modality: ImagingModality
body_part: str
timestamp: datetime
image_url: str
findings: List[str]
impression: str
radiologist: Optional[str]
@dataclass
class LabResult:
"""检验结果"""
test_id: str
patient_id: str
test_name: str
value: float
unit: str
reference_range: Tuple[float, float]
flag: str # N=Normal, H=High, L=Low, C=Critical
timestamp: datetime
@dataclass
class Diagnosis:
"""诊断结果"""
diagnosis_id: str
patient_id: str
condition: str
icd10_code: str
confidence: DiagnosisConfidence
supporting_evidence: List[str]
differential_diagnoses: List[str]
recommended_tests: List[str]
timestamp: datetime
@dataclass
class TreatmentPlan:
"""治疗方案"""
plan_id: str
patient_id: str
diagnosis_id: str
medications: List[Dict[str, Any]]
procedures: List[str]
lifestyle_recommendations: List[str]
follow_up_schedule: List[datetime]
risk_factors: List[str]
expected_outcomes: List[str]
@dataclass
class ClinicalAlert:
"""临床预警"""
alert_id: str
patient_id: str
alert_type: str
severity: UrgencyLevel
message: str
recommended_action: str
timestamp: datetime
acknowledged: bool
class MedicalKnowledgeBase:
"""
医学知识库
支持:
1. 疾病知识
2. 药物信息
3. 临床指南
4. 诊疗规范
"""
def __init__(self):
self.disease_database = self._load_disease_database()
self.drug_database = self._load_drug_database()
self.guidelines = self._load_guidelines()
def _load_disease_database(self) -> Dict[str, Any]:
"""加载疾病数据库"""
return {
'pneumonia': {
'icd10': 'J18.9',
'symptoms': ['fever', 'cough', 'shortness of breath', 'chest pain'],
'risk_factors': ['age > 65', 'smoking', 'chronic diseases', 'immunocompromised'],
'diagnostic_tests': ['chest x-ray', 'blood culture', 'sputum culture'],
'treatments': ['antibiotics', 'oxygen therapy', 'fluids'],
'complications': ['sepsis', 'respiratory failure', 'pleural effusion']
},
'diabetes_type2': {
'icd10': 'E11.9',
'symptoms': ['polyuria', 'polydipsia', 'fatigue', 'blurred vision'],
'risk_factors': ['obesity', 'family history', 'sedentary lifestyle', 'age > 45'],
'diagnostic_tests': ['fasting glucose', 'HbA1c', 'oral glucose tolerance'],
'treatments': ['metformin', 'lifestyle modification', 'insulin'],
'complications': ['cardiovascular disease', 'nephropathy', 'retinopathy']
},
'myocardial_infarction': {
'icd10': 'I21.9',
'symptoms': ['chest pain', 'shortness of breath', 'nausea', 'sweating'],
'risk_factors': ['hypertension', 'smoking', 'diabetes', 'high cholesterol'],
'diagnostic_tests': ['ECG', 'troponin', 'echocardiogram', 'coronary angiography'],
'treatments': ['aspirin', 'PCI', 'thrombolytics', 'beta-blockers'],
'complications': ['heart failure', 'arrhythmia', 'cardiogenic shock']
}
}
def _load_drug_database(self) -> Dict[str, Any]:
"""加载药物数据库"""
return {
'metformin': {
'class': 'biguanide',
'indications': ['type 2 diabetes'],
'dosage': '500-2000mg daily',
'contraindications': ['renal impairment', 'liver disease'],
'side_effects': ['nausea', 'diarrhea', 'lactic acidosis (rare)']
},
'amoxicillin': {
'class': 'penicillin antibiotic',
'indications': ['bacterial infections', 'pneumonia'],
'dosage': '500mg every 8 hours',
'contraindications': ['penicillin allergy'],
'side_effects': ['rash', 'diarrhea', 'nausea']
}
}
def _load_guidelines(self) -> Dict[str, Any]:
"""加载临床指南"""
return {
'pneumonia_management': {
'title': 'Community-Acquired Pneumonia Guidelines',
'organization': 'IDSA/ATS',
'year': 2019,
'recommendations': [
'Obtain chest x-ray for suspected pneumonia',
'Assess severity using CURB-65 score',
'Initiate empiric antibiotics within 4 hours',
'Consider hospitalization for high-risk patients'
]
}
}
def get_disease_info(self, condition: str) -> Optional[Dict[str, Any]]:
"""获取疾病信息"""
return self.disease_database.get(condition.lower())
def check_drug_interactions(self, medications: List[str]) -> List[Dict[str, str]]:
"""检查药物相互作用"""
# 简化实现
interactions = []
if 'metformin' in medications and 'contrast_dye' in medications:
interactions.append({
'drugs': ['metformin', 'contrast_dye'],
'severity': 'high',
'message': 'Risk of lactic acidosis. Hold metformin before contrast procedures.'
})
return interactions
class MedicalImagingAnalyzer:
"""
医学影像分析器
支持:
1. 影像分析
2. 病变检测
3. 定量测量
4. 报告生成
"""
def __init__(self):
self.analysis_history = deque(maxlen=1000)
def analyze_chest_xray(self, image_data: Dict) -> Dict[str, Any]:
"""分析胸部 X 光片"""
# 模拟 AI 分析
findings = []
confidence_scores = {}
# 模拟检测结果
detection_templates = [
{'finding': 'pulmonary infiltrate', 'probability': 0.85},
{'finding': 'pleural effusion', 'probability': 0.12},
{'finding': 'cardiomegaly', 'probability': 0.23},
{'finding': 'pneumothorax', 'probability': 0.05}
]
for detection in detection_templates:
if detection['probability'] > 0.5:
findings.append(detection['finding'])
confidence_scores[detection['finding']] = detection['probability']
# 生成印象
if 'pulmonary infiltrate' in findings:
impression = "Findings consistent with pneumonia. Clinical correlation recommended."
elif findings:
impression = f"Abnormal findings: {', '.join(findings)}. Further evaluation may be needed."
else:
impression = "No acute cardiopulmonary abnormality."
result = {
'findings': findings,
'confidence_scores': confidence_scores,
'impression': impression,
'urgency': UrgencyLevel.URGENT if 'pulmonary infiltrate' in findings else UrgencyLevel.ROUTINE,
'recommended_followup': ['clinical correlation', 'consider CT if uncertain']
}
self.analysis_history.append(result)
return result
def analyze_pathology_slide(self, image_data: Dict) -> Dict[str, Any]:
"""分析病理切片"""
# 模拟病理分析
return {
'diagnosis': 'adenocarcinoma',
'grade': 'moderately differentiated',
'stage': 'T2N1M0',
'biomarkers': {
'ER': 'positive',
'PR': 'positive',
'HER2': 'negative',
'Ki67': '25%'
},
'confidence': 0.92,
'recommendations': ['oncology consultation', 'molecular testing']
}
class ClinicalDecisionSupport:
"""
临床决策支持系统
支持:
1. 辅助诊断
2. 鉴别诊断
3. 治疗推荐
4. 风险预测
"""
def __init__(self, knowledge_base: MedicalKnowledgeBase):
self.knowledge_base = knowledge_base
self.diagnosis_history = deque(maxlen=1000)
def generate_differential_diagnosis(self,
symptoms: List[str],
patient_info: PatientInfo) -> List[Diagnosis]:
"""生成鉴别诊断"""
potential_conditions = []
# 基于症状匹配疾病
for disease_name, disease_info in self.knowledge_base.disease_database.items():
matching_symptoms = set(symptoms) & set(disease_info['symptoms'])
match_score = len(matching_symptoms) / len(disease_info['symptoms'])
if match_score > 0.3:
# 考虑风险因素
risk_factor_score = 0
for risk_factor in disease_info['risk_factors']:
if any(rf in str(patient_info.medical_history) for rf in [risk_factor.split()[0]]):
risk_factor_score += 0.1
total_score = min(1.0, match_score + risk_factor_score)
# 确定置信度
if total_score > 0.8:
confidence = DiagnosisConfidence.VERY_HIGH
elif total_score > 0.6:
confidence = DiagnosisConfidence.HIGH
elif total_score > 0.4:
confidence = DiagnosisConfidence.MEDIUM
else:
confidence = DiagnosisConfidence.LOW
diagnosis = Diagnosis(
diagnosis_id=f"dx_{uuid.uuid4().hex[:8]}",
patient_id=patient_info.patient_id,
condition=disease_name.replace('_', ' ').title(),
icd10_code=disease_info['icd10'],
confidence=confidence,
supporting_evidence=list(matching_symptoms),
differential_diagnoses=[],
recommended_tests=disease_info['diagnostic_tests'],
timestamp=datetime.now()
)
potential_conditions.append(diagnosis)
# 按置信度排序
potential_conditions.sort(key=lambda x: ['VERY_HIGH', 'HIGH', 'MEDIUM', 'LOW'].index(x.confidence.value))
self.diagnosis_history.append(potential_conditions)
return potential_conditions
def recommend_treatment(self,
diagnosis: Diagnosis,
patient_info: PatientInfo) -> TreatmentPlan:
"""推荐治疗方案"""
disease_info = self.knowledge_base.get_disease_info(diagnosis.condition.lower().replace(' ', '_'))
if not disease_info:
raise ValueError(f"No treatment information for {diagnosis.condition}")
# 生成治疗方案
medications = []
for treatment in disease_info['treatments']:
if 'antibiotic' in treatment.lower() or treatment in self.knowledge_base.drug_database:
drug_info = self.knowledge_base.drug_database.get(treatment.lower(), {})
medications.append({
'name': treatment,
'dosage': drug_info.get('dosage', 'As per guidelines'),
'duration': '7-10 days',
'instructions': 'Take as directed'
})
# 生成随访计划
follow_up_dates = [
datetime.now() + timedelta(days=3),
datetime.now() + timedelta(days=7),
datetime.now() + timedelta(days=30)
]
plan = TreatmentPlan(
plan_id=f"plan_{uuid.uuid4().hex[:8]}",
patient_id=patient_info.patient_id,
diagnosis_id=diagnosis.diagnosis_id,
medications=medications,
procedures=['oxygen therapy'] if 'pneumonia' in diagnosis.condition.lower() else [],
lifestyle_recommendations=['rest', 'hydration', 'avoid smoking'],
follow_up_schedule=follow_up_dates,
risk_factors=disease_info.get('complications', []),
expected_outcomes=['symptom improvement in 48-72 hours', 'full recovery in 2-3 weeks']
)
return plan
def generate_clinical_alerts(self,
patient_info: PatientInfo,
lab_results: List[LabResult]) -> List[ClinicalAlert]:
"""生成临床预警"""
alerts = []
for lab in lab_results:
if lab.flag == 'C':
alerts.append(ClinicalAlert(
alert_id=f"alert_{uuid.uuid4().hex[:8]}",
patient_id=patient_info.patient_id,
alert_type='critical_lab_value',
severity=UrgencyLevel.CRITICAL,
message=f"Critical value: {lab.test_name} = {lab.value} {lab.unit}",
recommended_action='Immediate clinical review required',
timestamp=datetime.now(),
acknowledged=False
))
elif lab.flag == 'H' or lab.flag == 'L':
# 检查是否显著异常
ref_low, ref_high = lab.reference_range
deviation = abs(lab.value - (ref_low + ref_high) / 2) / ((ref_high - ref_low) / 2)
if deviation > 2:
alerts.append(ClinicalAlert(
alert_id=f"alert_{uuid.uuid4().hex[:8]}",
patient_id=patient_info.patient_id,
alert_type='abnormal_lab_value',
severity=UrgencyLevel.URGENT,
message=f"Significantly abnormal: {lab.test_name} = {lab.value} {lab.unit}",
recommended_action='Clinical review within 24 hours',
timestamp=datetime.now(),
acknowledged=False
))
return alerts
class HealthcareWorkflowEngine:
"""
医疗工作流引擎
支持:
1. 流程定义
2. 任务调度
3. 状态跟踪
4. HL7/FHIR 集成
"""
def __init__(self):
self.workflows: Dict[str, Dict] = {}
self.active_cases: Dict[str, Dict] = {}
def create_workflow(self,
workflow_type: str,
patient_id: str,
priority: UrgencyLevel) -> str:
"""创建工作流"""
workflow_id = f"wf_{uuid.uuid4().hex[:8]}"
# 定义标准工作流
if workflow_type == 'pneumonia_pathway':
tasks = [
{'name': 'chest_xray', 'status': 'pending', 'assigned_to': 'radiology'},
{'name': 'blood_culture', 'status': 'pending', 'assigned_to': 'laboratory'},
{'name': 'antibiotic_administration', 'status': 'pending', 'assigned_to': 'nursing'},
{'name': 'clinical_assessment', 'status': 'pending', 'assigned_to': 'physician'}
]
elif workflow_type == 'diabetes_management':
tasks = [
{'name': 'hba1c_test', 'status': 'pending', 'assigned_to': 'laboratory'},
{'name': 'medication_review', 'status': 'pending', 'assigned_to': 'pharmacy'},
{'name': 'dietary_consultation', 'status': 'pending', 'assigned_to': 'nutrition'},
{'name': 'patient_education', 'status': 'pending', 'assigned_to': 'nursing'}
]
else:
tasks = []
workflow = {
'workflow_id': workflow_id,
'type': workflow_type,
'patient_id': patient_id,
'priority': priority.value,
'status': WorkflowStatus.IN_PROGRESS.value,
'tasks': tasks,
'created_at': datetime.now(),
'updated_at': datetime.now()
}
self.workflows[workflow_id] = workflow
self.active_cases[patient_id] = workflow_id
return workflow_id
def update_task_status(self,
workflow_id: str,
task_name: str,
status: str,
result: Optional[Dict] = None) -> bool:
"""更新任务状态"""
if workflow_id not in self.workflows:
return False
workflow = self.workflows[workflow_id]
for task in workflow['tasks']:
if task['name'] == task_name:
task['status'] = status
if result:
task['result'] = result
workflow['updated_at'] = datetime.now()
# 检查是否所有任务完成
all_completed = all(t['status'] == 'completed' for t in workflow['tasks'])
if all_completed:
workflow['status'] = WorkflowStatus.COMPLETED.value
return True
return False
def get_workflow_status(self, workflow_id: str) -> Optional[Dict]:
"""获取工作流状态"""
return self.workflows.get(workflow_id)
def generate_hl7_message(self, workflow_id: str) -> str:
"""生成 HL7 消息"""
workflow = self.workflows.get(workflow_id)
if not workflow:
return ""
# 简化 HL7 ORU^R01 消息
patient_id = workflow['patient_id']
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
hl7_message = f"""MSH|^~\\&|AI_SYSTEM|HOSPITAL|LIS|HOSPITAL|{timestamp}||ORU^R01|{workflow_id}|P|2.5
PID|1||{patient_id}^^^HOSPITAL||Patient^Name||19800101|M
OBR|1||{workflow_id}|CHEST_XRAY|||{timestamp}
OBX|1|TX|FINDINGS||{','.join(['Pulmonary infiltrate'])}|||N
OBX|2|TX|IMPRESSION||Findings consistent with pneumonia|||F
"""
return hl7_message
# 使用示例
if __name__ == "__main__":
print("=== 医疗 Agent 辅助诊断与流程协同 ===\n")
print("=== 创建医学知识库 ===")
knowledge_base = MedicalKnowledgeBase()
print(f"加载疾病数据库:{len(knowledge_base.disease_database)} 种疾病")
print(f"加载药物数据库:{len(knowledge_base.drug_database)} 种药物")
print(f"加载临床指南:{len(knowledge_base.guidelines)} 项指南")
print(f"\n=== 创建患者信息 ===")
patient = PatientInfo(
patient_id="PAT001",
name="张三",
age=68,
gender="M",
medical_history=['hypertension', 'smoking'],
allergies=['penicillin'],
current_medications=['lisinopril'],
vital_signs={'temperature': 38.5, 'heart_rate': 95, 'bp_systolic': 145, 'bp_diastolic': 90, 'spo2': 92}
)
print(f"患者:{patient.name} ({patient.age}岁,{patient.gender})")
print(f"病史:{', '.join(patient.medical_history)}")
print(f"生命体征:T {patient.vital_signs['temperature']}°C, HR {patient.vital_signs['heart_rate']}, SpO2 {patient.vital_signs['spo2']}%")
print(f"\n=== 创建影像分析器 ===")
imaging_analyzer = MedicalImagingAnalyzer()
print(f"\n=== 分析胸部 X 光片 ===")
# 模拟影像数据
xray_data = {'image_url': 'chest_xray_001.dcm', 'quality': 'good'}
xray_result = imaging_analyzer.analyze_chest_xray(xray_data)
print(f"影像发现:{', '.join(xray_result['findings']) if xray_result['findings'] else '无异常'}")
print(f"印象:{xray_result['impression']}")
print(f"紧急程度:{xray_result['urgency'].value}")
print(f"建议:{', '.join(xray_result['recommended_followup'])}")
print(f"\n=== 创建临床决策支持系统 ===")
cds = ClinicalDecisionSupport(knowledge_base)
print(f"\n=== 生成鉴别诊断 ===")
symptoms = ['fever', 'cough', 'shortness of breath']
diagnoses = cds.generate_differential_diagnosis(symptoms, patient)
print(f"症状:{', '.join(symptoms)}")
print(f"\n鉴别诊断 ({len(diagnoses)} 个):")
for i, dx in enumerate(diagnoses, 1):
print(f"{i}. {dx.condition} (ICD-10: {dx.icd10_code})")
print(f" 置信度:{dx.confidence.value}")
print(f" 支持证据:{', '.join(dx.supporting_evidence)}")
print(f" 推荐检查:{', '.join(dx.recommended_tests)}")
print(f"\n=== 推荐治疗方案 ===")
if diagnoses:
primary_diagnosis = diagnoses[0]
treatment_plan = cds.recommend_treatment(primary_diagnosis, patient)
print(f"诊断:{primary_diagnosis.condition}")
print(f"\n治疗方案:")
print(f" 药物:")
for med in treatment_plan.medications:
print(f" - {med['name']}: {med['dosage']}")
print(f" 治疗措施:{', '.join(treatment_plan.procedures)}")
print(f" 生活建议:{', '.join(treatment_plan.lifestyle_recommendations)}")
print(f" 随访计划:{len(treatment_plan.follow_up_schedule)} 次")
print(f" 预期结果:{', '.join(treatment_plan.expected_outcomes)}")
print(f"\n=== 生成临床预警 ===")
# 模拟检验结果
lab_results = [
LabResult(
test_id="LAB001",
patient_id=patient.patient_id,
test_name="WBC",
value=15.2,
unit="10^9/L",
reference_range=(4.0, 11.0),
flag="H",
timestamp=datetime.now()
),
LabResult(
test_id="LAB002",
patient_id=patient.patient_id,
test_name="Lactate",
value=4.5,
unit="mmol/L",
reference_range=(0.5, 2.0),
flag="C",
timestamp=datetime.now()
)
]
alerts = cds.generate_clinical_alerts(patient, lab_results)
print(f"生成 {len(alerts)} 个临床预警:")
for alert in alerts:
print(f" [{alert.severity.value.upper()}] {alert.message}")
print(f" 建议:{alert.recommended_action}")
print(f"\n=== 创建工作流引擎 ===")
workflow_engine = HealthcareWorkflowEngine()
print(f"\n=== 创建肺炎诊疗工作流 ===")
workflow_id = workflow_engine.create_workflow('pneumonia_pathway', patient.patient_id, UrgencyLevel.URGENT)
print(f"工作流 ID: {workflow_id}")
print(f"类型:pneumonia_pathway")
print(f"优先级:urgent")
workflow_status = workflow_engine.get_workflow_status(workflow_id)
if workflow_status:
print(f"\n任务列表:")
for i, task in enumerate(workflow_status['tasks'], 1):
print(f"{i}. {task['name']} - {task['status']} ({task['assigned_to']})")
print(f"\n=== 更新任务状态 ===")
workflow_engine.update_task_status(workflow_id, 'chest_xray', 'completed', {'result': 'pulmonary infiltrate'})
workflow_engine.update_task_status(workflow_id, 'blood_culture', 'in_progress')
updated_status = workflow_engine.get_workflow_status(workflow_id)
if updated_status:
print(f"更新后任务状态:")
for i, task in enumerate(updated_status['tasks'], 1):
print(f"{i}. {task['name']} - {task['status']}")
print(f"\n=== 生成 HL7 消息 ===")
hl7_message = workflow_engine.generate_hl7_message(workflow_id)
print("HL7 ORU^R01 消息:")
for line in hl7_message.split('\n')[:5]:
print(f" {line}")
print(f"\n关键观察:")
print("1. 医学感知:影像分析 + 检验数据 + 电子病历")
print("2. 智能诊断:辅助诊断 + 鉴别诊断 + 治疗推荐")
print("3. 流程协同:工作流引擎 + HL7/FHIR + 多学科协作")
print("4. 临床预警:实时监测 + 风险预测 + 自动告警")
print("5. 智能医疗:感知 + 诊断 + 协同 = 可信赖")
print("\n智能医疗的使命:让诊疗更精准、更高效、更可及")