Coverage for geoarray/metadata.py: 94%

174 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-14 11:57 +0000

1# -*- coding: utf-8 -*- 

2 

3# geoarray, A fast Python interface for image geodata - either on disk or in memory. 

4# 

5# Copyright (C) 2017-2023 

6# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

7# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam, 

8# Germany (https://www.gfz-potsdam.de/) 

9# 

10# This software was developed within the context of the GeoMultiSens project funded 

11# by the German Federal Ministry of Education and Research 

12# (project grant code: 01 IS 14 010 A-C). 

13# 

14# Licensed under the Apache License, Version 2.0 (the "License"); 

15# you may not use this file except in compliance with the License. 

16# You may obtain a copy of the License at 

17# 

18# http://www.apache.org/licenses/LICENSE-2.0 

19# 

20# Unless required by applicable law or agreed to in writing, software 

21# distributed under the License is distributed on an "AS IS" BASIS, 

22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

23# See the License for the specific language governing permissions and 

24# limitations under the License. 

25 

26import os 

27from pprint import pformat 

28from copy import deepcopy 

29from typing import Union # noqa F401 # flake8 issue 

30from collections import OrderedDict 

31 

32from pandas import DataFrame, Series 

33import numpy as np 

34from osgeo import gdal # noqa 

35 

36 

37autohandled_meta = [ 

38 'bands', 

39 'byte_order', 

40 'coordinate_system_string', 

41 'data_type', 

42 'file_type', 

43 'header_offset', 

44 'interleave', 

45 'lines', 

46 'samples', 

47] 

48 

49 

50class GDAL_Metadata(object): 

51 def __init__(self, 

52 filePath: str = '', 

53 nbands: int = 1, 

54 nodata_allbands: Union[int, float] = None 

55 ) -> None: 

56 # privates 

57 self._global_meta = OrderedDict() 

58 self._band_meta = OrderedDict() 

59 

60 self.bands = nbands 

61 self.filePath = filePath 

62 self.fileFormat = '' 

63 self.nodata_allbands = nodata_allbands 

64 

65 if filePath: 

66 self.read_from_file(filePath) 

67 

68 if nodata_allbands is not None: 

69 self.global_meta.update({'data_ignore_value': str(nodata_allbands)}) 

70 

71 @classmethod 

72 def from_file(cls, filePath: str) -> 'GDAL_Metadata': 

73 return GDAL_Metadata(filePath=filePath) 

74 

75 def to_DataFrame(self) -> DataFrame: 

76 df = DataFrame(columns=range(self.bands)) 

77 

78 # add global meta 

79 for k, v in self.global_meta.items(): 

80 df.loc[k] = Series(dict(zip(df.columns, [v] * len(df.columns)))) 

81 

82 # add band meta 

83 for k, v in self.band_meta.items(): 

84 df.loc[k] = Series(dict(zip(df.columns, v))) 

85 

86 return df 

87 

88 @property 

89 def global_meta(self) -> OrderedDict: 

90 return self._global_meta 

91 

92 @global_meta.setter 

93 def global_meta(self, meta_dict: Union[dict, OrderedDict]): 

94 if not isinstance(meta_dict, dict): 

95 raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict)) 

96 

97 self._global_meta = meta_dict # TODO convert strings to useful types 

98 

99 @property 

100 def band_meta(self) -> OrderedDict: 

101 return self._band_meta 

102 

103 @band_meta.setter 

104 def band_meta(self, meta_dict: Union[dict, OrderedDict]): 

105 if not isinstance(meta_dict, (dict, OrderedDict)): 

106 raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict)) 

107 

108 for k, v in meta_dict.items(): 

109 if not isinstance(v, list): 

110 raise TypeError('The values of the given dictionary must be lists. Received %s for %s.' % (type(v), k)) 

111 if len(v) != self.bands: 

112 raise ValueError("The length of the given lists must be equal to the number of bands. " 

113 "Received a list with %d items for '%s'." % (len(v), k)) 

114 

115 self._band_meta = OrderedDict(meta_dict) # TODO convert strings to useful types 

116 

117 @property 

