南宁市文章资讯

Python高效提取与自动化处理Word表格的完整指南

2026-04-08 23:45:02 浏览次数:0
详细信息
Python高效提取与自动化处理Word表格的完整指南

概述

Python提供了多种库来处理Word文档中的表格,本指南将全面介绍如何使用python-docxpandasopenpyxl等库高效提取和处理Word表格数据。

一、环境准备

1.1 安装必要库

pip install python-docx pandas openpyxl xlsxwriter python-pptx

1.2 导入库

from docx import Document
import pandas as pd
import re
from typing import List, Dict, Any
import json
import os

二、基础表格提取

2.1 读取Word文档中的表格

def extract_all_tables(doc_path: str) -> List[List[List[str]]]:
    """提取Word文档中的所有表格"""
    doc = Document(doc_path)
    tables_data = []

    for table in doc.tables:
        table_data = []
        for row in table.rows:
            row_data = []
            for cell in row.cells:
                # 清理单元格文本
                text = cell.text.strip()
                row_data.append(text)
            table_data.append(row_data)
        tables_data.append(table_data)

    return tables_data

# 使用示例
tables = extract_all_tables('document.docx')
print(f"找到 {len(tables)} 个表格")

2.2 提取特定表格

def extract_specific_table(doc_path: str, table_index: int = 0) -> List[List[str]]:
    """提取指定索引的表格"""
    doc = Document(doc_path)

    if table_index >= len(doc.tables):
        raise IndexError(f"表格索引 {table_index} 超出范围")

    table = doc.tables[table_index]
    return [[cell.text.strip() for cell in row.cells] for row in table.rows]

三、高级表格处理

3.1 处理合并单元格

def extract_tables_with_merged_cells(doc_path: str) -> List[Dict]:
    """
    提取表格并处理合并单元格
    返回包含表格数据和元信息的字典列表
    """
    doc = Document(doc_path)
    tables_info = []

    for table_idx, table in enumerate(doc.tables):
        # 获取表格维度
        rows = len(table.rows)
        cols = len(table.columns) if hasattr(table, 'columns') else len(table.rows[0].cells)

        # 创建二维数组存储数据
        table_data = [['' for _ in range(cols)] for _ in range(rows)]

        # 填充数据
        for i, row in enumerate(table.rows):
            for j, cell in enumerate(row.cells):
                if cell.text.strip():  # 只填充有内容的单元格
                    table_data[i][j] = cell.text.strip()

        tables_info.append({
            'index': table_idx,
            'rows': rows,
            'columns': cols,
            'data': table_data,
            'has_merged': check_merged_cells(table)
        })

    return tables_info

def check_merged_cells(table) -> bool:
    """检查表格是否有合并单元格"""
    # 检查是否有空白但应该被合并的单元格
    for row in table.rows:
        for cell in row.cells:
            if cell.text.strip() == '':
                # 这里可以添加更复杂的合并单元格检测逻辑
                return True
    return False

3.2 智能表格识别

def smart_table_extraction(doc_path: str, keywords: List[str] = None) -> Dict:
    """
    智能表格提取:
    1. 根据关键词定位表格
    2. 自动识别表头
    3. 结构化输出
    """
    doc = Document(doc_path)
    results = {'tables': [], 'metadata': {}}

    for table_idx, table in enumerate(doc.tables):
        table_data = []
        header_row = None

        # 尝试自动识别表头
        for i, row in enumerate(table.rows):
            row_data = [cell.text.strip() for cell in row.cells]

            # 判断是否为表头的启发式规则
            if i == 0 or is_likely_header(row_data):
                header_row = row_data
                continue

            table_data.append(row_data)

        # 如果找到关键词,检查表格是否相关
        is_relevant = True
        if keywords:
            is_relevant = any(
                any(keyword in cell for cell in header_row or [])
                for keyword in keywords
            )

        if is_relevant:
            results['tables'].append({
                'index': table_idx,
                'header': header_row,
                'data': table_data,
                'rows': len(table_data),
                'columns': len(header_row) if header_row else 0
            })

    return results

def is_likely_header(row_data: List[str]) -> bool:
    """判断行是否可能是表头"""
    if not row_data:
        return False

    # 表头通常包含描述性文字而非数字
    text_count = sum(1 for cell in row_data if cell and not cell.replace('.', '').replace(',', '').isdigit())
    return text_count / len(row_data) > 0.7

