那我直接给你一个 通用版 data_cleaning.py,核心目标是: 列名自动标准化 日期统一 数值统一 重复/缺失/异常检查 按交易日历对齐 硬依赖 / 软依赖 日志输出 适合后面接你的量化策略 这套不是只针对三指数表,CSV / Excel / 多种列名风格都能用。 通用数据清洗框架代码 from __future__ import annotations from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Dict, List, Optional, Tuple, Union import json import numpy as np import pandas as pd # ========================= # 1. 通用字段别名 # ========================= DEFAULT_COLUMN_ALIASES: Dict[str, List[str]] = { "date": ["date", "日期", "交易日期", "time", "datetime"], "code": ["code", "股票代码", "证券代码", "ts_code", "wind_code"], "name": ["name", "股票名称", "证券简称", "sec_name"], "open": ["open", "开盘", "开盘价"], "high": ["high", "最高", "最高价"], "low": ["low", "最低", "最低价"], "close": ["close", "收盘", "收盘价"], "pre_close": ["pre_close", "前收盘", "前收盘价"], "volume": ["volume", "vol", "成交量"], "amount": ["amount", "turnover", "成交额", "成交金额"], "pct_chg": ["pct_chg", "涨跌幅", "涨幅", "pct_change"], "chg": ["chg", "涨跌", "change"], "turnover": ["turnover", "换手率"], "atr": ["atr", "ATR", "真实波幅", "ATR真实波幅"], "volatility": ["volatility", "波动率"], "max_drawdown": ["max_drawdown", "最大回撤"], "mfi": ["mfi", "MFI"], "up_ratio": ["up_ratio", "上涨成份数量占比", "上涨家数占比"], "down_ratio": ["down_ratio", "下跌成份数量占比", "下跌家数占比"], "up_count": ["up_count", "上涨数量", "上涨家数", "指数成份上涨数量"], "down_count": ["down_count", "下跌数量", "下跌家数", "指数成份下跌数量"], "limit_up_count": ["limit_up_count", "涨停数量", "指数成份涨停数量"], "limit_down_count": ["limit_down_count", "跌停数量", "指数成份跌停数量"], "avg_member_return": ["avg_member_return", "指数成份平均涨跌幅"], "median_member_return": ["median_member_return", "指数成份涨跌幅中位数"], } # ========================= # 2. 配置 # ========================= @dataclass class DataCleaningConfig: aliases: Dict[str, List[str]] = field(default_factory=lambda: DEFAULT_COLUMN_ALIASES.copy()) # 必要字段 required_cols: List[str] = field(default_factory=lambda: ["date"]) # 建议字段(缺了不报错,但记日志) optional_cols: List[str] = field(default_factory=list) # 应转数值的字段 numeric_cols: List[str] = field(default_factory=lambda: [ "open", "high", "low", "close", "pre_close", "volume", "amount", "pct_chg", "chg", "turnover", "atr", "volatility", "max_drawdown", "mfi", "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ]) # 主键 unique_keys: List[str] = field(default_factory=lambda: ["date"]) sort_by: List[str] = field(default_factory=lambda: ["date"]) # 缺失容忍 max_required_null_ratio: float = 0.02 # 日期 date_col: str = "date" date_format: Optional[str] = None # OHLC 验证 validate_ohlc: bool = True # 是否保留原始列 keep_original_columns: bool = False # 是否打印日志 verbose: bool = True # ========================= # 3. 日志结构 # ========================= @dataclass class CleaningLog: source: str = "" rows_raw: int = 0 rows_after_drop_empty: int = 0 rows_clean: int = 0 columns_raw: List[str] = field(default_factory=list) columns_normalized: List[str] = field(default_factory=list) rename_map: Dict[str, str] = field(default_factory=dict) missing_required_cols: List[str] = field(default_factory=list) missing_optional_cols: List[str] = field(default_factory=list) invalid_date_rows: int = 0 duplicate_key_rows_removed: int = 0 null_ratio: Dict[str, float] = field(default_factory=dict) bad_ohlc_rows: int = 0 date_min: Optional[str] = None date_max: Optional[str] = None warnings: List[str] = field(default_factory=list) def to_dict(self) -> Dict: return asdict(self) # ========================= # 4. 基础工具函数 # ========================= def _clean_colname(col: object) -> str: return str(col).strip().replace("\n", " ").replace("\t", " ") def _build_reverse_alias_map(aliases: Dict[str, List[str]]) -> Dict[str, str]: reverse = {} for std_name, alias_list in aliases.items(): for alias in alias_list: reverse[str(alias).strip().lower()] = std_name return reverse def normalize_columns( df: pd.DataFrame, aliases: Dict[str, List[str]], ) -> Tuple[pd.DataFrame, Dict[str, str]]: """ 将原始列名映射为标准列名。 """ df = df.copy() raw_cols = list(df.columns) cleaned_cols = [_clean_colname(c) for c in raw_cols] df.columns = cleaned_cols reverse_map = _build_reverse_alias_map(aliases) rename_map = {} used_std_names = set() for col in df.columns: key = col.lower() if key in reverse_map: std_name = reverse_map[key] if std_name not in used_std_names: rename_map[col] = std_name used_std_names.add(std_name) df = df.rename(columns=rename_map) return df, rename_map def coerce_date_column( df: pd.DataFrame, date_col: str = "date", date_format: Optional[str] = None, ) -> pd.DataFrame: df = df.copy() if date_col not in df.columns: return df if date_format: df[date_col] = pd.to_datetime(df[date_col], format=date_format, errors="coerce") else: df[date_col] = pd.to_datetime(df[date_col], errors="coerce") return df def coerce_numeric_columns( df: pd.DataFrame, numeric_cols: List[str], ) -> pd.DataFrame: df = df.copy() for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") return df def drop_fully_empty_rows(df: pd.DataFrame) -> pd.DataFrame: return df.dropna(how="all").copy() def deduplicate_by_keys( df: pd.DataFrame, keys: List[str], keep: str = "last", ) -> Tuple[pd.DataFrame, int]: df = df.copy() before = len(df) df = df.drop_duplicates(subset=keys, keep=keep) removed = before - len(df) return df, removed def sort_dataframe(df: pd.DataFrame, sort_by: List[str]) -> pd.DataFrame: existing = [c for c in sort_by if c in df.columns] if existing: df = df.sort_values(existing).reset_index(drop=True) return df # ========================= # 5. 验收函数 # ========================= def validate_required_columns(df: pd.DataFrame, required_cols: List[str]) -> List[str]: return [c for c in required_cols if c not in df.columns] def validate_optional_columns(df: pd.DataFrame, optional_cols: List[str]) -> List[str]: return [c for c in optional_cols if c not in df.columns] def compute_null_ratio(df: pd.DataFrame, cols: List[str]) -> Dict[str, float]: out = {} for col in cols: if col in df.columns: out[col] = round(float(df[col].isna().mean()), 6) return out def validate_required_null_ratio( df: pd.DataFrame, required_cols: List[str], max_required_null_ratio: float, ) -> List[str]: bad = [] for col in required_cols: if col in df.columns: ratio = float(df[col].isna().mean()) if ratio > max_required_null_ratio: bad.append(f"{col}: {ratio:.2%}") return bad def validate_ohlc_logic(df: pd.DataFrame) -> int: needed = ["open", "high", "low", "close"] if not all(c in df.columns for c in needed): return 0 bad = df[ (df["high"] < df["low"]) | (df["open"] <= 0) | (df["high"] <= 0) | (df["low"] <= 0) | (df["close"] <= 0) | (df["high"] < df["open"]) | (df["high"] < df["close"]) | (df["low"] > df["open"]) | (df["low"] > df["close"]) ] return len(bad) def validate_date_column(df: pd.DataFrame, date_col: str) -> int: if date_col not in df.columns: return 0 return int(df[date_col].isna().sum()) # ========================= # 6. 主清洗函数 # ========================= def clean_dataframe( df_raw: pd.DataFrame, config: Optional[DataCleaningConfig] = None, source: str = "", ) -> Tuple[pd.DataFrame, CleaningLog]: if config is None: config = DataCleaningConfig() log = CleaningLog(source=source) log.rows_raw = len(df_raw) log.columns_raw = [_clean_colname(c) for c in df_raw.columns] df = df_raw.copy() # 1) 删全空行 df = drop_fully_empty_rows(df) log.rows_after_drop_empty = len(df) # 2) 标准化列名 df, rename_map = normalize_columns(df, config.aliases) log.rename_map = rename_map log.columns_normalized = list(df.columns) # 3) 日期标准化 df = coerce_date_column(df, config.date_col, config.date_format) # 4) 数值标准化 df = coerce_numeric_columns(df, config.numeric_cols) # 5) 必要列检查 missing_required = validate_required_columns(df, config.required_cols) log.missing_required_cols = missing_required if missing_required: raise ValueError(f"缺少必要字段: {missing_required}") # 6) 建议列检查 log.missing_optional_cols = validate_optional_columns(df, config.optional_cols) # 7) 删非法日期 invalid_date_rows = validate_date_column(df, config.date_col) log.invalid_date_rows = invalid_date_rows if config.date_col in df.columns: df = df.dropna(subset=[config.date_col]) # 8) 排序 df = sort_dataframe(df, config.sort_by) # 9) 去重 existing_keys = [k for k in config.unique_keys if k in df.columns] if existing_keys: df, removed = deduplicate_by_keys(df, existing_keys, keep="last") log.duplicate_key_rows_removed = removed # 10) 缺失率检查 all_check_cols = list(dict.fromkeys(config.required_cols + config.optional_cols)) log.null_ratio = compute_null_ratio(df, all_check_cols) bad_nulls = validate_required_null_ratio( df, config.required_cols, config.max_required_null_ratio ) if bad_nulls: raise ValueError(f"必要字段缺失率过高: {bad_nulls}") # 11) OHLC 合法性检查 if config.validate_ohlc: log.bad_ohlc_rows = validate_ohlc_logic(df) if log.bad_ohlc_rows > 0: log.warnings.append(f"检测到 {log.bad_ohlc_rows} 行 OHLC 异常。") # 12) 日期范围 if config.date_col in df.columns and not df.empty: log.date_min = str(df[config.date_col].min().date()) log.date_max = str(df[config.date_col].max().date()) # 13) 保留原始列开关 if not config.keep_original_columns: # 不做额外删除,只是保留标准化后的列 pass log.rows_clean = len(df) if config.verbose: print_cleaning_log(log) return df.reset_index(drop=True), log # ========================= # 7. 文件读取函数 # ========================= def load_file(file_path: Union[str, Path], **kwargs) -> pd.DataFrame: file_path = Path(file_path) suffix = file_path.suffix.lower() if suffix == ".csv": return pd.read_csv(file_path, **kwargs) if suffix in [".xlsx", ".xls"]: return pd.read_excel(file_path, **kwargs) if suffix == ".parquet": return pd.read_parquet(file_path, **kwargs) raise ValueError(f"暂不支持的文件类型: {suffix}") def clean_file( file_path: Union[str, Path], config: Optional[DataCleaningConfig] = None, read_kwargs: Optional[dict] = None, ) -> Tuple[pd.DataFrame, CleaningLog]: if read_kwargs is None: read_kwargs = {} df_raw = load_file(file_path, **read_kwargs) return clean_dataframe(df_raw, config=config, source=str(file_path)) # ========================= # 8. 交易日历对齐 # ========================= def align_to_calendar( df: pd.DataFrame, calendar: Union[pd.DataFrame, pd.Series, List], date_col: str = "date", how: str = "left", ) -> pd.DataFrame: """ 所有表统一按主交易日历对齐。 """ if isinstance(calendar, pd.DataFrame): if date_col not in calendar.columns: raise ValueError(f"calendar 缺少 {date_col}") cal_df = calendar[[date_col]].copy() elif isinstance(calendar, pd.Series): cal_df = pd.DataFrame({date_col: pd.to_datetime(calendar, errors="coerce")}) else: cal_df = pd.DataFrame({date_col: pd.to_datetime(pd.Series(calendar), errors="coerce")}) cal_df = cal_df.dropna().drop_duplicates().sort_values(date_col).reset_index(drop=True) df2 = df.copy() df2[date_col] = pd.to_datetime(df2[date_col], errors="coerce") out = cal_df.merge(df2, on=date_col, how=how) return out # ========================= # 9. 多表合并 # ========================= def merge_dataframes_on_date( dfs: List[pd.DataFrame], date_col: str = "date", how: str = "left", base_df: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: """ 把多个清洗后的表按 date 合并。 """ if not dfs and base_df is None: raise ValueError("dfs 和 base_df 不能都为空") if base_df is None: base_df = dfs[0].copy() rest = dfs[1:] else: rest = dfs merged = base_df.copy() for df in rest: cols = [c for c in df.columns if c == date_col or c not in merged.columns] merged = merged.merge(df[cols], on=date_col, how=how) merged = merged.sort_values(date_col).reset_index(drop=True) return merged # ========================= # 10. 硬依赖 / 软依赖 # ========================= def check_dependencies( df: pd.DataFrame, hard_required: List[str], soft_required: Optional[List[str]] = None, ) -> Dict[str, List[str]]: if soft_required is None: soft_required = [] missing_hard = [c for c in hard_required if c not in df.columns] missing_soft = [c for c in soft_required if c not in df.columns] if missing_hard: raise ValueError(f"缺少硬依赖字段: {missing_hard}") return { "missing_hard": missing_hard, "missing_soft": missing_soft, } def weighted_score_with_fallback(components: Dict[str, Tuple[Optional[float], float]]) -> float: """ components = { "drawdown": (0.72, 0.4), "atr": (0.58, 0.3), "breadth": (None, 0.3), } breadth 缺了就自动重标权重 """ valid = { k: (score, weight) for k, (score, weight) in components.items() if score is not None and not pd.isna(score) } if not valid: return np.nan total_weight = sum(weight for _, weight in valid.values()) return sum(score * weight for score, weight in valid.values()) / total_weight # ========================= # 11. 完整性评分 # ========================= def compute_completeness_score( df: pd.DataFrame, cols: List[str], ) -> pd.Series: existing = [c for c in cols if c in df.columns] if not existing: return pd.Series(np.nan, index=df.index) return 1.0 - df[existing].isna().mean(axis=1) # ========================= # 12. 日志输出 / 保存 # ========================= def print_cleaning_log(log: CleaningLog) -> None: print("=" * 60) print("数据清洗日志") print("=" * 60) print(f"来源: {log.source}") print(f"原始行数: {log.rows_raw}") print(f"去全空行后: {log.rows_after_drop_empty}") print(f"清洗后行数: {log.rows_clean}") print(f"原始列数: {len(log.columns_raw)}") print(f"标准化后列数: {len(log.columns_normalized)}") print(f"缺少必要字段: {log.missing_required_cols}") print(f"缺少建议字段: {log.missing_optional_cols}") print(f"非法日期行数: {log.invalid_date_rows}") print(f"重复主键移除行数: {log.duplicate_key_rows_removed}") print(f"OHLC异常行数: {log.bad_ohlc_rows}") print(f"日期范围: {log.date_min} ~ {log.date_max}") print(f"缺失率: {log.null_ratio}") if log.warnings: print(f"警告: {log.warnings}") print("=" * 60) def save_cleaning_log(log: CleaningLog, file_path: Union[str, Path]) -> None: file_path = Path(file_path) with open(file_path, "w", encoding="utf-8") as f: json.dump(log.to_dict(), f, ensure_ascii=False, indent=2) # ========================= # 13. 针对常见表的快捷配置 # ========================= def build_ohlc_config() -> DataCleaningConfig: return DataCleaningConfig( required_cols=["date", "open", "high", "low", "close"], optional_cols=["volume", "amount", "pct_chg", "code", "name"], numeric_cols=[ "open", "high", "low", "close", "pre_close", "volume", "amount", "pct_chg", "chg", "turnover", ], unique_keys=["date"], sort_by=["date"], validate_ohlc=True, ) def build_market_breadth_config() -> DataCleaningConfig: return DataCleaningConfig( required_cols=["date"], optional_cols=[ "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ], numeric_cols=[ "up_ratio", "down_ratio", "up_count", "down_count", "limit_up_count", "limit_down_count", "avg_member_return", "median_member_return", ], unique_keys=["date"], sort_by=["date"], validate_ohlc=False, ) # ========================= # 14. 示例 # ========================= if __name__ == "__main__": # 示例1:清洗单张 OHLC 表 cfg = build_ohlc_config() df_clean, log = clean_file("your_ohlc_file.xlsx", config=cfg) # 保存清洗结果和日志 df_clean.to_csv("your_ohlc_file_clean.csv", index=False, encoding="utf-8-sig") save_cleaning_log(log, "your_ohlc_file_clean_log.json") # 示例2:按交易日历对齐 calendar = df_clean["date"] # 这里只是示例,正式建议用独立交易日历 df_aligned = align_to_calendar(df_clean, calendar, date_col="date") df_aligned.to_csv("your_ohlc_file_aligned.csv", index=False, encoding="utf-8-sig") 你以后怎么用 1. 清洗普通行情表 比如一张股票或指数 OHLC 表: from data_cleaning import clean_file, build_ohlc_config cfg = build_ohlc_config() df_clean, log = clean_file("index_data.xlsx", config=cfg) 2. 清洗市场宽度表 比如上涨家数、涨停数这些: from data_cleaning import clean_file, build_market_breadth_config cfg = build_market_breadth_config() df_clean, log = clean_file("breadth_data.csv", config=cfg) 3. 统一按交易日历对齐 from data_cleaning import align_to_calendar df_aligned = align_to_calendar(df_clean, calendar_df, date_col="date") 4. 多张表合并 from data_cleaning import merge_dataframes_on_date merged = merge_dataframes_on_date([ohlc_df, breadth_df, factor_df], date_col="date") 5. 做完整性评分 from data_cleaning import compute_completeness_score merged["completeness_score"] = compute_completeness_score( merged, cols=["open", "high", "low", "close", "amount", "up_ratio"] ) 这套方法的核心优点 1. 不再“边取数边算策略” 先清洗成标准表,再喂给策略。 2. 列名不统一也能处理 靠 alias 映射,不靠手改每张表。 3. 日期问题单独治理 统一 to_datetime、去重、排序、对齐。 4. 能验收 不是“能跑就行”,而是每次都有日志。 5. 能降级 软依赖缺了不至于全崩。 我建议你接下来这样整理项目 把数据层固定成这几个文件: data_cleaning.py:放上面这套通用框架 data_loaders.py:放你不同数据源的读取逻辑 schemas.py:放各种表的 config feature_engineering.py:算 MA、ATR、回撤 strategy.py:你的大势和个股策略 这样以后你很多表都能复用这一层。