下面给你一段可以直接塞进你现有项目里的 “禁止占位符、解析失败直接终止” 的防呆代码。 目标很明确: 没提取到真实 OHLC,直接停 禁止偷偷用占位符继续跑 解析失败时给出结构日志,方便你定位 适合你这种复杂 Excel 入口 建议文件名:excel_guard.py from __future__ import annotations from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, List, Optional, Tuple, Any import json import pandas as pd import numpy as np # ========================= # 1) 异常定义 # ========================= class ExcelParseError(Exception): """Excel 结构解析失败""" pass class PlaceholderDataError(Exception): """检测到占位符/伪造数据""" pass class MissingOHLCError(Exception): """缺少真实 OHLC 数据""" pass # ========================= # 2) 解析日志 # ========================= @dataclass class ParseAuditLog: file_path: str nrows: int ncols: int preview_rows: List[List[Any]] detected_header_rows: Dict[str, Optional[int]] detected_blocks: List[Dict[str, Any]] found_real_ohlc_cols: List[str] missing_required_ohlc_cols: List[str] warnings: List[str] success: bool def to_dict(self) -> Dict[str, Any]: return asdict(self) def save(self, file_path: str | Path): with open(file_path, "w", encoding="utf-8") as f: json.dump(self.to_dict(), f, ensure_ascii=False, indent=2) # ========================= # 3) 常见占位符检测 # ========================= PLACEHOLDER_VALUES = { -1, 0, 1, 999, 9999, 99999, 1.0, 999.0, 9999.0, 99999.0, } def is_placeholder_series(s: pd.Series) -> bool: """ 判断一列是否大概率是占位符。 典型情况: - 全是同一个值 - 全是 1 / 999 / 9999 这类兜底值 """ s = pd.to_numeric(s, errors="coerce").dropna() if len(s) == 0: return True nunique = s.nunique(dropna=True) if nunique == 1: val = s.iloc[0] if val in PLACEHOLDER_VALUES: return True # 大比例相同的可疑值 top_freq = s.value_counts(normalize=True, dropna=True).iloc[0] top_val = s.mode(dropna=True).iloc[0] if top_freq > 0.95 and top_val in PLACEHOLDER_VALUES: return True return False # ========================= # 4) 基础检查 # ========================= def read_excel_raw(file_path: str | Path) -> pd.DataFrame: file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"文件不存在: {file_path}") return pd.read_excel(file_path, header=None) def build_preview_rows(df: pd.DataFrame, n: int = 6) -> List[List[Any]]: preview = df.head(n).fillna("").values.tolist() return preview def detect_header_rows(raw: pd.DataFrame) -> Dict[str, Optional[int]]: """ 对复杂 Excel 做一个粗粒度表头识别。 这里只做审计,不负责完整解析。 """ detected = { "metric_row": None, "formula_row": None, "label_row": None, "data_start_row": None, } max_check = min(8, len(raw)) rows_text = [] for i in range(max_check): vals = [str(x).strip() for x in raw.iloc[i].tolist() if pd.notna(x)] rows_text.append((i, vals)) # 简单启发式: # 第1行常出现“收盘价/开盘价/最高价/最低价/ATR”等指标名 metric_keywords = ["收盘价", "开盘价", "最高价", "最低价", "波动率", "ATR", "涨跌幅"] # 第2行可能有参数 formula_keywords = ["交易日期", "收盘价:", "开盘价:", "最高价:", "最低价:", "[", "]"] # 第3行可能有时间、中证500、沪深300等标签 label_keywords = ["时间", "中证500", "中证1000", "沪深300", "创业板", "上证"] for i, vals in rows_text: joined = " | ".join(vals) if detected["metric_row"] is None and any(k in joined for k in metric_keywords): detected["metric_row"] = i if detected["formula_row"] is None and any(k in joined for k in formula_keywords): detected["formula_row"] = i if detected["label_row"] is None and any(k in joined for k in label_keywords): detected["label_row"] = i # 默认数据开始行 = label_row + 1 if detected["label_row"] is not None: detected["data_start_row"] = detected["label_row"] + 1 return detected # ========================= # 5) 识别“真实OHLC是否存在” # ========================= def detect_real_ohlc_columns(df: pd.DataFrame) -> Tuple[List[str], List[str]]: """ 针对已经转成标准宽表后的 df 进行检查。 例如要求有: sh_open/sh_high/sh_low/sh_close 或 zz500_open/... 等 """ candidate_prefix_groups = [ ["sh_open", "sh_high", "sh_low", "sh_close"], ["sz_open", "sz_high", "sz_low", "sz_close"], ["cyb_open", "cyb_high", "cyb_low", "cyb_close"], ["zz500_open", "zz500_high", "zz500_low", "zz500_close"], ["zz1000_open", "zz1000_high", "zz1000_low", "zz1000_close"], ["hs300_open", "hs300_high", "hs300_low", "hs300_close"], ] found = [] missing = [] for group in candidate_prefix_groups: existing = [c for c in group if c in df.columns] if len(existing) == len(group): found.extend(existing) elif len(existing) > 0: missing.extend([c for c in group if c not in df.columns]) return sorted(set(found)), sorted(set(missing)) def validate_real_ohlc(df: pd.DataFrame, min_non_null_rows: int = 30) -> None: """ 严格校验: - 必须存在至少一组完整 OHLC - 不能是占位符 - 不能全空 """ found, missing = detect_real_ohlc_columns(df) if not found: raise MissingOHLCError( "未检测到任何完整 OHLC 组。说明解析器没有真正拿到价格数据。" ) # 检查每组是否为真实数值 groups = [] for prefix in ["sh", "sz", "cyb", "zz500", "zz1000", "hs300"]: cols = [f"{prefix}_open", f"{prefix}_high", f"{prefix}_low", f"{prefix}_close"] if all(c in df.columns for c in cols): groups.append(cols) valid_group_count = 0 errors = [] for cols in groups: sub = df[cols].copy() non_null_rows = sub.dropna().shape[0] if non_null_rows < min_non_null_rows: errors.append(f"{cols} 非空行太少: {non_null_rows}") continue placeholder_flags = {c: is_placeholder_series(sub[c]) for c in cols} if any(placeholder_flags.values()): errors.append(f"{cols} 含占位符列: {placeholder_flags}") continue # 基本 OHLC 关系校验 sub_num = sub.apply(pd.to_numeric, errors="coerce") bad_mask = ( (sub_num[cols[1]] < sub_num[cols[2]]) | # high < low (sub_num[cols[0]] <= 0) | (sub_num[cols[3]] <= 0) ) bad_rows = int(bad_mask.sum()) if bad_rows > max(3, int(non_null_rows * 0.05)): errors.append(f"{cols} OHLC 异常行过多: {bad_rows}") continue valid_group_count += 1 if valid_group_count == 0: raise PlaceholderDataError( "虽然检测到表面上的 OHLC 列,但它们不是可靠的真实价格数据。\n" + "\n".join(errors) ) # ========================= # 6) 审计器:在解析后强制检查 # ========================= def audit_parsed_market_dataframe( parsed_df: pd.DataFrame, file_path: str | Path, save_log_path: Optional[str | Path] = None, ) -> ParseAuditLog: """ 用于“解析完成后”的硬审计。 如果发现没拿到真实价格,应该直接 raise,禁止继续跑策略。 """ raw = read_excel_raw(file_path) detected_header = detect_header_rows(raw) found_ohlc, missing_ohlc = detect_real_ohlc_columns(parsed_df) log = ParseAuditLog( file_path=str(file_path), nrows=int(raw.shape[0]), ncols=int(raw.shape[1]), preview_rows=build_preview_rows(raw, n=6), detected_header_rows=detected_header, detected_blocks=[], found_real_ohlc_cols=found_ohlc, missing_required_ohlc_cols=missing_ohlc, warnings=[], success=False, ) try: validate_real_ohlc(parsed_df) log.success = True except Exception as e: log.warnings.append(str(e)) if save_log_path: log.save(save_log_path) raise if save_log_path: log.save(save_log_path) return log # ========================= # 7) 禁止占位符兜底 # ========================= def forbid_placeholder_fallback(use_placeholder: bool) -> None: """ 如果你系统里还有“解析失败就自动补 1/999 占位符”的逻辑, 这里强制拦截。 """ if use_placeholder: raise PlaceholderDataError( "检测到系统试图使用占位符数据继续运行。" "这是被禁止的:未成功提取真实 OHLC 时必须直接停止。" ) # ========================= # 8) 推荐入口包装器 # ========================= def strict_excel_parse_guard( file_path: str | Path, parse_func, save_log_path: Optional[str | Path] = None, *args, **kwargs, ) -> pd.DataFrame: """ 给你现有 parse_func 外面包一层守卫。 parse_func 必须返回“已经解析好的标准宽表”: 例如包含: date, zz500_open, zz500_high, zz500_low, zz500_close, ... """ parsed_df = parse_func(file_path, *args, **kwargs) if not isinstance(parsed_df, pd.DataFrame): raise ExcelParseError("parse_func 必须返回 pandas.DataFrame") audit_parsed_market_dataframe( parsed_df=parsed_df, file_path=file_path, save_log_path=save_log_path, ) return parsed_df # ========================= # 9) 示例:错误做法(不要这么写) # ========================= def bad_placeholder_example(df: pd.DataFrame) -> pd.DataFrame: """ 这是反例:演示以前那种危险写法 """ if "close" not in df.columns: # 危险:不要这样做 df["open"] = 1.0 df["high"] = 1.0 df["low"] = 1.0 df["close"] = 1.0 return df # ========================= # 10) 示例:正确入口 # ========================= if __name__ == "__main__": # 假设你已经有一个真正的复杂 Excel 解析器: # def parse_complex_market_excel(file_path) -> pd.DataFrame: # ... # return parsed_df def demo_parse_func(file_path): # 这里只是演示 # 真实项目里替换成你的解析器 df = pd.DataFrame({ "date": pd.date_range("2025-01-01", periods=100, freq="B"), "zz500_open": np.random.uniform(5000, 6000, 100), "zz500_high": np.random.uniform(5100, 6100, 100), "zz500_low": np.random.uniform(4900, 5900, 100), "zz500_close": np.random.uniform(5000, 6000, 100), }) return df try: parsed = strict_excel_parse_guard( file_path="三指数5年历史数据 (1).xlsx", parse_func=demo_parse_func, save_log_path="excel_parse_audit_log.json", ) print("解析通过,允许进入后续策略层。") print(parsed.head()) except Exception as e: print("解析失败,已停止:") print(e) 你应该怎么接到现有系统里 假设你现在已经有一个自己的解析函数,比如: def parse_market_excel(file_path): ... return parsed_df 你不要再直接这样用: df = parse_market_excel(file_path) 而要改成: from excel_guard import strict_excel_parse_guard df = strict_excel_parse_guard( file_path=file_path, parse_func=parse_market_excel, save_log_path="logs/excel_parse_audit_log.json", ) 这样效果就是: 解析器先跑 然后守卫检查有没有真实 OHLC 如果只是占位符 / 伪列 / 列不全 直接终止 并且给你一份审计日志 这段代码解决什么问题 1. 防止“假装读到了数据” 以前你系统可能是: 没找到列 自动塞 1.0 或 999 然后继续跑策略 现在会被 validate_real_ohlc() 直接拦掉。 2. 防止“只读到表面列名,没读到真实价格” 比如有了 zz500_close 这个列名,但整列都是占位符。 is_placeholder_series() 会把它拦掉。 3. 失败时给你结构信息 ParseAuditLog 会保存: 文件行列数 前几行预览 猜到的 header 行 找到的 OHLC 列 缺了哪些 OHLC 列 为什么失败 这样你不用再靠猜。 你现在最重要的改法 除了加这段守卫,我强烈建议你再把原系统里类似下面这种逻辑删掉: if extraction_failed: use_placeholder_values = True 改成: if extraction_failed: raise ExcelParseError("未成功提取真实数据,停止运行") 也就是: 宁可报错停掉,也不要带着假数据继续跑。 我给你的直接建议 你现在的核心不是“大势模型不准”,而是数据入口还允许失败后伪装成功。 这层一旦锁死,后面你判断系统问题会轻松很多。 如果你愿意,我下一步可以继续给你一份: “复杂 Excel 结构解析器模板(块识别 + 多行表头识别 + 指标块拆解)”, 这样你就不仅能“拦错”,还能把复杂表真正稳定解析出来。