可以。 我直接按你这版 v1.9 的结构,给你一个**“尽量少改、但架构明显更稳”的 v2.0 改法**。 先说我对你现架构的核心判断: 你现在的问题不是选股框架不行,而是状态层权限太大。 同一个 final_state 同时控制了仓位、持仓数、因子权重、行业倍率、是否允许日常减仓;再加上 REBALANCE_DAYS=20、MIN_TRADE_RATIO=0.20、MIN_HOLD_DAYS=10 这些执行规则都直接揉在 run_backtest() 里,研究层和执行层耦合得比较紧。 量化程序1.9测试版完整 量化程序1.9测试版完整 另外,你的“低频”被日常减仓稀释了: ALLOW_DAILY_REDUCE_STATES = {'RISK_SEVERE', 'RISK_MILD', 'RANGE'},而且当前是按持仓同比例碎减,所以很容易出现大量减仓单。你这版回测里 1327 笔交易中,减仓就有 748 笔,这个特征和代码结构是一致的。 量化程序1.9测试版完整 量化程序1.9测试版完整 量化程序1.9测试版完整 我建议的 v2.0 架构 目标就三个: 提高收益 维持回撤 尽量不靠调参,不容易过拟合 所以我建议拆成 4 个模块: 1)Regime Engine 只输出: regime target_exposure risk_flag 不再让它直接决定“今天怎么下单”。 2)Alpha Engine 只输出: stock_score industry_score eligible_universe 这里把你现有的 mom20 / mom60 / turnover / vol + 行业分 保留,但加一个趋势一致性过滤。 3)Portfolio Constructor 只负责: 目标持仓名单 目标权重 保留旧仓还是替换新仓 这里做“轻度头部倾斜”,而不是严格等权。 4)Execution Engine 只负责: 今天是否下单 是否只在调仓日动 是否触发风险应急降仓 是否忽略小额单 这样以后你改“选股”时,不会无意中改掉“执行”;改“执行”时,也不会误伤“信号”。 你这版最值得落地的 3 个结构改动 改动 A:加入趋势一致性过滤 你现在因子打分之前,几乎只做了“价格有效”过滤。 量化程序1.9测试版完整 我建议加一个很简单的入池闸门: 满足以下 4 条中的至少 3 条才允许入池: close > ma20 ma20 > ma60 mom20 > 0 mom60 > 0 这不是“新因子堆砌”,而是提高入场质量。 改动 B:行业从“纯加分”改成“先过滤,再轻加分” 你当前行业分是直接加到个股总分上的: score = base_score + industry_bonus。 量化程序1.9测试版完整 我建议改成两步: 先把行业得分排名落在后 50% 的股票过滤掉 剩下的股票再做轻加分 INDUSTRY_BONUS_SCALE 从 0.10 降到 0.05 这样更稳,不容易被单阶段行业轮动“抬”出样本内假收益。 改动 C:执行层取消“普通状态日常碎减” 你现在 RISK_MILD / RANGE 也允许日常减仓。 量化程序1.9测试版完整 我建议改成: 只有 RISK_SEVERE 允许非调仓日强制降仓 RISK_MILD / RANGE 不做日常减仓,只在调仓日处理 即便触发风险减仓,也要有阈值: 只有 当前持仓 > 目标持仓 + 10% 才减 这一个改动,大概率就能让你的交易质量上一个台阶。 直接给你代码 下面这版不是重写全部,而是可以直接替换/新增到你现有 v1.9 里的核心代码。 一、先加新参数 把你原来的配置区替换/补充成这样: # ==================== v2.0 新参数 ==================== TREND_FILTER_ENABLE = True TREND_FILTER_MIN_PASS = 3 # 4个条件至少满足3个 INDUSTRY_FILTER_ENABLE = True INDUSTRY_FILTER_QUANTILE = 0.50 # 仅保留行业分前50%的股票 INDUSTRY_BONUS_SCALE = 0.05 # 从0.10降到0.05 WEIGHT_TILT_ENABLE = True TOP_TILT_WEIGHTS = { 6: [0.22, 0.20, 0.18, 0.15, 0.13, 0.12], 8: [0.17, 0.15, 0.14, 0.13, 0.12, 0.11, 0.10, 0.08], 10: [0.14, 0.13, 0.12, 0.11, 0.10, 0.10, 0.09, 0.08, 0.07, 0.06] } ALLOW_INTRADAY_RISK_REDUCE_STATES = {'RISK_SEVERE'} RISK_REDUCE_BUFFER = 0.10 # 当前仓位高于目标10%以上才触发 POSITION_BAND = 0.25 # 单票偏离目标25%以内不调 二、增加趋势一致性过滤函数 这个函数建议放在 calculate_factors() 后面: def calc_ma(price_df, stock, date, dates, window): try: idx = list(dates).index(date) except ValueError: return np.nan if idx < window - 1: return np.nan hist_dates = dates[idx - window + 1: idx + 1] vals = [] for d in hist_dates: try: p = float(price_df.loc[d, stock]) if not pd.isna(p) and p > 0: vals.append(p) except: pass if len(vals) < max(5, int(window * 0.8)): return np.nan return float(np.mean(vals)) def trend_filter_pass(stock, date, data, factors): if not TREND_FILTER_ENABLE: return True price_df = data['price'] dates = data['dates'] try: close = float(price_df.loc[date, stock]) except: return False ma20 = calc_ma(price_df, stock, date, dates, 20) ma60 = calc_ma(price_df, stock, date, dates, 60) conditions = [ close > ma20 if not pd.isna(ma20) else False, ma20 > ma60 if (not pd.isna(ma20) and not pd.isna(ma60)) else False, factors.get('mom20', 0) > 0, factors.get('mom60', 0) > 0 ] return sum(conditions) >= TREND_FILTER_MIN_PASS 三、重写选股函数 你现在的 factor_select() 是最值得改的点。 下面这版保留你原框架,但把“趋势过滤、行业过滤、行业轻加分、头部轻倾斜”的结构加进去。 def build_target_weights(selected): n = len(selected) if n <= 0: return {} codes = [s for s, _, _ in selected] if WEIGHT_TILT_ENABLE and n in TOP_TILT_WEIGHTS: raw = np.array(TOP_TILT_WEIGHTS[n], dtype=float) raw = raw / raw.sum() return {code: float(w) for code, w in zip(codes, raw)} # fallback: 等权 w = 1.0 / n return {code: w for code in codes} def factor_select_v20(data, date, regime, n=10, current_positions=None): price_df = data['price'] turnover_df = data['turnover'] volatility_df = data['volatility'] core_stocks = data['stocks'] dates = data['dates'] stock_to_industry = data.get('stock_to_industry', {}) industry_price_df = data.get('industry_price') benchmark_series = data.get('benchmark_price') if current_positions is None: current_positions = {} avail = [s for s in core_stocks if s in price_df.columns and date in price_df.index and pd.notna(price_df.loc[date, s]) and price_df.loc[date, s] > 0] if not avail: return [], {}, {} # 1) 个股因子 results = {} for s in avail: factors = calculate_factors(s, date, dates, price_df, turnover_df, volatility_df) if not factors: continue if trend_filter_pass(s, date, data, factors): results[s] = factors if not results: return [], {}, {} stocks_list = list(results.keys()) mom20_vals = np.array([results[s]['mom20'] for s in stocks_list], dtype=float) mom60_vals = np.array([results[s]['mom60'] for s in stocks_list], dtype=float) turnover_vals = np.array([results[s]['turnover'] for s in stocks_list], dtype=float) vol_vals = np.array([results[s]['volatility'] for s in stocks_list], dtype=float) mom20_norm = normalize(mom20_vals) mom60_norm = normalize(mom60_vals) turnover_norm = normalize(turnover_vals) vol_norm = normalize(vol_vals) weights = get_state_weights(regime) # 2) 行业分 industry_bonus_map = {} if industry_price_df is not None and benchmark_series is not None: industry_bonus_map = build_industry_bonus_map( stocks_list, stock_to_industry, date, dates, industry_price_df, benchmark_series, regime ) # 3) 行业过滤 filtered_stocks = stocks_list if INDUSTRY_FILTER_ENABLE and industry_bonus_map: vals = np.array([industry_bonus_map.get(s, 0.0) for s in stocks_list], dtype=float) cutoff = np.nanpercentile(vals, INDUSTRY_FILTER_QUANTILE * 100) filtered_stocks = [s for s in stocks_list if industry_bonus_map.get(s, 0.0) >= cutoff] # 防止过滤过度 if len(filtered_stocks) < max(n * 2, 15): filtered_stocks = stocks_list filtered_set = set(filtered_stocks) # 4) 最终打分 scores = {} for i, s in enumerate(stocks_list): if s not in filtered_set: continue base_score = ( weights['mom20'] * mom20_norm[i] + weights['mom60'] * mom60_norm[i] + weights['turnover'] * turnover_norm[i] - weights['vol'] * vol_norm[i] ) scores[s] = base_score + industry_bonus_map.get(s, 0.0) if not scores: return [], {}, {} sorted_stocks = sorted(scores.items(), key=lambda x: x[1], reverse=True) rank_map = {s: i + 1 for i, (s, _) in enumerate(sorted_stocks)} n = STATE_HOLD_COUNT.get(regime, n) # 5) 持仓缓冲 keep_list = [] for s in current_positions.keys(): if s in rank_map and rank_map[s] <= HOLD_BUFFER_RANK and s in results: keep_list.append((s, results[s]['price'], scores[s])) keep_codes = set([x[0] for x in keep_list]) add_list = [] for s, score in sorted_stocks: if s in keep_codes: continue if rank_map[s] <= BUY_BUFFER_RANK and s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break if len(keep_list) + len(add_list) < n: used = keep_codes | set([x[0] for x in add_list]) for s, score in sorted_stocks: if s in used: continue if s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break selected = (keep_list + add_list)[:n] target_weights = build_target_weights(selected) return selected, rank_map, target_weights 四、加一个“是否需要交易”的容忍带函数 你原来只有 MIN_TRADE_RATIO,还不够。 量化程序1.9测试版完整 再加一个仓位带: def within_position_band(current_value, desired_value, band=POSITION_BAND): base = max(desired_value, 1.0) diff_ratio = abs(current_value - desired_value) / base return diff_ratio <= band 五、把执行层改成“风险应急”和“调仓执行”分开 下面这个是最关键的替换。 你不用整段完全照搬,但逻辑上建议替换掉原 run_backtest() 中“日常减仓 + 调仓”那部分。你当前版本是在同一段里把风险减仓和组合调仓揉在一起。 量化程序1.9测试版完整 1)风险应急函数 def emergency_reduce_positions(date, positions, cash, price_df, current_hold_value, target_hold_value, trades, final_state): if final_state not in ALLOW_INTRADAY_RISK_REDUCE_STATES: return positions, cash, False if current_hold_value <= 0: return positions, cash, False current_ratio = current_hold_value / max(current_hold_value + cash, 1.0) target_ratio = target_hold_value / max(current_hold_value + cash, 1.0) if current_ratio <= target_ratio + RISK_REDUCE_BUFFER: return positions, cash, False reduce_ratio = (current_hold_value - target_hold_value) / current_hold_value changed = False for s in list(positions.keys()): if s not in price_df.columns or date not in price_df.index: continue try: p = float(price_df.loc[date, s]) if pd.isna(p) or p <= 0: continue sell_shares = int((positions[s] * reduce_ratio) / 100) * 100 sell_shares = min(sell_shares, positions[s]) if sell_shares > 0: cash += sell_shares * p positions[s] -= sell_shares trades.append({ '日期': str(date)[:10], '股票代码': s, '操作': '减仓', '价格': round(p, 2), '数量': int(sell_shares), '大势状态': final_state }) changed = True if positions[s] <= 0: del positions[s] except: pass return positions, cash, changed 2)调仓执行函数 def rebalance_portfolio(date, i, data, final_state, target, positions, cash, entry_day, trades): price_df = data['price'] selected, rank_map, target_weights = factor_select_v20( data, date, final_state, n=STATE_HOLD_COUNT.get(final_state, FIXED_HOLD_COUNT), current_positions=positions ) if not selected: return positions, cash, entry_day, False # 当前总资产 total_value = cash for s, shares in positions.items(): try: p = float(price_df.loc[date, s]) if not pd.isna(p) and p > 0: total_value += shares * p except: pass target_total_hold = total_value * target selected_codes = [s for s, _, _ in selected] target_per_stock = {s: target_total_hold * target_weights[s] for s in selected_codes} changed = False # 先卖:不在目标池中的先卖;在池中的仅当明显超配才卖 for s in list(positions.keys()): if s not in price_df.columns or date not in price_df.index: continue try: p = float(price_df.loc[date, s]) if pd.isna(p) or p <= 0: continue current_value = positions[s] * p hold_days = i - entry_day.get(s, i) if s not in selected_codes: if hold_days >= MIN_HOLD_DAYS or final_state == 'RISK_SEVERE': sell_shares = positions[s] if sell_shares > 0: cash += sell_shares * p trades.append({ '日期': str(date)[:10], '股票代码': s, '操作': '卖出', '价格': round(p, 2), '数量': int(sell_shares), '大势状态': final_state }) del positions[s] entry_day.pop(s, None) changed = True continue desired_value = target_per_stock.get(s, 0.0) if within_position_band(current_value, desired_value): continue if should_skip_trade(current_value, desired_value, MIN_TRADE_RATIO): continue excess_value = current_value - desired_value if excess_value > p * 100: if hold_days < MIN_HOLD_DAYS and final_state != 'RISK_SEVERE': continue sell_shares = int(excess_value / p / 100) * 100 sell_shares = min(sell_shares, positions[s]) if sell_shares > 0: cash += sell_shares * p positions[s] -= sell_shares trades.append({ '日期': str(date)[:10], '股票代码': s, '操作': '卖出', '价格': round(p, 2), '数量': int(sell_shares), '大势状态': final_state }) if positions[s] <= 0: del positions[s] entry_day.pop(s, None) changed = True except: pass # 重新计算总资产后再买 total_value = cash for s, shares in positions.items(): try: p = float(price_df.loc[date, s]) if not pd.isna(p) and p > 0: total_value += shares * p except: pass target_total_hold = total_value * target target_per_stock = {s: target_total_hold * target_weights[s] for s in selected_codes} for s, price, score in selected: current_value = positions.get(s, 0) * price desired_value = target_per_stock.get(s, 0.0) if within_position_band(current_value, desired_value): continue if should_skip_trade(current_value, desired_value, MIN_TRADE_RATIO): continue diff = desired_value - current_value if diff <= 0: continue try: p = float(price_df.loc[date, s]) if pd.isna(p) or p <= 0: continue shares = int(diff / p / 100) * 100 if shares > 0 and cash >= shares * p: cash -= shares * p positions[s] = positions.get(s, 0) + shares if s not in entry_day: entry_day[s] = i trades.append({ '日期': str(date)[:10], '股票代码': s, '操作': '买入', '价格': round(p, 2), '数量': int(shares), '大势状态': final_state }) changed = True except: pass return positions, cash, entry_day, changed 六、在 run_backtest() 里怎么接 你不用全重写,只要把原来这两段逻辑替换掉: 原“非调仓日主动减仓”部分 原“调仓日卖出 + 再买入”部分 量化程序1.9测试版完整 替换成下面这个主流程: current_hold_value = total_value - cash target_hold_value = total_value * target operation = '' # 1. 风险应急:只有极端风险状态才动 positions, cash, risk_changed = emergency_reduce_positions( date, positions, cash, price_df, current_hold_value, target_hold_value, trades, final_state ) if risk_changed: operation = '风险减仓' # 2. 正常调仓:只在调仓日 / 再入场日执行 rebalance_today = (i % REBALANCE_DAYS == 0) or need_reentry if rebalance_today and target > 0: positions, cash, entry_day, reb_changed = rebalance_portfolio( date, i, data, final_state, target, positions, cash, entry_day, trades ) if reb_changed: operation = '调仓' 这个版本为什么更不容易过拟合 因为我改的不是“参数细节”,而是“结构职责”: 1. 不再靠多状态碎调仓硬抹曲线 你原来风险状态日常减仓太多。 量化程序1.9测试版完整 现在只把极端风险单独处理。 2. 不再完全等权 你文档里其实已经有牛市集中权重开关 USE_BULL_CONCENTRATED_WEIGHT 和 BULL_POSITION_WEIGHTS,但主回测逻辑里实际还是按总仓位除持仓数去分。 量化程序1.9测试版完整 量化程序1.9测试版完整 我这里把“轻度倾斜”真正落进组合层。 3. 行业不再是纯加分 你原来行业分是全量股票直接叠加 bonus。 量化程序1.9测试版完整 量化程序1.9测试版完整 改成先过滤再轻加分,更稳。 4. 入池质量更高 你现版选股基本上是因子分数驱动,没有趋势形态闸门。 量化程序1.9测试版完整 所以加趋势一致性过滤,逻辑上很自然。 我建议你先只测这 3 组实验 不要一口气改十个地方。按这个顺序来: 实验 1 只上“趋势一致性过滤”,别动别的。 实验 2 在实验 1 基础上,再加“行业前 50% 过滤 + bonus 降到 0.05”。 实验 3 在实验 2 基础上,再加“轻度头部倾斜 + 普通状态取消日常减仓”。 如果实验 1 就明显改善,那说明你的主要问题是“入场质量”; 如果实验 2 再提升,说明行业模块有用; 如果实验 3 再改善,说明问题主要在组合构造和执行,不在因子本身。 我对结果的预期 诚实讲,这套改法不一定让样本内收益立刻比 113.36% 更高很多,但它更有希望做到: 收益小幅提升或持平 回撤不恶化 交易质量更高 样本外更稳 而这恰恰符合你说的: 提高收益,维持回撤,绝不过拟合。 你要的话,我下一条可以继续直接给你一版**“完整可运行的 v2.0 主文件整合版”**,把这些函数按顺序拼成一个完整脚本。