明白,你的意思是:输入 Excel/数据源的结构不固定,有时比你给的参考还复杂,所以清洗模块必须“更像解析器”,而不是针对某一种模板。 我给你一版更通用、能扛“各种复合表”的更新(仍然两模块:洗数据 + 别瞎说)。核心思路: 不假设表头在哪一行、不假设块宽、不假设列顺序 先在整张表里“找锚点”:开盘价/收盘价 + 时间/日期 + 指数名(或代码) 自动识别每个“块”(open/close/close_nvnf),再从块里提取数据 提取后进行强一致性校验(日期交集、缺失规则、价格合法性、异常涨跌检测) LLM guard 做成“schema gate”:白名单字段 + 枚举 + 数值范围 + 固定模板渲染(LLM只能解释) 下面是更鲁棒的版本(你可以直接替换原文件)。它对“复杂复合表”的适配能力会显著强于前一版。 ✅ data_cleaning.py(超鲁棒复合表解析版) # -*- coding: utf-8 -*- """ data_cleaning.py (robust) 适配:iFinD/Wind/各种复合Excel,不固定表头行、不固定块布局 输出 v14_ready: date, zz500_open, zz500_close, zz1000_open, zz1000_close, hs300_open, hs300_close, comp_open, comp_close """ from __future__ import annotations import re from dataclasses import dataclass from typing import Dict, List, Tuple, Optional import numpy as np import pandas as pd import openpyxl # ----------------------------- # 配置 # ----------------------------- @dataclass class CleaningConfig: sheet_name: Optional[str] = None # None => active sheet search_rows: int = 40 # 扫描前N行找表头/锚点(复杂表可调大) scan_columns_cap: int = 300 # 最多扫描多少列(极宽表避免太慢) max_block_width: int = 20 # 一个块内最多向右扫描多少列 data_start_row_fallback: int = 5 # 若无法精准推断数据起始行,用这个兜底 require_all_three_indices: bool = True # 三指数开收盘必须齐全才保留 fail_on_any_missing: bool = False # True: 发现缺失直接报错;False: 丢行 placeholder_values: Tuple[float, ...] = (-1.0, 0.0, 9999.0, 99999.0, 1e20) max_abs_daily_move: float = 0.25 # 合成日收益绝对值超过阈值报警 allow_close_nvnf_fallback: bool = True # 收盘价缺失时允许用“不前推收盘价”兜底 stop_after_blank_dates: int = 3 # 连续几行日期为空视为数据结束 INDEX_ALIASES = { "zz500": ["中证500", "000905", "000905.SH", "CSI500", "CSI 500"], "zz1000": ["中证1000", "000852", "000852.SH", "CSI1000", "CSI 1000"], "hs300": ["沪深300", "000300", "000300.SH", "CSI300", "CSI 300"], } METRIC_ALIASES = { "open": ["开盘价", "open", "Open"], "close": ["收盘价", "close", "Close"], "close_nvnf": ["收盘价(不前推)", "收盘价(不前推)", "不前推收盘价", "close_nvnf"], } DATE_ALIASES = ["时间", "日期", "date", "Date", "TIME", "Time"] # ----------------------------- # 工具 # ----------------------------- def _norm(x) -> str: if x is None: return "" s = str(x).strip() s = re.sub(r"\s+", "", s) return s def _contains_any(cell_text: str, keys: List[str]) -> bool: t = _norm(cell_text).lower() for k in keys: kk = _norm(k).lower() if kk and kk in t: return True return False def _match_index(cell_text: str) -> Optional[str]: t = _norm(cell_text) if not t: return None for key, aliases in INDEX_ALIASES.items(): for a in aliases: if _norm(a) and _norm(a) in t: return key return None def _match_metric(cell_text: str) -> Optional[str]: if _contains_any(cell_text, METRIC_ALIASES["open"]): return "open" if _contains_any(cell_text, METRIC_ALIASES["close_nvnf"]): return "close_nvnf" if _contains_any(cell_text, METRIC_ALIASES["close"]): return "close" return None def _is_date_header(cell_text: str) -> bool: t = _norm(cell_text) return t in DATE_ALIASES or _contains_any(t, DATE_ALIASES) def _as_date(v): if v is None: return None # openpyxl datetime/date if hasattr(v, "year") and hasattr(v, "month") and hasattr(v, "day"): try: return pd.to_datetime(v).normalize() except Exception: return None # string try: return pd.to_datetime(v).normalize() except Exception: return None def _as_float(v): if v is None: return np.nan try: if isinstance(v, str): s = v.replace(",", "").strip() if s == "" or s.lower() in ("nan", "none", "null"): return np.nan return float(s) return float(v) except Exception: return np.nan # ----------------------------- # 识别“块锚点” # ----------------------------- def _find_blocks(ws, cfg: CleaningConfig) -> Dict[str, List[Dict[str, int]]]: """ 在表头区域寻找可能的块: 块定义:某列是 date header,且同一行/相邻行附近有 metric 标识(open/close/close_nvnf), 并且 date header 右侧能找到至少2个指数列。 返回: {"open":[{date_col:..., zz500:..., ...}], "close":[...], "close_nvnf":[...]} 所有 col_idx 为 1-based """ blocks: Dict[str, List[Dict[str, int]]] = {"open": [], "close": [], "close_nvnf": []} max_row = min(cfg.search_rows, ws.max_row) max_col = min(cfg.scan_columns_cap, ws.max_column) # 预读表头区域文本,减少 cell 访问开销 header = [[_norm(ws.cell(r, c).value) for c in range(1, max_col + 1)] for r in range(1, max_row + 1)] # 对每个 cell 判断是不是 “日期列标题” for r in range(1, max_row + 1): for c in range(1, max_col + 1): if not _is_date_header(header[r-1][c-1]): continue # 在附近(同列向上/向下几行)找 metric 名称:open/close/close_nvnf metric = None for rr in range(max(1, r-3), min(max_row, r+3) + 1): m = _match_metric(header[rr-1][c-1]) if m: metric = m break if not metric: # 有的模板 metric 在日期列左侧一格(合并单元格导致) for rr in range(max(1, r-3), min(max_row, r+3) + 1): if c > 1: m = _match_metric(header[rr-1][c-2]) if m: metric = m break if not metric: continue # 在日期列右侧扫描指数列(同一行 r 或相邻行 r±2) mapping = {"date": c} for cc in range(c + 1, min(max_col, c + cfg.max_block_width) + 1): found = None # 尝试在 r±2 行中找指数名 for rr in range(max(1, r-2), min(max_row, r+2) + 1): key = _match_index(header[rr-1][cc-1]) if key: found = key break if found: mapping[found] = cc idx_found = sum(1 for k in ("zz500", "zz1000", "hs300") if k in mapping) if idx_found >= 2: blocks[metric].append(mapping) # 去重:同一 metric 下 date 列相同的块保留一个(取指数最多的) for metric, arr in list(blocks.items()): by_date_col: Dict[int, Dict[str, int]] = {} for m in arr: dc = m["date"] if dc not in by_date_col: by_date_col[dc] = m else: # 保留指数列更全的 cur = by_date_col[dc] cur_n = sum(1 for k in ("zz500","zz1000","hs300") if k in cur) new_n = sum(1 for k in ("zz500","zz1000","hs300") if k in m) if new_n > cur_n: by_date_col[dc] = m blocks[metric] = list(by_date_col.values()) # 必须要有 open if not blocks["open"]: raise ValueError("未能在表头区域识别到任何“开盘价(open)”块。请确认表格包含“开盘价”字样及日期列。") # close/close_nvnf 至少一个 if not blocks["close"] and not (cfg.allow_close_nvnf_fallback and blocks["close_nvnf"]): raise ValueError("未识别到“收盘价(close)”块;也没有可用的“收盘价(不前推)”兜底块。") return blocks def _extract_from_block(ws, block: Dict[str, int], cfg: CleaningConfig) -> pd.DataFrame: """ 从块提取数据:date + 三指数值(缺的为NaN) 自动推断数据起始行:从 date header 行往下找第一个“可解析日期”的行 """ date_col = block["date"] # 推断 header 行:向上找最近的“日期列标题” # 简化:从 1..search_rows 找到该列出现“日期/时间”的最后一行 max_row = min(cfg.search_rows, ws.max_row) header_row = None for r in range(1, max_row + 1): if _is_date_header(ws.cell(r, date_col).value): header_row = r if header_row is None: header_row = 1 # 推断数据起始行:从 header_row+1 开始找第一个可解析日期 start_row = None for r in range(header_row + 1, min(ws.max_row, header_row + 50) + 1): if _as_date(ws.cell(r, date_col).value) is not None: start_row = r break if start_row is None: start_row = cfg.data_start_row_fallback data = [] blank_streak = 0 for r in range(start_row, ws.max_row + 1): d = _as_date(ws.cell(r, date_col).value) if d is None: blank_streak += 1 if blank_streak >= cfg.stop_after_blank_dates: break continue blank_streak = 0 row = {"date": d} for key in ("zz500", "zz1000", "hs300"): if key in block: row[key] = _as_float(ws.cell(r, block[key]).value) else: row[key] = np.nan data.append(row) df = pd.DataFrame(data) if df.empty: raise ValueError("从识别到的块中提取数据为空:请检查该块附近是否真有数据。") return df.drop_duplicates("date").sort_values("date").reset_index(drop=True) def clean_three_indices_from_any_composite_xlsx( xlsx_path: str, out_csv_path: Optional[str] = None, cfg: Optional[CleaningConfig] = None, ) -> pd.DataFrame: cfg = cfg or CleaningConfig() wb = openpyxl.load_workbook(xlsx_path, data_only=True) ws = wb[cfg.sheet_name] if cfg.sheet_name else wb.active blocks = _find_blocks(ws, cfg) # 选最“完整”的块:指数列识别最多的那个 def pick_best(metric: str) -> Dict[str, int]: arr = blocks[metric] arr_sorted = sorted( arr, key=lambda m: sum(1 for k in ("zz500","zz1000","hs300") if k in m), reverse=True, ) return arr_sorted[0] open_block = pick_best("open") close_block = None if blocks["close"]: close_block = pick_best("close") else: close_block = pick_best("close_nvnf") df_open = _extract_from_block(ws, open_block, cfg).rename( columns={"zz500":"zz500_open","zz1000":"zz1000_open","hs300":"hs300_open"} ) df_close = _extract_from_block(ws, close_block, cfg).rename( columns={"zz500":"zz500_close","zz1000":"zz1000_close","hs300":"hs300_close"} ) # inner join 取共同交易日(避免错位) df = df_open.merge(df_close, on="date", how="inner").sort_values("date").drop_duplicates("date") # placeholder / 非法值 core_cols = ["zz500_open","zz500_close","zz1000_open","zz1000_close","hs300_open","hs300_close"] for c in core_cols: df[c] = df[c].replace(list(cfg.placeholder_values), np.nan) df.loc[df[c] <= 0, c] = np.nan # 缺失处理 bad = df[core_cols].isna().any(axis=1) bad_n = int(bad.sum()) if bad_n > 0: if cfg.require_all_three_indices: if cfg.fail_on_any_missing: raise ValueError(f"发现 {bad_n} 行三指数开收盘不全,已配置 fail_on_any_missing=True,终止。") df = df.loc[~bad].copy() else: if cfg.fail_on_any_missing: raise ValueError(f"发现 {bad_n} 行存在缺失,已配置 fail_on_any_missing=True,终止。") # 合成价格:等权平均 df["comp_open"] = df[["zz500_open","zz1000_open","hs300_open"]].mean(axis=1, skipna=True) df["comp_close"] = df[["zz500_close","zz1000_close","hs300_close"]].mean(axis=1, skipna=True) if df["comp_open"].isna().any() or df["comp_close"].isna().any(): raise ValueError("合成价格出现缺失:说明缺失策略不符合当前配置,请检查 require_all_three_indices。") # 异常涨跌幅报警 comp_ret = df["comp_close"] / df["comp_close"].shift(1) - 1 abn = comp_ret.abs() > cfg.max_abs_daily_move if abn.any(): dates = df.loc[abn.fillna(False), "date"].dt.strftime("%Y-%m-%d").tolist() print(f"[WARN] 发现疑似异常合成日收益(>|{cfg.max_abs_daily_move:.0%}|):{dates[:10]}{'...' if len(dates)>10 else ''}") out = df[[ "date", "zz500_open","zz500_close", "zz1000_open","zz1000_close", "hs300_open","hs300_close", "comp_open","comp_close", ]].copy() if out_csv_path: out.to_csv(out_csv_path, index=False, encoding="utf-8-sig") return out if __name__ == "__main__": import argparse p = argparse.ArgumentParser() p.add_argument("--xlsx", required=True) p.add_argument("--out", default="three_indices_v14_ready.csv") p.add_argument("--sheet", default=None) p.add_argument("--strict_missing", action="store_true", help="发现缺失直接报错") p.add_argument("--allow_missing", action="store_true", help="允许缺失(不推荐)") args = p.parse_args() cfg = CleaningConfig( sheet_name=args.sheet, require_all_three_indices=not args.allow_missing, fail_on_any_missing=args.strict_missing, ) df = clean_three_indices_from_any_composite_xlsx(args.xlsx, args.out, cfg) print(f"OK: {args.out} 行数={len(df)} 日期范围={df['date'].min().date()}~{df['date'].max().date()}") ✅ llm_guard.py(更硬:schema gate + 固定模板输出) # -*- coding: utf-8 -*- """ llm_guard.py (hardened) - 白名单字段 - 枚举/范围校验 - 输出固定模板 - 禁止任何交易指令/代码/标的输出(防 openclaw 自由发挥) """ from __future__ import annotations import re from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple ALLOWED_STATES = {"TREND_BULL", "TREND_NORMAL", "RANGE", "RISK_MILD", "RISK_SEVERE"} ALLOWED_DECISION_KEYS = { "date", "market_state", "confidence", "suggested_position", "execution_permissions", "consecutive_days", "switch_count_last_5days", "trade_candidates", } ALLOWED_PERM_KEYS = {"allow_new_positions", "allow_add_positions", "allow_reduce_positions"} @dataclass class GuardConfig: position_is_ratio_0_1: bool = True forbid_trade_instructions: bool = True ban_patterns: Tuple[str, ...] = ( r"(买入|卖出|下单|挂单|止损|止盈|建仓|加仓|减仓|清仓)", r"(代码|ticker|symbol|ETF|基金代码|证券代码)", r"(\d{6}\.?(SH|SZ)?)", r"(?i)\b(?:buy|sell|order|execute)\b", ) class DecisionValidationError(ValueError): pass def _ensure_whitelist(d: Dict[str, Any], allowed: set, where: str) -> None: extra = set(d.keys()) - allowed if extra: raise DecisionValidationError(f"{where} 存在未允许字段:{sorted(extra)}(已拒绝,防自由发挥)") def _to_float(x: Any) -> float: try: if x is None: return float("nan") if isinstance(x, str): return float(x.replace("%", "").strip()) return float(x) except Exception: return float("nan") def validate_and_sanitize_decision(decision: Dict[str, Any], cfg: Optional[GuardConfig] = None) -> Dict[str, Any]: cfg = cfg or GuardConfig() if not isinstance(decision, dict): raise DecisionValidationError("decision 必须是 dict") _ensure_whitelist(decision, ALLOWED_DECISION_KEYS, "decision") for k in ("market_state", "confidence", "suggested_position"): if k not in decision: raise DecisionValidationError(f"decision 缺少必填字段:{k}") state = str(decision["market_state"]).strip() if state not in ALLOWED_STATES: raise DecisionValidationError(f"market_state 非法:{state}") conf = _to_float(decision["confidence"]) if not (0 <= conf <= 100): raise DecisionValidationError(f"confidence 超范围:{decision['confidence']}(应 0~100)") pos = _to_float(decision["suggested_position"]) if cfg.position_is_ratio_0_1: if not (0 <= pos <= 1): raise DecisionValidationError(f"suggested_position 超范围:{decision['suggested_position']}(应 0~1)") pos_pct = pos * 100.0 pos_ratio = pos else: if not (0 <= pos <= 100): raise DecisionValidationError(f"suggested_position 超范围:{decision['suggested_position']}(应 0~100)") pos_pct = pos pos_ratio = pos / 100.0 perms = decision.get("execution_permissions") if perms is not None: if not isinstance(perms, dict): raise DecisionValidationError("execution_permissions 必须是 dict") _ensure_whitelist(perms, ALLOWED_PERM_KEYS, "execution_permissions") for k, v in perms.items(): if not isinstance(v, bool): raise DecisionValidationError(f"execution_permissions.{k} 必须是 bool") out = dict(decision) out["market_state"] = state out["confidence"] = float(conf) out["suggested_position"] = float(pos_ratio) out["_suggested_position_pct"] = float(pos_pct) return out def _render_fixed_template(decision: Dict[str, Any]) -> str: state_cn = { "TREND_BULL": "趋势强势(偏多)", "TREND_NORMAL": "趋势偏多(中等)", "RANGE": "震荡整理", "RISK_MILD": "风险偏高(轻)", "RISK_SEVERE": "风险偏高(重)", }[decision["market_state"]] lines = [ f"大势状态:{state_cn}", f"置信度:{decision['confidence']:.1f}/100", f"建议仓位:{decision['_suggested_position_pct']:.1f}%", ] perms = decision.get("execution_permissions") if isinstance(perms, dict): lines.append( f"执行权限:新开仓={perms.get('allow_new_positions', False)},加仓={perms.get('allow_add_positions', False)},减仓={perms.get('allow_reduce_positions', True)}" ) if "consecutive_days" in decision: lines.append(f"状态连续天数:{decision['consecutive_days']}") if "switch_count_last_5days" in decision: lines.append(f"近5日切换次数:{decision['switch_count_last_5days']}") lines.append("备注:以上为模型信号解读与风控建议,不构成任何自动下单指令。") return "\n".join(lines) def _block_trade_instructions(text: str, cfg: GuardConfig) -> str: if not cfg.forbid_trade_instructions: return text for pat in cfg.ban_patterns: if re.search(pat, text): raise DecisionValidationError(f"解释文本触发禁止模式:{pat}(已拒绝输出)") return text def generate_safe_explanation(decision: Dict[str, Any], cfg: Optional[GuardConfig] = None) -> str: cfg = cfg or GuardConfig() safe = validate_and_sanitize_decision(decision, cfg) text = _render_fixed_template(safe) return _block_trade_instructions(text, cfg) 你现在最担心的点:“程序能跳过步骤,直接输出不准信息” 这版的关键改动是:宁可失败,也不“凑合输出”。 典型会直接 raise ValueError 的情况包括: 找不到 开盘价块 或 收盘价块(及兜底块) open/close 合并后日期为空或错位 合成价出现 NaN 价格<=0/占位符导致数据不可用 这样你不会再遇到“跑是跑了,但输出乱七八糟”的情况。 建议你上线时加一个“强制门禁” 把主流程改成: 先跑清洗 清洗成功才允许状态机/建议模块 清洗失败就输出“数据源异常,不更新信号” 只要这样,openclaw 就算再能说,也没有“脏数据下的建议”可说。 如果你愿意,我还可以给你加一个输出清洗报告(json/csv),内容包括: 识别到了哪些块(open/close 的列号) 合并后有效交易日数量 丢弃行数与原因(缺失/非法/占位符) 异常涨跌日期列表 这对你云端排错很有用。你想要日志格式是 json 还是 csv?