@ëë @ëë @ëë@ëëÄã¿´¿´×öÒ»°æ 1.8.4 ÊÔÊԻزâÒ»ÏÂÓÐûÓÐÌáÉý£¬¾ßÌåÈçÏÂÄã¾õµÃÔõôÑù import pandas as pd import numpy as np import pickle import warnings warnings.filterwarnings('ignore') # ==================== ÅäÖà ==================== POSITION_MAPPING = { 'RISK_SEVERE': 0, 'RISK_MILD': 35, 'RANGE': 55, 'TREND_NORMAL': 60, 'TREND_BULL': 90 } # v1.8.4£ºË«ÖÜÆÚ¶¯Á¿£¬½µµÍ»úе»»ÊÖ SELECTOR_WEIGHTS = { 'mom20': 0.35, 'mom60': 0.35, 'turnover': 0.10, 'vol': 0.20 } FIXED_HOLD_COUNT = 10 REBALANCE_DAYS = 20 INITIAL_CAPITAL = 1000000 # ===== v1.8.4 µÍ½»Ò×ÔöÇ¿²ÎÊý ===== HOLD_BUFFER_RANK = 15 # ÒѳֲÖÖ»Òª»¹ÔÚǰ15£¬ÓÅÏȼÌÐø³ÖÓÐ BUY_BUFFER_RANK = 8 # пª²Ö±ØÐë½øÈëǰ8 MIN_TRADE_RATIO = 0.20 # µ¥Æ±Ä¿±ê±ä»¯²»×ã20%£¬ºöÂÔ£¬²»½»Ò× MIN_HOLD_DAYS = 10 # ×îÉÙ³ÖÓÐ10¸ö½»Ò×ÈÕ£¨·çÏÕ״̬³ýÍ⣩ ALLOW_DAILY_REDUCE_STATES = {'RISK_SEVERE', 'RISK_MILD'} # ½ö·çÏÕ״̬ÔÊÐí·Çµ÷²ÖÈÕÖ÷¶¯¼õ²Ö # ==================== ´óÊÆÄ£¿é ==================== # ÏÂÃæÕⲿ·Ö¼ÌÐøÑØÓÃÄã v1.8.3 Ô­ÎļþÖеÄʵÏÖ£º # - class RegimeV145Config # - is_upgrade # - is_downgrade # - calculate_index_support # - apply_upgrade_filters # - apply_confirmation_gate # - calculate_market_score_v145 # - determine_state_v145 # - calculate_position_v145 # ==================== Ñ¡¹ÉÄ£¿é ==================== def calculate_factors(stock, date, dates, price_df, turnover_df, volatility_df): try: idx = list(dates).index(date) except ValueError: return None try: p_now = float(price_df.loc[date, stock]) if pd.isna(p_now) or p_now <= 0: return None except: return None factors = {} # -------- 20ÈÕ¶¯Á¿ -------- if idx >= 20: old_date_20 = dates[idx - 20] p_old_20 = float(price_df.loc[old_date_20, stock]) factors['mom20'] = (p_now / p_old_20 - 1.0) * 100.0 if p_old_20 > 0 else 0.0 else: factors['mom20'] = 0.0 # -------- 60ÈÕ¶¯Á¿ -------- if idx >= 60: old_date_60 = dates[idx - 60] p_old_60 = float(price_df.loc[old_date_60, stock]) factors['mom60'] = (p_now / p_old_60 - 1.0) * 100.0 if p_old_60 > 0 else 0.0 else: # Ñù±¾²»×ãʱÍË»¯Îª20ÈÕ¶¯Á¿£¬±ÜÃâǰÆÚÈ«¿ÕÖµ factors['mom60'] = factors['mom20'] # -------- »»ÊÖÆ«Àë -------- if idx >= 20: turnover_vals = [] for j in range(20): tdate = dates[idx - j] if tdate in turnover_df.index and stock in turnover_df.columns: t = float(turnover_df.loc[tdate, stock]) if not pd.isna(t) and t > 0: turnover_vals.append(t) if len(turnover_vals) >= 10: turnover_ma = np.mean(turnover_vals) if date in turnover_df.index and stock in turnover_df.columns: current_turnover = float(turnover_df.loc[date, stock]) else: current_turnover = np.nan if turnover_ma > 0 and not pd.isna(current_turnover): turnover_dev = (current_turnover - turnover_ma) / turnover_ma * 100.0 else: turnover_dev = 0.0 else: turnover_dev = 0.0 else: turnover_dev = 0.0 # ÉÏÕÇÈÕ·Å´ó»»ÊÖ£¬»ØÂäÈÕÈõ»¯»»ÊÖ£¬µ«²»×ö¹ýÇ¿³Í·£ if idx >= 1: prev_date = dates[idx - 1] if prev_date in price_df.index and stock in price_df.columns: p_prev = float(price_df.loc[prev_date, stock]) price_change = 1 if p_now > p_prev else (-1 if p_now < p_prev else 0) else: price_change = 0 else: price_change = 0 factors['turnover'] = turnover_dev * (1.0 if price_change >= 0 else -0.5) # -------- ²¨¶¯ -------- if stock in volatility_df.columns and date in volatility_df.index: vol = float(volatility_df.loc[date, stock]) factors['volatility'] = vol if not pd.isna(vol) and vol > 0 else 0.3 else: factors['volatility'] = 0.3 factors['price'] = p_now return factors def winsorize(arr, lower=0.05, upper=0.95): lower_val = np.nanpercentile(arr, lower * 100) upper_val = np.nanpercentile(arr, upper * 100) return np.clip(arr, lower_val, upper_val) def normalize(arr): arr = winsorize(arr) if np.std(arr) > 0: return (arr - np.mean(arr)) / np.std(arr) return np.zeros_like(arr) def factor_select(data, date, n=10, current_positions=None): price_df = data['price'] turnover_df = data['turnover'] volatility_df = data['volatility'] core_stocks = data['stocks'] dates = data['dates'] if current_positions is None: current_positions = {} avail = [ s for s in core_stocks if s in price_df.columns and date in price_df.index and pd.notna(price_df.loc[date, s]) and price_df.loc[date, s] > 0 ] if not avail: return [], {} results = {} for s in avail: factors = calculate_factors(s, date, dates, price_df, turnover_df, volatility_df) if factors: results[s] = factors if not results: return [], {} stocks_list = list(results.keys()) mom20_vals = np.array([results[s]['mom20'] for s in stocks_list]) mom60_vals = np.array([results[s]['mom60'] for s in stocks_list]) turnover_vals = np.array([results[s]['turnover'] for s in stocks_list]) vol_vals = np.array([results[s]['volatility'] for s in stocks_list]) mom20_norm = normalize(mom20_vals) mom60_norm = normalize(mom60_vals) turnover_norm = normalize(turnover_vals) vol_norm = normalize(vol_vals) scores = {} for i, s in enumerate(stocks_list): scores[s] = ( SELECTOR_WEIGHTS['mom20'] * mom20_norm[i] + SELECTOR_WEIGHTS['mom60'] * mom60_norm[i] + SELECTOR_WEIGHTS['turnover'] * turnover_norm[i] - SELECTOR_WEIGHTS['vol'] * vol_norm[i] ) sorted_stocks = sorted(scores.items(), key=lambda x: x[1], reverse=True) rank_map = {s: i + 1 for i, (s, _) in enumerate(sorted_stocks)} # Ïȱ£Áô¾É³Ö²Ö£ºÖ»ÒªÈÔÔÚ»º³åÅÅÃûÄÚ keep_list = [] for s in current_positions.keys(): if s in rank_map and rank_map[s] <= HOLD_BUFFER_RANK and s in results: keep_list.append((s, results[s]['price'], scores[s])) keep_codes = set([x[0] for x in keep_list]) # ÔÙ´Ó¸ßÅÅÃûÖв¹³äв֣ºÐ¿ª²Ö±ØÐë½øÈë¸üÑϸñÃż÷ add_list = [] for s, score in sorted_stocks: if s in keep_codes: continue if rank_map[s] <= BUY_BUFFER_RANK and s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break # Èô²»×ãn£¬ÔÙ°´¸ß·Ö²¹Âú£¬±ÜÃ⼫¶ËÇé¿öϳֲֹýÉÙ if len(keep_list) + len(add_list) < n: used = keep_codes | set([x[0] for x in add_list]) for s, score in sorted_stocks: if s in used: continue if s in results: add_list.append((s, results[s]['price'], score)) if len(keep_list) + len(add_list) >= n: break selected = (keep_list + add_list)[:n] return selected, rank_map # ==================== »Ø²â¸¨Öú ==================== def compute_total_value(cash, positions, price_df, date): total_value = cash for s, shares in positions.items(): if s in price_df.columns and date in price_df.index: try: p = price_df.loc[date, s] if not pd.isna(p) and p > 0: total_value += shares * p except: pass return total_value def should_skip_trade(current_value, desired_value, min_trade_ratio): """ µ±Ç°²ÖλÓëÄ¿±ê²Öλ²îÒì̫С£¬Ôò²»½»Ò× """ base = max(current_value, desired_value, 1.0) diff_ratio = abs(desired_value - current_value) / base return diff_ratio < min_trade_ratio def summarize_trade_stats(trades): if not trades: return {'½»Ò×´ÎÊý': 0, 'ÂòÈë´ÎÊý': 0, 'Âô³ö´ÎÊý': 0, '¼õ²Ö´ÎÊý': 0, 'Çå²Ö´ÎÊý': 0} buy_cnt = sum(1 for x in trades if x['²Ù×÷'] == 'ÂòÈë') sell_cnt = sum(1 for x in trades if x['²Ù×÷'] == 'Âô³ö') reduce_cnt = sum(1 for x in trades if x['²Ù×÷'] == '¼õ²Ö') clear_cnt = sum(1 for x in trades if x['²Ù×÷'] == 'Çå²Ö') return { '½»Ò×´ÎÊý': len(trades), 'ÂòÈë´ÎÊý': buy_cnt, 'Âô³ö´ÎÊý': sell_cnt, '¼õ²Ö´ÎÊý': reduce_cnt, 'Çå²Ö´ÎÊý': clear_cnt } # ==================== »Ø²âÖ÷º¯Êý ==================== def run_backtest(data, name='v1.8.4_low_turnover'): config = RegimeV145Config() dates = data['dates'] price_df = data['price'] cash = INITIAL_CAPITAL positions = {} # {¹ÉƱ´úÂë: ¹ÉÊý} entry_day = {} # {¹ÉƱ´úÂë: ½¨²ÖÈÕË÷Òý} trades = [] daily_records = [] prev_pos = 0 prev_state = None prev_score = None consecutive_days = 0 severe_days = 0 need_reentry = False for i, date in enumerate(dates): if date not in data['trend'].index or date not in data['price'].index: daily_records.append({ 'ÈÕÆÚ': str(date)[:10], '´óÊÆ×´Ì¬': 'N/A', '²Öλ': 0, '×ܾ»Öµ': cash, '²Ù×÷': '' }) continue # ===== ´óÊÆÅж¨£¨ÑØÓÃÔ­v1.8.3Âß¼­£©===== score = calculate_market_score_v145(data['trend'], data['industry'], date) raw_state = determine_state_v145(score) filtered_state = apply_upgrade_filters( raw_state, prev_state, score, prev_score, data['trend'], date, config ) final_state = apply_confirmation_gate( filtered_state, prev_state, consecutive_days, config ) if prev_state == final_state: consecutive_days += 1 else: consecutive_days = 1 target_pos = calculate_position_v145(final_state, config, prev_pos) target = target_pos / 100.0 # ===== RISK_SEVERE Á½¶Îʽ£¬ÑØÓÃÔ­Âß¼­ ===== if final_state == 'RISK_SEVERE': severe_days += 1 else: if severe_days > 0: need_reentry = True severe_days = 0 if final_state == 'RISK_SEVERE': target = 0.25 if severe_days == 1 else 0.0 total_value = compute_total_value(cash, positions, price_df, date) current_hold_value = total_value - cash target_hold_value = total_value * target operation = '' # ================================================== # 1) ½öÔÚ·çÏÕ״̬ÔÊÐí¡°·Çµ÷²ÖÈÕÖ÷¶¯¼õ²Ö¡± # ================================================== if final_state in ALLOW_DAILY_REDUCE_STATES: if current_hold_value > target_hold_value and current_hold_value > 0: reduce_ratio = (current_hold_value - target_hold_value) / current_hold_value for s in list(positions.keys()): if s not in price_df.columns or date not in price_df.index: continue try: p = price_df.loc[date, s] if pd.isna(p) or p <= 0: continue sell_shares = int((positions[s] * reduce_ratio) / 100) * 100 sell_shares = min(sell_shares, positions[s]) if sell_shares > 0: cash += sell_shares * p positions[s] -= sell_shares trades.append({ 'ÈÕÆÚ': str(date)[:10], '¹ÉƱ´úÂë': s, '²Ù×÷': '¼õ²Ö', '¼Û¸ñ': round(float(p), 2), 'ÊýÁ¿': int(sell_shares), '´óÊÆ×´Ì¬': final_state }) operation = '·çÏÕ¼õ²Ö' if positions[s] <= 0: del positions[s] if s in entry_day: del entry_day[s] except: pass # ================================================== # 2) ½öÔÚµ÷²ÖÈÕ / ·çÏÕÍ˳öÖØ½¨ÈÕ£¬½øÐÐÑ¡¹Éµ÷²Ö # ================================================== rebalance_today = (i % REBALANCE_DAYS == 0) or need_reentry if rebalance_today and target > 0: n = FIXED_HOLD_COUNT if n > 0: selected, rank_map = factor_select(data, date, n=n, current_positions=positions) if selected: selected_codes = [s for s, _, _ in selected] # ³õ²½Ä¿±ê£ºµÈȨ£¬µ«²»ÒªÇóÿ´Î¾«È·ÌùºÏ target_stock_value = total_value * target / len(selected) target_per_stock = {s: target_stock_value for s, _, _ in selected} # ---------- ÏÈÂô ---------- for s in list(positions.keys()): if s not in price_df.columns or date not in price_df.index: continue try: p = price_df.loc[date, s] if pd.isna(p) or p <= 0: continue current_value = positions[s] * p hold_days = i - entry_day.get(s, i) # Çé¿ö1£º²»ÔÚÄ¿±ê×éºÏÀï if s not in selected_codes: # ·çÏÕ״ֱ̬½Ó´¦Àí£»Õý³£×´Ì¬³ÖÓÐÌ«¶ÌÏȲ»Âô£¬¼õÉÙ·´¸´´òÁ³ can_sell = ( final_state in ALLOW_DAILY_REDUCE_STATES or hold_days >= MIN_HOLD_DAYS ) if can_sell: sell_shares = positions[s] if sell_shares > 0: cash += sell_shares * p trades.append({ 'ÈÕÆÚ': str(date)[:10], '¹ÉƱ´úÂë': s, '²Ù×÷': 'Âô³ö', '¼Û¸ñ': round(float(p), 2), 'ÊýÁ¿': int(sell_shares), '´óÊÆ×´Ì¬': final_state }) del positions[s] if s in entry_day: del entry_day[s] operation = 'µ÷²Ö' continue # Çé¿ö2£ºÈÔÔÚÄ¿±ê×éºÏÀµ«Èç¹û³¬ÅäÃ÷ÏԲżõ desired_value = target_per_stock.get(s, 0.0) if should_skip_trade(current_value, desired_value, MIN_TRADE_RATIO): continue excess_value = current_value - desired_value if excess_value > p * 100: if final_state not in ALLOW_DAILY_REDUCE_STATES and hold_days < MIN_HOLD_DAYS: continue sell_shares = int(excess_value / p / 100) * 100 sell_shares = min(sell_shares, positions[s]) if sell_shares > 0: cash += sell_shares * p positions[s] -= sell_shares trades.append({ 'ÈÕÆÚ': str(date)[:10], '¹ÉƱ´úÂë': s, '²Ù×÷': 'Âô³ö', '¼Û¸ñ': round(float(p), 2), 'ÊýÁ¿': int(sell_shares), '´óÊÆ×´Ì¬': final_state }) operation = 'µ÷²Ö' if positions[s] <= 0: del positions[s] if s in entry_day: del entry_day[s] except: pass # ---------- ÔÙÂò ---------- total_value = compute_total_value(cash, positions, price_df, date) # ÖØÐÂѡһ´Î£¬±£Ö¤Âô³öºó»º³åÇøÂß¼­Ò»Ö selected, rank_map = factor_select(data, date, n=n, current_positions=positions) if selected: target_stock_value = total_value * target / len(selected) target_per_stock = {s: target_stock_value for s, _, _ in selected} for s, price, score in selected: current_value = positions.get(s, 0) * price desired_value = target_per_stock[s] if should_skip_trade(current_value, desired_value, MIN_TRADE_RATIO): continue diff = desired_value - current_value if diff <= 0: continue try: p = price_df.loc[date, s] if pd.isna(p) or p <= 0: continue shares = int(diff / p / 100) * 100 if shares > 0 and cash >= shares * p: cash -= shares * p positions[s] = positions.get(s, 0) + shares if s not in entry_day: entry_day[s] = i trades.append({ 'ÈÕÆÚ': str(date)[:10], '¹ÉƱ´úÂë': s, '²Ù×÷': 'ÂòÈë', '¼Û¸ñ': round(float(p), 2), 'ÊýÁ¿': int(shares), '´óÊÆ×´Ì¬': final_state }) operation = 'µ÷²Ö' except: pass # ===== ÈôÄ¿±ê²ÖλΪ0£¬ÔòÇå²Ö ===== elif target <= 0 and len(positions) > 0: for s in list(positions.keys()): if s not in price_df.columns or date not in price_df.index: continue try: p = price_df.loc[date, s] if pd.isna(p) or p <= 0: continue sell_shares = positions[s] if sell_shares > 0: cash += sell_shares * p trades.append({ 'ÈÕÆÚ': str(date)[:10], '¹ÉƱ´úÂë': s, '²Ù×÷': 'Çå²Ö', '¼Û¸ñ': round(float(p), 2), 'ÊýÁ¿': int(sell_shares), '´óÊÆ×´Ì¬': final_state }) del positions[s] if s in entry_day: del entry_day[s] operation = 'Çå²Ö' except: pass need_reentry = False total_value = compute_total_value(cash, positions, price_df, date) daily_records.append({ 'ÈÕÆÚ': str(date)[:10], '´óÊÆ×´Ì¬': final_state, '´óÊÆ·ÖÊý': round(float(score), 1), '²Öλ': int(target * 100), '³Ö²ÖÊý': len(positions), '×ܾ»Öµ': round(float(total_value), 2), 'ÏÖ½ð': round(float(cash), 2), '²Ù×÷': operation }) prev_state = final_state prev_score = score prev_pos = target_pos final_value = daily_records[-1]['×ܾ»Öµ'] if daily_records else INITIAL_CAPITAL ret = (final_value / INITIAL_CAPITAL - 1.0) * 100.0 equity_curve = [x['×ܾ»Öµ'] for x in daily_records] if daily_records else [INITIAL_CAPITAL] peak = equity_curve[0] max_dd = 0.0 for v in equity_curve: if v > peak: peak = v dd = (peak - v) / peak * 100.0 if peak > 0 else 0.0 if dd > max_dd: max_dd = dd trade_stats = summarize_trade_stats(trades) print(f"\n»Ø²â: {name}") print("-" * 60) print(f"×îÖÕ¾»Öµ: {final_value:,.0f}") print(f"ÊÕÒæÂÊ: {ret:.2f}%") print(f"×î´ó»Ø³·: {max_dd:.2f}%") print(f"½»Ò×´ÎÊý: {trade_stats['½»Ò×´ÎÊý']}") print(f" ÂòÈë´ÎÊý: {trade_stats['ÂòÈë´ÎÊý']}") print(f" Âô³ö´ÎÊý: {trade_stats['Âô³ö´ÎÊý']}") print(f" ¼õ²Ö´ÎÊý: {trade_stats['¼õ²Ö´ÎÊý']}") print(f" Çå²Ö´ÎÊý: {trade_stats['Çå²Ö´ÎÊý']}") return { 'name': name, 'daily': pd.DataFrame(daily_records), 'trades': pd.DataFrame(trades), 'final': final_value, 'return': ret, 'drawdown': max_dd, 'trade_stats': trade_stats } # ==================== ¼òµ¥·ÖÎöº¯Êý ==================== def evaluate_period_returns(daily_df): """ ÊäÈë daily dataframe£¬Êä³ö°´×ÔÈ»Äê¶È/°ëÄê·Ö¶ÎÊÕÒæ """ if daily_df is None or len(daily_df) == 0: return pd.DataFrame() df = daily_df.copy() df['ÈÕÆÚ'] = pd.to_datetime(df['ÈÕÆÚ']) df = df.sort_values('ÈÕÆÚ').reset_index(drop=True) df['year'] = df['ÈÕÆÚ'].dt.year df['half'] = df['ÈÕÆÚ'].dt.month.apply(lambda x: 'H1' if x <= 6 else 'H2') df['period'] = df['year'].astype(str) + df['half'] rows = [] for period, g in df.groupby('period'): start_nv = g.iloc[0]['×ܾ»Öµ'] end_nv = g.iloc[-1]['×ܾ»Öµ'] ret = (end_nv / start_nv - 1) * 100 if start_nv > 0 else 0 rows.append({ 'period': period, 'start_date': g.iloc[0]['ÈÕÆÚ'].strftime('%Y-%m-%d'), 'end_date': g.iloc[-1]['ÈÕÆÚ'].strftime('%Y-%m-%d'), 'start_nv': round(start_nv, 2), 'end_nv': round(end_nv, 2), 'return_pct': round(ret, 2) }) return pd.DataFrame(rows) # ==================== ʾÀýµ÷Óà ==================== # ÄãÔ­À´Ôõô¼ÓÔØÊý¾Ý£¬¾Í¼ÌÐøÔõô¼ÓÔØ¡£ # ¼ÙÉè×îÖյõ½ data = { # 'trend': trend_df, # 'industry': industry_df, # 'price': price_df, # 'turnover': turnover_df, # 'volatility': volatility_df, # 'stocks': stock_list, # 'dates': list(price_df.index) # } # result = run_backtest(data, name='v1.8.4_low_turnover') # daily_df = result['daily'] # trades_df = result['trades'] # period_df = evaluate_period_returns(daily_df) # print(period_df) # trades_df.to_excel('trades_v184.xlsx', index=False) # daily_df.to_excel('daily_v184.xlsx', index=False)