下面直接给你一份 通用版、可复用的数据清洗代码,重点解决你说的两类问题: 指标没找到 日期没找全 复杂 Excel / CSV 读进来后结构混乱 禁止占位符继续跑 清洗、验收、对齐、日志分层 建议文件名:data_pipeline_guard.py from __future__ import annotations from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Dict, List, Optional, Tuple, Union, Any import json import re import numpy as np import pandas as pd # ========================================================= # 1) 异常定义 # ========================================================= class DataCleaningError(Exception): """通用数据清洗异常""" pass class MissingRequiredColumnsError(DataCleaningError): """缺少必要列""" pass class InvalidDateError(DataCleaningError): """日期列异常""" pass class DuplicateKeyError(DataCleaningError): """主键重复异常""" pass class PlaceholderDataError(DataCleaningError): """检测到占位符/伪造兜底数据""" pass class ValidationError(DataCleaningError): """验收失败""" pass # ========================================================= # 2) 通用字段别名 # ========================================================= DEFAULT_COLUMN_ALIASES: Dict[str, List[str]] = { "date": ["date", "日期", "交易日期", "time", "datetime"], "code": ["code", "股票代码", "证券代码", "ts_code", "wind_code"], "name": ["name", "股票名称", "证券简称", "sec_name"], "open": ["open", "开盘", "开盘价"], "high": ["high", "最高", "最高价"], "low": ["low", "最低", "最低价"], "close": ["close", "收盘", "收盘价"], "pre_close": ["pre_close", "前收盘", "前收盘价"], "volume": ["volume", "vol", "成交量"], "amount": ["amount", "turnover", "成交额", "成交金额"], "pct_chg": ["pct_chg", "涨跌幅", "涨幅", "pct_change"], "chg": ["chg", "涨跌", "change"], "turnover": ["turnover", "换手率"], "atr": ["atr", "ATR", "真实波幅", "ATR真实波幅"], "volatility": ["volatility", "波动率"], "max_drawdown": ["max_drawdown", "最大回撤"], "mfi": ["mfi", "MFI"], "up_ratio": ["up_ratio", "上涨成份数量占比", "上涨家数占比"], "down_ratio": ["down_ratio", "下跌成份数量占比", "下跌家数占比"], "up_count": ["up_count", "上涨数量", "上涨家数", "指数成份上涨数量"], "down_count": ["down_count", "下跌数量", "下跌家数", "指数成份下跌数量"], "limit_up_count": ["limit_up_count", "涨停数量", "指数成份涨停数量"], "limit_down_count": ["limit_down_count", "跌停数量", "指数成份跌停数量"], "avg_member_return": ["avg_member_return", "指数成份平均涨跌幅"], "median_member_return": ["median_member_return", "指数成份涨跌幅中位数"], } # ========================================================= # 3) 配置 # ========================================================= @dataclass class CleaningConfig: aliases: Dict[str, List[str]] = field(default_factory=lambda: DEFAULT_COLUMN_ALIASES.copy()) # 必须有的列 required_cols: List[str] = field(default_factory=lambda: ["date"]) # 建议有的列,缺了不报错但记日志 optional_cols: List[str] = field(default_factory=list) # 尝试转数值的列 numeric_cols: List[str] = field(default_factory=lambda: [ "open", "high", "low", "close", "pre_close", "volume", "amount", "pct_chg", "chg", "turnover", "atr", "volatility", "max_drawdown", "mfi", "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ]) # 主键与排序 unique_keys: List[str] = field(default_factory=lambda: ["date"]) sort_by: List[str] = field(default_factory=lambda: ["date"]) # 日期列 date_col: str = "date" date_format: Optional[str] = None # 关键列最大允许缺失率 max_required_null_ratio: float = 0.02 # 是否校验 OHLC validate_ohlc: bool = True # 是否禁止占位符 forbid_placeholder: bool = True # 是否严格禁止重复主键 forbid_duplicate_keys: bool = False # 是否打印日志 verbose: bool = True # ========================================================= # 4) 日志结构 # ========================================================= @dataclass class CleaningLog: source: str = "" rows_raw: int = 0 rows_after_drop_empty: int = 0 rows_clean: int = 0 columns_raw: List[str] = field(default_factory=list) columns_normalized: List[str] = field(default_factory=list) rename_map: Dict[str, str] = field(default_factory=dict) missing_required_cols: List[str] = field(default_factory=list) missing_optional_cols: List[str] = field(default_factory=list) invalid_date_rows: int = 0 duplicate_key_rows_removed: int = 0 null_ratio: Dict[str, float] = field(default_factory=dict) placeholder_cols: List[str] = field(default_factory=list) bad_ohlc_rows: int = 0 date_min: Optional[str] = None date_max: Optional[str] = None warnings: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return asdict(self) # ========================================================= # 5) 通用工具 # ========================================================= PLACEHOLDER_VALUES = { -1, 0, 1, 999, 9999, 99999, 1.0, 999.0, 9999.0, 99999.0, } def _clean_colname(col: object) -> str: return str(col).strip().replace("\n", " ").replace("\t", " ") def _build_reverse_alias_map(aliases: Dict[str, List[str]]) -> Dict[str, str]: reverse = {} for std_name, alias_list in aliases.items(): for alias in alias_list: reverse[str(alias).strip().lower()] = std_name return reverse def normalize_columns( df: pd.DataFrame, aliases: Dict[str, List[str]], ) -> Tuple[pd.DataFrame, Dict[str, str]]: df = df.copy() raw_cols = list(df.columns) clean_cols = [_clean_colname(c) for c in raw_cols] df.columns = clean_cols reverse_map = _build_reverse_alias_map(aliases) rename_map = {} used_std_names = set() for col in df.columns: key = col.lower() if key in reverse_map: std_name = reverse_map[key] if std_name not in used_std_names: rename_map[col] = std_name used_std_names.add(std_name) df = df.rename(columns=rename_map) return df, rename_map def coerce_date_column( df: pd.DataFrame, date_col: str = "date", date_format: Optional[str] = None, ) -> pd.DataFrame: df = df.copy() if date_col not in df.columns: return df if date_format: df[date_col] = pd.to_datetime(df[date_col], format=date_format, errors="coerce") else: df[date_col] = pd.to_datetime(df[date_col], errors="coerce") return df def coerce_numeric_columns(df: pd.DataFrame, numeric_cols: List[str]) -> pd.DataFrame: df = df.copy() for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") return df def drop_fully_empty_rows(df: pd.DataFrame) -> pd.DataFrame: return df.dropna(how="all").copy() def deduplicate_by_keys( df: pd.DataFrame, keys: List[str], keep: str = "last", ) -> Tuple[pd.DataFrame, int]: df = df.copy() before = len(df) df = df.drop_duplicates(subset=keys, keep=keep) removed = before - len(df) return df, removed def sort_dataframe(df: pd.DataFrame, sort_by: List[str]) -> pd.DataFrame: existing = [c for c in sort_by if c in df.columns] if existing: df = df.sort_values(existing).reset_index(drop=True) return df def is_placeholder_series(s: pd.Series) -> bool: s = pd.to_numeric(s, errors="coerce").dropna() if len(s) == 0: return False if s.nunique(dropna=True) == 1: val = s.iloc[0] if val in PLACEHOLDER_VALUES: return True top_freq = s.value_counts(normalize=True, dropna=True).iloc[0] top_val = s.mode(dropna=True).iloc[0] if top_freq > 0.95 and top_val in PLACEHOLDER_VALUES: return True return False def find_placeholder_columns(df: pd.DataFrame, cols: List[str]) -> List[str]: bad = [] for col in cols: if col in df.columns and is_placeholder_series(df[col]): bad.append(col) return bad # ========================================================= # 6) 验收函数 # ========================================================= def validate_required_columns(df: pd.DataFrame, required_cols: List[str]) -> List[str]: return [c for c in required_cols if c not in df.columns] def validate_optional_columns(df: pd.DataFrame, optional_cols: List[str]) -> List[str]: return [c for c in optional_cols if c not in df.columns] def compute_null_ratio(df: pd.DataFrame, cols: List[str]) -> Dict[str, float]: out = {} for col in cols: if col in df.columns: out[col] = round(float(df[col].isna().mean()), 6) return out def validate_required_null_ratio( df: pd.DataFrame, required_cols: List[str], max_required_null_ratio: float, ) -> List[str]: bad = [] for col in required_cols: if col in df.columns: ratio = float(df[col].isna().mean()) if ratio > max_required_null_ratio: bad.append(f"{col}: {ratio:.2%}") return bad def validate_ohlc_logic(df: pd.DataFrame) -> int: needed = ["open", "high", "low", "close"] if not all(c in df.columns for c in needed): return 0 sub = df[needed].apply(pd.to_numeric, errors="coerce") bad = df[ (sub["high"] < sub["low"]) | (sub["open"] <= 0) | (sub["high"] <= 0) | (sub["low"] <= 0) | (sub["close"] <= 0) | (sub["high"] < sub["open"]) | (sub["high"] < sub["close"]) | (sub["low"] > sub["open"]) | (sub["low"] > sub["close"]) ] return len(bad) def validate_date_column(df: pd.DataFrame, date_col: str) -> int: if date_col not in df.columns: return 0 return int(df[date_col].isna().sum()) # ========================================================= # 7) 主清洗入口 # ========================================================= def clean_dataframe( df_raw: pd.DataFrame, config: Optional[CleaningConfig] = None, source: str = "", ) -> Tuple[pd.DataFrame, CleaningLog]: if config is None: config = CleaningConfig() log = CleaningLog(source=source) log.rows_raw = len(df_raw) log.columns_raw = [_clean_colname(c) for c in df_raw.columns] df = df_raw.copy() # 1) 删全空行 df = drop_fully_empty_rows(df) log.rows_after_drop_empty = len(df) # 2) 标准化列名 df, rename_map = normalize_columns(df, config.aliases) log.rename_map = rename_map log.columns_normalized = list(df.columns) # 3) 日期标准化 df = coerce_date_column(df, config.date_col, config.date_format) # 4) 数值标准化 df = coerce_numeric_columns(df, config.numeric_cols) # 5) 检查必要列 log.missing_required_cols = validate_required_columns(df, config.required_cols) if log.missing_required_cols: raise MissingRequiredColumnsError(f"缺少必要字段: {log.missing_required_cols}") log.missing_optional_cols = validate_optional_columns(df, config.optional_cols) # 6) 日期清理 log.invalid_date_rows = validate_date_column(df, config.date_col) if config.date_col in df.columns: df = df.dropna(subset=[config.date_col]) # 7) 排序 df = sort_dataframe(df, config.sort_by) # 8) 去重 existing_keys = [k for k in config.unique_keys if k in df.columns] if existing_keys: duplicated = df.duplicated(subset=existing_keys).sum() if duplicated > 0 and config.forbid_duplicate_keys: raise DuplicateKeyError(f"存在重复主键行: {duplicated}") df, removed = deduplicate_by_keys(df, existing_keys, keep="last") log.duplicate_key_rows_removed = removed # 9) 缺失率检查 check_cols = list(dict.fromkeys(config.required_cols + config.optional_cols)) log.null_ratio = compute_null_ratio(df, check_cols) bad_nulls = validate_required_null_ratio( df, config.required_cols, config.max_required_null_ratio ) if bad_nulls: raise ValidationError(f"必要字段缺失率过高: {bad_nulls}") # 10) 占位符检查 if config.forbid_placeholder: candidate_check_cols = [c for c in config.numeric_cols if c in df.columns] log.placeholder_cols = find_placeholder_columns(df, candidate_check_cols) if log.placeholder_cols: raise PlaceholderDataError(f"检测到占位符列: {log.placeholder_cols}") # 11) OHLC 合法性检查 if config.validate_ohlc: log.bad_ohlc_rows = validate_ohlc_logic(df) if log.bad_ohlc_rows > 0: raise ValidationError(f"检测到 {log.bad_ohlc_rows} 行 OHLC 异常") # 12) 日期范围 if config.date_col in df.columns and not df.empty: log.date_min = str(df[config.date_col].min().date()) log.date_max = str(df[config.date_col].max().date()) log.rows_clean = len(df) if config.verbose: print_cleaning_log(log) return df.reset_index(drop=True), log # ========================================================= # 8) 文件读取 # ========================================================= def load_file(file_path: Union[str, Path], **kwargs) -> pd.DataFrame: file_path = Path(file_path) suffix = file_path.suffix.lower() if suffix == ".csv": return pd.read_csv(file_path, **kwargs) if suffix in [".xlsx", ".xls"]: return pd.read_excel(file_path, **kwargs) if suffix == ".parquet": return pd.read_parquet(file_path, **kwargs) raise ValueError(f"暂不支持的文件类型: {suffix}") def clean_file( file_path: Union[str, Path], config: Optional[CleaningConfig] = None, read_kwargs: Optional[dict] = None, ) -> Tuple[pd.DataFrame, CleaningLog]: if read_kwargs is None: read_kwargs = {} df_raw = load_file(file_path, **read_kwargs) return clean_dataframe(df_raw, config=config, source=str(file_path)) # ========================================================= # 9) 交易日历对齐 # ========================================================= def align_to_calendar( df: pd.DataFrame, calendar: Union[pd.DataFrame, pd.Series, List], date_col: str = "date", how: str = "left", ) -> pd.DataFrame: if isinstance(calendar, pd.DataFrame): if date_col not in calendar.columns: raise ValueError(f"calendar 缺少 {date_col}") cal_df = calendar[[date_col]].copy() elif isinstance(calendar, pd.Series): cal_df = pd.DataFrame({date_col: pd.to_datetime(calendar, errors="coerce")}) else: cal_df = pd.DataFrame({date_col: pd.to_datetime(pd.Series(calendar), errors="coerce")}) cal_df = cal_df.dropna().drop_duplicates().sort_values(date_col).reset_index(drop=True) df2 = df.copy() df2[date_col] = pd.to_datetime(df2[date_col], errors="coerce") out = cal_df.merge(df2, on=date_col, how=how) return out # ========================================================= # 10) 多表合并 # ========================================================= def merge_dataframes_on_date( dfs: List[pd.DataFrame], date_col: str = "date", how: str = "left", base_df: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: if not dfs and base_df is None: raise ValueError("dfs 和 base_df 不能都为空") if base_df is None: merged = dfs[0].copy() rest = dfs[1:] else: merged = base_df.copy() rest = dfs for df in rest: keep_cols = [c for c in df.columns if c == date_col or c not in merged.columns] merged = merged.merge(df[keep_cols], on=date_col, how=how) merged = merged.sort_values(date_col).reset_index(drop=True) return merged # ========================================================= # 11) 硬依赖 / 软依赖 # ========================================================= def check_dependencies( df: pd.DataFrame, hard_required: List[str], soft_required: Optional[List[str]] = None, ) -> Dict[str, List[str]]: if soft_required is None: soft_required = [] missing_hard = [c for c in hard_required if c not in df.columns] missing_soft = [c for c in soft_required if c not in df.columns] if missing_hard: raise MissingRequiredColumnsError(f"缺少硬依赖字段: {missing_hard}") return { "missing_hard": missing_hard, "missing_soft": missing_soft, } def weighted_score_with_fallback(components: Dict[str, Tuple[Optional[float], float]]) -> float: valid = { k: (score, weight) for k, (score, weight) in components.items() if score is not None and not pd.isna(score) } if not valid: return np.nan total_weight = sum(weight for _, weight in valid.values()) return sum(score * weight for score, weight in valid.values()) / total_weight # ========================================================= # 12) 完整性评分 # ========================================================= def compute_completeness_score( df: pd.DataFrame, cols: List[str], ) -> pd.Series: existing = [c for c in cols if c in df.columns] if not existing: return pd.Series(np.nan, index=df.index) return 1.0 - df[existing].isna().mean(axis=1) # ========================================================= # 13) 复杂 Excel 结构预检查 # ========================================================= def inspect_excel_structure(file_path: Union[str, Path], nrows: int = 8) -> Dict[str, Any]: raw = pd.read_excel(file_path, header=None) preview = raw.head(nrows).fillna("").values.tolist() metric_keywords = ["收盘价", "开盘价", "最高价", "最低价", "波动率", "ATR", "涨跌幅"] label_keywords = ["时间", "中证500", "中证1000", "沪深300", "创业板", "上证"] header_guess = { "metric_row": None, "label_row": None, "data_start_row": None, } for i in range(min(nrows, len(raw))): row_text = " | ".join([str(x).strip() for x in raw.iloc[i].tolist() if pd.notna(x)]) if header_guess["metric_row"] is None and any(k in row_text for k in metric_keywords): header_guess["metric_row"] = i if header_guess["label_row"] is None and any(k in row_text for k in label_keywords): header_guess["label_row"] = i if header_guess["label_row"] is not None: header_guess["data_start_row"] = header_guess["label_row"] + 1 return { "shape": raw.shape, "header_guess": header_guess, "preview_rows": preview, } # ========================================================= # 14) 日志输出 # ========================================================= def print_cleaning_log(log: CleaningLog) -> None: print("=" * 60) print("数据清洗日志") print("=" * 60) print(f"来源: {log.source}") print(f"原始行数: {log.rows_raw}") print(f"去全空行后: {log.rows_after_drop_empty}") print(f"清洗后行数: {log.rows_clean}") print(f"原始列数: {len(log.columns_raw)}") print(f"标准化后列数: {len(log.columns_normalized)}") print(f"缺少必要字段: {log.missing_required_cols}") print(f"缺少建议字段: {log.missing_optional_cols}") print(f"非法日期行数: {log.invalid_date_rows}") print(f"重复主键移除行数: {log.duplicate_key_rows_removed}") print(f"占位符列: {log.placeholder_cols}") print(f"OHLC异常行数: {log.bad_ohlc_rows}") print(f"日期范围: {log.date_min} ~ {log.date_max}") print(f"缺失率: {log.null_ratio}") if log.warnings: print(f"警告: {log.warnings}") print("=" * 60) def save_cleaning_log(log: CleaningLog, file_path: Union[str, Path]) -> None: file_path = Path(file_path) with open(file_path, "w", encoding="utf-8") as f: json.dump(log.to_dict(), f, ensure_ascii=False, indent=2) # ========================================================= # 15) 常用 schema 快捷配置 # ========================================================= def build_ohlc_config() -> CleaningConfig: return CleaningConfig( required_cols=["date", "open", "high", "low", "close"], optional_cols=["volume", "amount", "pct_chg", "code", "name"], numeric_cols=[ "open", "high", "low", "close", "pre_close", "volume", "amount", "pct_chg", "chg", "turnover", ], unique_keys=["date"], sort_by=["date"], validate_ohlc=True, forbid_placeholder=True, ) def build_market_breadth_config() -> CleaningConfig: return CleaningConfig( required_cols=["date"], optional_cols=[ "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ], numeric_cols=[ "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ], unique_keys=["date"], sort_by=["date"], validate_ohlc=False, forbid_placeholder=True, ) # ========================================================= # 16) 示例 # ========================================================= if __name__ == "__main__": # 示例1:普通 OHLC 文件 cfg = build_ohlc_config() df_clean, log = clean_file("your_ohlc_file.xlsx", config=cfg) df_clean.to_csv("your_ohlc_file_clean.csv", index=False, encoding="utf-8-sig") save_cleaning_log(log, "your_ohlc_file_clean_log.json") # 示例2:宽度表 breadth_cfg = build_market_breadth_config() breadth_df, breadth_log = clean_file("your_breadth_file.csv", config=breadth_cfg) # 示例3:按交易日历对齐 calendar_df = df_clean[["date"]].copy() breadth_aligned = align_to_calendar(breadth_df, calendar_df, date_col="date") # 示例4:合并 merged = merge_dataframes_on_date([df_clean, breadth_aligned], date_col="date") merged["completeness_score"] = compute_completeness_score( merged, ["open", "high", "low", "close", "amount", "up_ratio"] ) merged.to_csv("merged_clean_data.csv", index=False, encoding="utf-8-sig") 你现在怎么用 1)清洗普通行情表 from data_pipeline_guard import clean_file, build_ohlc_config cfg = build_ohlc_config() df_clean, log = clean_file("index_data.xlsx", config=cfg) 2)清洗宽度/市场统计表 from data_pipeline_guard import clean_file, build_market_breadth_config cfg = build_market_breadth_config() df_clean, log = clean_file("breadth_data.csv", config=cfg) 3)对齐到统一交易日历 from data_pipeline_guard import align_to_calendar df_aligned = align_to_calendar(df_clean, calendar_df, date_col="date") 4)多表合并 from data_pipeline_guard import merge_dataframes_on_date merged = merge_dataframes_on_date([ohlc_df, breadth_df, factor_df], date_col="date") 这套代码的关键点 禁止“假装成功” 如果: 缺必要列 日期坏了 主键重复严重 关键列缺失率过高 出现占位符列 OHLC 关系异常 它会直接报错,而不是偷偷继续跑。 清洗和策略彻底分开 以后你的策略函数只吃: 已清洗 已验收 已对齐 的标准表。 适合很多表复用 你只需要按表类型换 config,而不是每张表都重写一遍清洗逻辑。 我建议你下一步 先把这个文件落进项目,然后做两件事: 把你现在所有原始输入表先都过一遍 clean_file() 把策略层改成不再直接读原始 Excel/CSV 这样你后面查问题时,能立刻分清: 是数据入口错了 还是模型逻辑错了 如果你愿意,下一步我给你补一份 “复杂 Excel 多行表头/分块解析器”,专门处理你这种 iFinD / Wind 风格导出。