# -*- coding: utf-8 -*- """ 完整选股代码(行业指数增强版,低频/小资金/抗过拟合) --------------------------------------------------- 核心特点: 1) 先硬过滤(停牌/ST/流动性/极端波动/深回撤),再横截面打分排名 2) alpha_score 低自由度:动量 + 风险 + 流动性 + 行业强弱/趋势(轻权重) 3) 选股用 TopN + 分位数门槛(替代固定阈值) 4) 状态(final_state) 只弱耦合控制 TopN 数量与“是否允许新开仓” 5) 组合约束:单股上限/行业上限 6) 输出 trade_candidates:对齐你现有 pipeline(stock/alpha_score/threshold/action/position/intercepted/reason) 你需要在工程里提供/对接 4 个函数: - get_stock_data_fn(stock, date) -> {"hist": DataFrame, "meta": dict} - get_hs300_hist_fn(date) -> DataFrame - get_industry_hist_fn(industry, date) -> DataFrame - calibrate_confidence_by_bucket(confidence, EXECUTION_ACTIONS) -> (action, bucket_name) - calculate_stock_position(action, suggested_position) -> float (组合目标仓位 0~1) """ from __future__ import annotations import numpy as np import pandas as pd from dataclasses import dataclass from typing import Dict, Any, List, Tuple, Optional, Callable # ========================= # Utils # ========================= def _zscore(s: pd.Series) -> pd.Series: s = s.astype(float) mu = float(s.mean()) sd = float(s.std(ddof=0)) if sd == 0.0 or np.isnan(sd): return pd.Series(np.zeros(len(s)), index=s.index) return (s - mu) / sd def _atr14(high: pd.Series, low: pd.Series, close: pd.Series, n: int = 14) -> pd.Series: prev_close = close.shift(1) tr = pd.concat( [(high - low).abs(), (high - prev_close).abs(), (low - prev_close).abs()], axis=1 ).max(axis=1) return tr.rolling(n, min_periods=n).mean() def _mom(close: pd.Series, n: int) -> float: if close is None or len(close) <= n: return np.nan return float(close.iloc[-1] / close.iloc[-(n + 1)] - 1.0) def _dd(close: pd.Series, n: int) -> float: if close is None or len(close) < n: return np.nan rm = close.rolling(n, min_periods=n).max().iloc[-1] if rm == 0 or np.isnan(rm): return np.nan return float(close.iloc[-1] / rm - 1.0) # ========================= # Params # ========================= @dataclass class StockSelectParams: # -------- 硬过滤(越“规则化”越不容易过拟合)-------- min_amount_ma20: float = 50_000_000.0 # 你按自己Amount单位调整(如果Amount是“万元/亿元”,这里要相应改) max_atr14_ratio: float = 0.08 # ATR/Close > 8% 过滤(事件股/消息股) max_dd60: float = -0.35 # 60日回撤深于 -35% 过滤(风险过高) # -------- 股票特征窗口(固定常用尺度,减少自由度)-------- mom_short: int = 20 mom_mid: int = 60 ma_mid: int = 60 dd_win: int = 60 # -------- 行业特征窗口 -------- ind_mom_short: int = 20 ind_mom_mid: int = 60 ind_ma_mid: int = 60 ind_dd_win: int = 60 # -------- 选股方式:TopN + 分位数门槛 -------- top_n_default: int = 6 min_quantile: float = 0.60 top_n_by_state: Optional[Dict[str, int]] = None # -------- 组合约束 -------- max_weight_per_stock: float = 0.10 max_weight_per_industry: float = 0.30 # -------- alpha 权重(低自由度)-------- w_momentum: float = 0.45 w_risk: float = 0.35 w_liquidity: float = 0.20 # -------- 行业加分权重(轻权重,避免漂移)-------- w_industry_strength: float = 0.15 w_industry_trend: float = 0.05 w_industry_risk: float = 0.05 # 可选:行业风险轻扣分 # -------- 行业硬规则(非常稳)-------- require_industry_strength_positive: bool = True # 行业60日超额<=0 的股票不入选 def default_topn_by_state() -> Dict[str, int]: return { "RISK_SEVERE": 0, # 风险极端:不新开仓(你也可以允许只做持仓内换股) "RISK_MILD": 3, "RANGE": 4, "TREND_NORMAL": 6, "TREND_BULL": 8 } # ========================= # Feature engineering # ========================= def compute_stock_features(hist: pd.DataFrame, p: StockSelectParams) -> Dict[str, float]: """ hist:该股票截至 date 的历史行情(至少 Close;建议有 High/Low;可选 Amount) 必需列:Close 推荐列:High, Low, Amount """ if hist is None or "Close" not in hist.columns: raise ValueError("hist must contain column 'Close'") close = hist["Close"].astype(float) high = hist["High"].astype(float) if "High" in hist.columns else close low = hist["Low"].astype(float) if "Low" in hist.columns else close mom20 = _mom(close, p.mom_short) mom60 = _mom(close, p.mom_mid) ma60 = close.rolling(p.ma_mid, min_periods=p.ma_mid).mean().iloc[-1] if len(close) >= p.ma_mid else np.nan ma60_ratio = float(close.iloc[-1] / ma60 - 1.0) if (not np.isnan(ma60) and ma60 != 0) else np.nan atr14 = _atr14(high, low, close, 14).iloc[-1] if len(close) >= 14 else np.nan atr14_ratio = float(atr14 / close.iloc[-1]) if (not np.isnan(atr14) and close.iloc[-1] != 0) else np.nan dd60 = _dd(close, p.dd_win) # 流动性(可选) if "Amount" in hist.columns and len(hist["Amount"]) >= 20: amt = hist["Amount"].astype(float) amt_ma20 = amt.rolling(20, min_periods=20).mean().iloc[-1] amt_ratio = float(amt.iloc[-1] / amt_ma20) if (not np.isnan(amt_ma20) and amt_ma20 != 0) else np.nan else: amt_ma20, amt_ratio = np.nan, np.nan return { "mom20": float(mom20) if not np.isnan(mom20) else np.nan, "mom60": float(mom60) if not np.isnan(mom60) else np.nan, "ma60_ratio": float(ma60_ratio) if not np.isnan(ma60_ratio) else np.nan, "atr14_ratio": float(atr14_ratio) if not np.isnan(atr14_ratio) else np.nan, "dd60": float(dd60) if not np.isnan(dd60) else np.nan, "amt_ratio": float(amt_ratio) if not np.isnan(amt_ratio) else np.nan, "amt_ma20": float(amt_ma20) if not np.isnan(amt_ma20) else np.nan, } def compute_industry_features( ind_hist: Optional[pd.DataFrame], hs300_hist: Optional[pd.DataFrame], p: StockSelectParams ) -> Dict[str, float]: """ 行业指数与沪深300指数的对比特征 必需列:Close """ if ind_hist is None or hs300_hist is None: return { "ind_mom20_excess": np.nan, "ind_mom60_excess": np.nan, "ind_trend60": np.nan, "ind_dd60": np.nan } if "Close" not in ind_hist.columns or "Close" not in hs300_hist.columns: return { "ind_mom20_excess": np.nan, "ind_mom60_excess": np.nan, "ind_trend60": np.nan, "ind_dd60": np.nan } ind_close = ind_hist["Close"].astype(float) hs_close = hs300_hist["Close"].astype(float) ind_m20 = _mom(ind_close, p.ind_mom_short) ind_m60 = _mom(ind_close, p.ind_mom_mid) hs_m20 = _mom(hs_close, p.ind_mom_short) hs_m60 = _mom(hs_close, p.ind_mom_mid) if len(ind_close) >= p.ind_ma_mid: ind_ma60 = ind_close.rolling(p.ind_ma_mid, min_periods=p.ind_ma_mid).mean().iloc[-1] ind_trend60 = 1.0 if (not np.isnan(ind_ma60) and ind_close.iloc[-1] > ind_ma60) else 0.0 else: ind_trend60 = np.nan ind_dd60 = _dd(ind_close, p.ind_dd_win) ind_mom20_excess = float(ind_m20 - hs_m20) if (not np.isnan(ind_m20) and not np.isnan(hs_m20)) else np.nan ind_mom60_excess = float(ind_m60 - hs_m60) if (not np.isnan(ind_m60) and not np.isnan(hs_m60)) else np.nan return { "ind_mom20_excess": ind_mom20_excess, "ind_mom60_excess": ind_mom60_excess, "ind_trend60": float(ind_trend60) if not np.isnan(ind_trend60) else np.nan, "ind_dd60": float(ind_dd60) if not np.isnan(ind_dd60) else np.nan, } # ========================= # Hard filter # ========================= def hard_filter(meta: Dict[str, Any], feat: Dict[str, float], p: StockSelectParams) -> Tuple[bool, str]: """ meta 可包含:is_suspended, is_st 等 """ if meta.get("is_suspended") is True: return False, "停牌/不可交易" if meta.get("is_st") is True: return False, "ST风险股过滤" # 流动性过滤:Amount MA20 太低(如果有Amount) if not np.isnan(feat.get("amt_ma20", np.nan)) and feat["amt_ma20"] < p.min_amount_ma20: return False, f"流动性不足(amt_ma20={feat['amt_ma20']:.0f})" # 波动过滤:ATR/Close 太高 if not np.isnan(feat.get("atr14_ratio", np.nan)) and feat["atr14_ratio"] > p.max_atr14_ratio: return False, f"波动过大(atr14_ratio={feat['atr14_ratio']:.3f})" # 深回撤过滤 if not np.isnan(feat.get("dd60", np.nan)) and feat["dd60"] < p.max_dd60: return False, f"深回撤(dd60={feat['dd60']:.2%})" return True, "" # ========================= # Alpha score (cross-section) # ========================= def cross_section_alpha_score(feat_df: pd.DataFrame, p: StockSelectParams) -> pd.Series: """ 输出 alpha_score:0~100 低自由度:动量 + 风险 + 流动性 + 行业(轻权重) 最后用 rank 映射到 0~100(比线性更稳) """ # 动量:越高越好 mom = feat_df[["mom20", "mom60", "ma60_ratio"]].mean(axis=1) mom_z = _zscore(mom.fillna(mom.median())) # 风险:越低越好(atr越大越糟,回撤越深越糟) risk_raw = feat_df["atr14_ratio"].fillna(feat_df["atr14_ratio"].median()) + (-feat_df["dd60"]).fillna((-feat_df["dd60"]).median()) risk_z = _zscore(risk_raw) risk_score = -risk_z # 流动性:越高越好(缺失降级为0) liq_raw = feat_df["amt_ratio"] if liq_raw.isna().all(): liq_z = pd.Series(0.0, index=feat_df.index) else: liq_z = _zscore(liq_raw.fillna(liq_raw.median())) # 行业相对强弱:越高越好(缺失降级为0) ind_strength_raw = ( feat_df["ind_mom60_excess"].fillna(0.0) * 0.7 + feat_df["ind_mom20_excess"].fillna(0.0) * 0.3 ) ind_strength_z = _zscore(ind_strength_raw) # 行业趋势确认(0/1) ind_trend = feat_df["ind_trend60"].fillna(0.0) # 行业风险:行业回撤深则轻扣分(缺失降级为0) if feat_df["ind_dd60"].isna().all(): ind_risk_score = pd.Series(0.0, index=feat_df.index) else: ind_risk_raw = (-feat_df["ind_dd60"]).fillna(0.0) # dd越负越大 -> 风险越高 ind_risk_z = _zscore(ind_risk_raw) ind_risk_score = -ind_risk_z raw = ( p.w_momentum * mom_z + p.w_risk * risk_score + p.w_liquidity * liq_z + p.w_industry_strength * ind_strength_z + p.w_industry_trend * ind_trend + p.w_industry_risk * ind_risk_score ) # rank 映射到 0~100 pct = raw.rank(pct=True) return (pct * 100.0).clip(0, 100) # ========================= # Main: build trade candidates # ========================= def build_trade_candidates( date, core_stocks: List[str], final_state: str, suggested_position: float, adjusted_confidence: float, config: Dict[str, Any], # data fetch get_stock_data_fn: Callable[[str, Any], Any], get_hs300_hist_fn: Callable[[Any], pd.DataFrame], get_industry_hist_fn: Callable[[str, Any], pd.DataFrame], # selection params select_params: Optional[StockSelectParams] = None, ) -> List[Dict[str, Any]]: """ 返回 trade_candidates(list[dict]),字段: - stock - alpha_score (0~100) - threshold (当日分位数门槛 q_th) - action(来自置信度桶) - position(分配给该股的权重,0~1) - intercepted (bool) - reason(如被拦截) - industry(可选) """ p = select_params or StockSelectParams() if p.top_n_by_state is None: p.top_n_by_state = default_topn_by_state() # 只拉一次 HS300 历史(避免循环IO) hs300_hist = get_hs300_hist_fn(date) filtered_out: List[Dict[str, Any]] = [] records: List[Dict[str, Any]] = [] # 1) 收集特征 + 硬过滤 for stock in core_stocks: sd = get_stock_data_fn(stock, date) # 约定:sd={"hist":df,"meta":{...}} if isinstance(sd, dict) and "hist" in sd: hist = sd["hist"] meta = sd.get("meta", {}) or {} else: # 如果你 get_stock_data_fn 直接返回 hist df hist = sd meta = {} feat = compute_stock_features(hist, p) ok, reason = hard_filter(meta, feat, p) if not ok: filtered_out.append({ "stock": stock, "alpha_score": 0.0, "threshold": np.nan, "action": "intercepted", "position": 0.0, "intercepted": True, "reason": reason, "industry": meta.get("industry", None) }) continue industry = meta.get("industry", None) if industry is not None: ind_hist = get_industry_hist_fn(industry, date) ind_feat = compute_industry_features(ind_hist, hs300_hist, p) else: ind_feat = {"ind_mom20_excess": np.nan, "ind_mom60_excess": np.nan, "ind_trend60": np.nan, "ind_dd60": np.nan} records.append({"stock": stock, "industry": industry, **feat, **ind_feat}) # 全部被过滤 if len(records) == 0: return filtered_out feat_df = pd.DataFrame(records).set_index("stock") feat_df["alpha_score"] = cross_section_alpha_score(feat_df, p) # 2) 决定 TopN(状态弱耦合) top_n = int(p.top_n_by_state.get(final_state, p.top_n_default)) top_n = max(0, min(top_n, len(feat_df))) # 3) 分位数门槛(替代固定 threshold) q_th = float(feat_df["alpha_score"].quantile(p.min_quantile)) ranked = feat_df.sort_values("alpha_score", ascending=False) selected = ranked[ranked["alpha_score"] >= q_th].head(top_n) # 4) 行业硬规则(非常稳):行业60日超额必须为正 if p.require_industry_strength_positive: selected = selected[selected["ind_mom60_excess"].fillna(0.0) > 0.0] # 5) 执行动作(沿用你工程的 bucket 逻辑) # config["EXECUTION_ACTIONS"] 由你现有配置提供 action, _bucket = calibrate_confidence_by_bucket(adjusted_confidence, config["EXECUTION_ACTIONS"]) total_target = float(calculate_stock_position(action, suggested_position)) # 组合目标仓位(0~1) # 如果不允许开仓/无入选/目标仓位为0:全部拦截(但保留分数用于观察) if top_n == 0 or len(selected) == 0 or total_target <= 0.0: out = filtered_out[:] for stock, r in feat_df.iterrows(): out.append({ "stock": stock, "alpha_score": float(r["alpha_score"]), "threshold": q_th, "action": "intercepted", "position": 0.0, "intercepted": True, "reason": f"状态{final_state}不允许新开仓/或行业弱/或无入选", "industry": r.get("industry", None) }) return out # 6) 分配仓位:等权 + 单股上限 + 行业上限 base_w = min(total_target / float(len(selected)), p.max_weight_per_stock) industry_sum: Dict[Any, float] = {} positions: Dict[str, float] = {} for stock, r in selected.iterrows(): ind = r.get("industry", None) w = base_w # 行业上限(如果有行业字段) if ind is not None: cur = industry_sum.get(ind, 0.0) if cur + w > p.max_weight_per_industry: w = max(0.0, p.max_weight_per_industry - cur) # 单股上限 w = min(w, p.max_weight_per_stock) positions[stock] = w if ind is not None: industry_sum[ind] = industry_sum.get(ind, 0.0) + w selected_set = set([s for s, w in positions.items() if w > 0]) # 7) 组装输出(对齐你现有 trade_candidates) trade_candidates: List[Dict[str, Any]] = filtered_out[:] for stock, r in feat_df.iterrows(): if stock in selected_set: trade_candidates.append({ "stock": stock, "alpha_score": float(r["alpha_score"]), "threshold": float(q_th), "action": action, "position": float(positions[stock]), "intercepted": False, "industry": r.get("industry", None), }) else: trade_candidates.append({ "stock": stock, "alpha_score": float(r["alpha_score"]), "threshold": float(q_th), "action": "intercepted", "position": 0.0, "intercepted": True, "reason": f"未入Top{top_n}/低于{p.min_quantile:.0%}分位/行业弱或行业上限", "industry": r.get("industry", None), }) return trade_candidates # ========================= # Example integration # ========================= """ 你在 pipeline 里这样用: trade_candidates = build_trade_candidates( date=date, core_stocks=core_stocks, # 你的22只核心池 final_state=final_state, # 大势状态 suggested_position=suggested_position, # 大势给的组合仓位上限/目标 adjusted_confidence=adjusted_confidence, # 大势置信度(已做稳定性/抖动惩罚更好) config=config, # 需要包含 EXECUTION_ACTIONS get_stock_data_fn=get_stock_data, # 你现有函数 get_hs300_hist_fn=get_hs300_hist, # 你实现:返回HS300历史df(截至date) get_industry_hist_fn=get_industry_hist, # 你实现:返回行业指数历史df(截至date) select_params=StockSelectParams(), # 可选:传入参数 ) 然后写进 report["trade_candidates"] 即可。 注意: - Amount 单位要一致;如果你Amount是“亿元”,min_amount_ma20 要改小很多。 - industry 必须在 meta 里给出来:meta["industry"]="申万-电子" 或行业代码都行。 """