文字版PDF转txt及初步清洗
import os
import sys
import re
import json
import subprocess
from pathlib import Path
from datetime import datetime
class CorpusProcessor:
def __init__(self):
"""初始化语料库处理器"""
self.version = "1.0"
self.config = self.load_config()
print(f"流水句语料库处理工具 v{self.version}")
print("=" * 60)
def load_config(self):
"""加载配置"""
default_config = {
"pdf_conversion": {
"input_dir": "原始pdf",
"output_dir": "转换后TXT",
"use_pdfplumber": True,
"poppler_path": r"C:\poppler\Library\bin\pdftotext.exe"
},
"text_cleaning": {
"cleaned_dir": "清洗后TXT",
"keep_original": True,
"remove_control_chars": True,
"normalize_punctuation": True,
"fix_line_breaks": True
},
"project": {
"name": "流水句语料库",
"author": "应用语言学博士生",
"created_date": datetime.now().strftime("%Y-%m-%d")
}
}
# 尝试加载用户配置
config_file = "corpus_config.json"
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# 深度合并配置
self.merge_config(default_config, user_config)
except:
print(f"警告: 无法读取配置文件 {config_file},使用默认配置")
return default_config
def merge_config(self, default, user):
"""深度合并配置"""
for key, value in user.items():
if key in default and isinstance(default[key], dict) and isinstance(value, dict):
self.merge_config(default[key], value)
else:
default[key] = value
def show_main_menu(self):
"""显示主菜单"""
while True:
print("\n" + "=" * 60)
print("主菜单")
print("=" * 60)
print("1. PDF转TXT")
print("2. 文本清洗")
print("3. 完整流程 (PDF转TXT -> 文本清洗)")
print("4. 检查转换质量")
print("5. 批量处理整个目录")
print("6. 配置设置")
print("7. 查看帮助")
print("8. 退出")
choice = input("\n请选择操作 (1-8): ").strip()
if choice == '1':
self.convert_pdfs()
elif choice == '2':
self.clean_texts()
elif choice == '3':
self.full_pipeline()
elif choice == '4':
self.check_quality()
elif choice == '5':
self.batch_process()
elif choice == '6':
self.configure()
elif choice == '7':
self.show_help()
elif choice == '8':
print("\n感谢使用,再见!")
break
else:
print("无效的选择,请重新输入")
def convert_pdfs(self):
"""PDF转TXT功能"""
print("\n" + "=" * 60)
print("PDF转TXT")
print("=" * 60)
# 获取输入输出路径
input_dir = input(f"PDF文件夹路径 [默认: {self.config['pdf_conversion']['input_dir']}]: ").strip()
if not input_dir:
input_dir = self.config['pdf_conversion']['input_dir']
output_dir = input(f"输出文件夹路径 [默认: {self.config['pdf_conversion']['output_dir']}]: ").strip()
if not output_dir:
output_dir = self.config['pdf_conversion']['output_dir']
# 检查路径
if not os.path.exists(input_dir):
print(f"错误: 输入目录不存在 - {input_dir}")
return
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 选择转换方法
print("\n选择转换方法:")
print("1. 使用pdfplumber (推荐,无需额外安装)")
print("2. 使用pdftotext (需要安装Poppler)")
method_choice = input("请选择 (1或2,默认1): ").strip()
if method_choice == '2':
# 检查是否安装了pdftotext
if not self.check_pdftotext():
print("未找到pdftotext,将使用pdfplumber")
method_choice = '1'
# 执行转换
pdf_files = []
for ext in ['*.pdf', '*.PDF']:
pdf_files.extend(Path(input_dir).glob(ext))
if not pdf_files:
print(f"在目录 {input_dir} 中未找到PDF文件")
return
print(f"找到 {len(pdf_files)} 个PDF文件")
print("开始转换...")
success_count = 0
for i, pdf_file in enumerate(pdf_files, 1):
print(f"\n[{i}/{len(pdf_files)}] 转换: {pdf_file.name}")
output_file = Path(output_dir) / f"{pdf_file.stem}.txt"
try:
if method_choice == '2':
success = self.convert_with_pdftotext(pdf_file, output_file)
else:
success = self.convert_with_pdfplumber(pdf_file, output_file)
if success:
success_count += 1
print(f" 成功")
else:
print(f" 失败")
except Exception as e:
print(f" 错误: {e}")
print(f"\n转换完成! 成功: {success_count}/{len(pdf_files)}")
# 询问是否立即清洗
if success_count > 0:
answer = input("\n是否立即清洗转换后的文本? (y/n, 默认n): ").strip().lower()
if answer == 'y':
self.clean_specific_dir(output_dir)
def convert_with_pdfplumber(self, pdf_path, output_path):
"""使用pdfplumber转换PDF"""
try:
import pdfplumber
all_text = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
text = page.extract_text()
if text:
all_text.append(text)
else:
print(f" 警告: 第{page_num}页可能为扫描图像")
all_text.append(f"[扫描页面 {page_num}]")
if all_text:
full_text = '\n'.join(all_text)
# 基本清理
full_text = self.basic_clean_text(full_text)
# 保存
with open(output_path, 'w', encoding='utf-8', newline='\n') as f:
f.write(full_text)
return os.path.getsize(output_path) > 100
return False
except ImportError:
print("错误: pdfplumber未安装,请运行: pip install pdfplumber")
return False
except Exception as e:
print(f" pdfplumber错误: {e}")
return False
def convert_with_pdftotext(self, pdf_path, output_path):
"""使用pdftotext转换PDF"""
pdftotext_path = self.config['pdf_conversion']['poppler_path']
if not os.path.exists(pdftotext_path):
print(f"错误: pdftotext未找到 - {pdftotext_path}")
return False
try:
cmd = [
pdftotext_path,
"-enc", "UTF-8",
"-nopgbrk",
"-eol", "unix",
str(pdf_path),
str(output_path)
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
# 读取并清理文本
if os.path.exists(output_path):
with open(output_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
text = self.basic_clean_text(text)
with open(output_path, 'w', encoding='utf-8', newline='\n') as f:
f.write(text)
return True
else:
print(f" pdftotext错误: {result.stderr}")
return False
except Exception as e:
print(f" pdftotext异常: {e}")
return False
def check_pdftotext(self):
"""检查pdftotext是否可用"""
pdftotext_path = self.config['pdf_conversion']['poppler_path']
return os.path.exists(pdftotext_path)
def basic_clean_text(self, text):
"""基本文本清理(用于PDF转换后)"""
if not text:
return ""
# 统一换行符
text = text.replace('\r\n', '\n').replace('\r', '\n')
# 连接被断开的单词
text = re.sub(r'(\w+)-\n(\w+)', r'\1\2', text)
# 清理多余空白
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)
text = re.sub(r'[ \t]+', ' ', text)
return text.strip()
def clean_texts(self):
"""文本清洗功能"""
print("\n" + "=" * 60)
print("文本清洗")
print("=" * 60)
print("选择要清洗的文本来源:")
print("1. 清洗PDF转换后的文本")
print("2. 清洗指定目录的文本")
print("3. 清洗单个文件")
source_choice = input("请选择 (1-3, 默认1): ").strip()
if source_choice == '3':
self.clean_single_file()
elif source_choice == '2':
input_dir = input("请输入要清洗的目录路径: ").strip()
if not os.path.exists(input_dir):
print(f"错误: 目录不存在 - {input_dir}")
return
self.clean_specific_dir(input_dir)
else:
# 默认清洗PDF转换后的文本
input_dir = self.config['pdf_conversion']['output_dir']
if not os.path.exists(input_dir):
print(f"错误: 目录不存在 - {input_dir}")
return
self.clean_specific_dir(input_dir)
def clean_specific_dir(self, input_dir):
"""清洗指定目录"""
output_dir = input(f"输出目录 [默认: {self.config['text_cleaning']['cleaned_dir']}]: ").strip()
if not output_dir:
output_dir = self.config['text_cleaning']['cleaned_dir']
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 查找TXT文件
txt_files = []
for root, dirs, files in os.walk(input_dir):
for file in files:
if file.lower().endswith('.txt'):
txt_files.append(os.path.join(root, file))
if not txt_files:
print(f"在目录 {input_dir} 中未找到TXT文件")
return
print(f"找到 {len(txt_files)} 个TXT文件")
print("开始清洗...")
success_count = 0
for i, input_file in enumerate(txt_files, 1):
print(f"\n[{i}/{len(txt_files)}] 清洗: {os.path.basename(input_file)}")
# 保持相对路径结构
rel_path = os.path.relpath(input_file, input_dir)
output_file = os.path.join(output_dir, rel_path)
# 确保输出目录存在
os.makedirs(os.path.dirname(output_file), exist_ok=True)
try:
result = self.clean_text_file(input_file, output_file)
success_count += 1
print(f" 成功: {result['original_size']} -> {result['cleaned_size']} 字符")
except Exception as e:
print(f" 失败: {e}")
print(f"\n清洗完成! 成功: {success_count}/{len(txt_files)}")
def clean_single_file(self):
"""清洗单个文件"""
input_file = input("请输入要清洗的文件路径: ").strip()
if not os.path.exists(input_file):
print(f"错误: 文件不存在 - {input_file}")
return
# 自动生成输出文件名
if input_file.endswith('.txt'):
output_file = input_file.replace('.txt', '_cleaned.txt')
else:
output_file = input_file + '_cleaned.txt'
custom_output = input(f"输出文件路径 [默认: {output_file}]: ").strip()
if custom_output:
output_file = custom_output
try:
result = self.clean_text_file(input_file, output_file)
print(f"\n清洗完成!")
print(f"原始文件: {input_file}")
print(f"清洗后文件: {output_file}")
print(f"原始大小: {result['original_size']} 字符")
print(f"清洗后大小: {result['cleaned_size']} 字符")
print(f"移除杂质: {result['removed']} 字符")
if result['original_size'] > 0:
reduction = (result['removed'] / result['original_size']) * 100
print(f"减少比例: {reduction:.1f}%")
except Exception as e:
print(f"清洗过程中出错: {e}")
def clean_text_file(self, input_path, output_path):
"""清洗单个文本文件"""
# 读取文件
content = self.read_file_with_encoding(input_path)
if content is None:
raise ValueError("无法读取文件,编码可能不正确")
original_size = len(content)
# 执行清洗
cleaned_content = self.advanced_clean_text(content)
cleaned_size = len(cleaned_content)
# 保存结果
with open(output_path, 'w', encoding='utf-8', newline='\n') as f:
f.write(cleaned_content)
return {
'original_size': original_size,
'cleaned_size': cleaned_size,
'removed': original_size - cleaned_size
}
def read_file_with_encoding(self, file_path):
"""尝试用多种编码读取文件"""
encodings = ['utf-8', 'gbk', 'gb2312', 'big5', 'utf-16']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except Exception:
continue
return None
def advanced_clean_text(self, text):
"""高级文本清洗"""
if not text:
return ""
# 1. 移除控制字符
text = self.remove_control_chars(text)
# 2. 按行处理
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.rstrip() # 移除行尾空格
# 跳过空行(保留一个空行作为段落分隔)
if not line.strip():
cleaned_lines.append('')
continue
# 3. 移除页码行
if self.is_page_number(line):
continue
# 4. 移除注释行
if self.is_annotation(line):
continue
# 5. 规范化标点
line = self.normalize_punctuation(line)
# 6. 规范化空格
line = self.normalize_spaces(line)
cleaned_lines.append(line)
# 7. 处理断行
result = '\n'.join(cleaned_lines)
result = self.fix_line_breaks(result)
# 8. 移除多余空行
result = re.sub(r'\n\s*\n\s*\n+', '\n\n', result)
# 9. 确保以换行符结束
if result and not result.endswith('\n'):
result += '\n'
return result
def remove_control_chars(self, text):
"""移除控制字符"""
# 保留换行符和制表符,移除其他控制字符
result = []
for char in text:
if char == '\n' or char == '\t' or (ord(char) >= 32 and ord(char) <= 126):
result.append(char)
elif '\u4e00' <= char <= '\u9fff': # 中文字符
result.append(char)
elif char in ',。!?;:"\'()【】《》': # 中文标点
result.append(char)
# 其他字符被过滤掉
return ''.join(result)
def is_page_number(self, line):
"""判断是否是页码行"""
line = line.strip()
# 简单数字
if re.match(r'^\s*\d+\s*$', line) and len(line) < 10:
return True
# 带格式的页码
patterns = [
r'^[-—]\s*\d+\s*[-—]$',
r'^\d+\s*/\s*\d+$',
r'^第\s*\d+\s*页$',
r'^Page\s*\d+$',
r'^-\s*\d+\s*-$',
]
for pattern in patterns:
if re.match(pattern, line, re.IGNORECASE):
return True
return False
def is_annotation(self, line):
"""判断是否是注释行"""
line = line.strip()
# 注释关键词
annotation_keywords = [
r'^注[::]',
r'^注释[::]',
r'^插图[::]',
r'^图\d+[::]',
r'^※',
r'^★',
r'^【.*】$',
]
for pattern in annotation_keywords:
if re.search(pattern, line):
return True
# 括号注释
if re.match(r'^[((].*?[))]\s*$', line):
return True
return False
def normalize_punctuation(self, text):
"""规范化标点符号"""
# 半角转全角
punct_map = {
',': ',',
'.': '。',
';': ';',
':': ':',
'!': '!',
'?': '?',
'(': '(',
')': ')',
'[': '【',
']': '】',
'<': '《',
'>': '》',
}
for half, full in punct_map.items():
text = text.replace(half, full)
# 英文句号在中文上下文中的处理
text = re.sub(r'(?<=[\u4e00-\u9fff])\.(?=\s*[\u4e00-\u9fff])', '。', text)
# 统一引号
text = re.sub(r'[「『]', '"', text)
text = re.sub(r'[」』]', '"', text)
# 移除标点前后的空格
text = re.sub(r'\s+([。,;:!?)】》)])', r'\1', text)
text = re.sub(r'([(【《(])\s+', r'\1', text)
return text
def normalize_spaces(self, text):
"""规范化空格"""
# 全角空格转半角
text = text.replace(' ', ' ')
# 制表符转空格
text = text.replace('\t', ' ')
# 多个连续空格转一个
text = re.sub(r' {2,}', ' ', text)
return text
def fix_line_breaks(self, text):
"""修复断行问题"""
lines = text.split('\n')
if len(lines) <= 1:
return text
merged_lines = []
i = 0
while i < len(lines):
current_line = lines[i].strip()
# 如果是空行,保留
if not current_line:
merged_lines.append('')
i += 1
continue
# 检查是否需要合并
should_merge = False
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
# 当前行以逗号、分号等结尾
if current_line and current_line[-1] in [',', ';', ':', '、', '—', '-']:
should_merge = True
# 当前行很短且不以句号结尾
elif len(current_line) < 25 and current_line[-1] not in ['。', '!', '?', '…', '"', "'", ')', '】', '》']:
should_merge = True
if should_merge and i + 1 < len(lines):
# 合并当前行和下一行
merged_line = current_line
if current_line.endswith('-') or current_line.endswith('—'):
merged_line = current_line[:-1]
merged_line += lines[i + 1].strip()
merged_lines.append(merged_line)
i += 2
else:
merged_lines.append(current_line)
i += 1
return '\n'.join(merged_lines)
def full_pipeline(self):
"""完整流程:PDF转TXT -> 文本清洗"""
print("\n" + "=" * 60)
print("完整处理流程")
print("=" * 60)
# 第一步:PDF转TXT
print("\n第一步:PDF转TXT")
print("-" * 40)
pdf_dir = input(f"PDF文件夹路径 [默认: {self.config['pdf_conversion']['input_dir']}]: ").strip()
if not pdf_dir:
pdf_dir = self.config['pdf_conversion']['input_dir']
temp_dir = input(f"临时输出目录 [默认: {self.config['pdf_conversion']['output_dir']}]: ").strip()
if not temp_dir:
temp_dir = self.config['pdf_conversion']['output_dir']
if not os.path.exists(pdf_dir):
print(f"错误: PDF目录不存在 - {pdf_dir}")
return
# 执行PDF转换
print("\n开始PDF转换...")
self.run_pdf_conversion(pdf_dir, temp_dir)
# 第二步:文本清洗
print("\n第二步:文本清洗")
print("-" * 40)
final_dir = input(f"最终输出目录 [默认: {self.config['text_cleaning']['cleaned_dir']}]: ").strip()
if not final_dir:
final_dir = self.config['text_cleaning']['cleaned_dir']
print("\n开始文本清洗...")
self.clean_specific_dir(temp_dir)
print("\n" + "=" * 60)
print("完整流程完成!")
print(f"PDF源目录: {pdf_dir}")
print(f"临时TXT目录: {temp_dir}")
print(f"最终清洗目录: {final_dir}")
def run_pdf_conversion(self, pdf_dir, output_dir):
"""运行PDF转换"""
# 简化版的PDF转换逻辑
os.makedirs(output_dir, exist_ok=True)
# 查找PDF文件
pdf_files = []
for ext in ['*.pdf', '*.PDF']:
pdf_files.extend(Path(pdf_dir).glob(ext))
if not pdf_files:
print(f"在目录 {pdf_dir} 中未找到PDF文件")
return
print(f"找到 {len(pdf_files)} 个PDF文件")
success_count = 0
for i, pdf_file in enumerate(pdf_files, 1):
print(f"[{i}/{len(pdf_files)}] 转换: {pdf_file.name}")
output_file = Path(output_dir) / f"{pdf_file.stem}.txt"
try:
success = self.convert_with_pdfplumber(pdf_file, output_file)
if success:
success_count += 1
except Exception as e:
print(f" 错误: {e}")
print(f"PDF转换完成! 成功: {success_count}/{len(pdf_files)}")
def check_quality(self):
"""检查转换质量"""
print("\n" + "=" * 60)
print("检查转换质量")
print("=" * 60)
dir_to_check = input("请输入要检查的目录路径: ").strip()
if not os.path.exists(dir_to_check):
print(f"错误: 目录不存在 - {dir_to_check}")
return
txt_files = []
for root, dirs, files in os.walk(dir_to_check):
for file in files:
if file.lower().endswith('.txt'):
txt_files.append(os.path.join(root, file))
if not txt_files:
print(f"在目录 {dir_to_check} 中未找到TXT文件")
return
print(f"找到 {len(txt_files)} 个TXT文件")
print("\n开始检查...")
issues = []
for i, txt_file in enumerate(txt_files[:20]): # 只检查前20个文件
try:
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
file_size = os.path.getsize(txt_file)
lines = len(content.split('\n'))
chars = len(content)
# 检查编码问题
if '\ufffd' in content:
issues.append(f"{os.path.basename(txt_file)}: 包含替换字符(�)")
# 检查文件大小
if file_size < 1024: # 小于1KB
issues.append(f"{os.path.basename(txt_file)}: 文件过小 ({file_size}字节)")
print(f"[{i+1}] {os.path.basename(txt_file)}: {lines}行, {chars}字符")
except UnicodeDecodeError:
issues.append(f"{os.path.basename(txt_file)}: 非UTF-8编码")
except Exception as e:
issues.append(f"{os.path.basename(txt_file)}: 读取失败 ({e})")
if issues:
print(f"\n发现 {len(issues)} 个问题:")
for issue in issues[:10]: # 只显示前10个问题
print(f" ⚠ {issue}")
if len(issues) > 10:
print(f" ... 还有 {len(issues) - 10} 个问题")
else:
print("\n所有检查的文件格式正常!")
def batch_process(self):
"""批量处理整个目录"""
print("\n" + "=" * 60)
print("批量处理整个目录")
print("=" * 60)
print("说明:此功能将自动处理指定目录下的所有PDF文件")
print("包括:转换PDF -> 清洗文本 -> 质量检查")
base_dir = input("请输入基础目录路径: ").strip()
if not os.path.exists(base_dir):
print(f"错误: 目录不存在 - {base_dir}")
return
# 创建子目录结构
subdirs = {
'pdf_dir': os.path.join(base_dir, "原始pdf"),
'temp_dir': os.path.join(base_dir, "转换后TXT"),
'clean_dir': os.path.join(base_dir, "清洗后TXT"),
'report_dir': os.path.join(base_dir, "报告")
}
# 创建所有目录
for dir_name, dir_path in subdirs.items():
os.makedirs(dir_path, exist_ok=True)
print(f"创建目录: {dir_path}")
# 检查是否有PDF文件
pdf_files = []
for ext in ['*.pdf', '*.PDF']:
pdf_files.extend(Path(subdirs['pdf_dir']).glob(ext))
if not pdf_files:
print(f"\n在目录 {subdirs['pdf_dir']} 中未找到PDF文件")
print("请将PDF文件放入该目录后重新运行")
return
print(f"\n找到 {len(pdf_files)} 个PDF文件")
print("开始批量处理...")
# 1. PDF转TXT
print("\n1. PDF转TXT...")
self.run_pdf_conversion(subdirs['pdf_dir'], subdirs['temp_dir'])
# 2. 文本清洗
print("\n2. 文本清洗...")
self.batch_clean_directory(subdirs['temp_dir'], subdirs['clean_dir'])
# 3. 生成报告
print("\n3. 生成报告...")
self.generate_report(subdirs, subdirs['report_dir'])
print("\n" + "=" * 60)
print("批量处理完成!")
print("=" * 60)
print(f"原始PDF: {subdirs['pdf_dir']}")
print(f"转换后TXT: {subdirs['temp_dir']}")
print(f"清洗后TXT: {subdirs['clean_dir']}")
print(f"报告文件: {subdirs['report_dir']}")
def batch_clean_directory(self, input_dir, output_dir):
"""批量清洗目录"""
txt_files = []
for root, dirs, files in os.walk(input_dir):
for file in files:
if file.lower().endswith('.txt'):
txt_files.append(os.path.join(root, file))
if not txt_files:
return
success_count = 0
for i, input_file in enumerate(txt_files, 1):
rel_path = os.path.relpath(input_file, input_dir)
output_file = os.path.join(output_dir, rel_path)
os.makedirs(os.path.dirname(output_file), exist_ok=True)
try:
self.clean_text_file(input_file, output_file)
success_count += 1
except:
pass
print(f"清洗完成: {success_count}/{len(txt_files)} 个文件")
def generate_report(self, dirs, report_dir):
"""生成处理报告"""
report_file = os.path.join(report_dir, "处理报告.txt")
with open(report_file, 'w', encoding='utf-8') as f:
f.write("流水句语料库处理报告\n")
f.write("=" * 60 + "\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# 统计PDF文件
pdf_count = 0
for ext in ['*.pdf', '*.PDF']:
pdf_count += len(list(Path(dirs['pdf_dir']).glob(ext)))
f.write(f"原始PDF文件数: {pdf_count}\n")
# 统计转换后文件
txt_files = []
for root, dirs_list, files in os.walk(dirs['temp_dir']):
for file in files:
if file.lower().endswith('.txt'):
txt_files.append(file)
f.write(f"转换后TXT文件数: {len(txt_files)}\n")
# 统计清洗后文件
cleaned_files = []
for root, dirs_list, files in os.walk(dirs['clean_dir']):
for file in files:
if file.lower().endswith('.txt'):
cleaned_files.append(file)
f.write(f"清洗后TXT文件数: {len(cleaned_files)}\n\n")
f.write("目录结构:\n")
for name, path in dirs.items():
f.write(f" {name}: {path}\n")
print(f"报告已生成: {report_file}")
def configure(self):
"""配置设置"""
print("\n" + "=" * 60)
print("配置设置")
print("=" * 60)
print("当前配置:")
print(json.dumps(self.config, ensure_ascii=False, indent=2))
print("\n1. 修改PDF转换配置")
print("2. 修改文本清洗配置")
print("3. 重置为默认配置")
print("4. 保存配置到文件")
print("5. 返回主菜单")
choice = input("\n请选择 (1-5): ").strip()
if choice == '1':
self.configure_pdf_conversion()
elif choice == '2':
self.configure_text_cleaning()
elif choice == '3':
self.config = self.load_config()
print("配置已重置为默认值")
elif choice == '4':
self.save_config()
else:
return
def configure_pdf_conversion(self):
"""配置PDF转换"""
print("\nPDF转换配置:")
print(f"1. 输入目录: {self.config['pdf_conversion']['input_dir']}")
print(f"2. 输出目录: {self.config['pdf_conversion']['output_dir']}")
print(f"3. poppler路径: {self.config['pdf_conversion']['poppler_path']}")
field = input("\n选择要修改的字段 (1-3, 或回车取消): ").strip()
if field == '1':
new_value = input(f"新的输入目录 [当前: {self.config['pdf_conversion']['input_dir']}]: ").strip()
if new_value:
self.config['pdf_conversion']['input_dir'] = new_value
elif field == '2':
new_value = input(f"新的输出目录 [当前: {self.config['pdf_conversion']['output_dir']}]: ").strip()
if new_value:
self.config['pdf_conversion']['output_dir'] = new_value
elif field == '3':
new_value = input(f"新的poppler路径 [当前: {self.config['pdf_conversion']['poppler_path']}]: ").strip()
if new_value:
self.config['pdf_conversion']['poppler_path'] = new_value
def configure_text_cleaning(self):
"""配置文本清洗"""
print("\n文本清洗配置:")
print(f"1. 清洗后目录: {self.config['text_cleaning']['cleaned_dir']}")
field = input("\n选择要修改的字段 (1, 或回车取消): ").strip()
if field == '1':
new_value = input(f"新的清洗后目录 [当前: {self.config['text_cleaning']['cleaned_dir']}]: ").strip()
if new_value:
self.config['text_cleaning']['cleaned_dir'] = new_value
def save_config(self):
"""保存配置到文件"""
config_file = "corpus_config.json"
try:
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
print(f"配置已保存到: {config_file}")
except Exception as e:
print(f"保存配置失败: {e}")
def show_help(self):
"""显示帮助信息"""
print("\n" + "=" * 60)
print("帮助信息")
print("=" * 60)
print("\n本工具用于处理流水句语料库,主要功能包括:")
print("1. PDF转TXT: 将PDF文件转换为纯文本")
print("2. 文本清洗: 清理文本中的杂质、规范标点、修复断行")
print("3. 完整流程: 自动完成PDF转TXT和文本清洗")
print("4. 质量检查: 检查转换和清洗的质量")
print("5. 批量处理: 自动化处理整个目录")
print("\n使用建议:")
print("1. 首次使用建议运行 '批量处理',自动创建目录结构")
print("2. 将PDF文件放入 '原始pdf' 目录")
print("3. 使用 '完整流程' 处理所有文件")
print("4. 使用 '质量检查' 验证处理结果")
print("\n依赖库:")
print(" pip install pdfplumber # PDF转换")
print(" 或安装Poppler工具 (可选)")
print("\n配置文件: corpus_config.json")
print("=" * 60)
def main():
"""主函数"""
try:
processor = CorpusProcessor()
processor.show_main_menu()
except KeyboardInterrupt:
print("\n\n程序被用户中断")
except Exception as e:
print(f"\n程序运行出错: {e}")
import traceback
traceback.print_exc()
input("\n按回车键退出...")
if __name__ == "__main__":
main()最后更新于