四、数据清洗与转换

4.1 表格数据清洗

def clean_table_data(table_data: List[List[str]]) -> List[List[str]]:
    """清理表格数据"""
    cleaned_data = []

    for row in table_data:
        cleaned_row = []
        for cell in row:
            # 移除多余空格和换行符
            cell = ' '.join(cell.split())
            # 移除特殊字符(可选)
            # cell = re.sub(r'[^\w\s.,-]', '', cell)
            cleaned_row.append(cell)

        # 跳过完全为空的行
        if any(cell.strip() for cell in cleaned_row):
            cleaned_data.append(cleaned_row)

    return cleaned_data

4.2 转换为Pandas DataFrame

def table_to_dataframe(table_data: List[List[str]], 
                      header_row: int = 0) -> pd.DataFrame:
    """将表格数据转换为Pandas DataFrame"""

    if not table_data:
        return pd.DataFrame()

    # 确保表头行有效
    if header_row >= len(table_data):
        header_row = 0

    # 提取表头和数据
    headers = table_data[header_row]
    data_rows = table_data[header_row + 1:] if header_row + 1 < len(table_data) else []

    # 创建DataFrame
    df = pd.DataFrame(data_rows, columns=headers)

    return df

4.3 批量处理多个表格

def batch_process_word_tables(doc_paths: List[str], 
                             output_format: str = 'excel') -> Dict:
    """
    批量处理多个Word文档中的表格

    Args:
        doc_paths: Word文档路径列表
        output_format: 输出格式 ('excel', 'csv', 'json')

    Returns:
        处理结果的字典
    """
    results = {}

    for doc_path in doc_paths:
        try:
            doc_name = os.path.basename(doc_path)
            print(f"处理文件: {doc_name}")

            # 提取所有表格
            tables = extract_all_tables(doc_path)

            # 清理和转换每个表格
            cleaned_tables = []
            for table in tables:
                cleaned = clean_table_data(table)
                if cleaned:  # 只保留非空表格
                    cleaned_tables.append(cleaned)

            results[doc_name] = {
                'total_tables': len(cleaned_tables),
                'tables': cleaned_tables
            }

            # 根据格式输出
            if output_format == 'excel':
                export_to_excel(cleaned_tables, f"{doc_name}_tables.xlsx")
            elif output_format == 'json':
                export_to_json(cleaned_tables, f"{doc_name}_tables.json")

        except Exception as e:
            print(f"处理文件 {doc_path} 时出错: {str(e)}")
            results[doc_path] = {'error': str(e)}

    return results

五、导出与格式转换

5.1 导出到Excel

def export_to_excel(tables: List[List[List[str]]], 
                   output_path: str,
                   sheet_names: List[str] = None):
    """将多个表格导出到Excel的不同工作表"""

    with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
        for i, table in enumerate(tables):
            if not table:
                continue

            # 生成工作表名称
            if sheet_names and i < len(sheet_names):
                sheet_name = sheet_names[i]
            else:
                sheet_name = f'Table_{i+1}'

            # 限制工作表名称长度
            sheet_name = sheet_name[:31]

            # 转换为DataFrame并导出
            df = table_to_dataframe(table)
            df.to_excel(writer, sheet_name=sheet_name, index=False)

            # 自动调整列宽
            worksheet = writer.sheets[sheet_name]
            for column in worksheet.columns:
                max_length = 0
                column_letter = column[0].column_letter
                for cell in column:
                    try:
                        if len(str(cell.value)) > max_length:
                            max_length = len(str(cell.value))
                    except:
                        pass
                adjusted_width = min(max_length + 2, 50)
                worksheet.column_dimensions[column_letter].width = adjusted_width

    print(f"已导出到: {output_path}")

5.2 导出为JSON

def export_to_json(tables: List[List[List[str]]], 
                  output_path: str,
                  include_metadata: bool = True):
    """将表格数据导出为JSON格式"""

    output_data = []

    for i, table in enumerate(tables):
        if not table:
            continue

        table_info = {
            'table_index': i,
            'row_count': len(table),
            'column_count': len(table[0]) if table else 0,
        }

        if include_metadata:
            table_info['data'] = table
        else:
            # 扁平化数据
            headers = table[0] if table else []
            rows = table[1:] if len(table) > 1 else []
            table_info['headers'] = headers
            table_info['rows'] = rows

        output_data.append(table_info)

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(output_data, f, ensure_ascii=False, indent=2)

    print(f"已导出到: {output_path}")

