好,我们直接往 v2.0 机构化版本 走。 目标不是继续堆小参数,而是把你现在已经有效的 v1.9,升级成: “大势 + 行业主线 + 个股排序 + 风险预算” 四层结构。 你当前 v1.9 已经证明两件事: 行业增强是有效的,收益从 88.47% 提到 113.36% 代价是回撤从 17.29% 提到 19.26%,但交易次数还下降到 1327 次,说明提升不是靠更频繁交易硬做出来的。 量化程序1.9测试版完整 所以 v2.0 的方向不该是“更猛”,而是: 保留主线收益增强,把风险集中度再管住。 一、v2.0 的结构 我建议你做成 4 层: 第 1 层:大势状态 继续沿用你现在这套,不大改。 RISK_SEVERE RISK_MILD RANGE TREND_NORMAL TREND_BULL 这层已经能控制大方向,不用推翻。你 v1.9 的仓位映射和确认逻辑已经是稳定内核。 量化程序1.9测试版完整 第 2 层:行业主线识别 不再只是给每只股票加一点行业分,而是先判断: 当前哪些行业是主线 主线强度有多高 是否值得集中 这层解决你最开始的问题: 翻倍股没体现,本质是主线识别和主线集中不够。 第 3 层:个股排序 继续保留你现有的: 20日动量 60日动量 换手偏离 波动惩罚 但要分两组来排: 主线行业内股票 非主线行业股票 这样强主线不会被均匀冲淡。 第 4 层:风险预算 这是 v2.0 最关键的新层。 不是简单“总仓位控制”,而是再加: 单行业最大权重 单票最大权重 主线过热时的自动降档 这样就能把 v1.9 那种“收益很猛但回撤跟着上去”的问题收住。 二、v2.0 最值得加的 3 个模块 模块 A:行业轮动强度 核心思路 先不直接算股票分,而是先给所有行业算一个 行业总分: industry_total_score = 0.35 * 行业相对基准20日超额 + 0.25 * 行业20日收益 + 0.20 * 行业60日收益 + 0.10 * 行业5日加速度 + 0.10 * 行业扩散度 其中“行业扩散度”很重要: 行业内 20 日收益为正的股票比例 或行业内进入全池前 50% 的股票比例 为什么这比 v1.9 更强 v1.9 现在是“每只股票顺便看一下行业”。 v2.0 要变成“先看行业是否为主线,再看个股是否最强”。 这会让: 主线识别更明确 主线集中更可控 不容易被个别杂音票干扰 模块 B:主线识别 核心规则 每天把行业总分排序,只保留前几名行业作为“主线候选”。 比如: TOP_INDUSTRIES = 3 然后定义: if industry_rank <= 3 and industry_total_score > 0: 属于主线行业 else: 属于非主线行业 不同大势状态下的主线作用 风险状态 主线只作轻微参考,不允许重压。 RANGE 主线可加分,但不能高度集中。 TREND_NORMAL 主线行业明显优先。 TREND_BULL 主线行业允许高权重,但要受风险预算约束。 模块 C:风险预算 这是 v2.0 和 v1.9 最大的区别。 你至少加这 3 个约束: 1. 单行业上限 MAX_INDUSTRY_WEIGHT = { 'RISK_SEVERE': 0.20, 'RISK_MILD': 0.25, 'RANGE': 0.30, 'TREND_NORMAL': 0.40, 'TREND_BULL': 0.50 } 意思是再强的主线,也别无限堆。 2. 单票上限 MAX_STOCK_WEIGHT = { 'RISK_SEVERE': 0.08, 'RISK_MILD': 0.10, 'RANGE': 0.12, 'TREND_NORMAL': 0.16, 'TREND_BULL': 0.20 } 这样可以防止 1 只票把组合波动拉爆。 3. 行业过热降档 比如行业 20 日涨幅太大且 5 日加速度转负,就认为“过热衰减”。 if ind_ret20 > 25 and ind_accel5 < 0: overheat_penalty = 0.4 else: overheat_penalty = 0.0 这样可以减少: 主线末端追太高 行业高潮后急跌带来的大回撤 三、v2.0 推荐参数框架 下面这套先别细调,先粗颗粒跑。 STATE_HOLD_COUNT = { 'RISK_SEVERE': 4, 'RISK_MILD': 6, 'RANGE': 8, 'TREND_NORMAL': 8, 'TREND_BULL': 7 } STATE_INDUSTRY_MULTIPLIER = { 'RISK_SEVERE': 0.2, 'RISK_MILD': 0.3, 'RANGE': 0.5, 'TREND_NORMAL': 0.8, 'TREND_BULL': 1.0 } MAX_INDUSTRY_WEIGHT = { 'RISK_SEVERE': 0.20, 'RISK_MILD': 0.25, 'RANGE': 0.30, 'TREND_NORMAL': 0.40, 'TREND_BULL': 0.50 } MAX_STOCK_WEIGHT = { 'RISK_SEVERE': 0.08, 'RISK_MILD': 0.10, 'RANGE': 0.12, 'TREND_NORMAL': 0.16, 'TREND_BULL': 0.18 } 这比你 v1.9 的 TREND_BULL = 6 更稳一点,也更容易控制回撤。 量化程序1.9测试版完整 四、主要代码 下面给你核心代码骨架,直接能接到你 v1.9 上。 1)行业扩散度 def calculate_industry_breadth(industry_code, date, dates, stock_to_industry, price_df): """ 行业扩散度: 行业内20日收益为正的股票比例 """ idx = safe_get_index_loc(dates, date) if idx < 20: return 0.0 old_date = dates[idx - 20] members = [s for s, ind in stock_to_industry.items() if ind == industry_code] if len(members) == 0: return 0.0 pos_cnt = 0 total_cnt = 0 for s in members: if s not in price_df.columns: continue if date not in price_df.index or old_date not in price_df.index: continue try: p_now = float(price_df.loc[date, s]) p_old = float(price_df.loc[old_date, s]) if pd.isna(p_now) or pd.isna(p_old) or p_old <= 0: continue ret20 = (p_now / p_old - 1.0) if ret20 > 0: pos_cnt += 1 total_cnt += 1 except: continue if total_cnt == 0: return 0.0 return pos_cnt / total_cnt 2)行业总分 def calculate_industry_total_score(industry_code, date, dates, industry_price_df, benchmark_series, stock_to_industry, price_df): feat = calculate_industry_features( industry_code, date, dates, industry_price_df, benchmark_series ) if feat is None: return None breadth = calculate_industry_breadth( industry_code, date, dates, stock_to_industry, price_df ) score = ( 0.35 * feat['ind_rel20'] + 0.25 * feat['ind_ret20'] + 0.20 * feat['ind_ret60'] + 0.10 * feat['ind_accel5'] + 0.10 * (breadth * 100.0) ) return { 'industry_code': industry_code, 'score': score, 'breadth': breadth, 'ret20': feat['ind_ret20'], 'ret60': feat['ind_ret60'], 'rel20': feat['ind_rel20'], 'accel5': feat['ind_accel5'] } 3)主线行业识别 def build_industry_rank_map(data, date): industry_price_df = data['industry_price'] benchmark_series = data['benchmark_price'] stock_to_industry = data['stock_to_industry'] price_df = data['price'] dates = data['dates'] industry_codes = list(industry_price_df.columns) result = [] for ind in industry_codes: row = calculate_industry_total_score( ind, date, dates, industry_price_df, benchmark_series, stock_to_industry, price_df ) if row is not None: result.append(row) if not result: return {}, [] df = pd.DataFrame(result).sort_values('score', ascending=False).reset_index(drop=True) df['rank'] = np.arange(1, len(df) + 1) rank_map = {} for _, row in df.iterrows(): rank_map[row['industry_code']] = { 'rank': int(row['rank']), 'score': float(row['score']), 'breadth': float(row['breadth']), 'ret20': float(row['ret20']), 'rel20': float(row['rel20']), 'accel5': float(row['accel5']) } top_industries = df.head(3)['industry_code'].tolist() return rank_map, top_industries 4)行业过热惩罚 def calc_industry_overheat_penalty(ind_info): if ind_info is None: return 0.0 ret20 = ind_info.get('ret20', 0.0) accel5 = ind_info.get('accel5', 0.0) # 20日涨太多,且短期加速度转弱 if ret20 > 25 and accel5 < 0: return 0.5 elif ret20 > 18 and accel5 < 0: return 0.25 return 0.0 5)v2.0 版股票分数 def factor_select_v20(data, date, regime, n=10, current_positions=None): price_df = data['price'] turnover_df = data['turnover'] volatility_df = data['volatility'] core_stocks = data['stocks'] dates = data['dates'] stock_to_industry = data['stock_to_industry'] if current_positions is None: current_positions = {} avail = [ s for s in core_stocks if s in price_df.columns and date in price_df.index and pd.notna(price_df.loc[date, s]) and price_df.loc[date, s] > 0 ] if not avail: return [], {}, {} rank_map_industry, top_industries = build_industry_rank_map(data, date) results = {} for s in avail: f = calculate_factors(s, date, dates, price_df, turnover_df, volatility_df) if f: results[s] = f if not results: return [], {}, {} stocks_list = list(results.keys()) mom20_vals = np.array([results[s]['mom20'] for s in stocks_list], dtype=float) mom60_vals = np.array([results[s]['mom60'] for s in stocks_list], dtype=float) turnover_vals = np.array([results[s]['turnover'] for s in stocks_list], dtype=float) vol_vals = np.array([results[s]['volatility'] for s in stocks_list], dtype=float) mom20_norm = normalize(mom20_vals) mom60_norm = normalize(mom60_vals) turnover_norm = normalize(turnover_vals) vol_norm = normalize(vol_vals) weights = get_state_weights(regime) industry_mult = STATE_INDUSTRY_MULTIPLIER.get(regime, 1.0) scores = {} meta = {} for i, s in enumerate(stocks_list): base_score = ( weights['mom20'] * mom20_norm[i] + weights['mom60'] * mom60_norm[i] + weights['turnover'] * turnover_norm[i] - weights['vol'] * vol_norm[i] ) ind_code = stock_to_industry.get(s, None) ind_info = rank_map_industry.get(ind_code, None) industry_bonus = 0.0 mainline_bonus = 0.0 overheat_penalty = 0.0 if ind_info is not None: # 行业排名加分 rank = ind_info['rank'] score_ind = ind_info['score'] industry_bonus = 0.08 * score_ind * industry_mult # 主线行业额外加分 if ind_code in top_industries: if regime in ['TREND_NORMAL', 'TREND_BULL']: mainline_bonus = 0.35 elif regime == 'RANGE': mainline_bonus = 0.15 overheat_penalty = calc_industry_overheat_penalty(ind_info) final_score = base_score + industry_bonus + mainline_bonus - overheat_penalty scores[s] = final_score meta[s] = { 'industry_code': ind_code, 'is_mainline': ind_code in top_industries if ind_code else False } sorted_stocks = sorted(scores.items(), key=lambda x: x[1], reverse=True) rank_map = {s: i + 1 for i, (s, _) in enumerate(sorted_stocks)} n = STATE_HOLD_COUNT.get(regime, n) keep_list = [] for s in current_positions.keys(): if s in rank_map and rank_map[s] <= HOLD_BUFFER_RANK and s in results: keep_list.append((s, results[s]['price'], scores[s])) keep_codes = set([x[0] for x in keep_list]) add_list = [] for s, score in sorted_stocks: if s in keep_codes: continue if rank_map[s] <= BUY_BUFFER_RANK and s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break if len(keep_list) + len(add_list) < n: used = keep_codes | set([x[0] for x in add_list]) for s, score in sorted_stocks: if s in used: continue add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break selected = (keep_list + add_list)[:n] return selected, rank_map, meta 6)风险预算权重分配 MAX_INDUSTRY_WEIGHT = { 'RISK_SEVERE': 0.20, 'RISK_MILD': 0.25, 'RANGE': 0.30, 'TREND_NORMAL': 0.40, 'TREND_BULL': 0.50 } MAX_STOCK_WEIGHT = { 'RISK_SEVERE': 0.08, 'RISK_MILD': 0.10, 'RANGE': 0.12, 'TREND_NORMAL': 0.16, 'TREND_BULL': 0.18 } def build_target_weights_v20(selected, meta, regime): if len(selected) == 0: return {} max_industry_w = MAX_INDUSTRY_WEIGHT[regime] max_stock_w = MAX_STOCK_WEIGHT[regime] # 先按排名给基础权重 if regime == 'TREND_BULL': base = [0.20, 0.18, 0.16, 0.14, 0.12, 0.10, 0.10] elif regime == 'TREND_NORMAL': base = [0.16, 0.15, 0.14, 0.13, 0.12, 0.10, 0.10, 0.10] else: base = [1.0 / len(selected)] * len(selected) base = base[:len(selected)] if len(base) < len(selected): remain = len(selected) - len(base) base.extend([base[-1]] * remain) s = sum(base) base = [x / s for x in base] weights = {} industry_sum = {} for i, (code, _, _) in enumerate(selected): w = min(base[i], max_stock_w) ind = meta.get(code, {}).get('industry_code', 'UNKNOWN') current_industry_w = industry_sum.get(ind, 0.0) available = max_industry_w - current_industry_w w = max(0.0, min(w, available)) weights[code] = w industry_sum[ind] = current_industry_w + w # 剩余权重按原比例二次分配 total_alloc = sum(weights.values()) remain = max(0.0, 1.0 - total_alloc) if remain > 0: for code, _, _ in selected: ind = meta.get(code, {}).get('industry_code', 'UNKNOWN') room_stock = max_stock_w - weights[code] room_ind = max_industry_w - industry_sum.get(ind, 0.0) addable = max(0.0, min(room_stock, room_ind, remain)) if addable > 0: weights[code] += addable industry_sum[ind] += addable remain -= addable if remain <= 1e-6: break # 再归一化到1以内 total_w = sum(weights.values()) if total_w > 0: for code in weights: weights[code] /= total_w return weights 五、你在 run_backtest 里只需要改两处 原来: selected, rank_map = factor_select(data, date, final_state, n=n, current_positions=positions) target_stock_value = total_value * target / len(selected) target_per_stock = {s: target_stock_value for s, _, _ in selected} 改成: selected, rank_map, meta = factor_select_v20( data=data, date=date, regime=final_state, n=n, current_positions=positions ) target_weight_map = build_target_weights_v20(selected, meta, final_state) target_per_stock = { s: total_value * target * target_weight_map.get(s, 0.0) for s, _, _ in selected } 卖出后重新选股那一段同样这么改。 六、我对 v2.0 的预期 如果这个版本方向是对的,回测通常会出现这种特征: 总收益比 v1.9 再提高一点,或接近 回撤不一定更低,但应该比“纯集中版”更稳 2024/2025 主线年份继续增强 2022 不应明显恶化 单个主线见顶时,组合回撤会更可控 你现在 v1.9 的问题不是抓不到主线,而是: 抓到后,缺少“行业风险预算”这一层。 这正是 v2.0 要补的。 七、最稳的测试顺序 别一次全上,按这个顺序来: 第一步 只加: industry_breadth industry_total_score top_industries 先不加风险预算。 第二步 确认主线识别有效后,再加: build_target_weights_v20 第三步 最后再测: MAX_INDUSTRY_WEIGHT MAX_STOCK_WEIGHT 这样最容易定位收益和回撤变化的来源。 你把这版接进去跑完后,把 总收益、最大回撤、2022/2024/2025 分年收益 发我,我下一轮帮你收敛成更稳的 v2.0a。