Coverage for py_tools_ds/compression/decompress.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# py_tools_ds - A collection of geospatial data analysis tools that simplify standard
4# operations when handling geospatial raster and vector data as well as projections.
5#
6# Copyright (C) 2016-2021
7# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
8# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam,
9# Germany (https://www.gfz-potsdam.de/)
10#
11# This software was developed within the context of the GeoMultiSens project funded
12# by the German Federal Ministry of Education and Research
13# (project grant code: 01 IS 14 010 A-C).
14#
15# Licensed under the Apache License, Version 2.0 (the "License");
16# you may not use this file except in compliance with the License.
17# You may obtain a copy of the License at
18#
19# http://www.apache.org/licenses/LICENSE-2.0
20#
21# Unless required by applicable law or agreed to in writing, software
22# distributed under the License is distributed on an "AS IS" BASIS,
23# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
24# See the License for the specific language governing permissions and
25# limitations under the License.
27import os
28import zipfile
29import tarfile
30import gzip
31from logging import getLogger
32import shutil
34__author__ = 'Daniel Scheffler'
37def decompress(compressed_file, outputpath=None, logger=getLogger('decompressor')):
38 """Decompress ZIP, TAR, TAR.GZ, TGZ and GZ archives to a given output path.
40 :param compressed_file:
41 :param outputpath:
42 :param logger: instance of logging.Logger
43 """
44 # define output folder and filename
45 in_folder, in_filename = os.path.split(compressed_file)
46 out_folder, out_filename = os.path.split(outputpath) if outputpath else ('', '')
47 out_filename = out_filename or in_filename.partition(".")[0]
48 out_folder = out_folder or in_folder
49 outputpath = os.path.join(out_folder, out_filename)
51 # decompress
52 logger.info('Extracting ' + in_filename + '...')
54 if not os.path.isdir(out_folder):
55 os.makedirs(out_folder)
57 if compressed_file.endswith(".zip"):
58 assert zipfile.is_zipfile(compressed_file), \
59 logger.critical(compressed_file + " is not a valid zipfile!")
60 zf = zipfile.ZipFile(compressed_file)
61 names = zf.namelist()
62 count_extracted = 0
63 for n in names:
64 if os.path.exists(os.path.join(outputpath, n)) and \
65 zipfile.ZipFile.getinfo(zf, n).file_size == os.stat(os.path.join(outputpath, n)).st_size:
66 logger.warning("file '%s' from '%s' already exists in the directory: '%s'"
67 % (n, in_filename, outputpath))
68 else:
69 written = 0
70 while written == 0:
71 try:
72 zf.extract(n, outputpath)
73 logger.info("Extracting %s..." % n)
74 count_extracted += 1
75 written = 1
76 except OSError as e:
77 if e.errno == 28:
78 print('No space left on device. Waiting..')
79 else:
80 raise
81 if count_extracted == 0:
82 logger.warning("No files of %s have been decompressed.\n" % in_filename)
83 else:
84 logger.info("Extraction of '" + in_filename + " was successful\n")
85 zf.close()
87 elif compressed_file.endswith((".tar", ".tar.gz", ".tgz")):
88 tf = tarfile.open(compressed_file)
89 names, members = tf.getnames(), tf.getmembers()
90 count_extracted = 0
91 for n, m in zip(names, members):
92 if os.path.exists(os.path.join(outputpath, n)) and \
93 m.size == os.stat(os.path.join(outputpath, n)).st_size:
94 logger.warning("file '%s' from '%s' already exists in the directory: '%s'"
95 % (n, in_filename, outputpath))
96 else:
97 written = 0
98 while written == 0:
99 try:
100 tf.extract(n, outputpath)
101 logger.info("Extracting %s..." % n)
102 count_extracted += 1
103 written = 1
104 except OSError as e:
105 if e.errno == 28:
106 print('No space left on device. Waiting..')
107 else:
108 raise
109 if count_extracted == 0:
110 logger.warning("No files of %s have been decompressed.\n" % in_filename)
111 else:
112 logger.info("Extraction of '" + in_filename + " was successful\n")
113 tf.close()
115 elif compressed_file.endswith(".gz"):
116 with gzip.open(compressed_file, 'rb') as f_in:
117 with open(outputpath, 'wb') as f_out:
118 shutil.copyfileobj(f_in, f_out)
120 else:
121 raise ValueError('Unexpected file extension of compressed file. Supported file extensions are: '
122 '*.zip, *.tar and *.tgz')