# ============================================================ # v1.6 完整行业筛选 / 行业预算 / 名额分配 / 行业内选股(严格版) # 适用:十几个行业、80~150股票池、持仓数 6~15 # 目标: # - 避免预算趋同(不使用 min_w) # - 只投 Top 行业(弱行业预算=0) # - 名额分配稳定(最大余额法,不用 round) # - UNKNOWN 默认不参与轮动(allow_unknown=False) # - 补齐也只从 Top 行业里补(不会引入非Top行业) # ------------------------------------------------------------ # 你只需要: # 1) 确保 stock_to_industry 的行业代码与 industry_df 列名一致 # 2) 在回测调仓日调用 build_industry_rotation_selection(...) # ============================================================ import math from collections import defaultdict from typing import Dict, List, Tuple, Set, Optional import pandas as pd # ----------------------------- # 0) 工具:安全归一化 # ----------------------------- def _renorm(d: Dict[str, float]) -> Dict[str, float]: if not d: return {} s = float(sum(d.values())) if s <= 0: n = len(d) return {k: 1.0 / n for k in d} return {k: float(v) / s for k, v in d.items()} # ----------------------------- # 1) 行业强度(预算前的强度) # 输入 ind_weights(来自 IndustryV11/V12.get_weights) # 输出 raw_strength(只针对池内行业) # ----------------------------- def compute_industry_strength( ind_weights: Dict[str, float], industries_in_pool: Set[str], clip_low: float = 0.70, clip_high: float = 1.35, ) -> Dict[str, float]: """ ind_weights: 行业模块输出(约0.7~1.35) industries_in_pool: 股票池里出现的行业集合(不含UNKNOWN) """ raw = {ind: float(ind_weights.get(ind, 1.0)) for ind in industries_in_pool} # 在“强度层”做一次clip,防异常值 raw = {k: max(clip_low, min(clip_high, v)) for k, v in raw.items()} return raw # ----------------------------- # 2) 行业预算(严格版) # - 先按强度归一化 # - Top行业过滤(只投强行业) # - 仅做 max_w(不做 min_w,避免趋同) # ----------------------------- def make_industry_budget_strict( raw_strength: Dict[str, float], top_ratio: float = 0.35, # 建议 0.30~0.45 top_k_min: int = 4, # 你十几个行业建议 >=4 top_k_max: int = 6, # 十几个行业建议 <=6 max_w: Optional[float] = 0.40, # 建议 0.35~0.45,None表示不封顶 ) -> Dict[str, float]: """ 返回 budget: sum=1,只包含Top行业(其他行业=0被剔除) """ if not raw_strength: return {} bud = _renorm(raw_strength) # 第一次归一化(反映强弱) # Top-K过滤 sorted_inds = sorted(bud.items(), key=lambda x: x[1], reverse=True) k = int(len(sorted_inds) * top_ratio) k = max(top_k_min, k) k = min(top_k_max, k, len(sorted_inds)) keep = set([x[0] for x in sorted_inds[:k]]) bud = {ind: (v if ind in keep else 0.0) for ind, v in bud.items()} bud = _renorm({k: v for k, v in bud.items() if v > 0}) # only max_w(不做min_w) if max_w is not None and bud: bud = {k: min(float(max_w), v) for k, v in bud.items()} bud = _renorm(bud) return bud # ----------------------------- # 3) 名额分配:最大余额法(替代 round) # ----------------------------- def allocate_slots_lrm(bud: Dict[str, float], total_n: int) -> Dict[str, int]: """ Largest Remainder Method(最大余额法) bud: sum=1 total_n: 总持仓数 return alloc: sum=total_n """ if total_n <= 0 or not bud: return {} raw = {k: bud[k] * total_n for k in bud} alloc = {k: int(math.floor(raw[k])) for k in raw} used = sum(alloc.values()) remain = total_n - used # 余数从大到小补齐 remainders = sorted( raw.keys(), key=lambda k: (raw[k] - math.floor(raw[k])), reverse=True ) for i in range(remain): alloc[remainders[i % len(remainders)]] += 1 # 只保留>0 alloc = {k: v for k, v in alloc.items() if v > 0} return alloc # ----------------------------- # 4) 计算个股动量 # ----------------------------- def compute_stock_momentum_20d( stocks_df: pd.DataFrame, date: pd.Timestamp, dates: List[pd.Timestamp], stock: str, momentum_period: int = 20 ) -> float: try: idx = list(dates).index(date) except ValueError: return 0.0 if idx < momentum_period: return 0.0 old_date = dates[idx - momentum_period] try: p_now = float(stocks_df.loc[date, stock]) p_old = float(stocks_df.loc[old_date, stock]) if p_old > 0: return (p_now / p_old - 1.0) * 100.0 except Exception: return 0.0 return 0.0 # ----------------------------- # 5) 行业内选股(严格版) # - 只从预算Top行业里选 # - UNKNOWN 默认不参与 # - 补齐也只从Top行业里补 # ----------------------------- def select_stocks_by_industry_budget_strict( stocks_df: pd.DataFrame, date: pd.Timestamp, dates: List[pd.Timestamp], core_stocks: List[str], total_n: int, industry_budget: Dict[str, float], stock_to_industry: Dict[str, str], momentum_period: int = 20, allow_unknown: bool = False, ) -> List[Tuple[str, float, float, str]]: """ 返回 [(stock, mom, price, industry), ...] 长度<=total_n """ if total_n <= 0: return [] # 可交易股票(价格>0) avail = [ s for s in core_stocks if s in stocks_df.columns and date in stocks_df.index and pd.notna(stocks_df.loc[date, s]) and float(stocks_df.loc[date, s]) > 0 ] if not avail: return [] # 预算为空:退化全局动量(可包含UNKNOWN) if not industry_budget: mom_map = {s: compute_stock_momentum_20d(stocks_df, date, dates, s, momentum_period) for s in avail} ranked = sorted(avail, key=lambda x: mom_map[x], reverse=True)[:total_n] return [(s, mom_map[s], float(stocks_df.loc[date, s]), stock_to_industry.get(s, "UNKNOWN")) for s in ranked] active_inds = {k for k, v in industry_budget.items() if v > 0} if not active_inds: mom_map = {s: compute_stock_momentum_20d(stocks_df, date, dates, s, momentum_period) for s in avail} ranked = sorted(avail, key=lambda x: mom_map[x], reverse=True)[:total_n] return [(s, mom_map[s], float(stocks_df.loc[date, s]), stock_to_industry.get(s, "UNKNOWN")) for s in ranked] # 计算动量 mom_map = {s: compute_stock_momentum_20d(stocks_df, date, dates, s, momentum_period) for s in avail} # 分组(只保留Top行业;UNKNOWN按开关处理) ind_to_stocks: Dict[str, List[str]] = defaultdict(list) for s in avail: ind = stock_to_industry.get(s, "UNKNOWN") if ind == "UNKNOWN" and not allow_unknown: continue if ind in active_inds: ind_to_stocks[ind].append(s) if not ind_to_stocks: ranked = sorted(avail, key=lambda x: mom_map[x], reverse=True)[:total_n] return [(s, mom_map[s], float(stocks_df.loc[date, s]), stock_to_industry.get(s, "UNKNOWN")) for s in ranked] # 预算只针对“有股票可选的行业” bud = {ind: float(industry_budget.get(ind, 0.0)) for ind in ind_to_stocks.keys()} bud = _renorm(bud) # 名额分配(最大余额法) alloc = allocate_slots_lrm(bud, total_n) selected: List[Tuple[str, float, float, str]] = [] # 行业内按动量选 for ind, cnt in alloc.items(): group = ind_to_stocks.get(ind, []) if not group: continue ranked = sorted(group, key=lambda x: mom_map[x], reverse=True)[:cnt] for s in ranked: selected.append((s, mom_map[s], float(stocks_df.loc[date, s]), ind)) # 严格补齐:只从“参与行业集合”补 if len(selected) < total_n: chosen = {x[0] for x in selected} remain = [] for ind in alloc.keys(): for s in ind_to_stocks.get(ind, []): if s not in chosen: remain.append(s) remain = sorted(remain, key=lambda x: mom_map[x], reverse=True)[: (total_n - len(selected))] for s in remain: ind = stock_to_industry.get(s, "UNKNOWN") selected.append((s, mom_map[s], float(stocks_df.loc[date, s]), ind)) return selected[:total_n] # ----------------------------- # 6) 一键调用:从行业模块输出 → 预算 → 选股 # ----------------------------- def build_industry_rotation_selection( stocks_df: pd.DataFrame, industry_df: pd.DataFrame, date: pd.Timestamp, dates: List[pd.Timestamp], core_stocks: List[str], total_n: int, industry_module, # IndustryV11() 或 IndustryV12() stock_to_industry: Dict[str, str], momentum_period: int = 20, allow_unknown: bool = False, top_ratio: float = 0.35, top_k_min: int = 4, top_k_max: int = 6, max_w: float = 0.40, ) -> Tuple[Dict[str, float], Dict[str, int], List[Tuple[str, float, float, str]]]: """ 返回: - industry_budget(sum=1,只包含Top行业) - industry_alloc(行业→名额) - selected(股票列表) """ # 1) 行业权重(来自你的 v1.1/v1.2 行业模块) ind_weights = {} if industry_module is not None and date in industry_df.index: try: ind_weights = industry_module.get_weights(industry_df, date) except Exception: ind_weights = {} # 2) 股票池出现的行业集合(剔除UNKNOWN) industries_in_pool = set() for s in core_stocks: ind = stock_to_industry.get(s, "UNKNOWN") if ind != "UNKNOWN": industries_in_pool.add(ind) # 3) 行业强度 → 预算(严格版) raw_strength = compute_industry_strength(ind_weights, industries_in_pool) industry_budget = make_industry_budget_strict( raw_strength=raw_strength, top_ratio=top_ratio, top_k_min=top_k_min, top_k_max=top_k_max, max_w=max_w ) # 4) 选股(严格版) selected = select_stocks_by_industry_budget_strict( stocks_df=stocks_df, date=date, dates=dates, core_stocks=core_stocks, total_n=total_n, industry_budget=industry_budget, stock_to_industry=stock_to_industry, momentum_period=momentum_period, allow_unknown=allow_unknown ) # 5) 生成alloc(用于你打印调试) if industry_budget: industry_alloc = allocate_slots_lrm(industry_budget, total_n) else: industry_alloc = {} return industry_budget, industry_alloc, selected # ============================================================ # 用法示例(放在你的 run_backtest 调仓日里) # ============================================================ """ industry_budget, industry_alloc, selected = build_industry_rotation_selection( stocks_df=stocks_df, industry_df=industry_df, date=date, dates=dates, core_stocks=core_stocks, total_n=n, # 由state决定 industry_module=IndustryV11(), # 或 IndustryV12() stock_to_industry=STOCK_TO_INDUSTRY, momentum_period=MOMENTUM_PERIOD, allow_unknown=False, top_ratio=0.35, top_k_min=4, top_k_max=6, max_w=0.40, ) # 调试打印(每次调仓日打印一次) print(date.date(), "Top行业预算:", sorted(industry_budget.items(), key=lambda x: x[1], reverse=True)[:6], "名额:", industry_alloc) print("选股:", [(s, ind, round(m,2)) for s,m,p,ind in selected]) 此外:集成到主回测的建议 以下是如何将 v1.6 模块接入你原来的 run_backtest_v15 函数的详细步骤: 步骤1:在调仓日调用 build_industry_rotation_selection python # 在 run_backtest_v15 的调仓部分 (i % REBALANCE_DAYS == 0) n = TOP_N.get(state, 4) industry_budget, industry_alloc, selected = build_industry_rotation_selection( stocks_df=stocks_df, industry_df=industry_df, date=date, dates=dates, core_stocks=core_stocks, total_n=n, industry_module=industry_module, # None, IndustryV11, or IndustryV12 stock_to_industry=STOCK_TO_INDUSTRY, momentum_period=MOMENTUM_PERIOD, allow_unknown=False, top_ratio=0.35, top_k_min=4, top_k_max=6, max_w=0.40 ) 步骤2:计算目标持仓市值 python target_stock_value = total_value * target # target 为目标仓位比例 步骤3:构建目标持仓字典(股票 -> 目标股数) python # 按行业分组 sel_by_ind = defaultdict(list) for s, mom, price, ind in selected: sel_by_ind[ind].append((s, price)) # 只使用 industry_budget 中的预算(已归一化) bud = {ind: industry_budget.get(ind, 0) for ind in sel_by_ind.keys()} bud = _renorm(bud) # 确保总和为1 target_shares = {} for ind, stocks in sel_by_ind.items(): ind_value = target_stock_value * bud[ind] per_stock_value = ind_value / len(stocks) # 行业内等权 for s, price in stocks: shares = int(per_stock_value / price / 100) * 100 # 整手 if shares > 0: target_shares[s] = shares """