Files
config_Test/data_indexer.py
2026-05-28 20:23:32 +08:00

304 lines
11 KiB
Python

import os
import csv
import json
from collections import defaultdict
from datetime import datetime
class DataIndexer:
def __init__(self, csv_dir='csv_output'):
self.csv_dir = csv_dir
self.index = {}
self.metadata = {}
def build_index(self, table_name):
csv_path = os.path.join(self.csv_dir, f"{table_name}.csv")
if not os.path.exists(csv_path):
print(f"错误: 文件不存在 - {csv_path}")
return False
index = {
'table_name': table_name,
'created_at': datetime.now().isoformat(),
'record_count': 0,
'indexes': {
'by_id': {},
'by_category': defaultdict(list),
'by_type': defaultdict(list)
},
'stats': {
'category_distribution': defaultdict(int),
'type_distribution': defaultdict(int)
}
}
try:
with open(csv_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
if len(lines) < 3:
return False
fieldnames = lines[2].strip().split(',')
data_lines = lines[3:]
reader = csv.DictReader(data_lines, fieldnames=fieldnames)
for row_num, row in enumerate(reader, start=4):
row_id = row.get('id')
if not row_id:
continue
try:
row_id_int = int(row_id)
category = int(row.get('category', 0))
type_val = int(row.get('type', 0))
except (ValueError, TypeError):
continue
record = {
'row_num': row_num,
'data': row
}
index['indexes']['by_id'][row_id_int] = record
index['indexes']['by_category'][category].append(row_id_int)
index['indexes']['by_type'][type_val].append(row_id_int)
index['stats']['category_distribution'][category] += 1
index['stats']['type_distribution'][type_val] += 1
index['record_count'] += 1
self.index[table_name] = index
self.metadata[table_name] = {
'last_indexed': datetime.now().isoformat(),
'record_count': index['record_count']
}
return True
except Exception as e:
print(f"索引构建失败: {e}")
return False
def save_index(self, output_file='data_index.json'):
data = {
'metadata': self.metadata,
'index': self.index,
'generated_at': datetime.now().isoformat()
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"索引已保存至: {output_file}")
def load_index(self, index_file='data_index.json'):
if os.path.exists(index_file):
with open(index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.metadata = data.get('metadata', {})
self.index = data.get('index', {})
return True
return False
def query_by_id(self, table_name, id_val):
if table_name not in self.index:
return None
return self.index[table_name]['indexes']['by_id'].get(id_val)
def query_by_category(self, table_name, category):
if table_name not in self.index:
return []
ids = self.index[table_name]['indexes']['by_category'].get(category, [])
results = []
for row_id in ids:
record = self.index[table_name]['indexes']['by_id'].get(row_id)
if record:
results.append(record)
return results
def query_by_type(self, table_name, type_val):
if table_name not in self.index:
return []
ids = self.index[table_name]['indexes']['by_type'].get(type_val, [])
results = []
for row_id in ids:
record = self.index[table_name]['indexes']['by_id'].get(row_id)
if record:
results.append(record)
return results
def query_by_category_and_type(self, table_name, category, type_val):
if table_name not in self.index:
return []
category_ids = set(self.index[table_name]['indexes']['by_category'].get(category, []))
type_ids = set(self.index[table_name]['indexes']['by_type'].get(type_val, []))
common_ids = category_ids & type_ids
results = []
for row_id in common_ids:
record = self.index[table_name]['indexes']['by_id'].get(row_id)
if record:
results.append(record)
return results
def get_stats(self, table_name):
if table_name not in self.index:
return None
return self.index[table_name]['stats']
def list_tables(self):
tables = []
for filename in os.listdir(self.csv_dir):
if filename.endswith('.csv'):
table_name = filename[:-4]
tables.append(table_name)
return sorted(tables)
def build_all_indexes(self):
tables = self.list_tables()
print(f"发现 {len(tables)} 个CSV文件")
success_count = 0
failed_count = 0
for table_name in tables:
print(f"正在索引: {table_name}...", end=' ')
if self.build_index(table_name):
print(f"完成 ({self.index[table_name]['record_count']} 条记录)")
success_count += 1
else:
print("失败")
failed_count += 1
print(f"\n索引构建完成: {success_count} 成功, {failed_count} 失败")
def generate_index_report(self):
report = []
report.append("=" * 80)
report.append("数据索引报告")
report.append("=" * 80)
report.append(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"已索引表数量: {len(self.index)}")
report.append("")
for table_name in sorted(self.index.keys()):
idx = self.index[table_name]
report.append(f"{table_name}")
report.append("-" * 60)
report.append(f" 记录数: {idx['record_count']}")
report.append(f" 索引时间: {idx['created_at']}")
report.append("")
report.append(" 分类分布:")
for category, count in sorted(idx['stats']['category_distribution'].items()):
report.append(f" {category}: {count}")
report.append("")
report.append(" 类型分布(前10):")
sorted_types = sorted(idx['stats']['type_distribution'].items(), key=lambda x: x[1], reverse=True)[:10]
for type_val, count in sorted_types:
report.append(f" {type_val}: {count}")
if len(idx['stats']['type_distribution']) > 10:
report.append(f" ... 还有 {len(idx['stats']['type_distribution']) - 10} 种类型")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def main():
import argparse
parser = argparse.ArgumentParser(description='数据索引系统')
parser.add_argument('-a', '--action', required=True, choices=['build', 'query', 'report'], help='操作类型')
parser.add_argument('-t', '--table', help='表名')
parser.add_argument('-id', '--id', type=int, help='按ID查询')
parser.add_argument('-c', '--category', type=int, help='按分类查询')
parser.add_argument('-type', '--type', type=int, help='按类型查询')
parser.add_argument('-o', '--output', help='输出文件')
args = parser.parse_args()
indexer = DataIndexer()
if args.action == 'build':
indexer.build_all_indexes()
indexer.save_index()
report = indexer.generate_index_report()
print("\n" + report)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n报告已保存至: {args.output}")
elif args.action == 'query':
if not args.table:
print("错误: 查询需要指定表名")
return
indexer.load_index()
if args.id is not None:
result = indexer.query_by_id(args.table, args.id)
if result:
print(f"查询结果 (id={args.id}):")
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(f"未找到 id={args.id} 的记录")
elif args.category is not None and args.type is not None:
results = indexer.query_by_category_and_type(args.table, args.category, args.type)
print(f"查询结果 (category={args.category}, type={args.type}):")
print(f"共找到 {len(results)} 条记录")
for r in results[:5]:
print(f" id={r['data']['id']}: {r['data']['desc']}")
if len(results) > 5:
print(f" ... 还有 {len(results) - 5}")
elif args.category is not None:
results = indexer.query_by_category(args.table, args.category)
print(f"查询结果 (category={args.category}):")
print(f"共找到 {len(results)} 条记录")
for r in results[:5]:
print(f" id={r['data']['id']}: {r['data']['desc']}")
if len(results) > 5:
print(f" ... 还有 {len(results) - 5}")
elif args.type is not None:
results = indexer.query_by_type(args.table, args.type)
print(f"查询结果 (type={args.type}):")
print(f"共找到 {len(results)} 条记录")
for r in results[:5]:
print(f" id={r['data']['id']}: {r['data']['desc']}")
if len(results) > 5:
print(f" ... 还有 {len(results) - 5}")
else:
stats = indexer.get_stats(args.table)
if stats:
print(f"{args.table} 的统计信息:")
print(json.dumps(stats, ensure_ascii=False, indent=2))
else:
print(f"未找到表 {args.table} 的索引")
elif args.action == 'report':
indexer.load_index()
report = indexer.generate_index_report()
print(report)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n报告已保存至: {args.output}")
if __name__ == '__main__':
main()