import os import csv import re from collections import defaultdict def format_number(num): if num >= 100000000: return f"{num // 100000000}亿" elif num >= 10000: return f"{num // 10000}万" else: return str(num) TYPE_DESCRIPTIONS = { 1: "招募", 2: "英雄升级", 3: "英雄升阶", 4: "主角升级", 5: "装备升级", 6: "通过", 7: "击败", 8: "商店购买", 9: "消耗", 10: "消耗", 11: "登录", 12: "开", 13: "挑战", 14: "挑战", 15: "获得", 16: "领取", 17: "世界频道发言" } TYPE_UNITS = { 1: "次", 2: "次", 3: "次", 4: "次", 5: "次", 6: "关主线关卡", 7: "波敌人", 8: "次", 9: "金币", 10: "勾玉", 11: "天", 12: "次宝箱", 13: "次副本", 14: "次竞技场", 15: "个英雄", 16: "次游历奖励", 17: "次" } TYPE_DESCRIPTIONS_ACHIEVEMENT = { 1: "累计招募", 2: "英雄累计升级", 3: "英雄累计升阶", 4: "主角累计升级", 5: "装备累计升级", 6: "累计通关", 7: "累计通关", 8: "商店累计消费", 9: "累计消耗", 10: "累计消耗", 11: "累计登录", 12: "无", 13: "累计挑战副本", 14: "累计挑战竞技场", 15: "累计获得", 16: "累计领取", 17: "累计消耗", 18: "累计点赞好友", 19: "累计英雄委托", 20: "累计击败", 21: "战力达到", 22: "主角到达", 23: "累计委托" } TYPE_UNITS_ACHIEVEMENT = { 1: "次", 2: "次", 3: "次", 4: "次", 5: "次", 6: "关主线关卡", 7: "波敌人", 8: "次", 9: "金币", 10: "钻石", 11: "天", 12: "", 13: "次", 14: "次", 15: "位英雄", 16: "次挂机奖励", 17: "秒", 18: "次", 19: "次", 20: "个敌人", 21: "万", 22: "阶", 23: "次" } TYPE_TO_OPENUI = { 1: 2, 2: 8, 3: 8, 4: 8, 5: 7, 6: 1, 7: 1, 8: 3, 9: 3, 10: 3, 11: 1, 12: 1, 13: 4, 14: 10, 15: 2, 16: 5, 17: 6 } OPENUI_TO_DESC = { 1: "UI_MainPanel", 2: "UI_DrawCardPanel", 3: "UI_ShopPanel", 4: "UI_FBPanel", 5: "UI_IdlePanel", 6: "UI_ChatPanel", 7: "UI_EquipPanel", 8: "UI_HeroPanel", 9: "UI_FriendPanel", 10: "UI_ArePanel" } CATEGORY_SCORE_MAP = { 2: 20, 5: 20, 6: 20, 10: 20, 9: 25 } CATEGORY_DESCRIPTIONS = { 1: "主线任务", 2: "日常任务", 3: "通行证任务", 4: "修炼任务", 5: "周常任务", 6: "公会任务", 7: "委托任务", 8: "称号任务", 9: "成就任务", 10: "七日任务" } class ValidationError: def __init__(self, row_num, field, error_type, message): self.row_num = row_num self.field = field self.error_type = error_type self.message = message def __repr__(self): return f"行{self.row_num} [{self.field}] {self.error_type}: {self.message}" class QuestValidator: def __init__(self): self.errors = [] self.all_ids = set() self.id_to_row = {} self.category_ids = defaultdict(list) def _add_error(self, row_num, field, error_type, message): self.errors.append(ValidationError(row_num, field, error_type, message)) def _parse_int(self, value, default=None): try: return int(value) except (ValueError, TypeError): return default def _parse_str(self, value): if value is None: return "" return str(value).strip() def validate_id(self, row_num, id_val, category): id_str = str(id_val) if len(id_str) != 5: self._add_error(row_num, "id", "格式错误", f"id必须为5位数字,当前值: {id_val}") return False if not id_str.isdigit(): self._add_error(row_num, "id", "格式错误", f"id必须为纯数字,当前值: {id_val}") return False expected_category = int(id_str[0]) if expected_category != category: self._add_error(row_num, "id", "逻辑错误", f"id第一位({expected_category})与category({category})不匹配") return False return True def validate_category(self, row_num, category): if category not in CATEGORY_DESCRIPTIONS: self._add_error(row_num, "category", "枚举错误", f"category值{category}不在允许范围1-10内") return False return True def validate_next(self, row_num, next_val, category): if next_val < 0: self._add_error(row_num, "next", "范围错误", f"next不能为负数,当前值: {next_val}") return False if category != 1: if next_val != 0: self._add_error(row_num, "next", "规则错误", f"category≠1时next必须为0,当前值: {next_val}") return False else: if next_val != 0 and next_val not in self.all_ids: self._add_error(row_num, "next", "引用错误", f"next={next_val}引用的id不存在") return False return True def validate_desc(self, row_num, desc, type_val, target, category): desc = self._parse_str(desc) if not desc: self._add_error(row_num, "desc", "空值错误", "desc字段不能为空") return False if not desc.endswith("。"): self._add_error(row_num, "desc", "格式错误", f"desc必须以中文句号'。'结尾,当前值: '{desc}'") return False if category == 9: if type_val not in TYPE_DESCRIPTIONS_ACHIEVEMENT: return True formatted_target = format_number(target) expected_desc = f"{TYPE_DESCRIPTIONS_ACHIEVEMENT[type_val]}{formatted_target}{TYPE_UNITS_ACHIEVEMENT[type_val]}。" else: if type_val not in TYPE_DESCRIPTIONS: return True formatted_target = format_number(target) expected_desc = f"{TYPE_DESCRIPTIONS[type_val]}{formatted_target}{TYPE_UNITS[type_val]}。" if desc != expected_desc: self._add_error(row_num, "desc", "逻辑错误", f"desc与type/target不匹配,期望: '{expected_desc}',实际: '{desc}'") return False return True def validate_type(self, row_num, type_val, category): if category == 9: if type_val < 1 or type_val > 23: self._add_error(row_num, "type", "枚举错误", f"category=9(成就)时type值{type_val}不在允许范围1-23内") return False else: if type_val < 1 or type_val > 17: self._add_error(row_num, "type", "枚举错误", f"category≠9时type值{type_val}不在允许范围1-17内") return False return True def validate_target(self, row_num, target): if target < 1: self._add_error(row_num, "target", "范围错误", f"target必须为正整数,当前值: {target}") return False return True def validate_openUI_type(self, row_num, openUI_type, type_val): if openUI_type < 1 or openUI_type > 10: self._add_error(row_num, "openUI_type", "枚举错误", f"openUI_type值{openUI_type}不在允许范围1-10内") return False if type_val in TYPE_TO_OPENUI: expected = TYPE_TO_OPENUI[type_val] if openUI_type != expected: self._add_error(row_num, "openUI_type", "映射错误", f"type={type_val}时openUI_type应为{expected},当前值: {openUI_type}") return False return True def validate_openUI_desc(self, row_num, openUI_desc, openUI_type): if openUI_type in OPENUI_TO_DESC: expected = OPENUI_TO_DESC[openUI_type] if self._parse_str(openUI_desc) != expected: self._add_error(row_num, "openUI_desc", "映射错误", f"openUI_type={openUI_type}时openUI_desc应为'{expected}',当前值: '{openUI_desc}'") return False return True def validate_score(self, row_num, score, category): expected = CATEGORY_SCORE_MAP.get(category, 0) if score != expected: self._add_error(row_num, "score", "计算错误", f"category={category}({CATEGORY_DESCRIPTIONS.get(category,'未知')})时score应为{expected},当前值: {score}") return False return True def validate_auto(self, row_num, auto): if auto != 0: self._add_error(row_num, "auto", "固定值错误", f"auto必须为0,当前值: {auto}") return False return True def validate_extra(self, row_num, extra): extra_str = self._parse_str(extra).lower() if extra_str != "" and extra_str != "null": self._add_error(row_num, "extra", "固定值错误", f"extra必须为null,当前值: {extra}") return False return True def validate_id_continuity(self): for category, ids in self.category_ids.items(): sorted_ids = sorted(ids) expected_base = category * 10000 for i, id_val in enumerate(sorted_ids): expected = expected_base + (i + 1) if id_val != expected: row_num = self.id_to_row[id_val] self._add_error(row_num, "id", "连续性错误", f"category={category}的id不连续,期望: {expected},实际: {id_val}") def validate_row(self, row_num, row): errors_before = len(self.errors) id_val = self._parse_int(row.get('id')) category = self._parse_int(row.get('category')) next_val = self._parse_int(row.get('next'), 0) desc = row.get('desc') type_val = self._parse_int(row.get('type')) target = self._parse_int(row.get('target')) openUI_type = self._parse_int(row.get('openUI_type')) openUI_desc = row.get('openUI_desc') score = self._parse_int(row.get('score'), 0) auto = self._parse_int(row.get('auto'), 0) extra = row.get('extra') if id_val is None: self._add_error(row_num, "id", "空值错误", "id字段不能为空") return if category is None: self._add_error(row_num, "category", "空值错误", "category字段不能为空") return self.validate_id(row_num, id_val, category) self.validate_category(row_num, category) self.validate_type(row_num, type_val, category) if type_val is not None: if (category == 9 and type_val >= 1 and type_val <= 23) or (category != 9 and type_val >= 1 and type_val <= 17): if target is None: self._add_error(row_num, "target", "空值错误", "target字段不能为空") else: self.validate_target(row_num, target) self.validate_desc(row_num, desc, type_val, target, category) if openUI_type is not None: self.validate_openUI_type(row_num, openUI_type, type_val) self.validate_openUI_desc(row_num, openUI_desc, openUI_type) self.validate_next(row_num, next_val, category) self.validate_score(row_num, score, category) self.validate_auto(row_num, auto) self.validate_extra(row_num, extra) return len(self.errors) == errors_before def validate_file(self, file_path): self.errors = [] self.all_ids = set() self.id_to_row = {} self.category_ids = defaultdict(list) with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) < 3: return self.errors fieldnames = lines[2].strip().split(',') data_lines = lines[3:] reader = csv.DictReader(data_lines, fieldnames=fieldnames) row_num = 4 all_rows = [] for row in reader: all_rows.append((row_num, row)) id_val = self._parse_int(row.get('id')) category = self._parse_int(row.get('category')) if id_val is not None and category is not None: self.all_ids.add(id_val) self.category_ids[category].append(id_val) self.id_to_row[id_val] = row_num row_num += 1 for row_num, row in all_rows: self.validate_row(row_num, row) self.validate_id_continuity() return self.errors def generate_report(self, file_path): errors = self.validate_file(file_path) report = [] report.append("=" * 80) report.append("Quest 表数据校验报告") report.append("=" * 80) report.append(f"校验文件: {file_path}") report.append(f"校验时间: {self._get_timestamp()}") report.append("") if not errors: report.append("[OK] 所有数据校验通过") report.append("") report.append("校验统计:") report.append(f" - 校验记录数: {len(self.all_ids)}") report.append(f" - 错误数: 0") for category, ids in sorted(self.category_ids.items()): report.append(f" - {CATEGORY_DESCRIPTIONS.get(category, f'category={category}')}: {len(ids)} 条") else: report.append(f"[FAIL] 发现 {len(errors)} 个错误") report.append("") error_by_type = defaultdict(list) for error in errors: error_by_type[error.error_type].append(error) for error_type, error_list in sorted(error_by_type.items(), key=lambda x: len(x[1]), reverse=True): report.append(f"【{error_type}】({len(error_list)}个)") report.append("-" * 60) for error in error_list[:10]: report.append(f" {error}") if len(error_list) > 10: report.append(f" ... 还有 {len(error_list) - 10} 个错误") report.append("") report.append("错误类型统计:") report.append("-" * 60) for error_type, error_list in sorted(error_by_type.items(), key=lambda x: len(x[1]), reverse=True): report.append(f" {error_type}: {len(error_list)} 个") report.append("=" * 80) return "\n".join(report) def _get_timestamp(self): from datetime import datetime return datetime.now().strftime('%Y-%m-%d %H:%M:%S') def main(): import argparse parser = argparse.ArgumentParser(description='Quest表数据校验工具') parser.add_argument('-f', '--file', required=True, help='要校验的CSV文件路径') parser.add_argument('-o', '--output', help='校验报告输出文件路径') args = parser.parse_args() validator = QuestValidator() report = validator.generate_report(args.file) print(report) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(report) print(f"\n报告已保存至: {args.output}") return len(validator.errors) if __name__ == '__main__': exit(main())