Source code for geoarray.metadata

# -*- coding: utf-8 -*-

# geoarray, A fast Python interface for image geodata - either on disk or in memory.
#
# Copyright (C) 2017-2023
# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam,
#   Germany (https://www.gfz-potsdam.de/)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from pprint import pformat
from copy import deepcopy
from typing import Union  # noqa F401  # flake8 issue
from collections import OrderedDict

from pandas import DataFrame, Series
import numpy as np
from osgeo import gdal  # noqa


autohandled_meta = [
    'bands',
    'byte_order',
    'coordinate_system_string',
    'data_type',
    'file_type',
    'header_offset',
    'interleave',
    'lines',
    'samples',
]


[docs]class GDAL_Metadata(object): def __init__(self, filePath: str = '', nbands: int = 1, nodata_allbands: Union[int, float] = None ) -> None: # privates self._global_meta = OrderedDict() self._band_meta = OrderedDict() self.bands = nbands self.filePath = filePath self.fileFormat = '' self.nodata_allbands = nodata_allbands if filePath: self.read_from_file(filePath) if nodata_allbands is not None: self.global_meta.update({'data_ignore_value': str(nodata_allbands)})
[docs] @classmethod def from_file(cls, filePath: str) -> 'GDAL_Metadata': return GDAL_Metadata(filePath=filePath)
[docs] def to_DataFrame(self) -> DataFrame: df = DataFrame(columns=range(self.bands)) # add global meta for k, v in self.global_meta.items(): df.loc[k] = Series(dict(zip(df.columns, [v] * len(df.columns)))) # add band meta for k, v in self.band_meta.items(): df.loc[k] = Series(dict(zip(df.columns, v))) return df
@property def global_meta(self) -> OrderedDict: return self._global_meta @global_meta.setter def global_meta(self, meta_dict: Union[dict, OrderedDict]): if not isinstance(meta_dict, dict): raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict)) self._global_meta = meta_dict # TODO convert strings to useful types @property def band_meta(self) -> OrderedDict: return self._band_meta @band_meta.setter def band_meta(self, meta_dict: Union[dict, OrderedDict]): if not isinstance(meta_dict, (dict, OrderedDict)): raise TypeError("Expected type 'dict'/'OrderedDict', received '%s'." % type(meta_dict)) for k, v in meta_dict.items(): if not isinstance(v, list): raise TypeError('The values of the given dictionary must be lists. Received %s for %s.' % (type(v), k)) if len(v) != self.bands: raise ValueError("The length of the given lists must be equal to the number of bands. " "Received a list with %d items for '%s'." % (len(v), k)) self._band_meta = OrderedDict(meta_dict) # TODO convert strings to useful types @property def all_meta(self) -> OrderedDict: all_meta = OrderedDict(self.global_meta.copy()) all_meta.update(self.band_meta) return all_meta @staticmethod def _convert_param_from_str(param_value: Union[int, float, str]) -> Union[int, float, str]: try: try: return int(param_value) # NOTE: float('0.34') causes ValueError: invalid literal for int() with base 10 except ValueError: return float(param_value) except ValueError: if param_value.startswith('{'): param_value = param_value.split('{')[1] if param_value.endswith('}'): param_value = param_value.split('}')[0] return param_value.strip() def _convert_param_to_ENVI_str(self, param_value: Union[int, float, list, str, np.ndarray, np.integer, np.floating] ) -> str: if isinstance(param_value, (int, np.integer)): return str(param_value) elif isinstance(param_value, (float, np.floating)): return '%f' % param_value elif isinstance(param_value, list): return '{ ' + ',\n'.join([self._convert_param_to_ENVI_str(i) for i in param_value]) + ' }' elif isinstance(param_value, np.ndarray): return self._convert_param_to_ENVI_str(param_value.tolist()) # noqa else: return str(param_value)
[docs] def read_from_file(self, filePath: str) -> OrderedDict: assert ' ' not in filePath, "The given path contains whitespaces. This is not supported by GDAL." if not os.path.exists(filePath) and \ not filePath.startswith('/vsi') and \ not filePath.startswith('HDF') and \ not filePath.startswith('NETCDF'): raise FileNotFoundError(filePath) ds = gdal.Open(filePath) try: if not ds: raise Exception('Error reading file: ' + gdal.GetLastErrorMsg()) self.bands = ds.RasterCount self.fileFormat = ds.GetDriver().GetDescription() ############### # ENVI format # ############### if self.fileFormat == 'ENVI': metadict = ds.GetMetadata('ENVI') for k, v in metadict.items(): if k not in autohandled_meta: if len(v.split(',')) == self.bands: # band meta parameter item_list = [ item_str.split('{')[1].strip() if item_str.strip().startswith('{') else item_str.split('}')[0].strip() if item_str.strip().endswith('}') else item_str.strip() for item_str in v.split(',')] self.band_meta[k] = \ [self._convert_param_from_str(item_str) for item_str in item_list] \ if k != 'band_names' else item_list else: # global meta parameter self.global_meta[k] = self._convert_param_from_str(v) ##################### # remaining formats # ##################### else: # read global domain metadata self.global_meta = ds.GetMetadata() # read band domain metadata for b in range(self.bands): band = ds.GetRasterBand(b + 1) # meta_gs = GeoSeries(band.GetMetadata()) bandmeta_dict = band.GetMetadata() # read bandname bandname = band.GetDescription() if bandname: bandmeta_dict['band_names'] = bandname # read nodata value nodataVal = band.GetNoDataValue() if nodataVal is not None: bandmeta_dict['nodata'] = nodataVal # read remaining meta for k, v in bandmeta_dict.items(): if k not in self.band_meta: self.band_meta[k] = [] self.band_meta[k].append(self._convert_param_from_str(v)) # # fill metadata # self.df[b] = meta_gs del band finally: del ds return self.all_meta
def __repr__(self) -> str: return 'Metadata: \n\n' + pformat(self.all_meta)
[docs] def to_ENVI_metadict(self) -> OrderedDict: return OrderedDict(zip(self.all_meta.keys(), [self._convert_param_to_ENVI_str(i) for i in self.all_meta.values()]))
[docs] def get_subset(self, bands2extract: Union[slice, list, np.ndarray] = None, keys2extract: Union[str, list] = None ) -> 'GDAL_Metadata': meta_sub = deepcopy(self) # subset bands if bands2extract is not None: if isinstance(bands2extract, list): if not len(set([type(i) for i in bands2extract])) == 1: raise ValueError("List items of 'bands2extract' should all have the same type.") if not isinstance(bands2extract[0], str): bands2extract = np.array(bands2extract) else: if 'band_names' not in meta_sub.band_meta: raise ValueError('String input is only supported if band names are set.') bandnames = meta_sub.band_meta['band_names'] out = [] for b in bands2extract: if b in bandnames: out.append(bandnames.index(b)) else: raise ValueError(f"'{b}' is not a valid band name.") bands2extract = np.array(out) elif isinstance(bands2extract, (np.ndarray, slice)): pass # all fine else: raise TypeError(bands2extract) meta_sub.band_meta = self.band_meta.copy() for k, v in meta_sub.band_meta.items(): meta_sub.band_meta[k] = list(np.array(v)[bands2extract]) meta_sub.bands = len(list(range(*bands2extract.indices(bands2extract.stop)))) \ if isinstance(bands2extract, slice) else bands2extract.size # subset metadata keys if keys2extract: if isinstance(keys2extract, list) and not list(set([type(i) for i in keys2extract])) == [str]: raise ValueError("List items of 'keys2extract' should all be of type string.") keys2extract = [keys2extract] if isinstance(keys2extract, str) else keys2extract # global_meta = meta_sub.global_meta.copy() for k in meta_sub.global_meta.copy().keys(): if k not in keys2extract: del meta_sub.global_meta[k] for k in meta_sub.band_meta.copy().keys(): if k not in keys2extract: del meta_sub.band_meta[k] if not meta_sub.all_meta: raise ValueError(keys2extract, 'The given metadata keys do not exist.') return meta_sub
def __getitem__(self, given: Union[int, slice, str, list, np.ndarray]) -> 'GDAL_Metadata': if isinstance(given, (int, np.integer)): return self.get_subset(bands2extract=slice(given, given + 1)) elif isinstance(given, slice): return self.get_subset(bands2extract=given) elif isinstance(given, str): return self.get_subset(bands2extract=[given]) elif isinstance(given, list): if isinstance(given[0], str): return self.get_subset(bands2extract=given) elif isinstance(given[0], (int, np.integer)): return self.get_subset(bands2extract=given) else: raise TypeError(given, 'Given list must contain string or integer items.') elif isinstance(given, np.ndarray): if given.ndim != 1: raise ValueError(given, 'Given numpy array must be one-dimensional.') return self.get_subset(bands2extract=given) else: raise TypeError(given)