118 def all_meta(self) -> OrderedDict: 

119 all_meta = OrderedDict(self.global_meta.copy()) 

120 all_meta.update(self.band_meta) 

121 return all_meta 

122 

123 @staticmethod 

124 def _convert_param_from_str(param_value: Union[int, float, str]) -> Union[int, float, str]: 

125 try: 

126 try: 

127 return int(param_value) # NOTE: float('0.34') causes ValueError: invalid literal for int() with base 10 

128 except ValueError: 

129 return float(param_value) 

130 except ValueError: 

131 if param_value.startswith('{'): 

132 param_value = param_value.split('{')[1] 

133 if param_value.endswith('}'): 

134 param_value = param_value.split('}')[0] 

135 return param_value.strip() 

136 

137 def _convert_param_to_ENVI_str(self, param_value: Union[int, float, list, str, np.ndarray, np.integer, np.floating] 

138 ) -> str: 

139 if isinstance(param_value, (int, np.integer)): 

140 return str(param_value) 

141 

142 elif isinstance(param_value, (float, np.floating)): 

143 return '%f' % param_value 

144 

145 elif isinstance(param_value, list): 

146 return '{ ' + ',\n'.join([self._convert_param_to_ENVI_str(i) for i in param_value]) + ' }' 

147 

148 elif isinstance(param_value, np.ndarray): 

149 return self._convert_param_to_ENVI_str(param_value.tolist()) # noqa 

150 

151 else: 

152 return str(param_value) 

153 

154 def read_from_file(self, filePath: str) -> OrderedDict: 

155 assert ' ' not in filePath, "The given path contains whitespaces. This is not supported by GDAL." 

156 

157 if not os.path.exists(filePath) and \ 

158 not filePath.startswith('/vsi') and \ 

159 not filePath.startswith('HDF') and \ 

160 not filePath.startswith('NETCDF'): 

161 raise FileNotFoundError(filePath) 

162 

163 ds = gdal.Open(filePath) 

164 

165 try: 

166 if not ds: 

167 raise Exception('Error reading file: ' + gdal.GetLastErrorMsg()) 

168 

169 self.bands = ds.RasterCount 

170 self.fileFormat = ds.GetDriver().GetDescription() 

171 

172 ############### 

173 # ENVI format # 

174 ############### 

175 

176 if self.fileFormat == 'ENVI': 

177 metadict = ds.GetMetadata('ENVI') 

178 

179 for k, v in metadict.items(): 

180 

181 if k not in autohandled_meta: 

182 

183 if len(v.split(',')) == self.bands: 

184 # band meta parameter 

185 item_list = [ 

186 item_str.split('{')[1].strip() if item_str.strip().startswith('{') else 

187 item_str.split('}')[0].strip() if item_str.strip().endswith('}') else 

188 item_str.strip() for item_str in v.split(',')] 

189 

190 self.band_meta[k] = \ 

191 [self._convert_param_from_str(item_str) for item_str in item_list] \ 

192 if k != 'band_names' else item_list 

193 

194 else: 

195 # global meta parameter 

196 self.global_meta[k] = self._convert_param_from_str(v) 

197 

198 ##################### 

199 # remaining formats # 

200 ##################### 

201 

202 else: 

203 # read global domain metadata 

204 self.global_meta = ds.GetMetadata() 

205 

206 # read band domain metadata 

207 for b in range(self.bands): 

208 band = ds.GetRasterBand(b + 1) 

209 # meta_gs = GeoSeries(band.GetMetadata()) 

210 bandmeta_dict = band.GetMetadata() 

211 

212 # read bandname 

213 bandname = band.GetDescription() 

214 if bandname: 

215 bandmeta_dict['band_names'] = bandname 

216 

217 # read nodata value 

218 nodataVal = band.GetNoDataValue() 

219 if nodataVal is not None: 

220 bandmeta_dict['nodata'] = nodataVal 

221 

222 # read remaining meta 

223 for k, v in bandmeta_dict.items(): 

224 if k not in self.band_meta: 

225 self.band_meta[k] = [] 

226 

