可以,下面我直接给你一套 “按大势状态 + 行业强度加分”的主要代码。 思路是: 不用行业均线做硬开关 只把行业层当成 个股排序增强项 不同大势状态 下,行业加分力度不同 先不动你原来的大势框架,只改 选股部分 + 回测调用部分 你把下面几段接进你现有的 v1.8.5 即可。 1)先加参数区 把这段放到你的参数区里: # ==================== v1.9 行业增强参数 ==================== STATE_SELECTOR_WEIGHTS = { 'RISK': { 'mom20': 0.20, 'mom60': 0.30, 'turnover': 0.05, 'vol': 0.45 }, 'RANGE': { 'mom20': 0.30, 'mom60': 0.35, 'turnover': 0.10, 'vol': 0.25 }, 'TREND': { 'mom20': 0.40, 'mom60': 0.35, 'turnover': 0.15, 'vol': 0.10 } } STATE_INDUSTRY_MULTIPLIER = { 'RISK_SEVERE': 0.4, 'RISK_MILD': 0.6, 'RANGE': 0.8, 'TREND_NORMAL': 1.0, 'TREND_BULL': 1.2 } STATE_HOLD_COUNT = { 'RISK_SEVERE': 6, 'RISK_MILD': 8, 'RANGE': 10, 'TREND_NORMAL': 8, 'TREND_BULL': 6 } USE_BULL_CONCENTRATED_WEIGHT = True BULL_POSITION_WEIGHTS = [0.22, 0.20, 0.18, 0.15, 0.13, 0.12] # 行业增强分权重 INDUSTRY_SCORE_WEIGHTS = { 'rel20': 0.35, # 行业相对基准20日超额 'ret20': 0.35, # 行业20日收益 'ret60': 0.20, # 行业60日收益 'accel5': 0.10 # 行业5日加速度 } # 行业分的总影响强度,先别设太高 INDUSTRY_BONUS_SCALE = 0.35 2)加几个辅助函数 放在 normalize() 后面: def get_state_group(regime): if regime in ['RISK_SEVERE', 'RISK_MILD']: return 'RISK' elif regime == 'RANGE': return 'RANGE' else: return 'TREND' def get_state_weights(regime): group = get_state_group(regime) return STATE_SELECTOR_WEIGHTS[group] def calc_return(price_series, idx, lookback): if idx < lookback: return 0.0 p_now = price_series[idx] p_old = price_series[idx - lookback] if pd.isna(p_now) or pd.isna(p_old) or p_old <= 0: return 0.0 return (p_now / p_old - 1.0) * 100.0 def safe_get_index_loc(dates, date): try: return list(dates).index(date) except: return -1 3)读取“股票 -> 行业指数”的映射 如果你的 csv 里是两列,比如: stock_code industry_index 那用下面这段。 def load_stock_industry_map(csv_path): df = pd.read_csv(csv_path, dtype=str) df.columns = [c.strip() for c in df.columns] # 你按实际列名改这里 stock_col = 'stock_code' industry_col = 'industry_index' df[stock_col] = df[stock_col].str.strip() df[industry_col] = df[industry_col].str.strip() mapping = {} for _, row in df.iterrows(): s = row[stock_col] ind = row[industry_col] if pd.notna(s) and pd.notna(ind) and s != '' and ind != '': mapping[s] = ind return mapping 如果你的列名不是这两个,改一下就行。 4)行业强度计算函数 这里不用均线,只用: 行业20日收益 行业60日收益 行业相对基准20日超额 行业5日加速度 把这段放进去: def calculate_industry_features(industry_code, date, dates, industry_price_df, benchmark_series): """ industry_price_df: index = 日期 columns = 行业指数代码 values = 收盘价 benchmark_series: index = 日期 values = 基准指数收盘价(例如中证1000) """ if industry_code not in industry_price_df.columns: return None if date not in industry_price_df.index or date not in benchmark_series.index: return None idx = safe_get_index_loc(dates, date) if idx < 0: return None try: ind_prices = industry_price_df[industry_code].values bm_prices = benchmark_series.values ind_ret5 = calc_return(ind_prices, idx, 5) ind_ret20 = calc_return(ind_prices, idx, 20) ind_ret60 = calc_return(ind_prices, idx, 60) bm_ret20 = calc_return(bm_prices, idx, 20) # 行业相对基准超额 ind_rel20 = ind_ret20 - bm_ret20 # 5日加速度:最近5天的涨速,相对过去20日平均涨速 ind_accel5 = ind_ret5 - (ind_ret20 / 4.0) return { 'ind_ret5': ind_ret5, 'ind_ret20': ind_ret20, 'ind_ret60': ind_ret60, 'ind_rel20': ind_rel20, 'ind_accel5': ind_accel5 } except: return None 5)把行业特征转成行业 bonus def build_industry_bonus_map(avail_stocks, stock_to_industry, date, dates, industry_price_df, benchmark_series, regime): """ 给每只股票算一个行业加分 industry_bonus """ raw_features = {} for s in avail_stocks: ind_code = stock_to_industry.get(s, None) if ind_code is None: continue feat = calculate_industry_features( ind_code, date, dates, industry_price_df, benchmark_series ) if feat is not None: raw_features[s] = feat if not raw_features: return {s: 0.0 for s in avail_stocks} stocks = list(raw_features.keys()) rel20_vals = np.array([raw_features[s]['ind_rel20'] for s in stocks], dtype=float) ret20_vals = np.array([raw_features[s]['ind_ret20'] for s in stocks], dtype=float) ret60_vals = np.array([raw_features[s]['ind_ret60'] for s in stocks], dtype=float) accel5_vals = np.array([raw_features[s]['ind_accel5'] for s in stocks], dtype=float) rel20_norm = normalize(rel20_vals) ret20_norm = normalize(ret20_vals) ret60_norm = normalize(ret60_vals) accel5_norm = normalize(accel5_vals) mult = STATE_INDUSTRY_MULTIPLIER.get(regime, 1.0) bonus_map = {s: 0.0 for s in avail_stocks} for i, s in enumerate(stocks): raw_score = ( INDUSTRY_SCORE_WEIGHTS['rel20'] * rel20_norm[i] + INDUSTRY_SCORE_WEIGHTS['ret20'] * ret20_norm[i] + INDUSTRY_SCORE_WEIGHTS['ret60'] * ret60_norm[i] + INDUSTRY_SCORE_WEIGHTS['accel5'] * accel5_norm[i] ) bonus_map[s] = raw_score * INDUSTRY_BONUS_SCALE * mult return bonus_map 6)替换 factor_select:支持 regime + 行业增强 这是核心改动。 把你现有的 factor_select 换成下面这个版本。 def factor_select(data, date, regime, n=10, current_positions=None): price_df = data['price'] turnover_df = data['turnover'] volatility_df = data['volatility'] core_stocks = data['stocks'] dates = data['dates'] # 新增的数据 stock_to_industry = data['stock_to_industry'] # 股票 -> 行业指数 industry_price_df = data['industry_price'] # 行业指数价格表 benchmark_series = data['benchmark_price'] # 基准指数价格 series if current_positions is None: current_positions = {} avail = [ s for s in core_stocks if s in price_df.columns and date in price_df.index and pd.notna(price_df.loc[date, s]) and price_df.loc[date, s] > 0 ] if not avail: return [], {} results = {} for s in avail: factors = calculate_factors(s, date, dates, price_df, turnover_df, volatility_df) if factors: results[s] = factors if not results: return [], {} stocks_list = list(results.keys()) mom20_vals = np.array([results[s]['mom20'] for s in stocks_list], dtype=float) mom60_vals = np.array([results[s]['mom60'] for s in stocks_list], dtype=float) turnover_vals = np.array([results[s]['turnover'] for s in stocks_list], dtype=float) vol_vals = np.array([results[s]['volatility'] for s in stocks_list], dtype=float) mom20_norm = normalize(mom20_vals) mom60_norm = normalize(mom60_vals) turnover_norm = normalize(turnover_vals) vol_norm = normalize(vol_vals) weights = get_state_weights(regime) # 行业加分 industry_bonus_map = build_industry_bonus_map( avail_stocks=stocks_list, stock_to_industry=stock_to_industry, date=date, dates=dates, industry_price_df=industry_price_df, benchmark_series=benchmark_series, regime=regime ) scores = {} for i, s in enumerate(stocks_list): base_score = ( weights['mom20'] * mom20_norm[i] + weights['mom60'] * mom60_norm[i] + weights['turnover'] * turnover_norm[i] - weights['vol'] * vol_norm[i] ) industry_bonus = industry_bonus_map.get(s, 0.0) scores[s] = base_score + industry_bonus sorted_stocks = sorted(scores.items(), key=lambda x: x[1], reverse=True) rank_map = {s: i + 1 for i, (s, _) in enumerate(sorted_stocks)} # 持仓缓冲逻辑沿用 keep_list = [] for s in current_positions.keys(): if s in rank_map and rank_map[s] <= HOLD_BUFFER_RANK and s in results: keep_list.append((s, results[s]['price'], scores[s])) keep_codes = set([x[0] for x in keep_list]) add_list = [] for s, score in sorted_stocks: if s in keep_codes: continue if rank_map[s] <= BUY_BUFFER_RANK and s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break if len(keep_list) + len(add_list) < n: used = keep_codes | set([x[0] for x in add_list]) for s, score in sorted_stocks: if s in used: continue if s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break selected = (keep_list + add_list)[:n] return selected, rank_map 7)Bull 阶段的轻度集中权重 加一个权重分配函数: def get_target_weights(regime, selected_count): """ 返回每个持仓的目标权重列表,长度 = selected_count """ if selected_count <= 0: return [] if regime == 'TREND_BULL' and USE_BULL_CONCENTRATED_WEIGHT: base = BULL_POSITION_WEIGHTS[:selected_count] s = sum(base) return [x / s for x in base] # 其他状态等权 return [1.0 / selected_count] * selected_count 8)修改 run_backtest 里的调用方式 你原来的 run_backtest 里,原本是这样: n = FIXED_HOLD_COUNT selected, rank_map = factor_select(data, date, n=n, current_positions=positions) 改成: n = STATE_HOLD_COUNT.get(final_state, FIXED_HOLD_COUNT) selected, rank_map = factor_select( data=data, date=date, regime=final_state, n=n, current_positions=positions ) 9)修改目标持仓分配逻辑 你原来大概率是这样: target_stock_value = total_value * target / len(selected) target_per_stock = {s: target_stock_value for s, _, _ in selected} 改成: target_weights = get_target_weights(final_state, len(selected)) target_per_stock = {} for idx_sel, (s, _, _) in enumerate(selected): target_per_stock[s] = total_value * target * target_weights[idx_sel] 卖出后重新计算那一段,也同样改掉: selected, rank_map = factor_select( data=data, date=date, regime=final_state, n=n, current_positions=positions ) if selected: target_weights = get_target_weights(final_state, len(selected)) target_per_stock = {} for idx_sel, (s, _, _) in enumerate(selected): target_per_stock[s] = total_value * target * target_weights[idx_sel] 10)准备 data 时新增三项 你原来的 data 里大概只有: price turnover volatility trend industry stocks dates 现在要新增: stock_to_industry industry_price benchmark_price 示例: stock_to_industry = load_stock_industry_map('行业核心股票代码表.csv') # 行业指数价格表 # index = 日期 # columns = 行业指数代码 # values = 收盘价 industry_price_df = pd.read_excel('行业指数历史数据.xlsx', index_col=0) industry_price_df.index = pd.to_datetime(industry_price_df.index) # 基准指数,建议先用中证1000,或者你最常用的风格基准 benchmark_df = pd.read_excel('基准指数历史数据.xlsx', index_col=0) benchmark_df.index = pd.to_datetime(benchmark_df.index) # 假设 benchmark_df 只有一列 close benchmark_series = benchmark_df.iloc[:, 0] data = { 'trend': trend_df, 'industry': industry_df, 'price': price_df, 'turnover': turnover_df, 'volatility': volatility_df, 'stocks': stock_list, 'dates': list(price_df.index), # 新增 'stock_to_industry': stock_to_industry, 'industry_price': industry_price_df, 'benchmark_price': benchmark_series } 11)最小可跑版本的落地顺序 先不要一次改太多,建议你按这个顺序测: 第一版:只加行业 bonus 先保留: 固定持仓数 等权 只改: factor_select(... regime=...) industry_bonus 第二版:再加状态持仓数 加上: STATE_HOLD_COUNT 第三版:最后再加 Bull 集中权重 加上: get_target_weights() 这样你能清楚看到,收益改善到底来自: 行业增强 状态持仓数变化 Bull 集中权重 12)我最建议你先用的保守参数 为了避免一上来过拟合,先用这套: STATE_HOLD_COUNT = { 'RISK_SEVERE': 6, 'RISK_MILD': 8, 'RANGE': 10, 'TREND_NORMAL': 8, 'TREND_BULL': 8 } INDUSTRY_BONUS_SCALE = 0.25 USE_BULL_CONCENTRATED_WEIGHT = False 等你确认: 2024/2025 提升 2022 不明显恶化 总回撤可接受 再开: USE_BULL_CONCENTRATED_WEIGHT = True TREND_BULL = 6 13)一句话总结这版代码在干什么 这版不是用行业均线去“决定买不买”,而是在做: 个股本身强,再叠加所属行业是否处于主线强势中。 这样会比行业均线更快,也比硬过滤更稳。 你把行业指数历史数据的列名格式发我,我可以下一条直接帮你把这份代码改成完全贴你现有字段名的可运行版。