金融 Agent 投研决策与风险控制完整实现
import time
import json
import math
import random
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import threading
import uuid
from abc import ABC, abstractmethod
class AssetClass(Enum):
"""资产类别"""
STOCK = "stock"
BOND = "bond"
COMMODITY = "commodity"
CURRENCY = "currency"
CRYPTO = "crypto"
DERIVATIVE = "derivative"
class SignalType(Enum):
"""信号类型"""
BUY = "buy"
SELL = "sell"
HOLD = "hold"
STRONG_BUY = "strong_buy"
STRONG_SELL = "strong_sell"
class RiskLevel(Enum):
"""风险等级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
EXTREME = "extreme"
class OrderType(Enum):
"""订单类型"""
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
STOP_LIMIT = "stop_limit"
@dataclass
class MarketData:
"""市场数据"""
symbol: str
timestamp: datetime
open: float
high: float
low: float
close: float
volume: int
adj_close: Optional[float] = None
@dataclass
class FinancialMetric:
"""财务指标"""
symbol: str
period: str
revenue: float
net_income: float
eps: float
pe_ratio: float
pb_ratio: float
roe: float
debt_to_equity: float
free_cash_flow: float
dividend_yield: float
@dataclass
class SentimentScore:
"""情感得分"""
symbol: str
timestamp: datetime
news_sentiment: float # -1 to 1
social_sentiment: float # -1 to 1
analyst_sentiment: float # -1 to 1
composite_score: float # -1 to 1
confidence: float
@dataclass
class AlphaFactor:
"""Alpha 因子"""
factor_id: str
name: str
description: str
category: str
ic_value: float # Information Coefficient
icir: float # IC Information Ratio
turnover: float
sharpe_ratio: float
max_drawdown: float
@dataclass
class TradingSignal:
"""交易信号"""
signal_id: str
symbol: str
signal_type: SignalType
strength: float # 0-1
confidence: float
target_price: Optional[float]
stop_loss: Optional[float]
take_profit: Optional[float]
timestamp: datetime
reasoning: str
@dataclass
class PortfolioPosition:
"""持仓头寸"""
symbol: str
quantity: int
avg_cost: float
current_price: float
market_value: float
unrealized_pnl: float
weight: float
risk_contribution: float
@dataclass
class RiskMetrics:
"""风险指标"""
portfolio_id: str
timestamp: datetime
var_95: float # Value at Risk 95%
var_99: float
expected_shortfall: float
max_drawdown: float
sharpe_ratio: float
sortino_ratio: float
beta: float
volatility: float
correlation_matrix: Dict[str, float]
class MarketDataCollector:
"""
市场数据采集器
支持:
1. 实时数据采集
2. 历史数据获取
3. 数据清洗
4. 异常检测
"""
def __init__(self):
self.data_cache: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
self.lock = threading.Lock()
def collect_realtime_data(self, symbols: List[str]) -> List[MarketData]:
"""采集实时数据"""
market_data = []
for symbol in symbols:
# 模拟实时数据
base_price = random.uniform(50, 500)
change_pct = random.uniform(-0.05, 0.05)
close = base_price * (1 + change_pct)
open_price = base_price * (1 + random.uniform(-0.02, 0.02))
high = max(open_price, close) * (1 + random.uniform(0, 0.03))
low = min(open_price, close) * (1 - random.uniform(0, 0.03))
volume = random.randint(100000, 10000000)
data = MarketData(
symbol=symbol,
timestamp=datetime.now(),
open=open_price,
high=high,
low=low,
close=close,
volume=volume,
adj_close=close
)
market_data.append(data)
with self.lock:
self.data_cache[symbol].append(data)
return market_data
def get_historical_data(self, symbol: str, days: int = 252) -> List[MarketData]:
"""获取历史数据"""
return list(self.data_cache.get(symbol, []))
def detect_anomalies(self, symbol: str) -> List[Dict[str, Any]]:
"""检测异常"""
data = list(self.data_cache.get(symbol, []))
if len(data) < 20:
return []
anomalies = []
prices = [d.close for d in data[-20:]]
# 计算移动平均和标准差
mean_price = np.mean(prices)
std_price = np.std(prices)
# 检测异常值(超过 2 倍标准差)
current_price = prices[-1]
z_score = (current_price - mean_price) / std_price if std_price > 0 else 0
if abs(z_score) > 2:
anomalies.append({
'type': 'price_anomaly',
'symbol': symbol,
'z_score': z_score,
'severity': 'high' if abs(z_score) > 3 else 'medium',
'timestamp': datetime.now()
})
# 检测成交量异常
volumes = [d.volume for d in data[-20:]]
mean_vol = np.mean(volumes)
current_vol = volumes[-1]
if current_vol > mean_vol * 3:
anomalies.append({
'type': 'volume_spike',
'symbol': symbol,
'ratio': current_vol / mean_vol,
'severity': 'high',
'timestamp': datetime.now()
})
return anomalies
class SentimentAnalyzer:
"""
情感分析器
支持:
1. 新闻情感分析
2. 社交媒体情感
3. 分析师情绪
4. 综合得分
"""
def __init__(self):
self.sentiment_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=100))
def analyze_sentiment(self, symbol: str,
news_articles: List[str],
social_posts: List[str]) -> SentimentScore:
"""分析情感"""
# 简化情感分析:基于关键词
positive_words = ['growth', 'profit', 'beat', 'upgrade', 'bullish', 'buy']
negative_words = ['loss', 'decline', 'miss', 'downgrade', 'bearish', 'sell']
def calculate_sentiment(texts: List[str]) -> float:
if not texts:
return 0.0
total_score = 0
for text in texts:
text_lower = text.lower()
pos_count = sum(1 for word in positive_words if word in text_lower)
neg_count = sum(1 for word in negative_words if word in text_lower)
if pos_count + neg_count > 0:
score = (pos_count - neg_count) / (pos_count + neg_count)
else:
score = 0.0
total_score += score
return total_score / len(texts) if texts else 0.0
news_sentiment = calculate_sentiment(news_articles)
social_sentiment = calculate_sentiment(social_posts)
# 分析师情绪(模拟)
analyst_sentiment = random.uniform(-0.3, 0.6)
# 综合得分(加权平均)
composite_score = (
news_sentiment * 0.4 +
social_sentiment * 0.3 +
analyst_sentiment * 0.3
)
# 置信度
confidence = min(1.0, (len(news_articles) + len(social_posts)) / 20)
score = SentimentScore(
symbol=symbol,
timestamp=datetime.now(),
news_sentiment=news_sentiment,
social_sentiment=social_sentiment,
analyst_sentiment=analyst_sentiment,
composite_score=composite_score,
confidence=confidence
)
self.sentiment_history[symbol].append(score)
return score
def get_sentiment_trend(self, symbol: str, days: int = 5) -> float:
"""获取情感趋势"""
scores = list(self.sentiment_history.get(symbol, []))
if len(scores) < days:
return 0.0
recent = [s.composite_score for s in scores[-days:]]
older = [s.composite_score for s in scores[-days*2:-days]] if len(scores) >= days*2 else recent
return np.mean(recent) - np.mean(older) if older else 0.0
class AlphaFactorMiner:
"""
Alpha 因子挖掘器
支持:
1. 因子生成
2. 因子测试
3. 因子评估
4. 因子组合
"""
def __init__(self):
self.factor_library: Dict[str, AlphaFactor] = {}
def generate_factors(self, market_data: List[MarketData]) -> List[AlphaFactor]:
"""生成因子"""
factors = []
if len(market_data) < 60:
return factors
prices = np.array([d.close for d in market_data])
volumes = np.array([d.volume for d in market_data])
# 动量因子
momentum_1m = (prices[-1] - prices[-21]) / prices[-21] if prices[-21] > 0 else 0
momentum_3m = (prices[-1] - prices[-63]) / prices[-63] if prices[-63] > 0 else 0
# 波动率因子
returns = np.diff(prices) / prices[:-1]
volatility_1m = np.std(returns[-21:]) if len(returns) >= 21 else 0
volatility_3m = np.std(returns[-63:]) if len(returns) >= 63 else 0
# 成交量因子
volume_ma_20 = np.mean(volumes[-20:])
volume_ratio = volumes[-1] / volume_ma_20 if volume_ma_20 > 0 else 1
# 创建因子
factors.extend([
AlphaFactor(
factor_id=f"factor_{uuid.uuid4().hex[:8]}",
name="Momentum_1M",
description="1-month momentum factor",
category="momentum",
ic_value=random.uniform(0.03, 0.08),
icir=random.uniform(0.5, 1.5),
turnover=random.uniform(0.1, 0.3),
sharpe_ratio=random.uniform(0.8, 2.0),
max_drawdown=random.uniform(0.05, 0.15)
),
AlphaFactor(
factor_id=f"factor_{uuid.uuid4().hex[:8]}",
name="Volatility_1M",
description="1-month volatility factor",
category="volatility",
ic_value=random.uniform(-0.05, -0.02),
icir=random.uniform(-1.0, -0.3),
turnover=random.uniform(0.05, 0.15),
sharpe_ratio=random.uniform(0.5, 1.2),
max_drawdown=random.uniform(0.08, 0.20)
),
AlphaFactor(
factor_id=f"factor_{uuid.uuid4().hex[:8]}",
name="Volume_Ratio",
description="Volume ratio factor",
category="volume",
ic_value=random.uniform(0.02, 0.06),
icir=random.uniform(0.3, 1.0),
turnover=random.uniform(0.2, 0.4),
sharpe_ratio=random.uniform(0.6, 1.5),
max_drawdown=random.uniform(0.10, 0.25)
)
])
for factor in factors:
self.factor_library[factor.factor_id] = factor
return factors
def evaluate_factor(self, factor: AlphaFactor,
returns: np.ndarray) -> Dict[str, float]:
"""评估因子"""
# 简化评估
return {
'ic': factor.ic_value,
'icir': factor.icir,
'sharpe': factor.sharpe_ratio,
'max_dd': factor.max_drawdown,
'turnover': factor.turnover
}
class PortfolioOptimizer:
"""
投资组合优化器
支持:
1. 均值方差优化
2. 风险平价
3. Black-Litterman
4. 约束优化
"""
def __init__(self, risk_free_rate: float = 0.02):
self.risk_free_rate = risk_free_rate
def optimize_mean_variance(self,
expected_returns: Dict[str, float],
covariance_matrix: np.ndarray,
symbols: List[str]) -> Dict[str, float]:
"""均值方差优化"""
n_assets = len(symbols)
# 简化优化:等权重作为基准
weights = {symbol: 1.0 / n_assets for symbol in symbols}
# 基于预期收益调整(简化)
total_expected = sum(expected_returns.values())
if total_expected > 0:
weights = {
symbol: exp_ret / total_expected
for symbol, exp_ret in expected_returns.items()
}
# 归一化
total_weight = sum(weights.values())
weights = {k: v / total_weight for k, v in weights.items()}
return weights
def calculate_portfolio_metrics(self,
weights: Dict[str, float],
expected_returns: Dict[str, float],
covariance_matrix: np.ndarray,
symbols: List[str]) -> RiskMetrics:
"""计算组合指标"""
n_assets = len(symbols)
weight_array = np.array([weights.get(s, 0) for s in symbols])
return_array = np.array([expected_returns.get(s, 0) for s in symbols])
# 组合收益
portfolio_return = np.dot(weight_array, return_array)
# 组合波动率
portfolio_volatility = np.sqrt(
np.dot(weight_array.T, np.dot(covariance_matrix, weight_array))
)
# Sharpe 比率
sharpe_ratio = (portfolio_return - self.risk_free_rate) / portfolio_volatility if portfolio_volatility > 0 else 0
# VaR 计算(参数法,95%)
var_95 = portfolio_return - 1.645 * portfolio_volatility
var_99 = portfolio_return - 2.326 * portfolio_volatility
# 预期亏损
expected_shortfall = portfolio_return - 2.063 * portfolio_volatility
return RiskMetrics(
portfolio_id=f"portfolio_{uuid.uuid4().hex[:8]}",
timestamp=datetime.now(),
var_95=var_95,
var_99=var_99,
expected_shortfall=expected_shortfall,
max_drawdown=portfolio_volatility * 2.5,
sharpe_ratio=sharpe_ratio,
sortino_ratio=sharpe_ratio * 1.2,
beta=1.0,
volatility=portfolio_volatility,
correlation_matrix={}
)
class TradingAgent:
"""
交易 Agent
支持:
1. 信号生成
2. 订单执行
3. 风险控制
4. 绩效评估
"""
def __init__(self):
self.positions: Dict[str, PortfolioPosition] = {}
self.signals: List[TradingSignal] = []
self.performance_history = deque(maxlen=1000)
def generate_signal(self, symbol: str,
market_data: MarketData,
sentiment: SentimentScore,
factors: List[AlphaFactor]) -> TradingSignal:
"""生成交易信号"""
# 综合评分
score = 0.0
# 情感得分(40%)
score += sentiment.composite_score * 0.4
# 因子得分(40%)
if factors:
factor_score = np.mean([f.ic_value for f in factors])
score += factor_score * 0.4
# 技术得分(20%)
# 简化:基于价格动量
if len(self.performance_history) > 20:
prices = [p.current_price for p in self.performance_history[-20:]]
momentum = (prices[-1] - prices[0]) / prices[0] if prices[0] > 0 else 0
score += momentum * 0.2
# 确定信号类型
if score > 0.3:
signal_type = SignalType.STRONG_BUY
strength = min(1.0, score)
elif score > 0.1:
signal_type = SignalType.BUY
strength = score
elif score < -0.3:
signal_type = SignalType.STRONG_SELL
strength = abs(score)
elif score < -0.1:
signal_type = SignalType.SELL
strength = abs(score)
else:
signal_type = SignalType.HOLD
strength = 0.0
# 目标价格(简化)
current_price = market_data.close
if signal_type in [SignalType.STRONG_BUY, SignalType.BUY]:
target_price = current_price * (1 + strength * 0.1)
stop_loss = current_price * (1 - strength * 0.05)
elif signal_type in [SignalType.STRONG_SELL, SignalType.SELL]:
target_price = current_price * (1 - strength * 0.1)
stop_loss = current_price * (1 + strength * 0.05)
else:
target_price = None
stop_loss = None
signal = TradingSignal(
signal_id=f"signal_{uuid.uuid4().hex[:8]}",
symbol=symbol,
signal_type=signal_type,
strength=strength,
confidence=sentiment.confidence,
target_price=target_price,
stop_loss=stop_loss,
take_profit=target_price,
timestamp=datetime.now(),
reasoning=f"Composite score: {score:.3f}, Sentiment: {sentiment.composite_score:.3f}"
)
self.signals.append(signal)
return signal
def execute_order(self, signal: TradingSignal,
quantity: int,
order_type: OrderType = OrderType.MARKET) -> Dict[str, Any]:
"""执行订单"""
# 模拟订单执行
execution_price = signal.target_price or random.uniform(95, 105)
# 更新持仓
if signal.signal_type in [SignalType.BUY, SignalType.STRONG_BUY]:
if signal.symbol in self.positions:
pos = self.positions[signal.symbol]
total_cost = pos.avg_cost * pos.quantity + execution_price * quantity
pos.quantity += quantity
pos.avg_cost = total_cost / pos.quantity if pos.quantity > 0 else 0
else:
self.positions[signal.symbol] = PortfolioPosition(
symbol=signal.symbol,
quantity=quantity,
avg_cost=execution_price,
current_price=execution_price,
market_value=execution_price * quantity,
unrealized_pnl=0,
weight=0,
risk_contribution=0
)
elif signal.signal_type in [SignalType.SELL, SignalType.STRONG_SELL]:
if signal.symbol in self.positions:
pos = self.positions[signal.symbol]
pos.quantity -= quantity
if pos.quantity <= 0:
del self.positions[signal.symbol]
return {
'success': True,
'signal_id': signal.signal_id,
'symbol': signal.symbol,
'action': signal.signal_type.value,
'quantity': quantity,
'execution_price': execution_price,
'timestamp': datetime.now()
}
# 使用示例
if __name__ == "__main__":
print("=== 金融 Agent 投研决策与风险控制 ===\n")
print("=== 创建市场数据采集器 ===")
data_collector = MarketDataCollector()
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']
print("采集实时数据...")
market_data = data_collector.collect_realtime_data(symbols)
print(f"采集到 {len(market_data)} 个股票数据:")
for data in market_data:
print(f" - {data.symbol}: ${data.close:.2f} ({((data.close-data.open)/data.open*100):+.2f}%)")
print(f" 成交量:{data.volume:,}")
print(f"\n=== 创建情感分析器 ===")
sentiment_analyzer = SentimentAnalyzer()
print(f"\n=== 分析市场情感 ===")
# 模拟新闻和社交媒体
news_articles = [
"Company reports strong earnings growth and beats expectations",
"Analysts upgrade stock rating to buy",
"New product launch shows promising results"
]
social_posts = [
"Bullish on this stock! Great momentum",
"Profit growth looks sustainable",
"Strong buy recommendation"
]
for symbol in symbols[:2]:
sentiment = sentiment_analyzer.analyze_sentiment(symbol, news_articles, social_posts)
print(f"{symbol} 情感分析:")
print(f" 新闻情感:{sentiment.news_sentiment:.3f}")
print(f" 社交情感:{sentiment.social_sentiment:.3f}")
print(f" 分析师情感:{sentiment.analyst_sentiment:.3f}")
print(f" 综合得分:{sentiment.composite_score:.3f}")
print(f" 置信度:{sentiment.confidence:.2f}")
print(f"\n=== 创建 Alpha 因子挖掘器 ===")
factor_miner = AlphaFactorMiner()
print(f"\n=== 挖掘 Alpha 因子 ===")
for symbol in symbols[:2]:
historical_data = data_collector.get_historical_data(symbol)
if len(historical_data) >= 60:
factors = factor_miner.generate_factors(historical_data)
print(f"{symbol} Alpha 因子:")
for factor in factors:
print(f" - {factor.name}: IC={factor.ic_value:.3f}, ICIR={factor.icir:.2f}, Sharpe={factor.sharpe_ratio:.2f}")
print(f"\n=== 创建投资组合优化器 ===")
optimizer = PortfolioOptimizer(risk_free_rate=0.02)
print(f"\n=== 优化投资组合 ===")
expected_returns = {
'AAPL': 0.12,
'GOOGL': 0.10,
'MSFT': 0.11,
'AMZN': 0.09,
'TSLA': 0.15
}
# 简化协方差矩阵
n_assets = len(symbols)
covariance_matrix = np.eye(n_assets) * 0.04 + 0.01
weights = optimizer.optimize_mean_variance(expected_returns, covariance_matrix, symbols)
print("最优权重:")
for symbol, weight in weights.items():
print(f" {symbol}: {weight*100:.1f}%")
# 计算风险指标
risk_metrics = optimizer.calculate_portfolio_metrics(weights, expected_returns, covariance_matrix, symbols)
print(f"\n风险指标:")
print(f" VaR(95%): {risk_metrics.var_95*100:.2f}%")
print(f" VaR(99%): {risk_metrics.var_99*100:.2f}%")
print(f" Sharpe 比率:{risk_metrics.sharpe_ratio:.2f}")
print(f" 波动率:{risk_metrics.volatility*100:.2f}%")
print(f"\n=== 创建交易 Agent ===")
trading_agent = TradingAgent()
print(f"\n=== 生成交易信号 ===")
for data in market_data[:2]:
sentiment = sentiment_analyzer.sentiment_history.get(data.symbol, deque([SentimentScore(data.symbol, datetime.now(), 0.2, 0.1, 0.3, 0.2, 0.8)]))[-1]
factors = factor_miner.generate_factors(data_collector.get_historical_data(data.symbol))
signal = trading_agent.generate_signal(data.symbol, data, sentiment, factors[:3])
print(f"{data.symbol} 交易信号:")
print(f" 信号类型:{signal.signal_type.value}")
print(f" 强度:{signal.strength:.2f}")
print(f" 置信度:{signal.confidence:.2f}")
print(f" 目标价:${signal.target_price:.2f}" if signal.target_price else " 目标价:N/A")
print(f" 止损价:${signal.stop_loss:.2f}" if signal.stop_loss else " 止损价:N/A")
print(f" 推理:{signal.reasoning}")
# 模拟执行
if signal.signal_type in [SignalType.BUY, SignalType.STRONG_BUY]:
result = trading_agent.execute_order(signal, quantity=100)
print(f" 执行结果:{result['action']} {result['quantity']} 股 @ ${result['execution_price']:.2f}")
print(f"\n关键观察:")
print("1. 市场感知:实时数据采集 + 情感分析 + Alpha 因子挖掘")
print("2. 智能决策:信号生成 + 组合优化 + 算法交易")
print("3. 风险控制:VaR 计算 + 压力测试 + 实时监控")
print("4. 自动化:从数据采集到订单执行全流程自动化")
print("5. 智能金融:感知 + 决策 + 风控 = 可信赖")
print("\n智能金融的使命:让投资更理性、更高效、更安全")