227 self.band_meta[k].append(self._convert_param_from_str(v)) 

228 

229 # # fill metadata 

230 # self.df[b] = meta_gs 

231 del band 

232 

233 finally: 

234 del ds 

235 

236 return self.all_meta 

237 

238 def __repr__(self) -> str: 

239 return 'Metadata: \n\n' + pformat(self.all_meta) 

240 

241 def to_ENVI_metadict(self) -> OrderedDict: 

242 return OrderedDict(zip(self.all_meta.keys(), 

243 [self._convert_param_to_ENVI_str(i) for i in self.all_meta.values()])) 

244 

245 def get_subset(self, 

246 bands2extract: Union[slice, list, np.ndarray] = None, 

247 keys2extract: Union[str, list] = None 

248 ) -> 'GDAL_Metadata': 

249 meta_sub = deepcopy(self) 

250 

251 # subset bands 

252 if bands2extract is not None: 

253 if isinstance(bands2extract, list): 

254 if not len(set([type(i) for i in bands2extract])) == 1: 

255 raise ValueError("List items of 'bands2extract' should all have the same type.") 

256 if not isinstance(bands2extract[0], str): 

257 bands2extract = np.array(bands2extract) 

258 else: 

259 if 'band_names' not in meta_sub.band_meta: 

260 raise ValueError('String input is only supported if band names are set.') 

261 bandnames = meta_sub.band_meta['band_names'] 

262 out = [] 

263 for b in bands2extract: 

264 if b in bandnames: 

265 out.append(bandnames.index(b)) 

266 else: 

267 raise ValueError(f"'{b}' is not a valid band name.") 

268 bands2extract = np.array(out) 

269 elif isinstance(bands2extract, (np.ndarray, slice)): 

270 pass # all fine 

271 else: 

272 raise TypeError(bands2extract) 

273 

274 meta_sub.band_meta = self.band_meta.copy() 

275 

276 for k, v in meta_sub.band_meta.items(): 

277 meta_sub.band_meta[k] = list(np.array(v)[bands2extract]) 

278 

279 meta_sub.bands = len(list(range(*bands2extract.indices(bands2extract.stop)))) \ 

280 if isinstance(bands2extract, slice) else bands2extract.size 

281 

282 # subset metadata keys 

283 if keys2extract: 

284 if isinstance(keys2extract, list) and not list(set([type(i) for i in keys2extract])) == [str]: 

285 raise ValueError("List items of 'keys2extract' should all be of type string.") 

286 

287 keys2extract = [keys2extract] if isinstance(keys2extract, str) else keys2extract 

288 

289 # global_meta = meta_sub.global_meta.copy() 

290 for k in meta_sub.global_meta.copy().keys(): 

291 if k not in keys2extract: 

292 del meta_sub.global_meta[k] 

293 

294 for k in meta_sub.band_meta.copy().keys(): 

295 if k not in keys2extract: 

296 del meta_sub.band_meta[k] 

297 

298 if not meta_sub.all_meta: 

299 raise ValueError(keys2extract, 'The given metadata keys do not exist.') 

300 

301 return meta_sub 

302 

303 def __getitem__(self, given: Union[int, slice, str, list, np.ndarray]) -> 'GDAL_Metadata': 

304 if isinstance(given, (int, np.integer)): 

305 return self.get_subset(bands2extract=slice(given, given + 1)) 

306 elif isinstance(given, slice): 

307 return self.get_subset(bands2extract=given) 

308 elif isinstance(given, str): 

309 return self.get_subset(bands2extract=[given]) 

310 elif isinstance(given, list): 

311 if isinstance(given[0], str): 

312 return self.get_subset(bands2extract=given) 

313 elif isinstance(given[0], (int, np.integer)): 

314 return self.get_subset(bands2extract=given) 

315 else: 

316 raise TypeError(given, 'Given list must contain string or integer items.') 

317 elif isinstance(given, np.ndarray): 

318 if given.ndim != 1: 

319 raise ValueError(given, 'Given numpy array must be one-dimensional.') 

320 return self.get_subset(bands2extract=given) 

321 else: 

322 raise TypeError(given)