v1.4完整代码 1. v1_4_p0_fixes.py (核心逻辑) """ v1.4 P0问题修复实现 """ import sqlite3 import json from datetime import datetime, timedelta class StatePersistence: """状态持久化管理""" def __init__(self, db_path="state_persistence.db"): self.db_path = db_path self.init_database() def init_database(self): """初始化数据库""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS daily_state ( date TEXT PRIMARY KEY, final_state TEXT, raw_state TEXT, score REAL, consecutive_days INTEGER, switch_count_last_5days INTEGER, previous_position REAL, alerts TEXT ) ''') conn.commit() conn.close() def save_daily_state(self, date, final_state, raw_state, score, consecutive_days, switch_count_last_5days, previous_position, alerts=None): """保存每日状态""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() alerts_json = json.dumps(alerts) if alerts else "[]" cursor.execute(''' INSERT OR REPLACE INTO daily_state (date, final_state, raw_state, score, consecutive_days, switch_count_last_5days, previous_position, alerts) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', (date, final_state, raw_state, score, consecutive_days, switch_count_last_5days, previous_position, alerts_json)) conn.commit() conn.close() def load_previous_state(self, date): """加载前一日状态""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT * FROM daily_state WHERE date = ?', (date,)) result = cursor.fetchone() conn.close() if result: return { 'date': result[0], 'final_state': result[1], 'raw_state': result[2], 'score': result[3], 'consecutive_days': result[4], 'switch_count_last_5days': result[5], 'previous_position': result[6], 'alerts': json.loads(result[7]) if result[7] else [] } return None def calculate_consecutive_days(self, current_state, current_date): """计算连续同状态天数""" consecutive_days = 1 current_check_date = current_date for i in range(1, 10): # 最多检查10天 prev_date = (datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=i)).strftime('%Y-%m-%d') prev_state = self.load_previous_state(prev_date) if prev_state and prev_state['final_state'] == current_state: consecutive_days += 1 else: break return consecutive_days def calculate_switch_count_last_5days(self, current_date): """计算近5日切换次数""" switch_count = 0 current_state = None for i in range(5): check_date = (datetime.strptime(current_date, '%Y-%m-%d') - timedelta(days=i)).strftime('%Y-%m-%d') state_data = self.load_previous_state(check_date) if state_data: if current_state is None: current_state = state_data['final_state'] elif current_state != state_data['final_state']: switch_count += 1 current_state = state_data['final_state'] return switch_count # 修复后的仓位限速函数 def apply_position_limit(new_position, daily_change_limit, previous_position): """ 应用仓位限速(修复P0-2) """ if previous_position is None or previous_position == 0: # 首日或无历史仓位,不应用限速 return new_position max_change = daily_change_limit actual_change = new_position - previous_position if abs(actual_change) > max_change: if actual_change > 0: return previous_position + max_change else: return previous_position - max_change else: return new_position # 修复后的执行权限逻辑 def calculate_execution_permissions(final_state, adjusted_confidence): """ 计算执行权限(修复P0-3) """ # RISK_SEVERE期间绝对不允许新开仓 allow_new_positions = final_state not in ["RISK_SEVERE", "RISK_MILD"] # 减仓/止损始终允许 allow_reduce_positions = True # 对冲操作在非RISK_SEVERE时允许 allow_hedge = final_state != "RISK_SEVERE" # 观察模式始终允许 allow_observe = True return { 'allow_new_positions': allow_new_positions, 'allow_reduce_positions': allow_reduce_positions, 'allow_hedge': allow_hedge, 'allow_observe': allow_observe } # 版本冻结说明 VERSION_FREEZE = { "version": "v1.4-p0-fixes", "frozen_components": [ "fund_weights", # 资金权重 "hysteresis_thresholds", # 滞回阈值 "cooldown_periods", # 冷却期 "position_mapping", # 仓位映射 "position_limit", # 仓位限速 "confidence_buckets", # 置信度分桶 "execution_actions" # 执行动作 ], "v1_4_allowed_changes": [ "persistence", # 状态持久化 "logging", # 日志系统 "exception_handling", # 异常处理 "batch_validation", # 批量验证 "sensitivity_testing" # 敏感性测试 ], "note": "v1.4仅允许工程与验证改进,不修改核心策略逻辑" } # 修复后的每日运行流程 def run_daily_pipeline_fixed(date, core_stocks, config): """ 修复后的每日运行流程(集成P0修复) """ # 1. 初始化状态持久化 state_persistence = StatePersistence() # 2. 加载前一日状态 previous_state = state_persistence.load_previous_state( (datetime.strptime(date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d') ) # 3. 获取市场数据 market_data = get_market_data(date) # 4. 计算市场状态 state_machine = V13StateMachine(config) score = state_machine.calculate_score(market_data) raw_state = state_machine.determine_state(score) final_state = state_machine.apply_cooling_period(raw_state, score) # 5. 计算连续天数和切换次数 consecutive_days = state_persistence.calculate_consecutive_days(final_state, date) switch_count_last_5days = state_persistence.calculate_switch_count_last_5days(date) # 6. 计算置信度(使用修复后的稳定性调整) base_confidence = state_machine.calculate_confidence(final_state, score) adjusted_confidence, _, _ = adjust_confidence_by_stability( base_confidence, consecutive_days, switch_count_last_5days ) # 7. 计算建议仓位(使用修复后的限速) previous_position = previous_state['previous_position'] if previous_state else 0 suggested_position = calculate_continuous_position_fixed( final_state, score, config["POSITION_MAPPING"], config["POSITION_LIMIT_CONFIG"]["daily_change_limit"], previous_position ) # 8. 计算执行权限(使用修复后的逻辑) execution_permissions = calculate_execution_permissions(final_state, adjusted_confidence) # 9. 生成交易候选清单 trade_candidates = [] for stock in core_stocks: stock_data = get_stock_data(stock, date) alpha_score = calculate_alpha_score(stock_data, final_state) threshold = get_state_threshold(final_state) if alpha_score >= threshold and execution_permissions['allow_new_positions']: # 允许交易 action, conf = calibrate_confidence_by_bucket(adjusted_confidence, config["EXECUTION_ACTIONS"]) position = calculate_stock_position(action, suggested_position) trade_candidates.append({ "stock": stock, "alpha_score": alpha_score, "threshold": threshold, "action": action, "position": position, "intercepted": False }) else: # 拦截交易 intercept_reason = [] if alpha_score < threshold: intercept_reason.append(f"Alpha score {alpha_score:.1f} < threshold {threshold:.1f}") if not execution_permissions['allow_new_positions']: intercept_reason.append("New positions not allowed in current state") trade_candidates.append({ "stock": stock, "alpha_score": alpha_score, "threshold": threshold, "action": "intercepted", "position": 0, "intercepted": True, "reason": "; ".join(intercept_reason) }) # 10. 保存今日状态 state_persistence.save_daily_state( date, final_state, raw_state, score, consecutive_days, switch_count_last_5days, suggested_position, [] ) # 11. 生成报告 report = { "date": date, "market_state": final_state, "confidence": adjusted_confidence, "suggested_position": suggested_position, "execution_permissions": execution_permissions, "trade_candidates": trade_candidates, "consecutive_days": consecutive_days, "switch_count_last_5days": switch_count_last_5days } return report def calculate_continuous_position_fixed(state, score, position_mapping, daily_change_limit, previous_position): """ 修复后的连续仓位计算(集成限速修复) """ min_pos, max_pos = position_mapping[state] # 根据分数在区间内线性插值 if state == "RISK_SEVERE": normalized_score = max(0, min(1, (score - 25) / 15)) position = min_pos + (max_pos - min_pos) * (1 - normalized_score) elif state == "RISK_MILD": normalized_score = max(0, min(1, (score - 35) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "RANGE": normalized_score = max(0, min(1, (score - 45) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "TREND_NORMAL": normalized_score = max(0, min(1, (score - 60) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score else: # TREND_BULL normalized_score = max(0, min(1, (score - 70) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score # 应用修复后的仓位限速 position = apply_position_limit(position, daily_change_limit, previous_position) return round(position, 1) 2. v14_call_example.py (调用示例) # v1.4大势模块调用代码 import pandas as pd from regime_v14 import build_regime_v14, RegimeV14Config # 加载适配后的数据 df = pd.read_csv('three_indices_v14_ready.csv') # 配置参数 cfg = RegimeV14Config() # 运行v1.4大势模块 result = build_regime_v14( df, prefixes=['sh', 'sz', 'cyb'], cfg=cfg ) # 保存结果 result.to_csv('market_regime_v14_complete.csv', index=False, encoding='utf-8-sig') print(f'✅ v1.4大势结果生成完成: {len(result)} 天') 3. v1_4_system/regime_v14.py (大势模块) # regime_v14.py - 已集成v1.4大势模块 4. v1_4_system/data_cleaning.py (数据清洗) # data_cleaning.py - 已集成通用数据清洗框架 5. v1_4_system/llm_guard.py (LLM守卫) # llm_guard.py - 已集成LLM守卫模块