1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
   | import os import chardet import asyncio import psutil from functools import lru_cache from concurrent.futures import ProcessPoolExecutor from typing import Union, Optional, Callable
  class FileParser:     """     智能文本文件解析器,具备以下特性:     - 自动编码检测与容错处理     - 动态内存管理的大文件处理     - 自适应并发策略选择     - 文件完整性校验     - 资源监控与保护     """          def __init__(self,                   file_path: str,                   max_memory_ratio: float = 0.3,                  chunk_size: int = 1024*1024,                  encoding_threshold: int = 1024*1024):         """         参数:         file_path: 文件路径         max_memory_ratio: 允许使用的最大内存比例(0-1)         chunk_size: 大文件分块读取尺寸(字节)         encoding_threshold: 编码检测最大读取字节数         """         self.file_path = file_path         self.max_memory = psutil.virtual_memory().total * max_memory_ratio         self.chunk_size = chunk_size         self.encoding_threshold = encoding_threshold         self._encoding = None         self._file_size = None
      @property     def exists(self) -> bool:         """检查文件是否存在"""         return os.path.exists(self.file_path)
      @property     def size(self) -> int:         """获取文件大小(字节)"""         if self._file_size is None:             self._file_size = os.path.getsize(self.file_path)         return self._file_size
      @lru_cache(maxsize=32)     def detect_encoding(self) -> str:         """智能编码检测(带缓存)"""         try:             with open(self.file_path, 'rb') as f:                 raw_data = f.read(min(self.size, self.encoding_threshold))                 result = chardet.detect(raw_data)                 return result['encoding'] or 'utf-8'         except Exception as e:             raise RuntimeError(f"编码检测失败: {str(e)}")
      def _memory_safe(self, required_mem: int) -> bool:         """内存安全检查"""         available = psutil.virtual_memory().available         return required_mem < min(self.max_memory, available)
      async def async_read(self,                          callback: Optional[Callable] = None) -> list:         """协程方式读取(适合IO密集型)"""         if self.size > 1e6 and not self._memory_safe(self.size*2):             raise MemoryError("文件过大且内存不足")
          self._encoding = self.detect_encoding()         lines = []                  async with asyncio.Lock():             with open(self.file_path, 'r',                       encoding=self._encoding,                       errors='replace') as f:                 if self.size > self.chunk_size:                                          while True:                         chunk = await asyncio.to_thread(f.read, self.chunk_size)                         if not chunk:                             break                         if callback:                             await callback(chunk)                         else:                             lines.extend(chunk.splitlines())                 else:                                          content = await asyncio.to_thread(f.read)                     lines = content.splitlines()         return lines
      def multiprocess_read(self,                           workers: int = None) -> list:         """多进程读取(适合CPU密集型处理)"""         if workers is None:             workers = min(os.cpu_count(), 4)                      with ProcessPoolExecutor(max_workers=workers) as executor:             futures = []             results = []             with open(self.file_path, 'rb') as f:                 for chunk in iter(lambda: f.read(self.chunk_size), b''):                     futures.append(                         executor.submit(                             self._process_chunk,                             chunk,                             self.detect_encoding()                         ))             for future in futures:                 results.extend(future.result())             return results
      @staticmethod     def _process_chunk(chunk: bytes, encoding: str) -> list:         """多进程分块处理"""         text = chunk.decode(encoding, errors='replace')         return text.splitlines()
      def read(self,              mode: str = 'auto',             progress: bool = False) -> Union[list, str]:         """         主读取方法         参数:         mode: auto|async|multiprocess         progress: 显示进度提示         """         if not self.exists:             raise FileNotFoundError(f"文件不存在: {self.file_path}")
          required_mem = self.size * 2           if not self._memory_safe(required_mem):             if mode == 'auto':                 mode = 'async' if self.size > 1e6 else 'multiprocess'                          if mode == 'async':             return asyncio.run(self.async_read())         elif mode == 'multiprocess':             return self.multiprocess_read()         else:                          self._encoding = self.detect_encoding()             with open(self.file_path, 'r',                       encoding=self._encoding,                      errors='replace') as f:                 if self.size > self.chunk_size:                     lines = []                     for line in f:                         lines.append(line.strip())                         if progress and len(lines) % 1000 == 0:                             print(f"\r已读取 {len(lines)} 行", end='')                     return lines                 else:                     return f.read().splitlines()
           def validate_file(self) -> bool:         """文件完整性校验"""         try:             with open(self.file_path, 'r',                       encoding=self.detect_encoding()) as f:                 f.read(1024)             return True         except UnicodeDecodeError:             return False
      def convert_line_endings(self, target: str = '\n'):         """统一换行符格式"""         with open(self.file_path, 'r+',                   encoding=self.detect_encoding()) as f:             content = f.read().replace('\r\n', '\n').replace('\r', '\n')             f.seek(0)             f.write(content.replace('\n', target))             f.truncate()
      @classmethod     def batch_process(cls,                       file_list: list,                       max_concurrency: int = 8):         """批量处理文件"""         semaphore = asyncio.Semaphore(max_concurrency)                  async def _process(file_path):             async with semaphore:                 parser = cls(file_path)                 return await parser.async_read()                          return asyncio.gather(*[_process(f) for f in file_list])
   |