知识图谱与认知记忆融合系统完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import hashlib
import json
from collections import defaultdict, deque
import heapq
class NodeType(Enum):
"""节点类型"""
ENTITY = "entity" # 实体
CONCEPT = "concept" # 概念
EVENT = "event" # 事件
ATTRIBUTE = "attribute" # 属性
class RelationType(Enum):
"""关系类型"""
IS_A = "is_a" # 是 (类别关系)
PART_OF = "part_of" # 部分
LOCATED_IN = "located_in" # 位于
CAUSED_BY = "caused_by" # 导致
ASSOCIATED_WITH = "associated_with" # 关联
TEMPORAL = "temporal" # 时间关系
@dataclass
class KGNode:
"""知识图谱节点"""
id: str
name: str
node_type: NodeType
embedding: np.ndarray
attributes: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"node_type": self.node_type.value,
"embedding": self.embedding.tolist(),
"attributes": self.attributes,
"created_at": self.created_at.isoformat()
}
@dataclass
class KGEdge:
"""知识图谱边"""
id: str
source_id: str
target_id: str
relation_type: RelationType
weight: float = 1.0
attributes: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"source_id": self.source_id,
"target_id": self.target_id,
"relation_type": self.relation_type.value,
"weight": self.weight,
"attributes": self.attributes
}
@dataclass
class MemoryTrace:
"""记忆痕迹(情景记忆)"""
id: str
content: str
embedding: np.ndarray
timestamp: datetime
context: Dict[str, Any] = field(default_factory=dict)
importance: float = 0.5
kg_references: List[str] = field(default_factory=list) # 关联的 KG 节点 ID
class KnowledgeGraph:
"""
知识图谱
支持:
1. 节点和边的增删改查
2. 图遍历与路径查找
3. 子图提取
4. 图嵌入生成
"""
def __init__(self):
self.nodes: Dict[str, KGNode] = {}
self.edges: Dict[str, KGEdge] = {}
self.adjacency: Dict[str, List[Tuple[str, str]]] = defaultdict(list) # node_id -> [(neighbor_id, edge_id)]
self.reverse_adjacency: Dict[str, List[Tuple[str, str]]] = defaultdict(list)
def add_node(self, node: KGNode):
"""添加节点"""
self.nodes[node.id] = node
def add_edge(self, edge: KGEdge):
"""添加边"""
self.edges[edge.id] = edge
self.adjacency[edge.source_id].append((edge.target_id, edge.id))
self.reverse_adjacency[edge.target_id].append((edge.source_id, edge.id))
def get_neighbors(self, node_id: str, relation_type: RelationType = None) -> List[Tuple[str, str]]:
"""获取邻居节点"""
neighbors = self.adjacency.get(node_id, [])
if relation_type is None:
return neighbors
# 过滤关系类型
filtered = []
for neighbor_id, edge_id in neighbors:
edge = self.edges[edge_id]
if edge.relation_type == relation_type:
filtered.append((neighbor_id, edge_id))
return filtered
def find_path(self, source_id: str, target_id: str, max_depth: int = 5) -> List[List[str]]:
"""
查找两点之间的所有路径 (BFS)
Returns:
路径列表,每条路径是节点 ID 列表
"""
if source_id == target_id:
return [[source_id]]
paths = []
queue = deque([(source_id, [source_id])])
visited = {source_id}
while queue and len(paths) < 10: # 限制路径数量
current_id, path = queue.popleft()
if len(path) > max_depth:
continue
for neighbor_id, edge_id in self.adjacency.get(current_id, []):
if neighbor_id == target_id:
paths.append(path + [neighbor_id])
elif neighbor_id not in visited:
visited.add(neighbor_id)
queue.append((neighbor_id, path + [neighbor_id]))
return paths
def get_subgraph(self, seed_ids: List[str], max_depth: int = 2) -> Tuple[Dict[str, KGNode], Dict[str, KGEdge]]:
"""
获取种子节点的子图
Returns:
(节点字典,边字典)
"""
subgraph_nodes = {}
subgraph_edges = {}
# BFS 遍历
queue = deque([(sid, 0) for sid in seed_ids])
visited = set(seed_ids)
while queue:
node_id, depth = queue.popleft()
if node_id in self.nodes:
subgraph_nodes[node_id] = self.nodes[node_id]
if depth >= max_depth:
continue
for neighbor_id, edge_id in self.adjacency.get(node_id, []):
if edge_id in self.edges:
subgraph_edges[edge_id] = self.edges[edge_id]
if neighbor_id not in visited:
visited.add(neighbor_id)
queue.append((neighbor_id, depth + 1))
return subgraph_nodes, subgraph_edges
def compute_graph_embedding(self, method: str = "average") -> np.ndarray:
"""计算整个图的嵌入表示"""
if not self.nodes:
return np.array([])
embeddings = [node.embedding for node in self.nodes.values()]
if method == "average":
return np.mean(embeddings, axis=0)
elif method == "sum":
return np.sum(embeddings, axis=0)
else:
return embeddings[0]
class CognitiveMemory:
"""
认知记忆系统
支持:
1. 情景记忆存储与检索
2. 语义记忆(与 KG 融合)
3. 向量相似度检索
4. 记忆重要性评估
"""
def __init__(self, embedding_dim: int = 768):
self.embedding_dim = embedding_dim
self.memory_traces: Dict[str, MemoryTrace] = {}
self.semantic_index: Dict[str, List[str]] = defaultdict(list) # 关键词 -> 记忆 ID
def store_memory(self, trace: MemoryTrace):
"""存储记忆"""
self.memory_traces[trace.id] = trace
# 建立语义索引(简化:从内容提取关键词)
keywords = self._extract_keywords(trace.content)
for keyword in keywords:
self.semantic_index[keyword].append(trace.id)
def _extract_keywords(self, text: str) -> List[str]:
"""提取关键词(简化实现)"""
# 实际应用中应使用 NLP 技术
words = text.lower().split()
return [w for w in words if len(w) > 3][:5]
def retrieve_by_similarity(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
"""
基于向量相似度检索记忆
Returns:
[(记忆 ID, 相似度分数), ...]
"""
scores = []
for mem_id, trace in self.memory_traces.items():
# 余弦相似度
similarity = np.dot(query_embedding, trace.embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(trace.embedding) + 1e-8
)
scores.append((mem_id, float(similarity)))
# 排序并返回 top_k
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def retrieve_by_keyword(self, keyword: str) -> List[str]:
"""基于关键词检索记忆"""
return self.semantic_index.get(keyword.lower(), [])
def update_importance(self, memory_id: str, delta: float = 0.1):
"""更新记忆重要性"""
if memory_id in self.memory_traces:
self.memory_traces[memory_id].importance = min(1.0,
self.memory_traces[memory_id].importance + delta)
class KnowledgeMemoryFusion:
"""
知识 - 记忆融合系统
整合:
1. 知识图谱
2. 认知记忆
3. 混合检索
4. 推理增强
"""
def __init__(self, embedding_dim: int = 768):
self.kg = KnowledgeGraph()
self.memory = CognitiveMemory(embedding_dim)
self.embedding_dim = embedding_dim
# 融合映射:KG 节点 ID <-> 记忆 ID
self.kg_memory_mapping: Dict[str, List[str]] = defaultdict(list)
self.memory_kg_mapping: Dict[str, List[str]] = defaultdict(list)
# 统计
self.stats = {
"total_nodes": 0,
"total_edges": 0,
"total_memories": 0,
"fusion_queries": 0
}
def add_knowledge(self,
entity_name: str,
entity_type: NodeType,
attributes: Dict[str, Any] = None,
relations: List[Tuple[RelationType, str]] = None) -> KGNode:
"""
添加知识到图谱
Args:
entity_name: 实体名称
entity_type: 实体类型
attributes: 属性字典
relations: 关系列表 [(关系类型,目标实体 ID), ...]
Returns:
创建的节点
"""
# 创建节点 ID
node_id = hashlib.md5(f"{entity_name}{datetime.now().isoformat()}".encode()).hexdigest()[:16]
# 创建节点(使用简单嵌入,实际应使用 KG 嵌入模型)
embedding = np.random.randn(self.embedding_dim)
node = KGNode(
id=node_id,
name=entity_name,
node_type=entity_type,
embedding=embedding,
attributes=attributes or {}
)
self.kg.add_node(node)
self.stats["total_nodes"] += 1
# 添加关系
if relations:
for rel_type, target_id in relations:
edge_id = hashlib.md5(f"{node_id}{rel_type.value}{target_id}".encode()).hexdigest()[:16]
edge = KGEdge(
id=edge_id,
source_id=node_id,
target_id=target_id,
relation_type=rel_type
)
self.kg.add_edge(edge)
self.stats["total_edges"] += 1
return node
def store_experience(self,
content: str,
context: Dict[str, Any] = None,
kg_references: List[str] = None) -> MemoryTrace:
"""
存储经验(情景记忆)
Args:
content: 记忆内容
context: 上下文信息
kg_references: 关联的 KG 节点 ID 列表
Returns:
创建的记忆痕迹
"""
# 创建记忆 ID
mem_id = hashlib.md5(f"{content}{datetime.now().isoformat()}".encode()).hexdigest()[:16]
# 创建嵌入(实际应使用句子嵌入模型)
embedding = np.random.randn(self.embedding_dim)
trace = MemoryTrace(
id=mem_id,
content=content,
embedding=embedding,
timestamp=datetime.now(),
context=context or {},
kg_references=kg_references or []
)
self.memory.store_memory(trace)
self.stats["total_memories"] += 1
# 建立融合映射
if kg_references:
for kg_id in kg_references:
self.kg_memory_mapping[kg_id].append(mem_id)
self.memory_kg_mapping[mem_id].append(kg_id)
return trace
def hybrid_retrieval(self,
query_text: str,
query_embedding: np.ndarray,
top_k: int = 5,
use_kg: bool = True,
use_memory: bool = True) -> Dict[str, Any]:
"""
混合检索:同时检索知识图谱和认知记忆
Returns:
检索结果字典
"""
self.stats["fusion_queries"] += 1
results = {
"kg_results": [],
"memory_results": [],
"fused_results": []
}
# 1. 知识图谱检索(基于关键词匹配,简化)
if use_kg:
query_keywords = query_text.lower().split()
for node_id, node in self.kg.nodes.items():
if any(kw in node.name.lower() for kw in query_keywords):
results["kg_results"].append({
"type": "kg_node",
"id": node_id,
"name": node.name,
"node_type": node.node_type.value,
"attributes": node.attributes
})
# 2. 认知记忆检索(基于向量相似度)
if use_memory:
memory_results = self.memory.retrieve_by_similarity(query_embedding, top_k)
for mem_id, score in memory_results:
trace = self.memory.memory_traces[mem_id]
results["memory_results"].append({
"type": "memory",
"id": mem_id,
"content": trace.content,
"score": score,
"timestamp": trace.timestamp.isoformat(),
"kg_references": trace.kg_references
})
# 3. 融合结果(关联 KG 和记忆)
fused = []
# 从 KG 结果扩展到相关记忆
for kg_result in results["kg_results"]:
kg_id = kg_result["id"]
related_memories = self.kg_memory_mapping.get(kg_id, [])
if related_memories:
fused.append({
"type": "fused",
"source": "kg_expansion",
"kg_node": kg_result,
"related_memories": [
self.memory.memory_traces[mid].content
for mid in related_memories[:3]
]
})
# 从记忆结果扩展到相关 KG
for mem_result in results["memory_results"]:
kg_refs = mem_result.get("kg_references", [])
if kg_refs:
fused.append({
"type": "fused",
"source": "memory_expansion",
"memory": mem_result,
"related_kg_nodes": [
self.kg.nodes[kid].name
for kid in kg_refs if kid in self.kg.nodes
]
})
results["fused_results"] = fused
return results
def multi_hop_reasoning(self,
source_entity: str,
target_entity: str,
max_hops: int = 3) -> List[Dict[str, Any]]:
"""
多跳推理:查找两个实体之间的推理路径
Returns:
推理路径列表
"""
# 查找实体节点
source_node = None
target_node = None
for node_id, node in self.kg.nodes.items():
if node.name.lower() == source_entity.lower():
source_node = node
if node.name.lower() == target_entity.lower():
target_node = node
if not source_node or not target_node:
return []
# 查找路径
paths = self.kg.find_path(source_node.id, target_node.id, max_hops)
# 构建推理结果
reasoning_results = []
for path in paths:
path_info = {
"path": [self.kg.nodes[nid].name for nid in path],
"path_ids": path,
"relations": [],
"confidence": 1.0 / len(path) # 简化:路径越短置信度越高
}
# 获取路径上的关系
for i in range(len(path) - 1):
for neighbor_id, edge_id in self.kg.adjacency.get(path[i], []):
if neighbor_id == path[i + 1]:
edge = self.kg.edges[edge_id]
path_info["relations"].append(edge.relation_type.value)
reasoning_results.append(path_info)
return reasoning_results
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
**self.stats,
"kg_nodes": len(self.kg.nodes),
"kg_edges": len(self.kg.edges),
"memories": len(self.memory.memory_traces)
}
# 使用示例
if __name__ == "__main__":
print("=== 知识图谱与认知记忆融合技术 ===\n")
# 创建融合系统
fusion_system = KnowledgeMemoryFusion(embedding_dim=128)
print("=== 构建知识图谱 ===")
# 添加实体
beijing = fusion_system.add_knowledge(
entity_name="北京",
entity_type=NodeType.ENTITY,
attributes={"type": "城市", "population": "2100 万"}
)
china = fusion_system.add_knowledge(
entity_name="中国",
entity_type=NodeType.ENTITY,
attributes={"type": "国家"}
)
tsinghua = fusion_system.add_knowledge(
entity_name="清华大学",
entity_type=NodeType.ENTITY,
attributes={"type": "大学"}
)
# 添加关系
fusion_system.kg.add_edge(KGEdge(
id="edge1",
source_id=beijing.id,
target_id=china.id,
relation_type=RelationType.LOCATED_IN
))
fusion_system.kg.add_edge(KGEdge(
id="edge2",
source_id=tsinghua.id,
target_id=beijing.id,
relation_type=RelationType.LOCATED_IN
))
print(f"添加节点:北京、中国、清华大学")
print(f"添加关系:北京->位于->中国,清华大学->位于->北京")
print(f"\n=== 存储经验记忆 ===")
# 存储经验
exp1 = fusion_system.store_experience(
content="2025 年访问清华大学,参加 AI 会议",
context={"year": 2025, "event": "AI 会议"},
kg_references=[tsinghua.id]
)
exp2 = fusion_system.store_experience(
content="在北京品尝了烤鸭,非常美味",
context={"food": "烤鸭", "sentiment": "positive"},
kg_references=[beijing.id]
)
print(f"存储记忆:{exp1.content[:30]}...")
print(f"存储记忆:{exp2.content[:30]}...")
print(f"\n=== 系统统计 ===")
stats = fusion_system.get_stats()
print(f"KG 节点:{stats['kg_nodes']}")
print(f"KG 边:{stats['kg_edges']}")
print(f"记忆:{stats['memories']}")
print(f"\n=== 混合检索 ===")
# 混合检索
query_embedding = np.random.randn(128)
results = fusion_system.hybrid_retrieval(
query_text="北京 清华大学",
query_embedding=query_embedding,
top_k=3
)
print(f"KG 结果:{len(results['kg_results'])} 个")
for kg_res in results['kg_results'][:2]:
print(f" - {kg_res['name']} ({kg_res['node_type']})")
print(f"\n记忆结果:{len(results['memory_results'])} 个")
for mem_res in results['memory_results'][:2]:
print(f" - {mem_res['content'][:40]}... (相似度:{mem_res['score']:.3f})")
print(f"\n融合结果:{len(results['fused_results'])} 个")
for fused in results['fused_results'][:2]:
if fused['source'] == 'kg_expansion':
print(f" - KG: {fused['kg_node']['name']} -> {len(fused['related_memories'])} 个相关记忆")
else:
print(f" - 记忆:{fused['memory']['content'][:30]}... -> {len(fused['related_kg_nodes'])} 个相关 KG 节点")
print(f"\n=== 多跳推理 ===")
# 多跳推理
reasoning_paths = fusion_system.multi_hop_reasoning("清华大学", "中国", max_hops=3)
print(f"推理路径:清华大学 -> 中国")
for i, path in enumerate(reasoning_paths, 1):
print(f"路径{i}: {' -> '.join(path['path'])}")
print(f" 关系:{' -> '.join(path['relations'])}")
print(f" 置信度:{path['confidence']:.3f}")
print(f"\n关键观察:")
print("1. 知识图谱:结构化知识表示,支持符号推理")
print("2. 认知记忆:灵活存储经验,支持向量检索")
print("3. 融合技术:KG+ 记忆混合检索,优势互补")
print("4. 多跳推理:基于图结构的多步推理")
print("5. 神经符号融合:符号推理 + 向量检索 = 认知智能")
print("\n融合的核心:知识图谱 (符号) + 认知记忆 (向量) = 认知智能体")