5.3 导出为CSV

def export_to_csv(tables: List[List[List[str]]], 
                 output_dir: str,
                 prefix: str = "table"):
    """将每个表格导出为单独的CSV文件"""

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for i, table in enumerate(tables):
        if not table:
            continue

        df = table_to_dataframe(table)
        output_path = os.path.join(output_dir, f"{prefix}_{i+1}.csv")
        df.to_csv(output_path, index=False, encoding='utf-8-sig')

        print(f"导出表格 {i+1}: {output_path}")

六、自动化工作流

6.1 完整的表格处理流水线

class WordTableProcessor:
    """Word表格处理器的完整实现"""

    def __init__(self, config: Dict = None):
        self.config = config or {
            'auto_detect_header': True,
            'clean_data': True,
            'remove_empty_rows': True,
            'output_format': 'excel'
        }

    def process_document(self, input_path: str, output_path: str = None) -> Dict:
        """处理单个文档"""

        # 1. 提取表格
        print("步骤1: 提取表格...")
        tables = extract_all_tables(input_path)

        # 2. 数据清洗
        print("步骤2: 数据清洗...")
        cleaned_tables = []
        for table in tables:
            if self.config['clean_data']:
                table = clean_table_data(table)
            if self.config['remove_empty_rows'] and table:
                cleaned_tables.append(table)

        # 3. 智能处理(自动识别表头等)
        print("步骤3: 智能处理...")
        processed_tables = []
        for table in cleaned_tables:
            if self.config['auto_detect_header'] and table:
                # 尝试识别表头
                processed = self._auto_process_table(table)
                processed_tables.append(processed)
            else:
                processed_tables.append(table)

        # 4. 导出结果
        print("步骤4: 导出结果...")
        if output_path:
            if self.config['output_format'] == 'excel':
                export_to_excel(processed_tables, output_path)
            elif self.config['output_format'] == 'json':
                export_to_json(processed_tables, output_path)

        return {
            'input_file': input_path,
            'tables_found': len(tables),
            'tables_processed': len(processed_tables),
            'output_file': output_path
        }

    def _auto_process_table(self, table_data: List[List[str]]) -> List[List[str]]:
        """自动表格处理逻辑"""
        # 这里可以实现更复杂的自动处理逻辑
        return table_data

    def batch_process(self, input_dir: str, output_dir: str, pattern: str = "*.docx"):
        """批量处理目录中的所有Word文档"""

        import glob

        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        results = {}
        word_files = glob.glob(os.path.join(input_dir, pattern))

        for word_file in word_files:
            try:
                output_file = os.path.join(
                    output_dir, 
                    f"{os.path.splitext(os.path.basename(word_file))[0]}_tables.xlsx"
                )

                result = self.process_document(word_file, output_file)
                results[word_file] = result

                print(f"✓ 完成: {word_file}")

            except Exception as e:
                print(f"✗ 错误处理 {word_file}: {str(e)}")
                results[word_file] = {'error': str(e)}

        return results

6.2 监控文件夹自动处理

