import numpy as np import pandas as pd from dataclasses import dataclass from collections import deque from typing import Optional, Dict, Any, Tuple # -------------------------- # utils # -------------------------- def clip(x, lo=0.0, hi=1.0): return max(lo, min(hi, x)) def apply_position_limit(new_position: float, daily_change_limit: float, previous_position: float) -> float: """限制每日仓位变化幅度""" if previous_position is None or np.isnan(previous_position): return float(new_position) delta = new_position - previous_position if abs(delta) <= daily_change_limit: return float(new_position) return float(previous_position + np.sign(delta) * daily_change_limit) def atr(high: pd.Series, low: pd.Series, close: pd.Series, n: int = 14) -> pd.Series: prev_close = close.shift(1) tr = pd.concat([ (high - low).abs(), (high - prev_close).abs(), (low - prev_close).abs(), ], axis=1).max(axis=1) return tr.rolling(n, min_periods=n).mean() # -------------------------- # Regime / signal engine # -------------------------- @dataclass class RegimeParams: ma_trend_window: int = 120 dd_window: int = 60 vol_window: int = 20 q_window: int = 252 vol_enter_q: float = 0.80 vol_exit_q: float = 0.70 dd_enter_q: float = 0.20 dd_exit_q: float = 0.30 tr_enter_q: float = 0.60 tr_exit_q: float = 0.50 stability_cap_days: int = 20 hist_len: int = 10 daily_change_limit: float = 0.20 use_amount_filter: bool = True amount_ma_window: int = 20 amount_min_ratio: float = 0.60 caps: Optional[Dict[str, float]] = None bases: Optional[Dict[str, float]] = None class MarketRegimeStateMachine: """两层:风险闸门(vol/dd)-> 趋势过滤(ma120_ratio)""" def __init__(self, params: Optional[RegimeParams] = None): self.p = params or RegimeParams() if self.p.caps is None: self.p.caps = { "RISK_SEVERE": 0.20, "RISK_MILD": 0.50, "RANGE": 0.50, "TREND_NORMAL": 0.80, "TREND_BULL": 1.00, } if self.p.bases is None: self.p.bases = { "RISK_SEVERE": 0.00, "RISK_MILD": 0.20, "RANGE": 0.30, "TREND_NORMAL": 0.60, "TREND_BULL": 0.90, } self.current_state: str = "RANGE" self.state_duration: int = 0 self.last_position: float = 0.0 self.state_hist: deque = deque(maxlen=self.p.hist_len) def _switch_count_last_n(self) -> int: h = list(self.state_hist) if len(h) <= 1: return 0 return int(sum(h[i] != h[i - 1] for i in range(1, len(h)))) def compute_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """ df至少包含:Date, Open, High, Low, Close 可选:Amount """ d = df.copy() for col in ["Open", "High", "Low", "Close"]: if col not in d.columns: raise ValueError(f"Missing required column: {col}") d["ret1"] = d["Close"].pct_change() d["ma120"] = d["Close"].rolling(self.p.ma_trend_window, min_periods=self.p.ma_trend_window).mean() d["ma120_ratio"] = d["Close"] / d["ma120"] - 1.0 d["vol20"] = d["ret1"].rolling(self.p.vol_window, min_periods=self.p.vol_window).std() d["roll_max60"] = d["Close"].rolling(self.p.dd_window, min_periods=self.p.dd_window).max() d["dd60"] = d["Close"] / d["roll_max60"] - 1.0 d["atr14"] = atr(d["High"], d["Low"], d["Close"], n=14) d["atr14_ratio"] = d["atr14"] / d["Close"] if "Amount" in d.columns: d["amt_ma20"] = d["Amount"].rolling(self.p.amount_ma_window, min_periods=self.p.amount_ma_window).mean() d["amt_ratio"] = d["Amount"] / d["amt_ma20"] else: d["amt_ratio"] = np.nan qw = self.p.q_window d["vol_enter"] = d["vol20"].rolling(qw, min_periods=qw).quantile(self.p.vol_enter_q) d["vol_exit"] = d["vol20"].rolling(qw, min_periods=qw).quantile(self.p.vol_exit_q) d["dd_enter"] = d["dd60"].rolling(qw, min_periods=qw).quantile(self.p.dd_enter_q) d["dd_exit"] = d["dd60"].rolling(qw, min_periods=qw).quantile(self.p.dd_exit_q) d["tr_enter"] = d["ma120_ratio"].rolling(qw, min_periods=qw).quantile(self.p.tr_enter_q) d["tr_exit"] = d["ma120_ratio"].rolling(qw, min_periods=qw).quantile(self.p.tr_exit_q) return d def _decide_state(self, row: pd.Series) -> str: needed = ["vol20", "dd60", "vol_enter", "vol_exit", "dd_enter", "dd_exit", "ma120_ratio", "tr_enter", "tr_exit"] if any(pd.isna(row.get(k, np.nan)) for k in needed): return self.current_state vol20 = float(row["vol20"]) dd60 = float(row["dd60"]) # Layer 1: Risk gate if (vol20 > float(row["vol_enter"])) or (dd60 < float(row["dd_enter"])): return "RISK_SEVERE" if (vol20 > float(row["vol_exit"])) or (dd60 < float(row["dd_exit"])): return "RISK_MILD" # Layer 2: Trend filter tr = float(row["ma120_ratio"]) if tr > float(row["tr_enter"]): return "TREND_BULL" if tr > float(row["tr_exit"]): return "TREND_NORMAL" return "RANGE" def _amount_filter_cap(self, row: pd.Series, cap: float) -> float: if not self.p.use_amount_filter: return cap ar = row.get("amt_ratio", np.nan) if pd.isna(ar): return cap if float(ar) < self.p.amount_min_ratio: return min(cap, 0.50) return cap def _confidence(self, row: pd.Series, state: str) -> float: stability = min(self.state_duration, self.p.stability_cap_days) / float(self.p.stability_cap_days) jitter = clip(self._switch_count_last_n() / 6.0, 0.0, 1.0) margin = 0.0 try: if state in ("RISK_SEVERE", "RISK_MILD"): vol20 = float(row["vol20"]) vol_enter = float(row["vol_enter"]); vol_exit = float(row["vol_exit"]) dd60 = float(row["dd60"]) dd_enter = float(row["dd_enter"]); dd_exit = float(row["dd_exit"]) vol_band = max(1e-12, abs(vol_enter - vol_exit)) dd_band = max(1e-12, abs(dd_exit - dd_enter)) vol_m = clip((vol20 - vol_exit) / vol_band, 0.0, 1.0) dd_m = clip((dd_exit - dd60) / dd_band, 0.0, 1.0) margin = max(vol_m, dd_m) else: tr = float(row["ma120_ratio"]) tr_enter = float(row["tr_enter"]); tr_exit = float(row["tr_exit"]) tr_band = max(1e-12, abs(tr_enter - tr_exit)) margin = clip((tr - tr_exit) / tr_band, 0.0, 1.0) except Exception: margin = 0.0 conf = 0.45 + 0.35 * margin + 0.20 * stability - 0.30 * jitter return float(clip(conf, 0.0, 1.0)) def step(self, row: pd.Series) -> Dict[str, Any]: new_state = self._decide_state(row) if new_state == self.current_state: self.state_duration += 1 else: self.current_state = new_state self.state_duration = 1 self.state_hist.append(self.current_state) conf = self._confidence(row, self.current_state) base = float(self.p.bases[self.current_state]) cap = float(self.p.caps[self.current_state]) cap = self._amount_filter_cap(row, cap) target = base * (0.5 + 0.5 * conf) target = min(target, cap) # 限速必须用 last_position pos = apply_position_limit(target, self.p.daily_change_limit, self.last_position) self.last_position = pos return { "state": self.current_state, "confidence": conf, "target_position": float(target), "position": float(pos), "state_duration": int(self.state_duration), "switch_count": int(self._switch_count_last_n()), } # -------------------------- # Portfolio backtester (single instrument) # -------------------------- @dataclass class TradeCost: commission_rate: float = 0.0002 # 佣金:万2 stamp_tax_rate: float = 0.0010 # 印花税:卖出千1(指数/ETF你可设0) slippage_bps: float = 5.0 # 滑点(bps):5 = 0.05% min_commission: float = 0.0 # 最低佣金(有的券商5元,可设5) def slip(self) -> float: return self.slippage_bps / 10000.0 def _commission(amount: float, rate: float, min_fee: float) -> float: fee = abs(amount) * rate return float(max(fee, min_fee)) def backtest_single_instrument_regime( df: pd.DataFrame, params: Optional[RegimeParams] = None, cost: Optional[TradeCost] = None, initial_cash: float = 1_000_000.0, stamp_tax_on_sell: bool = False, # 回测指数择时:通常False;ETF/股票可True ) -> pd.DataFrame: """ 单标的择时回测:目标是让持仓价值 / 总资产 ≈ position(由大势引擎给出)。 成交:t日信号 -> t+1日Open成交(T+1) 估值:每日按Close计净值(mark-to-market) """ p = params or RegimeParams() c = cost or TradeCost() d = df.copy().sort_values("Date").reset_index(drop=True) for col in ["Date", "Open", "High", "Low", "Close"]: if col not in d.columns: raise ValueError(f"Missing required column: {col}") sm = MarketRegimeStateMachine(params=p) ind = sm.compute_indicators(d) cash = float(initial_cash) shares = 0.0 # 对指数用“份额/单位”即可;回测ETF可用股数 last_nav = float(initial_cash) rows = [] slip = c.slip() # 为了 T+1:用昨日 signal 决定今日开盘交易 prev_signal = None # dict: state/conf/target/position for i in range(len(ind)): row = ind.iloc[i] date = row["Date"] # 先生成今日信号(收盘后才知道),但交易要等明天开盘 today_signal = sm.step(row) # 1) 执行交易:用 prev_signal(昨日收盘信号)在今日开盘成交 trade_value = 0.0 commission = 0.0 stamp = 0.0 turnover = 0.0 if prev_signal is not None: desired_weight = float(prev_signal["position"]) # 昨日信号计算后的“可执行仓位” open_price = float(row["Open"]) # 以开盘价估算总资产(交易前) mkt_value = shares * open_price equity = cash + mkt_value # 目标持仓市值 target_mkt_value = equity * desired_weight delta_value = target_mkt_value - mkt_value # 正=>买入,负=>卖出 # 加入滑点:买入更贵,卖出更便宜 if delta_value > 0: exec_price = open_price * (1.0 + slip) else: exec_price = open_price * (1.0 - slip) # 以“份额”成交(不考虑100股整数手;你做个股可自己改成整手) delta_shares = delta_value / exec_price if exec_price > 0 else 0.0 # 现金约束:买入不能超过现金(考虑手续费) if delta_shares > 0: # 先粗算买入额,再扣佣金 buy_amount = delta_shares * exec_price fee = _commission(buy_amount, c.commission_rate, c.min_commission) total_need = buy_amount + fee if total_need > cash and total_need > 0: scale = cash / total_need delta_shares *= scale buy_amount = delta_shares * exec_price fee = _commission(buy_amount, c.commission_rate, c.min_commission) trade_value = buy_amount commission = fee cash -= (buy_amount + fee) shares += delta_shares elif delta_shares < 0: sell_amount = (-delta_shares) * exec_price fee = _commission(sell_amount, c.commission_rate, c.min_commission) tax = sell_amount * c.stamp_tax_rate if stamp_tax_on_sell else 0.0 trade_value = -sell_amount commission = fee stamp = tax cash += (sell_amount - fee - tax) shares += delta_shares # delta_shares为负 # 换手率:交易额/权益 turnover = abs(trade_value) / equity if equity > 0 else 0.0 # 2) 收盘计净值(mark-to-market) close_price = float(row["Close"]) nav = cash + shares * close_price ret = (nav / last_nav - 1.0) if last_nav > 0 else 0.0 last_nav = nav rows.append({ "Date": date, "nav": nav, "ret": ret, "cash": cash, "shares": shares, "mkt_value_close": shares * close_price, "trade_value": trade_value, "commission": commission, "stamp_tax": stamp, "turnover": turnover, # 今日收盘信号(用于观察,不用于今日成交) "state": today_signal["state"], "confidence": today_signal["confidence"], "target_position": today_signal["target_position"], "position": today_signal["position"], "state_duration": today_signal["state_duration"], "switch_count": today_signal["switch_count"], }) # 3) 今日信号留到明天开盘执行(T+1) prev_signal = today_signal out = pd.DataFrame(rows) return pd.concat([ind, out.drop(columns=["Date"])], axis=1) # ind里有Date # -------------------------- # Metrics # -------------------------- def perf_summary(result: pd.DataFrame, trading_days: int = 252) -> Dict[str, float]: r = result["ret"].fillna(0.0).to_numpy() nav = result["nav"].to_numpy() ann_ret = (nav[-1] / nav[0]) ** (trading_days / max(1, len(nav)-1)) - 1.0 ann_vol = np.std(r) * np.sqrt(trading_days) sharpe = ann_ret / ann_vol if ann_vol > 1e-12 else np.nan peak = np.maximum.accumulate(nav) dd = nav / peak - 1.0 max_dd = float(dd.min()) avg_turnover = float(result["turnover"].fillna(0.0).mean()) return { "ann_return": float(ann_ret), "ann_vol": float(ann_vol), "sharpe": float(sharpe), "max_drawdown": float(max_dd), "avg_daily_turnover": avg_turnover, "final_nav": float(nav[-1]), }