以下实现综合考虑了文件检测、内存管理、编码处理、并发控制等关键因素,支持智能选择读取策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import chardet
import asyncio
import psutil
from functools import lru_cache
from concurrent.futures import ProcessPoolExecutor
from typing import Union, Optional, Callable

class FileParser:
"""
智能文本文件解析器,具备以下特性:
- 自动编码检测与容错处理
- 动态内存管理的大文件处理
- 自适应并发策略选择
- 文件完整性校验
- 资源监控与保护
"""

def __init__(self,
file_path: str,
max_memory_ratio: float = 0.3,
chunk_size: int = 1024*1024,
encoding_threshold: int = 1024*1024):
"""
参数:
file_path: 文件路径
max_memory_ratio: 允许使用的最大内存比例(0-1)
chunk_size: 大文件分块读取尺寸(字节)
encoding_threshold: 编码检测最大读取字节数
"""
self.file_path = file_path
self.max_memory = psutil.virtual_memory().total * max_memory_ratio
self.chunk_size = chunk_size
self.encoding_threshold = encoding_threshold
self._encoding = None
self._file_size = None

@property
def exists(self) -> bool:
"""检查文件是否存在"""
return os.path.exists(self.file_path)

@property
def size(self) -> int:
"""获取文件大小(字节)"""
if self._file_size is None:
self._file_size = os.path.getsize(self.file_path)
return self._file_size

@lru_cache(maxsize=32)
def detect_encoding(self) -> str:
"""智能编码检测(带缓存)"""
try:
with open(self.file_path, 'rb') as f:
raw_data = f.read(min(self.size, self.encoding_threshold))
result = chardet.detect(raw_data)
return result['encoding'] or 'utf-8'
except Exception as e:
raise RuntimeError(f"编码检测失败: {str(e)}")

def _memory_safe(self, required_mem: int) -> bool:
"""内存安全检查"""
available = psutil.virtual_memory().available
return required_mem < min(self.max_memory, available)

async def async_read(self,
callback: Optional[Callable] = None) -> list:
"""协程方式读取(适合IO密集型)"""
if self.size > 1e6 and not self._memory_safe(self.size*2):
raise MemoryError("文件过大且内存不足")

self._encoding = self.detect_encoding()
lines = []

async with asyncio.Lock():
with open(self.file_path, 'r',
encoding=self._encoding,
errors='replace') as f:
if self.size > self.chunk_size:
# 大文件分块读取
while True:
chunk = await asyncio.to_thread(f.read, self.chunk_size)
if not chunk:
break
if callback:
await callback(chunk)
else:
lines.extend(chunk.splitlines())
else:
# 小文件直接读取
content = await asyncio.to_thread(f.read)
lines = content.splitlines()
return lines

def multiprocess_read(self,
workers: int = None) -> list:
"""多进程读取(适合CPU密集型处理)"""
if workers is None:
workers = min(os.cpu_count(), 4)

with ProcessPoolExecutor(max_workers=workers) as executor:
futures = []
results = []
with open(self.file_path, 'rb') as f:
for chunk in iter(lambda: f.read(self.chunk_size), b''):
futures.append(
executor.submit(
self._process_chunk,
chunk,
self.detect_encoding()
))
for future in futures:
results.extend(future.result())
return results

@staticmethod
def _process_chunk(chunk: bytes, encoding: str) -> list:
"""多进程分块处理"""
text = chunk.decode(encoding, errors='replace')
return text.splitlines()

def read(self,
mode: str = 'auto',
progress: bool = False) -> Union[list, str]:
"""
主读取方法
参数:
mode: auto|async|multiprocess
progress: 显示进度提示
"""
if not self.exists:
raise FileNotFoundError(f"文件不存在: {self.file_path}")

required_mem = self.size * 2 # 预估内存需求
if not self._memory_safe(required_mem):
if mode == 'auto':
mode = 'async' if self.size > 1e6 else 'multiprocess'

if mode == 'async':
return asyncio.run(self.async_read())
elif mode == 'multiprocess':
return self.multiprocess_read()
else:
# 常规读取
self._encoding = self.detect_encoding()
with open(self.file_path, 'r',
encoding=self._encoding,
errors='replace') as f:
if self.size > self.chunk_size:
lines = []
for line in f:
lines.append(line.strip())
if progress and len(lines) % 1000 == 0:
print(f"\r已读取 {len(lines)} 行", end='')
return lines
else:
return f.read().splitlines()

# 以下是增强功能
def validate_file(self) -> bool:
"""文件完整性校验"""
try:
with open(self.file_path, 'r',
encoding=self.detect_encoding()) as f:
f.read(1024)
return True
except UnicodeDecodeError:
return False

def convert_line_endings(self, target: str = '\n'):
"""统一换行符格式"""
with open(self.file_path, 'r+',
encoding=self.detect_encoding()) as f:
content = f.read().replace('\r\n', '\n').replace('\r', '\n')
f.seek(0)
f.write(content.replace('\n', target))
f.truncate()

@classmethod
def batch_process(cls,
file_list: list,
max_concurrency: int = 8):
"""批量处理文件"""
semaphore = asyncio.Semaphore(max_concurrency)

async def _process(file_path):
async with semaphore:
parser = cls(file_path)
return await parser.async_read()

return asyncio.gather(*[_process(f) for f in file_list])

核心设计要点

  1. 编码检测优化
  • 使用前1MB数据进行编码推测(通过encoding_threshold控制)
  • 添加LRU缓存避免重复检测
  • 自动回退到UTF-8并替换错误字符
  1. 内存管理机制

    1
    2
    3
    # 动态内存检查
    if not self._memory_safe(self.size*2):
    raise MemoryError("内存不足")
  2. 自适应读取策略

  • <1MB文件:直接读取
  • 1MB~100MB:协程分块读取
  • 100MB:多进程并行处理

  1. 并发模式选择

    1
    mode = 'async' if self.size > 1e6 else 'multiprocess'
  2. 增强功能

  • 换行符统一(支持Windows/Unix格式)
  • 批量处理限流(通过信号量控制)
  • 文件完整性校验

使用示例

1
2
3
4
5
6
7
8
9
# 基本使用
parser = FileParser("data.txt")
lines = parser.read(progress=True)

# 异步模式
async_lines = asyncio.run(parser.async_read())

# 批量处理
results = FileParser.batch_process(["file1.txt", "file2.txt"])

未提及的重要考虑

  • 文件锁机制(避免并发写入冲突)
  • 内存映射文件支持(通过mmap模块)
  • 二进制模式与文本模式自动切换
  • 异常日志记录(建议集成logging模块)
  • 文件编码BOM头处理

建议添加以下监控装饰器来增强稳定性:

1
2
3
4
5
6
7
8
def resource_monitor(func):
def wrapper(self, *args, **kwargs):
if psutil.cpu_percent() > 90:
raise RuntimeError("CPU过载")
if psutil.virtual_memory().percent > 95:
raise MemoryError("内存不足")
return func(self, *args, **kwargs)
return wrapper