RAG 与 Agent 记忆融合系统完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import hashlib
import json
from collections import defaultdict, deque
import heapq
class MemoryType(Enum):
"""记忆类型"""
EPISODIC = "episodic" # 情景记忆
SEMANTIC = "semantic" # 语义记忆
PROCEDURAL = "procedural" # 程序记忆
class RetrievalMode(Enum):
"""检索模式"""
VECTOR = "vector" # 向量检索
KEYWORD = "keyword" # 关键词检索
HYBRID = "hybrid" # 混合检索
@dataclass
class Document:
"""文档片段"""
id: str
content: str
embedding: np.ndarray
metadata: Dict[str, Any] = field(default_factory=dict)
source: str = ""
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class MemoryTrace:
"""记忆痕迹"""
id: str
type: MemoryType
content: str
embedding: np.ndarray
timestamp: datetime
context: Dict[str, Any] = field(default_factory=dict)
importance: float = 0.5
access_count: int = 0
@dataclass
class QueryResult:
"""查询结果"""
query: str
results: List[Dict[str, Any]]
retrieval_mode: RetrievalMode
latency_ms: float
total_results: int
class VectorIndex:
"""
向量索引
支持:
1. 向量嵌入存储
2. 相似度检索
3. 批量添加
4. 索引优化
"""
def __init__(self, embedding_dim: int = 768):
self.embedding_dim = embedding_dim
self.vectors: Dict[str, np.ndarray] = {}
self.metadata: Dict[str, Dict[str, Any]] = {}
def add(self, doc_id: str, embedding: np.ndarray, metadata: Dict[str, Any] = None):
"""添加向量"""
self.vectors[doc_id] = embedding
self.metadata[doc_id] = metadata or {}
def search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
"""
相似度搜索
Returns:
[(文档 ID, 相似度分数), ...]
"""
scores = []
for doc_id, vector in self.vectors.items():
# 余弦相似度
similarity = np.dot(query_embedding, vector) / (
np.linalg.norm(query_embedding) * np.linalg.norm(vector) + 1e-8
)
scores.append((doc_id, float(similarity)))
# 排序并返回 top_k
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def batch_add(self, documents: List[Document]):
"""批量添加文档"""
for doc in documents:
self.add(doc.id, doc.embedding, doc.metadata)
def size(self) -> int:
"""返回索引大小"""
return len(self.vectors)
class KeywordIndex:
"""
关键词索引
支持:
1. 倒排索引
2. BM25 评分
3. 关键词检索
"""
def __init__(self):
self.inverted_index: Dict[str, Set[str]] = defaultdict(set) # 词 -> 文档 ID 集合
self.doc_lengths: Dict[str, int] = {} # 文档 ID -> 长度
self.avg_doc_length: float = 0.0
self.k1: float = 1.5 # BM25 参数
self.b: float = 0.75 # BM25 参数
def add(self, doc_id: str, content: str):
"""添加文档到索引"""
# 分词(简化:按空格分割)
words = content.lower().split()
self.doc_lengths[doc_id] = len(words)
# 更新倒排索引
for word in set(words): # 去重
self.inverted_index[word].add(doc_id)
# 更新平均文档长度
total_docs = len(self.doc_lengths)
total_length = sum(self.doc_lengths.values())
self.avg_doc_length = total_length / total_docs if total_docs > 0 else 0
def search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
"""
BM25 关键词检索
Returns:
[(文档 ID, BM25 分数), ...]
"""
query_words = query.lower().split()
scores = defaultdict(float)
N = len(self.doc_lengths) # 总文档数
for word in query_words:
if word not in self.inverted_index:
continue
# 包含该词的文档数
n = len(self.inverted_index[word])
# IDF
idf = math.log((N - n + 0.5) / (n + 0.5) + 1.0)
# 遍历包含该词的文档
for doc_id in self.inverted_index[word]:
# TF
tf = 1.0 # 简化:假设 TF=1
# BM25 公式
doc_len = self.doc_lengths[doc_id]
norm = 1 - self.b + self.b * (doc_len / self.avg_doc_length)
bm25_score = (tf * (self.k1 + 1)) / (tf + self.k1 * norm)
scores[doc_id] += idf * bm25_score
# 排序并返回 top_k
sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return sorted_scores[:top_k]
class AgentMemory:
"""
Agent 记忆系统
支持:
1. 情景记忆、语义记忆、程序记忆
2. 记忆存储与检索
3. 记忆重要性评估
4. 记忆遗忘机制
"""
def __init__(self, embedding_dim: int = 768):
self.embedding_dim = embedding_dim
self.memories: Dict[str, MemoryTrace] = {}
self.episodic_index: Dict[str, List[str]] = defaultdict(list) # 时间 -> 记忆 ID
self.semantic_index: Dict[str, List[str]] = defaultdict(list) # 概念 -> 记忆 ID
# 遗忘参数
self.decay_rate: float = 0.01
self.importance_threshold: float = 0.2
def store_memory(self, trace: MemoryTrace):
"""存储记忆"""
self.memories[trace.id] = trace
# 建立索引
if trace.type == MemoryType.EPISODIC:
time_key = trace.timestamp.strftime("%Y-%m-%d")
self.episodic_index[time_key].append(trace.id)
elif trace.type == MemoryType.SEMANTIC:
# 提取关键词(简化)
keywords = trace.content.lower().split()[:5]
for kw in keywords:
self.semantic_index[kw].append(trace.id)
def retrieve_by_similarity(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
"""基于相似度检索记忆"""
scores = []
for mem_id, trace in self.memories.items():
similarity = np.dot(query_embedding, trace.embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(trace.embedding) + 1e-8
)
scores.append((mem_id, float(similarity)))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def retrieve_by_time(self, start_time: datetime, end_time: datetime) -> List[str]:
"""按时间范围检索情景记忆"""
results = []
for time_key, mem_ids in self.episodic_index.items():
try:
mem_date = datetime.strptime(time_key, "%Y-%m-%d")
if start_time <= mem_date <= end_time:
results.extend(mem_ids)
except:
continue
return results
def apply_forgetting(self):
"""应用遗忘机制"""
current_time = datetime.now()
to_remove = []
for mem_id, trace in self.memories.items():
# 时间衰减
time_elapsed = (current_time - trace.timestamp).total_seconds() / 3600 # 小时
decay = math.exp(-self.decay_rate * time_elapsed)
# 访问频率增强
access_bonus = math.log10(trace.access_count + 1) * 0.1
# 新重要性
new_importance = trace.importance * decay + access_bonus
if new_importance < self.importance_threshold:
to_remove.append(mem_id)
else:
trace.importance = new_importance
# 删除低重要性记忆
for mem_id in to_remove:
del self.memories[mem_id]
return to_remove
def update_access(self, memory_id: str):
"""更新记忆访问计数"""
if memory_id in self.memories:
self.memories[memory_id].access_count += 1
class RAGSystem:
"""
RAG 系统
整合:
1. 向量索引
2. 关键词索引
3. 混合检索
4. 上下文构建
"""
def __init__(self, embedding_dim: int = 768):
self.vector_index = VectorIndex(embedding_dim)
self.keyword_index = KeywordIndex()
self.documents: Dict[str, Document] = {}
self.embedding_dim = embedding_dim
def add_documents(self, documents: List[Document]):
"""添加文档"""
for doc in documents:
self.documents[doc.id] = doc
self.vector_index.add(doc.id, doc.embedding, doc.metadata)
self.keyword_index.add(doc.id, doc.content)
def retrieve(self,
query: str,
query_embedding: np.ndarray,
mode: RetrievalMode = RetrievalMode.HYBRID,
top_k: int = 5) -> QueryResult:
"""
检索
Returns:
查询结果
"""
start_time = datetime.now()
results = []
if mode == RetrievalMode.VECTOR:
# 纯向量检索
vector_results = self.vector_index.search(query_embedding, top_k)
results = [
{
"id": doc_id,
"content": self.documents[doc_id].content,
"score": score,
"source": self.documents[doc_id].source,
"mode": "vector"
}
for doc_id, score in vector_results
]
elif mode == RetrievalMode.KEYWORD:
# 纯关键词检索
keyword_results = self.keyword_index.search(query, top_k)
results = [
{
"id": doc_id,
"content": self.documents[doc_id].content,
"score": score,
"source": self.documents[doc_id].source,
"mode": "keyword"
}
for doc_id, score in keyword_results
]
else: # HYBRID
# 混合检索
vector_results = self.vector_index.search(query_embedding, top_k * 2)
keyword_results = self.keyword_index.search(query, top_k * 2)
# 融合结果(简化:取并集,重新排序)
combined = {}
for doc_id, score in vector_results:
combined[doc_id] = {"vector": score, "keyword": 0.0}
for doc_id, score in keyword_results:
if doc_id in combined:
combined[doc_id]["keyword"] = score
else:
combined[doc_id] = {"vector": 0.0, "keyword": score}
# 计算融合分数(加权平均)
fused_scores = []
for doc_id, scores in combined.items():
fused_score = 0.6 * scores["vector"] + 0.4 * scores["keyword"]
fused_scores.append((doc_id, fused_score))
fused_scores.sort(key=lambda x: x[1], reverse=True)
results = [
{
"id": doc_id,
"content": self.documents[doc_id].content,
"score": score,
"source": self.documents[doc_id].source,
"mode": "hybrid"
}
for doc_id, score in fused_scores[:top_k]
]
latency = (datetime.now() - start_time).total_seconds() * 1000
return QueryResult(
query=query,
results=results,
retrieval_mode=mode,
latency_ms=latency,
total_results=len(results)
)
def build_context(self, query: str, results: List[Dict[str, Any]], max_tokens: int = 2000) -> str:
"""构建上下文"""
context_parts = [f"用户问题:{query}\n\n相关信息:\n"]
current_tokens = 0
for result in results:
content = result["content"]
source = result["source"]
score = result["score"]
# 简化 token 计数
tokens = len(content.split())
if current_tokens + tokens > max_tokens:
break
context_parts.append(f"[来源:{source}, 相关性:{score:.3f}]\n{content}\n")
current_tokens += tokens
return "\n".join(context_parts)
class RAGMemoryAgent:
"""
RAG-记忆融合 Agent
整合:
1. RAG 系统
2. Agent 记忆
3. 融合检索
4. 增强生成
"""
def __init__(self, embedding_dim: int = 768):
self.rag = RAGSystem(embedding_dim)
self.memory = AgentMemory(embedding_dim)
self.embedding_dim = embedding_dim
# 统计
self.stats = {
"total_queries": 0,
"rag_retrievals": 0,
"memory_retrievals": 0,
"hallucinations_prevented": 0
}
def add_knowledge(self, documents: List[Document]):
"""添加知识到 RAG 系统"""
self.rag.add_documents(documents)
def store_experience(self, content: str, context: Dict[str, Any] = None):
"""存储经验到记忆"""
mem_id = hashlib.md5(f"{content}{datetime.now().isoformat()}".encode()).hexdigest()[:16]
embedding = np.random.randn(self.embedding_dim) # 实际应使用嵌入模型
trace = MemoryTrace(
id=mem_id,
type=MemoryType.EPISODIC,
content=content,
embedding=embedding,
timestamp=datetime.now(),
context=context or {}
)
self.memory.store_memory(trace)
def query(self,
query_text: str,
query_embedding: np.ndarray,
use_rag: bool = True,
use_memory: bool = True,
top_k: int = 5) -> Dict[str, Any]:
"""
融合查询
Returns:
查询结果(包含 RAG 结果、记忆结果、融合上下文)
"""
self.stats["total_queries"] += 1
response = {
"query": query_text,
"rag_results": [],
"memory_results": [],
"context": "",
"suggestions": []
}
# 1. RAG 检索
if use_rag and self.rag.vector_index.size() > 0:
self.stats["rag_retrievals"] += 1
rag_result = self.rag.retrieve(
query_text, query_embedding,
mode=RetrievalMode.HYBRID,
top_k=top_k
)
response["rag_results"] = rag_result.results
# 2. 记忆检索
if use_memory:
self.stats["memory_retrievals"] += 1
memory_results = self.memory.retrieve_by_similarity(query_embedding, top_k)
response["memory_results"] = [
{
"id": mem_id,
"content": self.memory.memories[mem_id].content,
"score": score,
"type": self.memory.memories[mem_id].type.value,
"timestamp": self.memory.memories[mem_id].timestamp.isoformat()
}
for mem_id, score in memory_results
]
# 更新访问计数
for mem_id, _ in memory_results:
self.memory.update_access(mem_id)
# 3. 构建融合上下文
context_parts = [f"问题:{query_text}\n"]
# 添加 RAG 结果
if response["rag_results"]:
context_parts.append("\n=== 外部知识 ===")
for i, result in enumerate(response["rag_results"], 1):
context_parts.append(f"{i}. [{result['source']}] {result['content'][:200]}...")
# 添加记忆结果
if response["memory_results"]:
context_parts.append("\n=== 历史记忆 ===")
for i, result in enumerate(response["memory_results"], 1):
context_parts.append(f"{i}. [{result['type']}] {result['content'][:200]}...")
response["context"] = "\n".join(context_parts)
# 4. 生成建议(简化:基于检索结果)
if response["rag_results"] or response["memory_results"]:
response["suggestions"] = [
"基于检索到的信息,您可以...",
"相关背景包括...",
"建议进一步查询..."
]
else:
response["suggestions"] = [
"未找到相关信息,建议...",
"可以尝试重新表述问题..."
]
self.stats["hallucinations_prevented"] += 1
return response
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
**self.stats,
"rag_documents": len(self.rag.documents),
"memory_traces": len(self.memory.memories)
}
# 使用示例
if __name__ == "__main__":
print("=== RAG 检索增强生成与 Agent 记忆 ===\n")
# 创建融合 Agent
agent = RAGMemoryAgent(embedding_dim=128)
print("=== 添加知识文档 ===")
# 模拟文档
np.random.seed(42)
docs = [
Document(
id="doc1",
content="RAG(检索增强生成)是一种结合检索和生成的技术,通过从外部知识库检索相关信息来增强 LLM 的生成能力。",
embedding=np.random.randn(128),
metadata={"category": "AI 技术"},
source="AI 百科"
),
Document(
id="doc2",
content="Agent 记忆系统包括情景记忆(记录具体事件)、语义记忆(存储概念知识)和程序记忆(保存技能)。",
embedding=np.random.randn(128),
metadata={"category": "Agent 架构"},
source="Agent 手册"
),
Document(
id="doc3",
content="混合检索结合向量检索和关键词检索的优势,既能捕捉语义相似性,又能保证关键词匹配精度。",
embedding=np.random.randn(128),
metadata={"category": "检索技术"},
source="检索指南"
)
]
agent.add_knowledge(docs)
print(f"添加{len(docs)}篇文档到 RAG 系统")
print(f"\n=== 存储经验记忆 ===")
# 存储经验
agent.store_experience(
content="用户询问了 RAG 技术的基本原理,解释了检索 - 增强 - 生成的流程",
context={"topic": "RAG", "user_satisfaction": "high"}
)
agent.store_experience(
content="用户咨询了 Agent 记忆系统,讨论了情景记忆和语义记忆的区别",
context={"topic": "Agent Memory", "follow_up": True}
)
print("存储 2 条经验记忆")
print(f"\n=== 系统统计 ===")
stats = agent.get_stats()
print(f"RAG 文档:{stats['rag_documents']}")
print(f"记忆痕迹:{stats['memory_traces']}")
print(f"\n=== 融合查询 ===")
# 模拟查询
query_embedding = np.random.randn(128)
result = agent.query(
query_text="RAG 和 Agent 记忆如何协同工作?",
query_embedding=query_embedding,
use_rag=True,
use_memory=True,
top_k=3
)
print(f"查询:{result['query']}")
print(f"\nRAG 结果:{len(result['rag_results'])} 个")
for i, r in enumerate(result['rag_results'], 1):
print(f" {i}. [{r['source']}] 相关性:{r['score']:.3f}")
print(f" {r['content'][:80]}...")
print(f"\n记忆结果:{len(result['memory_results'])} 个")
for i, m in enumerate(result['memory_results'], 1):
print(f" {i}. [{m['type']}] 相似度:{m['score']:.3f}")
print(f" {m['content'][:80]}...")
print(f"\n融合上下文长度:{len(result['context'])} 字符")
print(f"生成建议:{len(result['suggestions'])} 条")
print(f"\n=== 最终统计 ===")
final_stats = agent.get_stats()
print(f"总查询数:{final_stats['total_queries']}")
print(f"RAG 检索次数:{final_stats['rag_retrievals']}")
print(f"记忆检索次数:{final_stats['memory_retrievals']}")
print(f"幻觉防止次数:{final_stats['hallucinations_prevented']}")
print(f"\n关键观察:")
print("1. RAG 检索:从外部知识库检索相关信息")
print("2. Agent 记忆:从历史经验中检索相关记忆")
print("3. 融合技术:RAG+ 记忆协同,构建丰富上下文")
print("4. 增强生成:基于检索结果生成可靠回答")
print("5. 幻觉抑制:无检索结果时防止胡编乱造")
print("\n融合的核心:RAG(外部知识) + 记忆 (历史经验) = 动态认知 Agent")