# -*- coding: utf-8 -*- """ v1.4.5 大势模块 - 最小增强版(基于 v1.4.2) 升级内容: 1. 升级前过滤(只拦升级,不拦降级) 2. 分数改善过滤(升级时要求分数较昨日改善) 3. 三指数一致性过滤(升级时要求至少 2 个指数支持) """ import pandas as pd import numpy as np from datetime import datetime, timedelta import logging import json logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ==================== 配置类(v1.4.5) ==================== class RegimeV145Config: """v1.4.5 配置参数""" def __init__(self): # 仓位映射 self.POSITION_MAPPING = { "RISK_SEVERE": (0, 5), "RISK_MILD": (30, 50), "RANGE": (45, 65), "TREND_NORMAL": (60, 80), "TREND_BULL": (80, 100) } # 仓位限速 self.POSITION_LIMIT_CONFIG = { 'daily_change_limit': 20 } # 冷却期 self.COOLDOWN_PERIODS = { "RISK_SEVERE": 2, "RISK_MILD": 1, "RANGE": 1, "TREND_NORMAL": 1, "TREND_BULL": 1 } # 置信度 self.EXECUTION_ACTIONS = { "RISK_SEVERE": {"confidence": 0.95, "action": "observe"}, "RISK_MILD": {"confidence": 0.80, "action": "reduce"}, "RANGE": {"confidence": 0.65, "action": "hold"}, "TREND_NORMAL": {"confidence": 0.75, "action": "hold"}, "TREND_BULL": {"confidence": 0.95, "action": "buy"} } # ============ v1.4.2 确认门配置 ============ self.STATE_STABILITY_CONFIG = { 'confirm_upgrade_days': 2, # 升级确认天数(如:风险→牛市) 'confirm_downgrade_days': 1, # 降级确认天数(如:牛市→风险) 'confirm_exit_severe_days': 2, # 退出严重风险确认天数 'min_hold_days': 2, # 最短持有期 } # ============ v1.4.5 新增:升级前过滤 ============ self.UPGRADE_FILTER_CONFIG = { 'enable_score_improvement_filter': True, # 升级时要求分数较昨日改善 'enable_index_consistency_filter': True, # 升级时要求三指数一致性 'upgrade_support_required': 2, # 三指数至少 2 个支持才允许升级 } # ==================== 状态持久化 ==================== class StatePersistence: """状态持久化管理(内存字典)""" def __init__(self): self.state_history = {} # date -> state_data def save_daily_state(self, date, final_state, raw_state, cooled_state, score, consecutive_days, switch_count_last_5days, previous_position, alerts=None): """保存每日状态""" self.state_history[date] = { 'date': date, 'final_state': final_state, 'raw_state': raw_state, 'cooled_state': cooled_state, 'score': score, 'consecutive_days': consecutive_days, 'switch_count_last_5days': switch_count_last_5days, 'previous_position': previous_position, 'alerts': alerts or [] } def load_previous_state(self, date): """加载前一日状态""" return self.state_history.get(date) def get_all_dates(self): """获取所有日期""" return sorted(self.state_history.keys()) # ==================== 确认门机制(v1.4.2) ==================== def is_upgrade(from_state, to_state): """判断是否为状态升级""" order = ['RISK_SEVERE', 'RISK_MILD', 'RANGE', 'TREND_NORMAL', 'TREND_BULL'] if from_state not in order or to_state not in order: return False return order.index(to_state) > order.index(from_state) def is_downgrade(from_state, to_state): """判断是否为状态降级""" order = ['RISK_SEVERE', 'RISK_MILD', 'RANGE', 'TREND_NORMAL', 'TREND_BULL'] if from_state not in order or to_state not in order: return False return order.index(to_state) < order.index(from_state) def apply_confirmation_gate(raw_state, previous_state, consecutive_days, config): """ 轻量确认门 - 避免状态频繁切换 规则: 1. 升级需确认(如:风险→牛市)- 需要连续 N 天信号 2. 降级快速响应(如:牛市→风险)- 只需 1 天 3. 退出严重风险需确认 - 需要连续 N 天非风险信号 4. 最短持有期 - 避免刚切换就反向 """ if previous_state is None: return raw_state stability_config = config.STATE_STABILITY_CONFIG # 1. 升级确认(如:风险→牛市) if is_upgrade(previous_state, raw_state): if consecutive_days < stability_config['confirm_upgrade_days']: return previous_state # 保持原状态 # 2. 降级确认(如:牛市→风险)- 快速响应 elif is_downgrade(previous_state, raw_state): if consecutive_days < stability_config['confirm_downgrade_days']: return previous_state # 3. 退出严重风险确认 if previous_state == 'RISK_SEVERE' and raw_state != 'RISK_SEVERE': if consecutive_days < stability_config['confirm_exit_severe_days']: return 'RISK_SEVERE' # 4. 最短持有期检查 if consecutive_days < stability_config['min_hold_days']: # 如果刚切换不到最短持有期,且新状态与当前不同,保持原状态 pass # 已在上面处理 return raw_state # ==================== v1.4.5 新增:升级前过滤 ==================== def calculate_index_support(trend_df, current_date): """ 计算三大指数对"升级"的支持数 规则很克制:近 20 日收益 > 0 记为支持 """ support = 0 try: if current_date not in trend_df.index: return 0 current_idx = trend_df.index.get_loc(current_date) if current_idx < 20: return 0 date_20_ago = trend_df.index[current_idx - 20] for code in ['000300.SH', '000905.SH', '000852.SH']: if code not in trend_df.columns: continue close_current = trend_df.loc[current_date, code] close_20 = trend_df.loc[date_20_ago, code] if close_20 > 0 and close_current > 0: ret = (close_current - close_20) / close_20 if ret > 0: support += 1 except Exception: pass return support def apply_upgrade_filters(raw_state, previous_state, score, previous_score, trend_df, current_date, config): """ 只在"升级"时生效: 1) 分数必须较昨日改善 2) 三指数至少 2 个支持 非升级不拦,降级不拦,避免削弱风控 """ if previous_state is None: return raw_state # 非升级不拦 if not is_upgrade(previous_state, raw_state): return raw_state filter_cfg = config.UPGRADE_FILTER_CONFIG # 过滤 1:分数改善 if filter_cfg['enable_score_improvement_filter']: if previous_score is not None and score <= previous_score: logger.info(f"[FILTER_SCORE] {current_date} {previous_state}->{raw_state} 分数未改善 {previous_score:.1f}->{score:.1f}") return previous_state # 过滤 2:三指数一致性 if filter_cfg['enable_index_consistency_filter']: support = calculate_index_support(trend_df, current_date) if support < filter_cfg['upgrade_support_required']: logger.info(f"[FILTER_INDEX] {current_date} {previous_state}->{raw_state} 三指数支持不足 {support}<{filter_cfg['upgrade_support_required']}") return previous_state return raw_state # ==================== 仓位限速 ==================== def apply_position_limit(new_position, daily_change_limit, previous_position): """应用仓位限速""" if previous_position is None or previous_position == 0: return new_position max_change = daily_change_limit actual_change = new_position - previous_position if abs(actual_change) > max_change: if actual_change > 0: return previous_position + max_change else: return previous_position - max_change else: return new_position # ==================== 执行权限 ==================== def calculate_execution_permissions(final_state, adjusted_confidence): """计算执行权限""" allow_new_positions = final_state not in ["RISK_SEVERE", "RISK_MILD"] allow_reduce_positions = True allow_hedge = final_state != "RISK_SEVERE" allow_observe = True return { 'allow_new_positions': allow_new_positions, 'allow_reduce_positions': allow_reduce_positions, 'allow_hedge': allow_hedge, 'allow_observe': allow_observe } # ==================== 市场状态计算 ==================== def calculate_market_score(trend_df, industry_df, current_date): """计算市场状态分数""" score = 50.0 try: if current_date not in trend_df.index: return score current_idx = trend_df.index.get_loc(current_date) if current_idx >= 20: date_20_ago = trend_df.index[current_idx - 20] # 1. 大盘趋势(沪深 300) if '000300.SH' in trend_df.columns: close_current = trend_df.loc[current_date, '000300.SH'] close_20 = trend_df.loc[date_20_ago, '000300.SH'] if close_20 > 0 and close_current > 0: recent_return = (close_current - close_20) / close_20 if recent_return > 0.05: score += 15.0 elif recent_return > 0: score += 10.0 elif recent_return < -0.05: score -= 10.0 # 2. 中小盘表现 csi500_ret = 0 csi1000_ret = 0 if '000905.SH' in trend_df.columns: close_current = trend_df.loc[current_date, '000905.SH'] close_20 = trend_df.loc[date_20_ago, '000905.SH'] if close_20 > 0 and close_current > 0: csi500_ret = (close_current - close_20) / close_20 if '000852.SH' in trend_df.columns: close_current = trend_df.loc[current_date, '000852.SH'] close_20 = trend_df.loc[date_20_ago, '000852.SH'] if close_20 > 0 and close_current > 0: csi1000_ret = (close_current - close_20) / close_20 if csi500_ret > 0 and csi1000_ret > 0: score += 10.0 elif csi500_ret > 0 or csi1000_ret > 0: score += 5.0 # 3. 行业轮动 rising_count = 0 total_count = 0 for col in industry_df.columns: if col == 'date': continue try: close_current = industry_df.loc[current_date, col] close_20 = industry_df.loc[date_20_ago, col] if close_20 > 0 and close_current > 0: ret = (close_current - close_20) / close_20 if ret > 0: rising_count += 1 total_count += 1 except: continue if total_count > 0: rising_ratio = rising_count / total_count if rising_ratio > 0.7: score += 10.0 elif rising_ratio > 0.5: score += 5.0 except Exception as e: pass return max(25.0, min(85.0, score)) def determine_state(score): """根据分数确定市场状态""" if score <= 40: return "RISK_SEVERE" elif score <= 50: return "RISK_MILD" elif score <= 60: return "RANGE" elif score <= 70: return "TREND_NORMAL" else: return "TREND_BULL" def get_state_name(state): """获取状态中文名""" names = { 'RISK_SEVERE': '严重风险', 'RISK_MILD': '轻度风险', 'RANGE': '震荡市', 'TREND_NORMAL': '正常趋势', 'TREND_BULL': '牛市' } return names.get(state, state) # ==================== 置信度调整 ==================== def adjust_confidence_by_stability(base_confidence, consecutive_days, switch_count): """根据稳定性调整置信度""" if consecutive_days >= 5: confidence_boost = 0.05 elif consecutive_days >= 3: confidence_boost = 0.03 else: confidence_boost = 0.0 if switch_count >= 3: confidence_penalty = 0.10 elif switch_count >= 2: confidence_penalty = 0.05 else: confidence_penalty = 0.0 adjusted = base_confidence + confidence_boost - confidence_penalty adjusted = max(0.5, min(1.0, adjusted)) is_stable = consecutive_days >= 3 and switch_count < 2 stability_note = "稳定" if is_stable else "不稳定" return adjusted, is_stable, stability_note # ==================== 仓位计算 ==================== def calculate_continuous_position_fixed(state, score, position_mapping, daily_change_limit, previous_position): """修复后的连续仓位计算""" min_pos, max_pos = position_mapping[state] if state == "RISK_SEVERE": normalized_score = max(0, min(1, (score - 25) / 15)) position = min_pos + (max_pos - min_pos) * (1 - normalized_score) elif state == "RISK_MILD": normalized_score = max(0, min(1, (score - 35) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "RANGE": normalized_score = max(0, min(1, (score - 45) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score elif state == "TREND_NORMAL": normalized_score = max(0, min(1, (score - 60) / 10)) position = min_pos + (max_pos - min_pos) * normalized_score else: # TREND_BULL normalized_score = max(0, min(1, (score - 70) / 15)) position = min_pos + (max_pos - min_pos) * normalized_score position = apply_position_limit(position, daily_change_limit, previous_position) return round(position, 1) # ==================== 主回测函数(v1.4.5) ==================== def run_backtest_v145(): """运行 v1.4.5 回测""" logger.info("="*80) logger.info("v1.4.5 大势模块回测(最小增强版)") logger.info("="*80) # 1. 加载数据 logger.info("加载数据...") trend_df = pd.read_csv('/home/admin/.openclaw/workspace/trend_indices_cleaned.csv') trend_df.set_index('date', inplace=True) industry_df = pd.read_csv('/home/admin/.openclaw/workspace/industry_indices_cleaned.csv') industry_df.set_index('date', inplace=True) common_dates = sorted(list(set(trend_df.index) & set(industry_df.index))) logger.info(f"共同交易日:{len(common_dates)}") # 2. 初始化 config = RegimeV145Config() state_persistence = StatePersistence() results = [] logger.info("开始回测...") logger.info(f"确认门配置:{config.STATE_STABILITY_CONFIG}") logger.info(f"升级过滤配置:{config.UPGRADE_FILTER_CONFIG}") previous_position = 0 previous_state = None previous_score = None # v1.4.5 新增 consecutive_days = 0 # 状态序列(用于交易日计算) state_sequence = [] # [(date, state), ...] # 过滤统计 filter_stats = { 'score_filter': 0, 'index_filter': 0, 'total_upgrades': 0, } for i, date in enumerate(common_dates): # 计算分数 score = calculate_market_score(trend_df, industry_df, date) # 确定原始状态 raw_state = determine_state(score) # 应用冷却期(简化) cooled_state = raw_state # v1.4.5:升级前过滤(只拦升级,不拦降级) filtered_state = apply_upgrade_filters( raw_state=cooled_state, previous_state=previous_state, score=score, previous_score=previous_score, trend_df=trend_df, current_date=date, config=config ) # 统计过滤效果 if previous_state and is_upgrade(previous_state, raw_state): filter_stats['total_upgrades'] += 1 if filtered_state != raw_state: # 已经被过滤器拦截 pass # 应用确认门(保持 v1.4.2 原逻辑不动) final_state = apply_confirmation_gate(filtered_state, previous_state, consecutive_days, config) # 计算连续天数(按交易日,非自然日) if previous_state == final_state: consecutive_days += 1 else: consecutive_days = 1 # 计算近 5 日切换次数(按交易日) switch_count = 0 recent_states = state_sequence[-5:] if len(state_sequence) >= 5 else state_sequence for j in range(1, len(recent_states)): if recent_states[j][1] != recent_states[j-1][1]: switch_count += 1 # 添加到状态序列 state_sequence.append((date, final_state)) # 计算置信度 base_confidence = config.EXECUTION_ACTIONS[final_state]["confidence"] adjusted_confidence, is_stable, _ = adjust_confidence_by_stability( base_confidence, consecutive_days, switch_count ) # 计算仓位 position = calculate_continuous_position_fixed( final_state, score, config.POSITION_MAPPING, config.POSITION_LIMIT_CONFIG['daily_change_limit'], previous_position ) previous_position = position # 计算执行权限 exec_perms = calculate_execution_permissions(final_state, adjusted_confidence) # 保存结果 results.append({ 'date': date, 'score': round(score, 2), 'raw_state': raw_state, 'cooled_state': cooled_state, 'filtered_state': filtered_state, # v1.4.5 新增 'state': final_state, 'state_name': get_state_name(final_state), 'confidence': round(adjusted_confidence, 3), 'position': position, 'consecutive_days': consecutive_days, 'switch_count': switch_count, 'is_stable': is_stable, 'allow_new_positions': exec_perms['allow_new_positions'] }) # 保存状态 state_persistence.save_daily_state( date, final_state, raw_state, cooled_state, score, consecutive_days, switch_count, position, [] ) # 更新 previous_state 和 previous_score previous_state = final_state previous_score = score # v1.4.5 新增 logger.info(f"回测完成:{len(results)} 个交易日") logger.info(f"过滤统计:{filter_stats}") return pd.DataFrame(results), filter_stats if __name__ == "__main__": results_df, filter_stats = run_backtest_v145() # 保存结果 results_df.to_csv('/home/admin/.openclaw/workspace/v145_backtest_results.csv', index=False, encoding='utf-8-sig') print(f"\n✅ v1.4.5 回测完成,结果已保存") print(f" 文件:/home/admin/.openclaw/workspace/v145_backtest_results.csv") print(f" 过滤统计:{filter_stats}") # 打印关键指标 print(f"\n{'='*80}") print("v1.4.5 关键指标") print(f"{'='*80}") print(f"态势时段数:{len(results_df[results_df['state'] != results_df['state'].shift(1)])}") print(f"稳定交易日:{results_df['is_stable'].sum()} ({results_df['is_stable'].mean()*100:.1f}%)") print(f"平均连续:{results_df['consecutive_days'].mean():.2f} 天") print(f"\n过滤效果:") print(f" raw_state != filtered_state: {(results_df['raw_state'] != results_df['filtered_state']).sum()} 次") print(f" filtered_state != final_state: {(results_df['filtered_state'] != results_df['state']).sum()} 次") print(f"{'='*80}")