import time
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class WordTableWatcher(FileSystemEventHandler):
    """监控文件夹并自动处理新增的Word文档"""

    def __init__(self, processor: WordTableProcessor, output_dir: str):
        self.processor = processor
        self.output_dir = output_dir
        self.processed_files = set()

    def on_created(self, event):
        if not event.is_directory and event.src_path.endswith('.docx'):
            self._process_file(event.src_path)

    def on_modified(self, event):
        if not event.is_directory and event.src_path.endswith('.docx'):
            self._process_file(event.src_path)

    def _process_file(self, file_path):
        # 防止重复处理
        file_hash = self._get_file_hash(file_path)
        if file_hash in self.processed_files:
            return

        try:
            print(f"检测到新文件: {file_path}")
            self.processor.process_document(file_path, self.output_dir)
            self.processed_files.add(file_hash)
            print(f"处理完成: {file_path}")
        except Exception as e:
            print(f"处理失败: {file_path}, 错误: {str(e)}")

    def _get_file_hash(self, file_path):
        """获取文件哈希值,用于去重"""
        with open(file_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()

# 使用示例
def start_folder_monitoring(input_folder: str, output_folder: str):
    processor = WordTableProcessor()
    event_handler = WordTableWatcher(processor, output_folder)

    observer = Observer()
    observer.schedule(event_handler, input_folder, recursive=False)
    observer.start()

    print(f"开始监控文件夹: {input_folder}")

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

七、最佳实践与优化建议

7.1 性能优化

def optimized_table_extraction(doc_path: str, max_rows: int = 1000) -> List:
    """优化的大表格提取"""

    doc = Document(doc_path)
    tables_data = []

    for table in doc.tables:
        # 使用列表推导式提高性能
        table_data = [
            [cell.text.strip() for cell in row.cells]
            for row in table.rows[:max_rows]  # 限制行数
        ]
        tables_data.append(table_data)

    return tables_data

7.2 错误处理

def safe_table_extraction(doc_path: str, fallback_encoding: str = 'utf-8') -> Dict:
    """带错误处理的表格提取"""

    result = {
        'success': False,
        'tables': [],
        'error': None,
        'warnings': []
    }

    try:
        # 尝试打开文档
        doc = Document(doc_path)

        for i, table in enumerate(doc.tables):
            try:
                table_data = []
                for row in table.rows:
                    row_data = []
                    for cell in row.cells:
                        text = cell.text.strip()
                        # 检查编码问题
                        if any(ord(char) > 127 for char in text):
                            result['warnings'].append(f"表{i}行{row}包含非ASCII字符")
                        row_data.append(text)
                    table_data.append(row_data)

                result['tables'].append(table_data)

            except Exception as e:
                result['warnings'].append(f"处理表{i}时出错: {str(e)}")
                continue

        result['success'] = True

    except Exception as e:
        result['error'] = str(e)

    return result

7.3 配置文件

{
  "extraction": {
    "auto_detect_header": true,
    "clean_whitespace": true,
    "remove_empty_rows": true,
    "max_tables": 50
  },
  "output": {
    "format": "excel",
    "include_metadata": true,
    "auto_adjust_columns": true
  },
  "processing": {
    "parallel_processing": false,
    "chunk_size": 1000
  }
}

八、完整示例

8.1 完整的命令行工具

import argparse
import sys

def main():
    parser = argparse.ArgumentParser(description='Word表格提取工具')
    parser.add_argument('input', help='输入文件或文件夹路径')
    parser.add_argument('-o', '--output', help='输出路径', default='./output')
    parser.add_argument('-f', '--format', choices=['excel', 'json', 'csv'], 
                       default='excel', help='输出格式')
    parser.add_argument('-b', '--batch', action='store_true', 
                       help='批量处理模式')

    args = parser.parse_args()

    processor = WordTableProcessor({
        'output_format': args.format
    })

    if args.batch and os.path.isdir(args.input):
        print(f"批量处理模式: {args.input}")
        results = processor.batch_process(args.input, args.output)
        print(f"处理完成: {len(results)} 个文件")
    else:
        print(f"处理单个文件: {args.input}")
        result = processor.process_document(args.input, args.output)
        print(f"提取了 {result['tables_processed']} 个表格")

if __name__ == "__main__":
    main()

8.2 使用示例

# 示例1: 基本使用
processor = WordTableProcessor()
result = processor.process_document(
    input_path="报告.docx",
    output_path="报告_表格.xlsx"
)

# 示例2: 批量处理
processor.batch_process(
    input_dir="./word_documents",
    output_dir="./extracted_tables"
)

# 示例3: 自定义配置
config = {
    'auto_detect_header': True,
    'output_format': 'json',
    'clean_data': True
}
processor = WordTableProcessor(config)

总结

本文提供了完整的Word表格提取与自动化处理方案,包括:

基础提取:使用python-docx提取表格数据 高级处理:处理合并单元格、智能识别表头 数据清洗:清理和转换表格数据 格式导出:支持Excel、JSON、CSV等多种格式 自动化工作流:批量处理和文件夹监控 最佳实践:性能优化和错误处理

这些技术可以应用于文档自动化、数据提取、报告生成等多种场景,大大提高了处理Word表格的效率和准确性。

相关推荐