Coverage for geoarray/metadata.py: 94%
174 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-14 11:57 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-14 11:57 +0000
1# -*- coding: utf-8 -*-
3# geoarray, A fast Python interface for image geodata - either on disk or in memory.
4#
5# Copyright (C) 2017-2023
6# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
7# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam,
8# Germany (https://www.gfz-potsdam.de/)
9#
10# This software was developed within the context of the GeoMultiSens project funded
11# by the German Federal Ministry of Education and Research
12# (project grant code: 01 IS 14 010 A-C).
13#
14# Licensed under the Apache License, Version 2.0 (the "License");
15# you may not use this file except in compliance with the License.
16# You may obtain a copy of the License at
17#
18# http://www.apache.org/licenses/LICENSE-2.0
19#
20# Unless required by applicable law or agreed to in writing, software
21# distributed under the License is distributed on an "AS IS" BASIS,
22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23# See the License for the specific language governing permissions and
24# limitations under the License.
26import os
27from pprint import pformat
28from copy import deepcopy
29from typing import Union # noqa F401 # flake8 issue
30from collections import OrderedDict
32from pandas import DataFrame, Series
33import numpy as np
34from osgeo import gdal # noqa
37autohandled_meta = [
38 'bands',
39 'byte_order',
40 'coordinate_system_string',
41 'data_type',
42 'file_type',
43 'header_offset',
44 'interleave',
45 'lines',
46 'samples',
47]
50class GDAL_Metadata(object):
51 def __init__(self,
52 filePath: str = '',
53 nbands: int = 1,
54 nodata_allbands: Union[int, float] = None
55 ) -> None:
56 # privates
57 self._global_meta = OrderedDict()
58 self._band_meta = OrderedDict()
60 self.bands = nbands
61 self.filePath = filePath
62 self.fileFormat = ''
63 self.nodata_allbands = nodata_allbands
65 if filePath:
66 self.read_from_file(filePath)
68 if nodata_allbands is not None:
69 self.global_meta.update({'data_ignore_value': str(nodata_allbands)})
71 @classmethod
72 def from_file(cls, filePath: str) -> 'GDAL_Metadata':
73 return GDAL_Metadata(filePath=filePath)
75 def to_DataFrame(self) -> DataFrame:
76 df = DataFrame(columns=range(self.bands))
78 # add global meta
79 for k, v in self.global_meta.items():
80 df.loc[k] = Series(dict(zip(df.columns, [v] * len(df.columns))))
82 # add band meta
83 for k, v in self.band_meta.items():
84 df.loc[k] = Series(dict(zip(df.columns, v)))
86 return df
88 @property
89 def global_meta(self) -> OrderedDict:
90 return self._global_meta
92 @global_meta.setter
93 def global_meta(self, meta_dict: Union[dict, OrderedDict]):
94 if not isinstance(meta_dict, dict):
95 raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict))
97 self._global_meta = meta_dict # TODO convert strings to useful types
99 @property
100 def band_meta(self) -> OrderedDict:
101 return self._band_meta
103 @band_meta.setter
104 def band_meta(self, meta_dict: Union[dict, OrderedDict]):
105 if not isinstance(meta_dict, (dict, OrderedDict)):
106 raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict))
108 for k, v in meta_dict.items():
109 if not isinstance(v, list):
110 raise TypeError('The values of the given dictionary must be lists. Received %s for %s.' % (type(v), k))
111 if len(v) != self.bands:
112 raise ValueError("The length of the given lists must be equal to the number of bands. "
113 "Received a list with %d items for '%s'." % (len(v), k))
115 self._band_meta = OrderedDict(meta_dict) # TODO convert strings to useful types
117 @property
118 def all_meta(self) -> OrderedDict:
119 all_meta = OrderedDict(self.global_meta.copy())
120 all_meta.update(self.band_meta)
121 return all_meta
123 @staticmethod
124 def _convert_param_from_str(param_value: Union[int, float, str]) -> Union[int, float, str]:
125 try:
126 try:
127 return int(param_value) # NOTE: float('0.34') causes ValueError: invalid literal for int() with base 10
128 except ValueError:
129 return float(param_value)
130 except ValueError:
131 if param_value.startswith('{'):
132 param_value = param_value.split('{')[1]
133 if param_value.endswith('}'):
134 param_value = param_value.split('}')[0]
135 return param_value.strip()
137 def _convert_param_to_ENVI_str(self, param_value: Union[int, float, list, str, np.ndarray, np.integer, np.floating]
138 ) -> str:
139 if isinstance(param_value, (int, np.integer)):
140 return str(param_value)
142 elif isinstance(param_value, (float, np.floating)):
143 return '%f' % param_value
145 elif isinstance(param_value, list):
146 return '{ ' + ',\n'.join([self._convert_param_to_ENVI_str(i) for i in param_value]) + ' }'
148 elif isinstance(param_value, np.ndarray):
149 return self._convert_param_to_ENVI_str(param_value.tolist()) # noqa
151 else:
152 return str(param_value)
154 def read_from_file(self, filePath: str) -> OrderedDict:
155 assert ' ' not in filePath, "The given path contains whitespaces. This is not supported by GDAL."
157 if not os.path.exists(filePath) and \
158 not filePath.startswith('/vsi') and \
159 not filePath.startswith('HDF') and \
160 not filePath.startswith('NETCDF'):
161 raise FileNotFoundError(filePath)
163 ds = gdal.Open(filePath)
165 try:
166 if not ds:
167 raise Exception('Error reading file: ' + gdal.GetLastErrorMsg())
169 self.bands = ds.RasterCount
170 self.fileFormat = ds.GetDriver().GetDescription()
172 ###############
173 # ENVI format #
174 ###############
176 if self.fileFormat == 'ENVI':
177 metadict = ds.GetMetadata('ENVI')
179 for k, v in metadict.items():
181 if k not in autohandled_meta:
183 if len(v.split(',')) == self.bands:
184 # band meta parameter
185 item_list = [
186 item_str.split('{')[1].strip() if item_str.strip().startswith('{') else
187 item_str.split('}')[0].strip() if item_str.strip().endswith('}') else
188 item_str.strip() for item_str in v.split(',')]
190 self.band_meta[k] = \
191 [self._convert_param_from_str(item_str) for item_str in item_list] \
192 if k != 'band_names' else item_list
194 else:
195 # global meta parameter
196 self.global_meta[k] = self._convert_param_from_str(v)
198 #####################
199 # remaining formats #
200 #####################
202 else:
203 # read global domain metadata
204 self.global_meta = ds.GetMetadata()
206 # read band domain metadata
207 for b in range(self.bands):
208 band = ds.GetRasterBand(b + 1)
209 # meta_gs = GeoSeries(band.GetMetadata())
210 bandmeta_dict = band.GetMetadata()
212 # read bandname
213 bandname = band.GetDescription()
214 if bandname:
215 bandmeta_dict['band_names'] = bandname
217 # read nodata value
218 nodataVal = band.GetNoDataValue()
219 if nodataVal is not None:
220 bandmeta_dict['nodata'] = nodataVal
222 # read remaining meta
223 for k, v in bandmeta_dict.items():
224 if k not in self.band_meta:
225 self.band_meta[k] = []
227 self.band_meta[k].append(self._convert_param_from_str(v))
229 # # fill metadata
230 # self.df[b] = meta_gs
231 del band
233 finally:
234 del ds
236 return self.all_meta
238 def __repr__(self) -> str:
239 return 'Metadata: \n\n' + pformat(self.all_meta)
241 def to_ENVI_metadict(self) -> OrderedDict:
242 return OrderedDict(zip(self.all_meta.keys(),
243 [self._convert_param_to_ENVI_str(i) for i in self.all_meta.values()]))
245 def get_subset(self,
246 bands2extract: Union[slice, list, np.ndarray] = None,
247 keys2extract: Union[str, list] = None
248 ) -> 'GDAL_Metadata':
249 meta_sub = deepcopy(self)
251 # subset bands
252 if bands2extract is not None:
253 if isinstance(bands2extract, list):
254 if not len(set([type(i) for i in bands2extract])) == 1:
255 raise ValueError("List items of 'bands2extract' should all have the same type.")
256 if not isinstance(bands2extract[0], str):
257 bands2extract = np.array(bands2extract)
258 else:
259 if 'band_names' not in meta_sub.band_meta:
260 raise ValueError('String input is only supported if band names are set.')
261 bandnames = meta_sub.band_meta['band_names']
262 out = []
263 for b in bands2extract:
264 if b in bandnames:
265 out.append(bandnames.index(b))
266 else:
267 raise ValueError(f"'{b}' is not a valid band name.")
268 bands2extract = np.array(out)
269 elif isinstance(bands2extract, (np.ndarray, slice)):
270 pass # all fine
271 else:
272 raise TypeError(bands2extract)
274 meta_sub.band_meta = self.band_meta.copy()
276 for k, v in meta_sub.band_meta.items():
277 meta_sub.band_meta[k] = list(np.array(v)[bands2extract])
279 meta_sub.bands = len(list(range(*bands2extract.indices(bands2extract.stop)))) \
280 if isinstance(bands2extract, slice) else bands2extract.size
282 # subset metadata keys
283 if keys2extract:
284 if isinstance(keys2extract, list) and not list(set([type(i) for i in keys2extract])) == [str]:
285 raise ValueError("List items of 'keys2extract' should all be of type string.")
287 keys2extract = [keys2extract] if isinstance(keys2extract, str) else keys2extract
289 # global_meta = meta_sub.global_meta.copy()
290 for k in meta_sub.global_meta.copy().keys():
291 if k not in keys2extract:
292 del meta_sub.global_meta[k]
294 for k in meta_sub.band_meta.copy().keys():
295 if k not in keys2extract:
296 del meta_sub.band_meta[k]
298 if not meta_sub.all_meta:
299 raise ValueError(keys2extract, 'The given metadata keys do not exist.')
301 return meta_sub
303 def __getitem__(self, given: Union[int, slice, str, list, np.ndarray]) -> 'GDAL_Metadata':
304 if isinstance(given, (int, np.integer)):
305 return self.get_subset(bands2extract=slice(given, given + 1))
306 elif isinstance(given, slice):
307 return self.get_subset(bands2extract=given)
308 elif isinstance(given, str):
309 return self.get_subset(bands2extract=[given])
310 elif isinstance(given, list):
311 if isinstance(given[0], str):
312 return self.get_subset(bands2extract=given)
313 elif isinstance(given[0], (int, np.integer)):
314 return self.get_subset(bands2extract=given)
315 else:
316 raise TypeError(given, 'Given list must contain string or integer items.')
317 elif isinstance(given, np.ndarray):
318 if given.ndim != 1:
319 raise ValueError(given, 'Given numpy array must be one-dimensional.')
320 return self.get_subset(bands2extract=given)
321 else:
322 raise TypeError(given)