垂直行业 Agent 领域知识适配完整实现
import time
import json
import hashlib
import secrets
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import statistics
import threading
import re
from abc import ABC, abstractmethod
class IndustryType(Enum):
"""行业类型"""
HEALTHCARE = "healthcare" # 医疗
LEGAL = "legal" # 法律
FINANCE = "finance" # 金融
MANUFACTURING = "manufacturing" # 制造
EDUCATION = "education" # 教育
class KnowledgeSourceType(Enum):
"""知识来源类型"""
DOCUMENT = "document" # 文档
DATABASE = "database" # 数据库
KNOWLEDGE_GRAPH = "knowledge_graph" # 知识图谱
EXPERT_SYSTEM = "expert_system" # 专家系统
REGULATION = "regulation" # 法规
class AdaptationMethod(Enum):
"""适配方法"""
RAG = "rag" # 检索增强生成
FINE_TUNING = "fine_tuning" # 微调
PROMPT_ENGINEERING = "prompt_engineering" # 提示工程
CONTEXT_LEARNING = "context_learning" # 上下文学习
@dataclass
class DomainTerm:
"""领域术语"""
term: str
definition: str
category: str
synonyms: List[str]
related_terms: List[str]
usage_examples: List[str]
industry: IndustryType
@dataclass
class KnowledgeDocument:
"""知识文档"""
doc_id: str
title: str
content: str
source_type: KnowledgeSourceType
industry: IndustryType
tags: List[str]
created_at: datetime
updated_at: datetime
confidence_score: float
@dataclass
class WorkflowStep:
"""工作流步骤"""
step_id: str
name: str
description: str
input_requirements: List[str]
output_format: str
compliance_rules: List[str]
estimated_time: int # 分钟
dependencies: List[str]
@dataclass
class AdaptationConfig:
"""适配配置"""
industry: IndustryType
adaptation_method: AdaptationMethod
knowledge_sources: List[str]
terminology_database: str
workflow_templates: List[str]
compliance_rules: List[str]
fine_tuning_data: Optional[str]
rag_top_k: int
prompt_template: str
class DomainKnowledgeBase:
"""
领域知识库
支持:
1. 术语管理
2. 文档检索
3. 知识图谱
4. 语义搜索
"""
def __init__(self, config: AdaptationConfig):
self.config = config
self.terms: Dict[str, DomainTerm] = {}
self.documents: Dict[str, KnowledgeDocument] = {}
self.term_index: Dict[str, List[str]] = defaultdict(list)
self.document_embeddings: Dict[str, np.ndarray] = {}
self.lock = threading.Lock()
def add_term(self, term: DomainTerm):
"""添加术语"""
with self.lock:
self.terms[term.term.lower()] = term
# 建立索引
self.term_index[term.category].append(term.term.lower())
for synonym in term.synonyms:
self.term_index[synonym.lower()].append(term.term.lower())
def add_document(self, doc: KnowledgeDocument):
"""添加文档"""
with self.lock:
self.documents[doc.doc_id] = doc
# 生成嵌入向量(简化版)
embedding = self._generate_embedding(doc.content)
self.document_embeddings[doc.doc_id] = embedding
def search_terms(self, query: str, top_k: int = 5) -> List[DomainTerm]:
"""搜索术语"""
query_lower = query.lower()
scores = []
for term_key, term in self.terms.items():
# 计算相似度
score = self._calculate_similarity(query_lower, term_key)
# 检查同义词
for synonym in term.synonyms:
syn_score = self._calculate_similarity(query_lower, synonym.lower())
score = max(score, syn_score)
if score > 0.3: # 阈值
scores.append((score, term))
# 排序并返回 top_k
scores.sort(key=lambda x: x[0], reverse=True)
return [term for score, term in scores[:top_k]]
def search_documents(self, query: str, top_k: int = 5) -> List[KnowledgeDocument]:
"""搜索文档"""
query_embedding = self._generate_embedding(query)
scores = []
for doc_id, doc_embedding in self.document_embeddings.items():
# 余弦相似度
similarity = self._cosine_similarity(query_embedding, doc_embedding)
doc = self.documents[doc_id]
# 结合置信度
final_score = similarity * doc.confidence_score
scores.append((final_score, doc))
# 排序并返回 top_k
scores.sort(key=lambda x: x[0], reverse=True)
return [doc for score, doc in scores[:top_k]]
def _generate_embedding(self, text: str) -> np.ndarray:
"""生成文本嵌入向量(简化版)"""
# 实际应用中应使用预训练模型
words = text.lower().split()
embedding = np.zeros(128)
for i, word in enumerate(words[:50]): # 限制长度
word_hash = hash(word) % 128
embedding[word_hash] += 1.0
# 归一化
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
def _calculate_similarity(self, str1: str, str2: str) -> float:
"""计算字符串相似度"""
if str1 == str2:
return 1.0
# Jaccard 相似度
set1 = set(str1.split())
set2 = set(str2.split())
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算余弦相似度"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_terms": len(self.terms),
"total_documents": len(self.documents),
"categories": len(self.term_index),
"industry": self.config.industry.value,
"timestamp": datetime.now().isoformat()
}
class TerminologyAdapter:
"""
术语适配器
支持:
1. 术语识别
2. 语义映射
3. 多义消解
4. 专业表达生成
"""
def __init__(self, knowledge_base: DomainKnowledgeBase):
self.kb = knowledge_base
self.context_window = 5 # 上下文窗口大小
self.ambiguity_threshold = 0.7
def identify_terms(self, text: str) -> List[Tuple[str, DomainTerm, int, int]]:
"""识别文本中的术语"""
identified = []
words = text.split()
# 滑动窗口匹配
for i in range(len(words)):
for window_size in range(1, min(self.context_window + 1, len(words) - i + 1)):
phrase = ' '.join(words[i:i + window_size])
# 搜索术语
terms = self.kb.search_terms(phrase, top_k=1)
if terms and terms[0]:
term = terms[0]
# 检查置信度
if self._calculate_similarity(phrase.lower(), term.term.lower()) > 0.6:
identified.append((phrase, term, i, i + window_size))
# 去重(保留最长匹配)
identified.sort(key=lambda x: (x[2], -(x[3] - x[2])), reverse=True)
unique = []
covered = set()
for phrase, term, start, end in identified:
if not any(start < c_end and end > c_start for c_start, c_end in covered):
unique.append((phrase, term, start, end))
covered.add((start, end))
return unique
def resolve_ambiguity(self, term: str, context: str) -> Optional[DomainTerm]:
"""消解术语多义性"""
candidates = self.kb.search_terms(term, top_k=5)
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
# 基于上下文消歧
scores = []
context_words = set(context.lower().split())
for candidate in candidates:
# 计算上下文相关性
related_words = set()
for related in candidate.related_terms:
related_words.update(related.lower().split())
overlap = len(context_words & related_words)
score = overlap / len(context_words) if context_words else 0
scores.append((score, candidate))
scores.sort(key=lambda x: x[0], reverse=True)
# 检查最高分是否超过阈值
if scores[0][0] > self.ambiguity_threshold:
return scores[0][1]
return None # 无法确定
def generate_professional_expression(self, text: str) -> str:
"""生成专业表达"""
identified_terms = self.identify_terms(text)
if not identified_terms:
return text
# 替换为专业表达
result = text
offset = 0
for phrase, term, start, end in sorted(identified_terms, key=lambda x: x[2]):
# 使用标准术语替换
adjusted_start = start + offset
adjusted_end = end + offset
result = result[:adjusted_start] + term.term + result[adjusted_end:]
offset += len(term.term) - len(phrase)
return result
def _calculate_similarity(self, str1: str, str2: str) -> float:
"""计算相似度"""
if str1 == str2:
return 1.0
set1 = set(str1.split())
set2 = set(str2.split())
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
class WorkflowAdapter:
"""
工作流适配器
支持:
1. 工作流建模
2. 任务编排
3. 合规检查
4. 决策支持
"""
def __init__(self, config: AdaptationConfig):
self.config = config
self.workflows: Dict[str, List[WorkflowStep]] = {}
self.compliance_rules: Dict[str, List[str]] = defaultdict(list)
def add_workflow(self, workflow_id: str, steps: List[WorkflowStep]):
"""添加工作流"""
self.workflows[workflow_id] = steps
# 提取合规规则
for step in steps:
for rule in step.compliance_rules:
self.compliance_rules[workflow_id].append(rule)
def validate_workflow(self, workflow_id: str, execution_plan: List[str]) -> Tuple[bool, List[str]]:
"""验证工作流执行计划"""
if workflow_id not in self.workflows:
return False, ["Unknown workflow"]
issues = []
workflow = self.workflows[workflow_id]
# 检查步骤完整性
workflow_steps = {step.step_id for step in workflow}
plan_steps = set(execution_plan)
missing = workflow_steps - plan_steps
if missing:
issues.append(f"Missing steps: {missing}")
# 检查依赖关系
step_map = {step.step_id: step for step in workflow}
executed = set()
for step_id in execution_plan:
if step_id not in step_map:
issues.append(f"Unknown step: {step_id}")
continue
step = step_map[step_id]
# 检查依赖
for dep in step.dependencies:
if dep not in executed:
issues.append(f"Dependency not met: {step_id} requires {dep}")
executed.add(step_id)
# 合规检查
compliance_issues = self.check_compliance(workflow_id, execution_plan)
issues.extend(compliance_issues)
return len(issues) == 0, issues
def check_compliance(self, workflow_id: str, execution_plan: List[str]) -> List[str]:
"""合规检查"""
issues = []
rules = self.compliance_rules.get(workflow_id, [])
# 简化版合规检查
for rule in rules:
if not self._check_rule(rule, execution_plan):
issues.append(f"Compliance violation: {rule}")
return issues
def _check_rule(self, rule: str, execution_plan: List[str]) -> bool:
"""检查单条规则"""
# 简化实现
return True
def generate_execution_plan(self, workflow_id: str, context: Dict[str, Any]) -> List[str]:
"""生成执行计划"""
if workflow_id not in self.workflows:
return []
workflow = self.workflows[workflow_id]
# 基于上下文动态调整
plan = []
for step in workflow:
# 检查条件
if self._should_execute_step(step, context):
plan.append(step.step_id)
return plan
def _should_execute_step(self, step: WorkflowStep, context: Dict[str, Any]) -> bool:
"""判断是否执行步骤"""
# 简化实现
return True
# 使用示例
if __name__ == "__main__":
print("=== 垂直行业 Agent 领域知识适配 ===\n")
print("=== 创建适配配置 ===")
# 医疗行业配置
config = AdaptationConfig(
industry=IndustryType.HEALTHCARE,
adaptation_method=AdaptationMethod.RAG,
knowledge_sources=["medical_guidelines", "drug_database", "clinical_trials"],
terminology_database="medical_terms",
workflow_templates=["diagnosis", "treatment", "prescription"],
compliance_rules=["HIPAA", "FDA"],
fine_tuning_data=None,
rag_top_k=5,
prompt_template="作为医疗专家,请基于以下知识回答:{context}\n问题:{question}"
)
print(f"行业:{config.industry.value}")
print(f"适配方法:{config.adaptation_method.value}")
print(f"知识来源:{config.knowledge_sources}")
print(f"合规规则:{config.compliance_rules}")
print()
print("=== 创建领域知识库 ===")
kb = DomainKnowledgeBase(config)
# 添加医疗术语
terms = [
DomainTerm(
term="高血压",
definition="血压持续高于正常范围的疾病",
category="疾病",
synonyms=["Hypertension", "血压高"],
related_terms=["心血管", "降压药", "血压监测"],
usage_examples=["患者患有高血压", "高血压需要长期管理"],
industry=IndustryType.HEALTHCARE
),
DomainTerm(
term="糖尿病",
definition="血糖水平持续高于正常范围的代谢性疾病",
category="疾病",
synonyms=["Diabetes", "血糖高"],
related_terms=["胰岛素", "血糖监测", "并发症"],
usage_examples=["糖尿病患者需要控制饮食", "2 型糖尿病最常见"],
industry=IndustryType.HEALTHCARE
),
DomainTerm(
term="抗生素",
definition="用于治疗细菌感染的药物",
category="药物",
synonyms=["Antibiotics", "抗菌药"],
related_terms=["细菌感染", "耐药性", "处方药"],
usage_examples=["抗生素不能治疗病毒感染", "需遵医嘱使用抗生素"],
industry=IndustryType.HEALTHCARE
)
]
for term in terms:
kb.add_term(term)
print(f"已添加 {len(terms)} 个术语")
# 添加知识文档
docs = [
KnowledgeDocument(
doc_id="doc_001",
title="高血压诊疗指南",
content="高血压的诊断标准为收缩压≥140mmHg 和/或舒张压≥90mmHg。治疗包括生活方式干预和药物治疗...",
source_type=KnowledgeSourceType.DOCUMENT,
industry=IndustryType.HEALTHCARE,
tags=["高血压", "诊疗", "指南"],
created_at=datetime.now(),
updated_at=datetime.now(),
confidence_score=0.95
),
KnowledgeDocument(
doc_id="doc_002",
title="糖尿病管理手册",
content="糖尿病管理包括血糖监测、饮食控制、运动疗法和药物治疗。目标是将 HbA1c 控制在 7% 以下...",
source_type=KnowledgeSourceType.DOCUMENT,
industry=IndustryType.HEALTHCARE,
tags=["糖尿病", "管理", "血糖"],
created_at=datetime.now(),
updated_at=datetime.now(),
confidence_score=0.92
)
]
for doc in docs:
kb.add_document(doc)
print(f"已添加 {len(docs)} 篇文档")
kb_stats = kb.get_statistics()
print(f"\n知识库统计:")
print(f" 术语总数:{kb_stats['total_terms']}")
print(f" 文档总数:{kb_stats['total_documents']}")
print(f" 分类数:{kb_stats['categories']}")
print(f" 行业:{kb_stats['industry']}")
print(f"\n=== 测试术语搜索 ===")
query = "血压高"
results = kb.search_terms(query, top_k=3)
print(f"搜索 '{query}':")
for term in results:
print(f" - {term.term}: {term.definition[:50]}...")
print(f"\n=== 测试文档检索 ===")
query = "高血压治疗"
results = kb.search_documents(query, top_k=2)
print(f"检索 '{query}':")
for doc in results:
print(f" - {doc.title} (置信度:{doc.confidence_score})")
print(f"\n=== 创建术语适配器 ===")
term_adapter = TerminologyAdapter(kb)
text = "患者有血压高和血糖高的症状,需要使用抗菌药治疗"
print(f"原文:{text}")
identified = term_adapter.identify_terms(text)
print(f"\n识别到的术语:")
for phrase, term, start, end in identified:
print(f" - '{phrase}' → {term.term} ({term.category})")
professional = term_adapter.generate_professional_expression(text)
print(f"\n专业表达:{professional}")
print(f"\n=== 创建工作流适配器 ===")
workflow_adapter = WorkflowAdapter(config)
# 添加诊疗工作流
diagnosis_workflow = [
WorkflowStep(
step_id="step_001",
name="病史采集",
description="收集患者基本信息和病史",
input_requirements=["患者 ID", "主诉"],
output_format="病史记录",
compliance_rules=["患者隐私保护", "知情同意"],
estimated_time=15,
dependencies=[]
),
WorkflowStep(
step_id="step_002",
name="体格检查",
description="进行必要的体格检查",
input_requirements=["病史记录"],
output_format="检查报告",
compliance_rules=["操作规范", "感染控制"],
estimated_time=20,
dependencies=["step_001"]
),
WorkflowStep(
step_id="step_003",
name="辅助检查",
description="安排实验室或影像学检查",
input_requirements=["检查指征"],
output_format="检查申请单",
compliance_rules=["合理检查", "医保规定"],
estimated_time=10,
dependencies=["step_002"]
),
WorkflowStep(
step_id="step_004",
name="诊断",
description="基于检查结果做出诊断",
input_requirements=["病史", "检查报告"],
output_format="诊断证明",
compliance_rules=["诊断标准", "循证医学"],
estimated_time=15,
dependencies=["step_002", "step_003"]
),
WorkflowStep(
step_id="step_005",
name="治疗方案",
description="制定个体化治疗方案",
input_requirements=["诊断", "患者情况"],
output_format="治疗计划",
compliance_rules=["治疗指南", "药物规范"],
estimated_time=20,
dependencies=["step_004"]
)
]
workflow_adapter.add_workflow("diagnosis", diagnosis_workflow)
print("已添加诊疗工作流")
print(f"\n=== 验证工作流执行计划 ===")
execution_plan = ["step_001", "step_002", "step_003", "step_004", "step_005"]
is_valid, issues = workflow_adapter.validate_workflow("diagnosis", execution_plan)
print(f"执行计划:{execution_plan}")
print(f"验证结果:{'✓ 有效' if is_valid else '✗ 无效'}")
if issues:
print("问题:")
for issue in issues:
print(f" - {issue}")
print(f"\n关键观察:")
print("1. 领域知识:知识图谱、本体论、术语体系")
print("2. 知识注入:RAG 检索、微调适配、提示工程")
print("3. 术语适配:术语识别、语义映射、多义消解")
print("4. 流程适配:工作流建模、任务编排、合规检查")
print("5. 行业专家:领域 + 注入 + 适配 + 流程 = 可信赖")
print("\n行业专家的使命:让 AI 更专业、更精准、更合规")