下面这版代码是一个 v1.4 大势模块,核心思路是: 趋势线 TrendScore:更重视“趋势是否形成并持续” 风险线 RiskScore:只负责修正,不去抢趋势主导权 双层平滑:特征先平滑,分数再平滑 惯性项:减少切换过碎 三主状态 + 五态标签 连续仓位:给你复盘更好用 你可以直接保存成一个单独模块,比如 regime_v14.py。 from __future__ import annotations from dataclasses import dataclass from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd # ========================= # 配置参数 # ========================= @dataclass class RegimeV14Config: # 趋势线权重 w_long_trend: float = 0.35 w_mid_trend: float = 0.30 w_breakout: float = 0.20 w_breadth: float = 0.15 # 风险线权重 w_drawdown: float = 0.60 w_atr_risk: float = 0.40 # 滚动窗口 ma_short: int = 20 ma_mid: int = 60 ma_long: int = 120 breakout_window: int = 60 atr_window: int = 14 # 分位窗口 trend_rank_window: int = 504 risk_rank_window: int = 252 # 平滑 feature_ema_span: int = 3 score_ema_span: int = 5 inertia_alpha: float = 0.15 # 15% 惯性 # 主状态阈值 bull_trend_threshold: float = 63.0 defensive_risk_threshold: float = 72.0 weak_trend_threshold: float = 38.0 mild_risk_threshold: float = 55.0 # 仓位控制 cap_base: float = 0.08 cap_trend_coeff: float = 0.012 cap_risk_coeff: float = 0.009 cap_min: float = 0.05 cap_max: float = 1.00 # ========================= # 工具函数 # ========================= def _clip_series(s: pd.Series, low: float, high: float) -> pd.Series: return s.clip(lower=low, upper=high) def _safe_mean(df: pd.DataFrame, cols: List[str]) -> pd.Series: available = [c for c in cols if c in df.columns] if not available: return pd.Series(np.nan, index=df.index) return df[available].mean(axis=1) def _rolling_percentile_rank(series: pd.Series, window: int) -> pd.Series: """ 返回 0~100 的滚动分位排名。 当前值在过去 window 个样本中的相对位置。 """ def rank_last(x: np.ndarray) -> float: if len(x) == 0 or np.isnan(x[-1]): return np.nan arr = pd.Series(x).dropna().values if len(arr) < max(20, window // 5): return np.nan last = arr[-1] return 100.0 * (np.sum(arr <= last) / len(arr)) return series.rolling(window=window, min_periods=max(20, window // 5)).apply(rank_last, raw=True) def _ema(series: pd.Series, span: int) -> pd.Series: return series.ewm(span=span, adjust=False, min_periods=1).mean() def _true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: prev_close = close.shift(1) tr1 = high - low tr2 = (high - prev_close).abs() tr3 = (low - prev_close).abs() return pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) def _atr(high: pd.Series, low: pd.Series, close: pd.Series, window: int = 14) -> pd.Series: tr = _true_range(high, low, close) return tr.rolling(window=window, min_periods=max(5, window // 2)).mean() def _apply_inertia(score: pd.Series, alpha: float) -> pd.Series: """ 让今天的有效分数 = (1-alpha)*今天 + alpha*昨天有效分数 alpha 越大,惯性越强 """ out = [] prev = np.nan for val in score: if np.isnan(val): out.append(np.nan) continue if np.isnan(prev): curr = val else: curr = (1 - alpha) * val + alpha * prev out.append(curr) prev = curr return pd.Series(out, index=score.index) # ========================= # 数据准备 # ========================= def prepare_index_features( df: pd.DataFrame, prefix: str, cfg: RegimeV14Config ) -> pd.DataFrame: """ 对单个指数生成基础特征。 约定列名: {prefix}_close, {prefix}_high, {prefix}_low 可选: {prefix}_up_ratio, {prefix}_avg_return """ close_col = f"{prefix}_close" high_col = f"{prefix}_high" low_col = f"{prefix}_low" required = [close_col, high_col, low_col] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"{prefix} 缺少必要列: {missing}") close = df[close_col].astype(float) high = df[high_col].astype(float) low = df[low_col].astype(float) ma20 = close.rolling(cfg.ma_short, min_periods=max(5, cfg.ma_short // 2)).mean() ma60 = close.rolling(cfg.ma_mid, min_periods=max(10, cfg.ma_mid // 2)).mean() ma120 = close.rolling(cfg.ma_long, min_periods=max(20, cfg.ma_long // 2)).mean() atr14 = _atr(high, low, close, cfg.atr_window) atr_pct = atr14 / close.replace(0, np.nan) dist120 = close / ma120 - 1.0 mid_structure = ma20 / ma60 - 1.0 breakout_ratio = close / close.rolling(cfg.breakout_window, min_periods=max(10, cfg.breakout_window // 2)).max() dd120 = 1.0 - close / close.rolling(cfg.ma_long, min_periods=max(20, cfg.ma_long // 2)).max() out = pd.DataFrame(index=df.index) out[f"{prefix}_ma20"] = ma20 out[f"{prefix}_ma60"] = ma60 out[f"{prefix}_ma120"] = ma120 out[f"{prefix}_atr14"] = atr14 out[f"{prefix}_atr_pct"] = atr_pct out[f"{prefix}_dist120"] = dist120 out[f"{prefix}_mid_structure"] = mid_structure out[f"{prefix}_breakout_ratio"] = breakout_ratio out[f"{prefix}_dd120"] = dd120 # 宽度支持可选 if f"{prefix}_up_ratio" in df.columns: out[f"{prefix}_up_ratio"] = df[f"{prefix}_up_ratio"].astype(float) if f"{prefix}_avg_return" in df.columns: out[f"{prefix}_avg_return"] = df[f"{prefix}_avg_return"].astype(float) return out # ========================= # 趋势线 / 风险线 # ========================= def compute_trend_score( df_feat: pd.DataFrame, prefixes: List[str], cfg: RegimeV14Config ) -> pd.DataFrame: out = pd.DataFrame(index=df_feat.index) # 1) 长趋势:close / ma120 - 1 的滚动分位 long_scores = [] for p in prefixes: s = df_feat[f"{p}_dist120"] s = _ema(s, cfg.feature_ema_span) long_scores.append(_rolling_percentile_rank(s, cfg.trend_rank_window)) out["long_trend_score"] = pd.concat(long_scores, axis=1).mean(axis=1) # 2) 中趋势:ma20 / ma60 - 1 的滚动分位 mid_scores = [] for p in prefixes: s = df_feat[f"{p}_mid_structure"] s = _ema(s, cfg.feature_ema_span) mid_scores.append(_rolling_percentile_rank(s, cfg.risk_rank_window)) out["mid_trend_score"] = pd.concat(mid_scores, axis=1).mean(axis=1) # 3) 趋势突破能力:接近/刷新60日新高 breakout_scores = [] for p in prefixes: ratio = _ema(df_feat[f"{p}_breakout_ratio"], cfg.feature_ema_span) # 0.92 ~ 1.00 映射到 0~100 score = _clip_series((ratio - 0.92) / 0.08, 0.0, 1.0) * 100.0 breakout_scores.append(score) out["breakout_score"] = pd.concat(breakout_scores, axis=1).mean(axis=1) # 4) 宽度支持:优先用 up_ratio + avg_return,没有就退化为指数短期涨跌均值 breadth_parts = [] for p in prefixes: has_up = f"{p}_up_ratio" in df_feat.columns has_ret = f"{p}_avg_return" in df_feat.columns if has_up or has_ret: pieces = [] if has_up: up_ratio = _ema(df_feat[f"{p}_up_ratio"], cfg.feature_ema_span) up_score = _clip_series(up_ratio / 0.70, 0.0, 1.0) * 100.0 pieces.append(0.7 * up_score) if has_ret: avg_ret = _ema(df_feat[f"{p}_avg_return"], cfg.feature_ema_span) ret_score = _clip_series((avg_ret + 0.02) / 0.04, 0.0, 1.0) * 100.0 pieces.append(0.3 * ret_score if has_up else 1.0 * ret_score) breadth_parts.append(sum(pieces)) else: # 回退逻辑:用 close 相对 ma20 的位置近似宽度支持 dist20 = df_feat[f"{p}_ma20"] close_proxy = df_feat[f"{p}_ma20"] / df_feat[f"{p}_ma60"] - 1.0 proxy = _ema(close_proxy, cfg.feature_ema_span) proxy_score = _rolling_percentile_rank(proxy, cfg.risk_rank_window) breadth_parts.append(proxy_score) out["breadth_score"] = pd.concat(breadth_parts, axis=1).mean(axis=1) # 汇总 raw = ( cfg.w_long_trend * out["long_trend_score"] + cfg.w_mid_trend * out["mid_trend_score"] + cfg.w_breakout * out["breakout_score"] + cfg.w_breadth * out["breadth_score"] ) out["trend_score_raw"] = raw out["trend_score_smooth"] = _ema(raw, cfg.score_ema_span) out["trend_score"] = _apply_inertia(out["trend_score_smooth"], cfg.inertia_alpha) return out def compute_risk_score( df_feat: pd.DataFrame, prefixes: List[str], cfg: RegimeV14Config ) -> pd.DataFrame: out = pd.DataFrame(index=df_feat.index) # 1) 回撤风险 dd_scores = [] for p in prefixes: s = _ema(df_feat[f"{p}_dd120"], cfg.feature_ema_span) dd_scores.append(_rolling_percentile_rank(s, cfg.risk_rank_window)) out["drawdown_risk_score"] = pd.concat(dd_scores, axis=1).mean(axis=1) # 2) ATR 风险 atr_scores = [] for p in prefixes: s = _ema(df_feat[f"{p}_atr_pct"], cfg.feature_ema_span) atr_scores.append(_rolling_percentile_rank(s, cfg.risk_rank_window)) out["atr_risk_score"] = pd.concat(atr_scores, axis=1).mean(axis=1) raw = ( cfg.w_drawdown * out["drawdown_risk_score"] + cfg.w_atr_risk * out["atr_risk_score"] ) out["risk_score_raw"] = raw out["risk_score_smooth"] = _ema(raw, cfg.score_ema_span) out["risk_score"] = _apply_inertia(out["risk_score_smooth"], cfg.inertia_alpha) return out # ========================= # 状态映射 # ========================= def map_master_state( trend_score: float, risk_score: float, cfg: RegimeV14Config ) -> str: if np.isnan(trend_score) or np.isnan(risk_score): return "WARMUP" if risk_score >= cfg.defensive_risk_threshold or ( trend_score < cfg.weak_trend_threshold and risk_score >= cfg.mild_risk_threshold ): return "DEFENSIVE" if trend_score >= cfg.bull_trend_threshold and risk_score < cfg.defensive_risk_threshold: return "BULL" return "NEUTRAL" def map_regime_5( trend_score: float, risk_score: float, cfg: RegimeV14Config ) -> str: if np.isnan(trend_score) or np.isnan(risk_score): return "WARMUP" if risk_score >= cfg.defensive_risk_threshold: return "RISK_SEVERE" if trend_score >= cfg.bull_trend_threshold: if risk_score >= cfg.mild_risk_threshold: return "TREND_BULL_HIGHVOL" return "TREND_BULL" if risk_score >= cfg.mild_risk_threshold: return "RISK_MILD" return "RANGE" def compute_target_cap( trend_score: pd.Series, risk_score: pd.Series, cfg: RegimeV14Config ) -> pd.Series: cap = ( cfg.cap_base + cfg.cap_trend_coeff * trend_score - cfg.cap_risk_coeff * risk_score ) return _clip_series(cap, cfg.cap_min, cfg.cap_max) # ========================= # 主流程 # ========================= def build_regime_v14( df: pd.DataFrame, prefixes: List[str], cfg: Optional[RegimeV14Config] = None ) -> pd.DataFrame: """ 输入: df: 包含三指数列的数据表 prefixes: 指数前缀列表,例如 ["sh", "sz", "cyb"] 要求最少列: {prefix}_close, {prefix}_high, {prefix}_low 可选增强列: {prefix}_up_ratio {prefix}_avg_return """ if cfg is None: cfg = RegimeV14Config() all_feat = [] for p in prefixes: feat = prepare_index_features(df, p, cfg) all_feat.append(feat) df_feat = pd.concat(all_feat, axis=1) trend_df = compute_trend_score(df_feat, prefixes, cfg) risk_df = compute_risk_score(df_feat, prefixes, cfg) out = pd.DataFrame(index=df.index) # 原始价格可回填方便查看 if "date" in df.columns: out["date"] = df["date"] out["trend_score_raw"] = trend_df["trend_score_raw"] out["trend_score"] = trend_df["trend_score"] out["risk_score_raw"] = risk_df["risk_score_raw"] out["risk_score"] = risk_df["risk_score"] out["master_state"] = [ map_master_state(t, r, cfg) for t, r in zip(out["trend_score"], out["risk_score"]) ] out["regime_5"] = [ map_regime_5(t, r, cfg) for t, r in zip(out["trend_score"], out["risk_score"]) ] out["target_cap"] = compute_target_cap(out["trend_score"], out["risk_score"], cfg) # warmup 标记 out["warmup_flag"] = ( out["trend_score"].isna() | out["risk_score"].isna() ) return out # ========================= # 评估辅助:趋势识别而非短线预测 # ========================= def build_future_trend_label( close: pd.Series, ma20: pd.Series, future_window: int = 20, min_return: float = 0.04, max_drawdown: float = 0.04, ) -> pd.Series: """ 未来趋势标签: 用于检验“今天是不是趋势环境起点/趋势环境内” 不是预测明天涨跌。 条件: - 未来 future_window 日收益 > min_return - 未来区间最大回撤不超过 max_drawdown - 未来最后一天仍在 MA20 上方(简化趋势持续性) """ future_close = close.shift(-future_window) future_ret = future_close / close - 1.0 rolling_min_future = close[::-1].rolling(future_window, min_periods=future_window).min()[::-1] future_dd = rolling_min_future / close - 1.0 future_ma20 = ma20.shift(-future_window) label = ( (future_ret >= min_return) & (future_dd >= -max_drawdown) & (future_close >= future_ma20) ) return label.astype(float) def evaluate_trend_identification( result_df: pd.DataFrame, close: pd.Series, ma20: pd.Series, future_window: int = 20 ) -> pd.DataFrame: """ 看 trend_score 对未来趋势环境标签的区分能力。 """ label = build_future_trend_label(close=close, ma20=ma20, future_window=future_window) df_eval = pd.DataFrame({ "trend_score": result_df["trend_score"], "label": label }).dropna() if df_eval.empty: return pd.DataFrame() df_eval["bucket"] = pd.qcut(df_eval["trend_score"], 5, labels=False, duplicates="drop") summary = df_eval.groupby("bucket").agg( count=("label", "size"), trend_hit_rate=("label", "mean"), avg_trend_score=("trend_score", "mean"), ).reset_index() return summary # ========================= # 示例用法 # ========================= if __name__ == "__main__": # 假设你的原始表 df 包含以下列: # date # sh_close, sh_high, sh_low # sz_close, sz_high, sz_low # cyb_close, cyb_high, cyb_low # # 可选增强列: # sh_up_ratio, sh_avg_return # sz_up_ratio, sz_avg_return # cyb_up_ratio, cyb_avg_return # 示例: # df = pd.read_excel("your_market_data.xlsx") # cfg = RegimeV14Config() # result = build_regime_v14(df, prefixes=["sh", "sz", "cyb"], cfg=cfg) # result.to_csv("market_regime_v14.csv", index=False, encoding="utf-8-sig") pass 你最需要改的地方 这版代码能直接用,但你要先把你自己的列名映射一下。 如果你现在三个指数不是这套名字: sh_close, sh_high, sh_low sz_close, sz_high, sz_low cyb_close, cyb_high, cyb_low 那你有两种办法: 方案 1:重命名列 比如: df = df.rename(columns={ "上证_close": "sh_close", "上证_high": "sh_high", "上证_low": "sh_low", "深成_close": "sz_close", "深成_high": "sz_high", "深成_low": "sz_low", "创业板_close": "cyb_close", "创业板_high": "cyb_high", "创业板_low": "cyb_low", }) 方案 2:我帮你改成适配你现有表头 这个更稳。 这版的输出字段 最终 build_regime_v14() 会输出这些核心字段: trend_score_raw trend_score risk_score_raw risk_score master_state:BULL / NEUTRAL / DEFENSIVE regime_5: TREND_BULL TREND_BULL_HIGHVOL RANGE RISK_MILD RISK_SEVERE WARMUP target_cap warmup_flag 这版为什么比硬状态机更适合你 你要的是“趋势识别准”,不是“明天涨跌准”。 所以我这里刻意做了这些取舍: 趋势线权重更高 风险线只做修正 用分位排名代替拍脑袋常数 双层平滑 + 惯性 连续仓位为主,五态标签为辅 这样一般会比单纯 if/else 状态机更平滑,也更不容易过拟合。 建议你第一步怎么测 先别直接替换实盘,先做这三个对照: 和你当前五态程序并行跑 看 regime_5 的切换是否更少、更顺 看 trend_score 高的时候,你的个股 BUY 胜率是否更高 如果你愿意,把你现在三指数表的实际列名贴给我,我可以下一条直接帮你改成适配你表头的版本,包括 rename 映射和调用示例。