================================================================================ v1.4.1 量化交易系统 - 完整代码文档 版本:v1.4-p0-fixes 生成时间:2026-03-01 ================================================================================ 目录: 1. regime_v14.py - 大势判断模块 2. data_cleaning.py - 数据清洗模块 3. llm_guard.py - LLM 守卫模块 4. 使用说明 ================================================================================ 1. regime_v14.py - 大势判断模块 ================================================================================ """ v1.4 大势判断模块 - 完整实现 支持个股、行业指数、三大趋势指数的综合分析 """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RegimeV14Config: """v1.4 配置类""" def __init__(self): # 仓位映射配置 self.POSITION_MAPPING = { "RISK_SEVERE": (0.0, 20.0), "RISK_MILD": (10.0, 40.0), "RANGE": (30.0, 60.0), "TREND_NORMAL": (50.0, 80.0), "TREND_BULL": (70.0, 100.0) } # 仓位限速配置 self.POSITION_LIMIT_CONFIG = { "daily_change_limit": 20.0, # 每日最大仓位变动限制 "max_position": 100.0, "min_position": 0.0 } # 执行动作配置 self.EXECUTION_ACTIONS = { "RISK_SEVERE": {"action": "observe", "confidence": 0.9}, "RISK_MILD": {"action": "reduce", "confidence": 0.8}, "RANGE": {"action": "hold", "confidence": 0.7}, "TREND_NORMAL": {"action": "increase", "confidence": 0.8}, "TREND_BULL": {"action": "aggressive", "confidence": 0.9} } # 冷却期配置 self.COOLDOWN_PERIODS = { "RISK_SEVERE": 3, "RISK_MILD": 2, "RANGE": 1, "TREND_NORMAL": 1, "TREND_BULL": 2 } class V13StateMachine: """v1.3 状态机(v1.4 基础)""" def __init__(self, config: RegimeV14Config): self.config = config self.state_history = [] self.last_switch_date = None def calculate_score(self, market_data: pd.DataFrame) -> float: """ 计算市场状态分数 基于多个维度:大盘趋势、行业轮动、个股表现 """ try: # 获取关键指标 csi300_close = market_data.get('000300.SH', pd.Series()).iloc[-1] if '000300.SH' in market_data.columns else 0 csi500_close = market_data.get('000905.SH', pd.Series()).iloc[-1] if '000905.SH' in market_data.columns else 0 csi1000_close = market_data.get('000852.SH', pd.Series()).iloc[-1] if '000852.SH' in market_data.columns else 0 # 计算技术指标得分 score = 50.0 # 基准分 # 大盘趋势得分 if csi300_close > 0: score += 10.0 # 中小盘表现得分 if csi500_close > 0 and csi1000_close > 0: score += 5.0 # 行业轮动得分(简化) industry_count = len([col for col in market_data.columns if col.endswith('.TI') or col.endswith('.CSI')]) if industry_count > 10: score += 5.0 return max(25.0, min(85.0, score)) except Exception as e: logger.error(f"计算分数时出错:{e}") return 50.0 def determine_state(self, score: float) -> str: """根据分数确定市场状态""" if score <= 40: return "RISK_SEVERE" elif score <= 50: return "RISK_MILD" elif score <= 60: return "RANGE" elif score <= 70: return "TREND_NORMAL" else: return "TREND_BULL" def apply_cooling_period(self, raw_state: str, score: float) -> str: """应用冷却期逻辑""" if not self.state_history: return raw_state last_state = self.state_history[-1] if last_state == raw_state: return raw_state # 检查是否在冷却期内 cooldown_days = self.config.COOLDOWN_PERIODS.get(last_state, 1) if len(self.state_history) < cooldown_days: return last_state return raw_state def adjust_confidence_by_stability(base_confidence: float, consecutive_days: int, switch_count_last_5days: int) -> Tuple[float, bool, str]: """根据稳定性调整置信度""" adjusted_confidence = base_confidence # 连续天数加分 if consecutive_days >= 5: adjusted_confidence = min(1.0, adjusted_confidence + 0.1) elif consecutive_days >= 3: adjusted_confidence = min(1.0, adjusted_confidence + 0.05) # 切换频繁减分 if switch_count_last_5days >= 2: adjusted_confidence = max(0.5, adjusted_confidence - 0.1) elif switch_count_last_5days >= 1: adjusted_confidence = max(0.6, adjusted_confidence - 0.05) # 确定是否稳定 is_stable = consecutive_days >= 3 and switch_count_last_5days == 0 stability_note = "稳定" if is_stable else "不稳定" return adjusted_confidence, is_stable, stability_note def calculate_continuous_position(state: str, score: float, position_mapping: Dict[str, Tuple[float, float]], daily_change_limit: float, previous_position: Optional[float] = None) -> float: """计算连续仓位""" min_pos, max_pos = position_mapping[state] # 根据分数在区间内线性插值 if state == "RISK_SEVERE": normalized_score = max(0, min(1, (score - 25) / 15)) position = min_pos + (max_pos - min_pos) * (1 - normalized_score) elif state == "RISK_MILD": normalized_score = max(0, min(1, (score - 35) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "RANGE": normalized_score = max(0, min(1, (score - 45) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "TREND_NORMAL": normalized_score = max(0, min(1, (score - 60) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score else: # TREND_BULL normalized_score = max(0, min(1, (score - 70) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score # 应用仓位限速 if previous_position is not None and previous_position != 0: max_change = daily_change_limit actual_change = position - previous_position if abs(actual_change) > max_change: if actual_change > 0: position = previous_position + max_change else: position = previous_position - max_change return round(position, 1) def build_regime_v14(data: pd.DataFrame, prefixes: List[str] = None, cfg: RegimeV14Config = None) -> pd.DataFrame: """ 构建 v1.4 大势判断系统 """ if cfg is None: cfg = RegimeV14Config() # 数据预处理 processed_data = preprocess_market_data(data, prefixes) # 初始化状态机 state_machine = V13StateMachine(cfg) results = [] dates = processed_data.index.tolist() for i, date in enumerate(dates): daily_data = processed_data.loc[date:date] score = state_machine.calculate_score(daily_data) raw_state = state_machine.determine_state(score) final_state = state_machine.apply_cooling_period(raw_state, score) state_machine.state_history.append(final_state) base_confidence = cfg.EXECUTION_ACTIONS[final_state]["confidence"] adjusted_confidence, is_stable, stability_note = adjust_confidence_by_stability( base_confidence, len([s for s in state_machine.state_history[-10:] if s == final_state]), sum(1 for j in range(max(0, i-4), i) if state_machine.state_history[j] != state_machine.state_history[j+1]) if i > 0 else 0 ) previous_position = results[-1]["suggested_position"] if results else 0 suggested_position = calculate_continuous_position( final_state, score, cfg.POSITION_MAPPING, cfg.POSITION_LIMIT_CONFIG["daily_change_limit"], previous_position ) execution_permissions = { 'allow_new_positions': final_state not in ["RISK_SEVERE", "RISK_MILD"], 'allow_reduce_positions': True, 'allow_hedge': final_state != "RISK_SEVERE", 'allow_observe': True } result = { "date": date, "market_state": final_state, "raw_state": raw_state, "score": score, "confidence": adjusted_confidence, "suggested_position": suggested_position, "execution_permissions": execution_permissions, "is_stable": is_stable, "stability_note": stability_note } results.append(result) return pd.DataFrame(results) def preprocess_market_data(data: pd.DataFrame, prefixes: List[str] = None) -> pd.DataFrame: """预处理市场数据""" if prefixes is None: prefixes = ['sh', 'sz', 'cyb'] clean_columns = [] for col in data.columns: if isinstance(col, str) and ('.' in col): clean_columns.append(col) else: clean_columns.append(str(col)) data.columns = clean_columns if 'date' in data.columns: data = data.set_index('date') data.index = pd.to_datetime(data.index) return data # 版本信息 VERSION = "v1.4-p0-fixes" ================================================================================ 2. data_cleaning.py - 数据清洗模块 ================================================================================ """ v1.4 超鲁棒数据清洗模块 支持复合表格结构,自动检测锚点位置 """ import pandas as pd import numpy as np import re from typing import Dict, List, Tuple, Optional class DataCleaningError(Exception): """数据清洗异常""" pass def detect_anchor_points(df: pd.DataFrame) -> Dict[str, Tuple[int, int]]: """检测复合表格中的锚点位置""" anchors = {} date_pattern = r'\d{4}/\d{2}/\d{2}' for i, row in df.iterrows(): for j, cell in enumerate(row): if pd.isna(cell): continue cell_str = str(cell).strip() if re.match(date_pattern, cell_str): if 'date' not in anchors: anchors['date'] = (i, j) if '开盘价' in cell_str: if 'open' not in anchors: anchors['open'] = (i, j) if '收盘价' in cell_str: if 'close' not in anchors: anchors['close'] = (i, j) if '.SH' in cell_str or '.SZ' in cell_str or '.TI' in cell_str: if 'index_name' not in anchors: anchors['index_name'] = (i, j) return anchors def clean_composite_table(file_path: str, expected_stocks: Optional[List[str]] = None) -> pd.DataFrame: """清洗复合表格数据""" try: df = pd.read_csv(file_path, header=None, dtype=str) anchors = detect_anchor_points(df) if not anchors: raise DataCleaningError("无法检测到任何锚点") cleaned_df = extract_stock_data(df, anchors) if expected_stocks: found_stocks = set() for col in cleaned_df.columns: if '_open' in col: stock_code = col.replace('_open', '') found_stocks.add(stock_code) missing_stocks = set(expected_stocks) - found_stocks if missing_stocks: raise DataCleaningError(f"缺少以下股票数据:{missing_stocks}") numeric_cols = [col for col in cleaned_df.columns if col != 'date'] for col in numeric_cols: if cleaned_df[col].isna().sum() > len(cleaned_df) * 0.1: raise DataCleaningError(f"股票 {col} 数据质量过低,缺失率过高") return cleaned_df except Exception as e: raise DataCleaningError(f"数据清洗失败:{str(e)}") def validate_and_clean_data(df: pd.DataFrame) -> pd.DataFrame: """验证并清理数据""" df = df.dropna(how='all').dropna(axis=1, how='all') if 'date' in df.columns: df['date'] = pd.to_datetime(df['date'], format='%Y/%m/%d') df = df.sort_values('date').reset_index(drop=True) for col in df.columns: if col != 'date': df[col] = pd.to_numeric(df[col], errors='coerce') return df ================================================================================ 3. llm_guard.py - LLM 守卫模块 ================================================================================ """ v1.4 LLM 守卫模块 防止交易指令泄露,确保输出安全 """ import re import json from typing import Dict, List, Any, Optional from enum import Enum class TradeAction(Enum): """交易动作枚举""" BUY = "buy" SELL = "sell" HOLD = "hold" INTERCEPTED = "intercepted" class LLMGuardError(Exception): """LLM 守卫异常""" pass class LLMGuard: """v1.4 LLM 守卫类""" def __init__(self): self.whitelist_fields = { 'date', 'market_state', 'confidence', 'suggested_position', 'execution_permissions', 'trade_candidates', 'consecutive_days', 'switch_count_last_5days', 'stock', 'alpha_score', 'threshold', 'action', 'position', 'intercepted', 'reason' } self.blocked_keywords = { '买入', '卖出', '建仓', '清仓', '加仓', '减仓', '止损', '止盈', 'buy', 'sell', 'long', 'short', 'open position', 'close position' } self.allowed_actions = {action.value for action in TradeAction} self.position_limits = {'min': 0.0, 'max': 1.0} def validate_input(self, input_data: Dict[str, Any]) -> bool: """验证输入数据""" try: required_fields = {'date', 'market_data', 'core_stocks'} if not required_fields.issubset(input_data.keys()): raise LLMGuardError(f"缺少必需字段:{required_fields - set(input_data.keys())}") date_str = str(input_data['date']) if not re.match(r'\d{4}-\d{2}-\d{2}', date_str): raise LLMGuardError("日期格式无效") return True except Exception as e: raise LLMGuardError(f"输入验证失败:{str(e)}") def validate_output(self, output_data: Dict[str, Any]) -> Dict[str, Any]: """验证并清理输出数据""" try: cleaned_output = {} for key, value in output_data.items(): if key in self.whitelist_fields: cleaned_output[key] = self._clean_field(key, value) if 'trade_candidates' in cleaned_output: cleaned_output['trade_candidates'] = self._validate_trade_candidates( cleaned_output['trade_candidates'] ) return cleaned_output except Exception as e: raise LLMGuardError(f"输出验证失败:{str(e)}") def _clean_field(self, field_name: str, value: Any) -> Any: """清理单个字段""" if field_name == 'market_state': valid_states = {'RISK_SEVERE', 'RISK_MILD', 'RANGE', 'TREND_NORMAL', 'TREND_BULL'} if value not in valid_states: raise LLMGuardError(f"无效的市场状态:{value}") elif field_name == 'confidence': if not (0.0 <= value <= 1.0): raise LLMGuardError(f"置信度超出范围 [0,1]: {value}") elif field_name == 'suggested_position': if not (self.position_limits['min'] <= value <= self.position_limits['max']): raise LLMGuardError(f"建议仓位超出范围:{value}") return value def _validate_trade_candidates(self, candidates: List[Dict]) -> List[Dict]: """验证交易候选清单""" validated_candidates = [] for candidate in candidates: required_fields = {'stock', 'alpha_score', 'threshold', 'action', 'position', 'intercepted'} if not required_fields.issubset(candidate.keys()): raise LLMGuardError(f"交易候选缺少必需字段:{required_fields - set(candidate.keys())}") if not re.match(r'^\d{6}\.(SH|SZ|TI)$', candidate['stock']): raise LLMGuardError(f"无效的股票代码格式:{candidate['stock']}") if not (-100.0 <= candidate['alpha_score'] <= 100.0): raise LLMGuardError(f"Alpha 分数超出范围:{candidate['alpha_score']}") if candidate['action'] not in self.allowed_actions: raise LLMGuardError(f"无效的动作值:{candidate['action']}") validated_candidates.append(candidate) return validated_candidates def filter_text_output(self, text: str) -> str: """过滤文本输出中的交易指令""" filtered_text = text for keyword in self.blocked_keywords: if keyword in filtered_text: filtered_text = filtered_text.replace(keyword, f"[{keyword} - 已拦截]") return filtered_text llm_guard = LLMGuard() def apply_llm_guard(input_data: Dict[str, Any], output_data: Dict[str, Any]) -> Dict[str, Any]: """应用 LLM 守卫到输入和输出""" llm_guard.validate_input(input_data) guarded_output = llm_guard.validate_output(output_data) return guarded_output ================================================================================ 4. 使用说明 ================================================================================ **系统架构:** - regime_v14.py: 市场状态判断和仓位计算 - data_cleaning.py: 数据清洗和验证 - llm_guard.py: 输出安全过滤 **使用方法:** 1. 使用 data_cleaning.py 清洗原始数据 2. 使用 regime_v14.py 生成市场状态和仓位建议 3. 使用 llm_guard.py 过滤输出确保合规 **核心配置:** - 仓位范围:0-100% - 每日仓位变动限制:20% - 市场状态:RISK_SEVERE, RISK_MILD, RANGE, TREND_NORMAL, TREND_BULL **版本信息:** - 版本:v1.4-p0-fixes - 生成时间:2026-03-01 - 回测数据:2021-2026 年完整记录 ================================================================================ 文档结束 ================================================================================