Python 解压 UNIX/Linux 压缩文件的最快方式

IGS 的数据和产品通常用 compressgzip 程序压缩,以 .Z 或 .gz 的压缩文件形式提供。这是 UNIX/Linux 操作系统常用的压缩文件格式。用户在下载之后需首先使用软件对其进行解压。本文以实验方式探讨一个 Python 编程中的问题:如何最快的解压缩 .Z 或 .gz 文件?并且,代码最好是容易部署和移植的。

主要测试了通过标准库/扩展库和调用 gzip 程序的解压方法,以及各自的线程池和异步 I/O 版本。

测试数据

武汉大学的 FTP 服务器是离国内用户最近的 IGS 数据中心。为了对各种方法进行测试,我从其 FTP 服务器下载了 2018 年 12 月 11 日全部 IGS 站的观测数据。数据格式为压缩后的 GSI Compact RINEX,扩展名有 .Z 和 .gz 两种,786 个文件,1.5 GB。

标准库/扩展库

为了编写易迁移和部署的代码,使用 Python 标准库是最保险的选择。但遗憾的是,Python 标准库目前只能处理 .gz 文件,还不能解压 .Z 文件。一个名为 unlzw 的第三方库可以处理 .Z 文件。安装这个扩展库很容易,在终端中执行:

1
$ pip install unlzw

gzip 被包含在 Python 标准库,其中的 decompress() 函数和 unlzw 中的 unlzw() 函数,分别能够解压 .gz 和 .Z 文件。一个使用这两个包的解压函数如下:

1
2
3
4
5
6
def decompress_pk(src, dst):
"""Decompress function using gzip & unlzw packages."""
uc_func = unlzw.unlzw if src.endswith('.Z') else gzip.decompress
with open(src, 'rb') as sf, open(dst, 'wb') as df:
buffer = sf.read()
df.write(uc_func(buffer))

循环地调用上面的 decompress_pk 函数,对所有的测试文件依次进行解压。在我的笔记本电脑上,此过程一共耗费了 39 秒。有趣的是,标准库的 gzip 包在处理测试文件时竟然出现了异常,导致 3 个文件解压失败。使用 7zip 软件再对其解压,发现其扩展名虽然是 .gz,但实际上是以 compress 程序压缩的(扩展名应为 .Z)。这 3 个文件分别是 ZIM2、ZIM3 和 ZIMM 的 RINEX 3 观测数据。

调用 gzip 程序

还有一种在各平台通用的方法:调用外部的解压缩程序。目前在 UNIX/Linux 和 Windows 上都可用的解压缩程序是 gzip。该程序已经被大多数的 Linux 发行版预装,但如果要在 Windows 操作系统上使用 gzip,则需要安装。你可以从 GZIP for Windows 的相关页面下载预编译好的程序包。然后将其中 bin/ 文件夹下的可执行文件 gzip.exe 移动到系统可搜索到的目录中,如 “C:\Windows\System32”。

Python 标准库的 subprocess 包可用来执行外部的命令行程序。有些较老的博客可能也会介绍 os.system(),但我不建议使用它:该函数功能过于简单并且将被废弃。

gzip 程序解压 .Z 文件和 .gz 文件的命令是一样的,使用方式为:

1
$ gzip -d [filename]

因此,一个调用 gzip 的 Python 函数为:

1
2
3
4
5
6
7
def decompress_sp(src, dst):
"""Decompress function using gzip CLI program."""
gzip_args = ' '.join(['gzip', '-d', '-c', src])
with open(dst, 'wb') as df:
status = subprocess.call(gzip_args, stdout=df, stderr=subprocess.DEVNULL)
if status:
raise(OSError('Not a .gz file.'))

该函数中为 gzip 程序添加了 -c 参数,指示将解压后的内容输出到标准输出。循环调用上面的 decompress_sp 函数,对所有测试文件依次进行解压。在我的笔记本上,共花费了 63 秒。但该方法对所有的测试文件都解压成功了,这说明:相对于 Python 的标准库/扩展库,gzip 程序的容错性更好。

启用线程池

在线程池调用标准库/扩展库的解压代码

现在试试如何加速以上的代码。我首先想到的是使用 concurent.futures 包提供的线程池,将解压工作并发执行。但在并发执行之前,需要先改写之前的 decompress_pk 函数,以便在线程内部处理异常:

1
2
3
4
5
6
7
8
9
def decompress_pk_cc(src, dst):
"""Decompress function using gzip & unlzw, handle exception."""
uc_func = unlzw.unlzw if src.endswith('.Z') else gzip.decompress
try:
with open(src, 'rb') as sf, open(dst, 'wb') as df:
buffer = sf.read()
df.write(uc_func(buffer))
except OSError:
return src

然后并发调用以上函数,代码为:

1
2
3
4
5
6
7
8
9
10
11
12
>>> error_files = []
>>> with futures.ThreadPoolExecutor(max_workers=6) as executor:
... todo_list = [executor.submit(decompress_pk_cc, src, dst)
... for src, dst in zip(sources, destinations)]
... task_iter = futures.as_completed(todo_list)
... for future in tqdm.tqdm(task_iter, ascii=True, total=786):
... result = future.result()
... if result:
... error_files.append(result)
...
>>> if error_files:
... print('Decompress error files:', ', '.join(error_files))

这里的 sourcesdestinations 分别是输入和输出文件的路径,tqdm.tqdm() 用于在程序执行时显示进度条。这段代码在我的电脑上共运行了 17 秒,相比单线程方式提升了约 56%。

在线程池中调用 gzip 程序

那么如果在 Python 的线程池中调用 gzip 程序呢?我使用与上文相同的代码调用了 decompress_sp 函数(只需要将 decompress_pk_cc 替换为 decompress_sp)。结果发现:并发调用 gzip 程序处理所有测试文件共耗费了 25 秒,相比其单线程版本提升了约 60%。

异步 I/O

Python 中还有一种并发编程的方法:异步 I/O。该方法主要由标准库中的 asyncio 包来提供。对于 Windows 操作系统的 Python,其默认事件循环不支持外部程序调用,因此这里只实现了标准库/扩展库解压缩函数的异步 I/O 版本。异步文件读写使用 aiofiles 包,安装该包可使用如下命令:

1
$ pip install aiofiles

再次改写 decompress_pk 函数,将它变成协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
semaphore = asyncio.Semaphore(6)

async def decompress_pk_aio(src, dst):
"""Decompress function using gzip & unlzw, coroutine version."""
uc_func = unlzw.unlzw if src.endswith('.Z') else gzip.decompress
async with semaphore:
try:
async with aiofiles.open(src, 'rb') as sf,\
aiofiles.open(dst, 'wb') as df:
buffer = await sf.read()
await df.write(uc_func(buffer))
except OSError:
return src

然后使用 Python 默认的事件循环执行它:

1
2
3
4
5
6
7
8
9
10
11
>>> loop = asyncio.get_event_loop()
>>> tasks = [decompress_pk_aio(src, dst)
... for src, dst in zip(sources, destinations)]
>>> error_files = []
>>> for future in tqdm.tqdm(asyncio.as_completed(tasks), ascii=True, total=786):
... result = await future
... if result:
... error_files.append(result)
...
>>> if error_files:
... print('Decompress error files:', ', '.join(error_files))

在我的笔记本上,执行以上代码共耗费 30 秒,相比单线程版本,提升了约 23%。

结论

尽管本文没有为调用 gzip 程序的函数实现异步 I/O,但可以推测:其异步 I/O 版本应快于单线程版,慢于线程池版。据此可以得出以下结论:

  1. 当追求执行速度时,应该选择使用标准库/扩展库的解压方法;
  2. 当追求解压的容错性时,应该选择调用 gzip 程序的解压方法;
  3. 如果要使用并行编程对代码进行加速,应该选择使用线程池;
  4. 异步 I/O 对解压代码加速有限,应作为最后备选方案,除非你正在学习 asyncio

补充

unlzw 应用到大文件解压时,我发现该方法似乎有轻微的内存泄漏。如果持续不断的对大量压缩文件解压,该方法可能会不断吃进内存,最终导致无内存可用。因此在生产环境下,建议还是使用调用 gzip 程序的版本。

为了便于你重新进行实验,我上传了两个 iPython Notebook 文件:ftp_downloader.ipynbuncompress.ipynb,分别包含下载测试数据和解压缩实验的代码。