Source code for getsentinel.gs_stacker

"""Extracts data from Sentinel products for given regions of interest.

Postage stamp ROIs out of given rasters file in batch. After processing the
Stacker object acts as a dictionary from which numpy arrays containing all the
multi-temporal data layers.

Handles both Sentinel-1 and Sentinel-2 rasters over multiple time frames.

Example
-------
::

    from getsentinel import gs_localmanager, gs_stacker

    products = gs_localmanager.get_product_inventory()

    roi = 'path/to/test_field.geojson' 
    start = datetime.date(2018, 5, 6)
    end = datetime.date(2018, 5, 7)

    stacker = gs_stacker.Stacker(products, roi, start, end)

    # get the True Colour Image at resolution 10m
    stacker.set_bands(s2_band_list=['TCI'], s2_resolution=10)

    data = stacker.generate_stacks()


"""
# TODO
# ----
# Investigate doing the masking using `gdal` instead of `rasterio`.
# Revisit non-uniform arrays after masking. Is it fixed after applying orbit
# files in gpt?

import datetime
import warnings
from pathlib import Path
import numpy as np
import shapely
import shapefile
import rasterio
import rasterio.mask
from osgeo import osr, ogr
from .gs_config import UserConfig


[docs]class Stacker(): """Creates stacks of arrays from a product list and a group of shape files. Arrays contains the raster data each passed shapefile over Sentinel products covering different dates/times. Note ---- The user does not need to concern themselves with checking that the products passed contain data for for all the shapefiles. This class will sort products to their corresponding shapefiles and filter out all null data. Parameters ---------- product_list : dict Contains the Sentinel products containing data relevant to the area coverered by the passed shapefiles or geojsons. geo_files : list or str A `list` of `str` containing the paths to all of the relevant shapefiles or geojsons or a single `str`. start_date : datetime.date The start of the time period used for data extraction. Any Sentinel products passed to the `Stacker` but generated outside of this time period will be excluded from the extraction process. end_date : datetime.date The end of the time period used for data extraction. Inclusive. Attributes ---------- config : `gs_config.UserConfig` Class container for user configuation info. products : dict Filtered copy of `product_list` passed to the object containing only products generated between `start_date` and `end_date`. product_boundaries : dict Contains `shapely.geometry.Polygon` objects describing the boundaries of each of the products in `products`. geo_files : list Contains a copy of `geo_files` passed to the object. start_date : datetime.date A copy of `start_date` passed to the object. end_date : datetime.date A copy of the `end_date` passed to the object. job_list : dict Contains keys of all the uuids of products passed to the object, with their values being the shapefiles that are within the boundaries of each product. stack_list : dict After the `run` method is called, this `dict` is populated with keys that are the names of the shapefiles passed, with their values being the corresponding processed `numpy` arrays. band_list : list `list` of `str` containing the product bands that are to be extracted from the products. """ def __init__(self, product_list, geo_files, start_date, end_date): self.config = UserConfig() self.geo_files = geo_files self.start_date = start_date self.end_date = end_date # filters the product list and generates the shapely objects # for the products self.products, self.product_boundaries = self._gen_product_shapes( product_list) # generates the shapely objects for the ROIs if type(geo_files) is str: geo_files = [geo_files] self._gen_ROI_shapes(geo_files) # holds the uuids and corresponding shape files within each product self.job_list = self._allocate_ROIs() # lists all the names of the shapefiles self.stack_list = {name: {} for name in self.ROIs} # temporary store for the layers before final combination self._layerbank = {name: {} for name in self.stack_list} self.band_list = False self.weather_check = False
[docs] def set_bands(self, s1_band_list=[], s2_band_list=[], s2_resolution=False): """Sets the attribute `band_list` to the passed bands. Valid bands for S1 GRD products there are: 'vv', 'vh'. Valid bands for S2 L2A products are: '10': ['AOT', 'B02', 'B03', 'B04', 'B08', 'TCI', 'WVP'], '20': ['AOT', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B8A', 'B11', 'B12', 'SCL', 'TCI', 'WVP'], '60': ['AOT', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B8A', 'B09', 'B11', 'B12', 'SCL', 'TCI', 'WVP']} Parameters ---------- s1_band_list : list `list` of `str` containing the S1 bands the user wants extract from the S1 products passed to `Stacker`. Required form : ['band1', 'band2', ... ] s2_band_list : list `list` of `str` containing the S2 bands the user wants extract from the S2 products passed to `Stacker`. Required form : ['band1', 'band2', ... ] s2_resolution : int, optional Choose the band resolution of the Sentinel-2 bands. Can be `10`, `20`, or `60`. Returns ------- None Raises ------ ValueError If the band strings passed are not valid bands. """ # TODO: implement S2 bands as valid if s2_band_list and s2_resolution not in [10, 20, 60]: RuntimeError("You must specify a resolution of int 10, 20, or 60" " when stacking Sentinel-2 products.") s1_valid_bands = ['vv', 'vh'] s2_valid_bands = {'10': ['AOT', 'B02', 'B03', 'B04', 'B08', 'TCI', 'WVP'], '20': ['AOT', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B8A', 'B11', 'B12', 'SCL', 'TCI', 'WVP'], '60': ['AOT', 'B02', 'B03', 'B04', 'B05', 'B06', 'B07', 'B8A', 'B09', 'B11', 'B12', 'SCL', 'TCI', 'WVP']} if s2_resolution: s2_valid_bands = s2_valid_bands[str(s2_resolution)] if s1_band_list and s2_resolution: warnings.warn("Sentinel-1 GRD products are 10m resolution pixels." " Combining Sentinel-2 products with resolution of" " 20m or 60m with Sentinel-1 products will result in" " incoherent data output at the per-pixel level.") for band in s1_band_list: if band not in s1_valid_bands: print(self.set_bands.__doc__) raise ValueError('Passed bands not in valid band list.') for band in s2_band_list: if band not in s2_valid_bands: print(self.set_bands.__doc__) raise ValueError('Passed bands not in valid band list.') bands = s1_band_list + s2_band_list for roi in self._layerbank: for band in bands: self._layerbank[roi][band] = {} self.band_list = [s1_band_list, s2_band_list] if s2_resolution: self.s2_res = s2_resolution
[docs] def generate_stacks(self): """Runs the data layer extraction and stacking process. Returns ------- dict Contains all the generate `Stack` objects keyed by the names of their corresponding shapefiles. Raises ------ NotImplementedError If S2 products have been passed to `Stacker`. ValueError If `set_bands` has not been called before calling `run`. ValueError If any products other than S1 or S2 products are passed to `Stacker`. """ platforms = [self.products[uuid]['platformname'] for uuid in self.products] for platform in platforms: if platform not in ['Sentinel-1', 'Sentinel-2']: raise NotImplementedError("Currently only supports Sentinel-1" " and Sentinel-2 products.") if not self.band_list: raise ValueError("Please set the band list via" " .set_bands(band_list) before running.") for uuid, product in self.products.items(): # get the filepath for the product files in .SAFE if product['platformname'] == 'Sentinel-1': band_list = self.band_list[0] elif product['platformname'] == 'Sentinel-2': if '2A' not in product['producttype']: print("Product {0} is not processed to L2A, skipping." "".format(product['identifier'])) continue band_list = self.band_list[1] else: raise ValueError("Unrecognised platform name in product {0}" "".format(product['identifier'])) # extract bands from processed files for band in band_list: self._extract_data(uuid, band) self._generate_stacks() if self.weather_check: self._weather_report() return self.stack_list
def _generate_stacks(self): """Make the layers uniform and combine them into numpy array stacks.""" for roi, bands in self._layerbank.items(): for band, layers in bands.items(): layers_ = [layer for date, layer in sorted(layers.items())] info_ = [layer.info for date, layer in sorted(layers.items())] transforms_ = [layer.transform for date, layer in sorted(layers.items())] layers_ = self._pad_layers(layers_) if len(layers_) is 0: print("No data for ROI {0} in band {1} for the given" " products.".format(roi, band)) continue stack = np.stack(layers_, axis=0) stack = np.ma.masked_where(stack == 0, stack) stack = Stack(stack, info_, transforms_, roi) # mask all the zero values in the output array sounding the # region of interest self.stack_list[roi][band] = stack def _pad_layers(self, layers): """Pads the layers with zeroes so they conform to a uniform shape.""" padded_layers = [] height_max = 0 width_max = 0 for layer in layers: depth, height, width = layer.shape if height > height_max: height_max = height if width > width_max: width_max = width for layer in layers: depth, height, width = layer.shape height_add = height_max - height width_add = width_max - width while width_add: if width_add % 2 is 0: # pad right side of array layer = np.pad(layer, ((0, 0), (0, 0), (0, 1)), 'constant', constant_values=np.NAN) width_add = width_add - 1 continue # pad left side of array layer = np.pad(layer, ((0, 0), (0, 0), (1, 0)), 'constant', constant_values=np.NaN) width_add = width_add - 1 while height_add: if height_add % 2 is 0: # pad top of array layer = np.pad(layer, ((0, 0), (1, 0), (0, 0)), 'constant', constant_values=0) height_add = height_add - 1 continue # pad bottom of array layer = np.pad(layer, ((0, 0), (0, 1), (0, 0)), 'constant', constant_values=0) height_add = height_add - 1 padded_layers.append(layer) return padded_layers
[docs] def check_weather(self, cloud=False, snow=False): """ Set a threshold for the probability of cloud or snow cover in the region of interest. If the probability cover provided by the ESA in the automated cloud and snow masks is higher than the user-defined threshold, a warning is thrown during stacking. Parameters ---------- cloud : int, optional The percentrage threshold cloud probability for regions of interest, above which the stacker throws a warning. snow : int, optional The percentrage threshold snow probability for regions of interest, above which the stacker throws a warning. Returns ------- None """ for threshold in (cloud, snow): if threshold and (threshold < 0 or threshold > 100): raise ValueError("Please enter an int between 0 and 100") self.weather_check = True self.cloud_info = {ROI: [] for ROI in self.ROIs} self.snow_info = {ROI: [] for ROI in self.ROIs} self.weather_thresholds = {'cloud': cloud, 'snow': snow}
def _weather_concealment(self, mask, tilepath, ROI, filename, datetime): """Checks the percentage likelihood of weather concealment using the ESA provided masks.""" cloud_threshold = self.weather_thresholds['cloud'] snow_threshold = self.weather_thresholds['snow'] qi_data = tilepath.joinpath('QI_DATA') cloud_mask = list(qi_data.glob('*CLD*20m.jp2')) snow_mask = list(qi_data.glob('*SNW*20m.jp2')) if len(cloud_mask) is not 1: raise RuntimeError("Could not locate the cloud mask for {0}" "".format(filename)) if len(snow_mask) is not 1: raise RuntimeError("Could not locate the snow mask for {0}" "".format(filename)) cloud_mask = cloud_mask[0] snow_mask = snow_mask[0] if cloud_threshold: with rasterio.open(str(cloud_mask), 'r') as cloud: out_image, out_transform = rasterio.mask.mask(cloud, mask, crop=True) if np.max(out_image) >= cloud_threshold: if datetime not in self.cloud_info[ROI]: self.cloud_info[ROI].append(datetime) if snow_threshold: with rasterio.open(str(snow_mask), 'r') as snow: out_image, out_transform = rasterio.mask.mask(snow, mask, crop=True) if np.max(out_image) >= snow_threshold: if datetime not in self.snow_info[ROI]: self.snow_info[ROI].append(datetime) def _weather_report(self): """Prints the results of the weather check to the console.""" cloud_threshold = self.weather_thresholds['cloud'] snow_threshold = self.weather_thresholds['snow'] print("\ngs_stacker WEATHER CHECK RESULTS:\n") if cloud_threshold: print("The following ROIs have CLOUD cover probability above the" " specified {0}% threshold on these dates:\n" "".format(cloud_threshold)) for roi in self.cloud_info: print("{0}:".format(roi)) if len(self.cloud_info[roi]) is 0: print(" ", "NONE") for datet in self.cloud_info[roi]: print(" ", datet) if snow_threshold: print("The following ROIs have SNOW cover probability above the" " specified {0}% threshold on these dates:\n" "".format(snow_threshold)) for roi in self.snow_info: print("{0}:".format(roi)) if len(self.snow_info[roi]) is 0: print(" ", "NONE") for datet in self.snow_info[roi]: print(" ", datet) print("\n") def _extract_data(self, uuid: str, band: str): """ Checks the joblist to see what ROIs need to be postage stamped out of the given uuid and extracts them using rasterio.mask. Saves the product metadata to each ROI's extracted numpy array and add it to the corresponding list in the layer_stack. """ product = self.products[uuid] platform = product['platformname'] filename = product['filename'] datetime = product['beginposition'], info = {'from': uuid, 'platform': platform, 'datetime': datetime, 'band': band, 'processing': product['producttype']} if platform == 'Sentinel-2': data_path = Path(self.config.DATA_PATH) file_path = data_path.joinpath(filename) granule = file_path.joinpath('GRANULE') for child in granule.iterdir(): # should be only one sub-dir in the GRANULE dir if child.is_dir(): tilepath = child break img_data = tilepath.joinpath('IMG_DATA') ext = 'R{0}m'.format(str(self.s2_res)) raster_dir = img_data.joinpath(ext) for child in raster_dir.iterdir(): if band in str(child): proc_file = child break if platform == 'Sentinel-1': if filename.endswith('.SAFE'): raise RuntimeError("Product {0} is an unprocessed Sentinel-1 " "file and cannot be stacked.".format( filename)) info['mode'] = product['sensoroperationalmode'] if band is 'vv': layer_number = 0 if band is 'vh': layer_number = 1 proc_file = Path(self.config.DATA_PATH).joinpath(filename) # get the shape the reside within this product associated_ROIs = self.job_list[uuid] def reproject_ROI(mask, epsg): # Reproject the roi coords to the same crs as the raster roi = mask[0] wgs84 = osr.SpatialReference() wgs84.ImportFromEPSG(4326) raster_crs = osr.SpatialReference() raster_crs.ImportFromEPSG(epsg) shape_ = ogr.CreateGeometryFromWkt(roi.wkt) transform = osr.CoordinateTransformation(wgs84, raster_crs) shape_.Transform(transform) # back to shapely roi = shapely.wkt.loads(shape_.ExportToWkt()) return [roi] with rasterio.open(str(proc_file), 'r') as raster: raster_epsg = raster.crs.to_epsg() # for all of the corresponding ROIs related to this product for ROI in associated_ROIs: mask = [self.ROIs[ROI]] # rasterio requires mask in iterable # reproject the mask to the raster epsg if it is not WGS84 if raster_epsg is not 4326: mask = reproject_ROI(mask, raster_epsg) # if we need to check the weather cover for Sentinel-2 products if self.weather_check and '2' in platform: self._weather_concealment(mask, tilepath, ROI, filename, datetime) out_image, out_transform = rasterio.mask.mask(raster, mask, crop=True) if np.count_nonzero(out_image) is 0 or (np.max(out_image) < 0.0001): # The mask has fallen victim the the traversty that is ESA # polygons being actually sligthly larger than the product # data they represent. Ie. the ROI is in the dead zone! continue if '1' in platform: # out_image is 3D when ie. [2, X, Y] and we only need 2D # either vv or vh if out_image.shape[0] is not 1: layers = np.split(out_image, 2) layer = Stack(layers[layer_number], info, out_transform) else: layer = Stack(out_image, info, out_transform) if '2' in platform: layer = Stack(out_image, info, out_transform) date = product['beginposition'] self._layerbank[ROI][band][date] = layer def _allocate_ROIs(self): """ Checks product boundaries against ROI areas and allocates ROIs to products for stamping. """ job_list = {} # stores product UUIDs and accompanying ROIs for uuid, boundary in self.product_boundaries.items(): uuid_jobs = [] for ROI_name, shape in self.ROIs.items(): if boundary.contains(shape): uuid_jobs.append(ROI_name) job_list[uuid] = uuid_jobs return job_list def _gen_product_shapes(self, product_list: dict): """Loads shapely objects from product WKT footprints.""" product_boundaries = {} # stores the shapely files use for allocating filtered_products = {} # ESA product footprints are in WGS84 (epsg: 4326) for uuid, product in product_list.items(): product_start = datetime.datetime.strptime( product['beginposition'][:10], '%Y-%m-%d').date() if self.start_date <= product_start <= self.end_date: product_shape = shapely.wkt.loads(product['footprint']) product_boundaries[uuid] = product_shape filtered_products[uuid] = product return filtered_products, product_boundaries def _gen_ROI_shapes(self, geo_files): """Loads shapely objects from given geo files.""" # set WGS84 spatial ref wgs84 = osr.SpatialReference() wgs84.ImportFromEPSG(4326) ROIs = {} # stores the referenced shapely objects use for masking for geo_file in geo_files: geo_file = Path(geo_file) suffix = geo_file.suffix if suffix not in ['.shp', '.geojson']: raise TypeError("File {0} not supported by gs_stacker.Stacker." " Only .shp and .geojson files are currently" " supported for defining masks / ROIs".format( geo_file)) filename = geo_file.stem x_coords = [] y_coords = [] shp = ogr.Open(str(geo_file)) if shp.GetLayerCount() > 1: warnings.warn("geo-referenced files (.shp, .geojson etc)" " with more than one layer / feature will have" " the coordinates of all layers / features used" " to generate a mask for the Sentinel file.") layer = shp.GetLayer() shp_crs = layer.GetSpatialRef() if shp_crs is None: # means the file has not been georeferenced raise RuntimeError("Could not retrieve the co-ordinate" " reference system data from the meta" " data of the geo-file {0}.\n" " The file may not be correctly" " georeferenced.".format(geo_file)) transform = osr.CoordinateTransformation(shp_crs, wgs84) if suffix == '.shp': shp = shapefile.Reader(str(geo_file)) # extract all points from all shapes for shape in shp.shapes(): for point in shape.points: # in the file x_coords.append(point[0]) y_coords.append(point[1]) coords = list(zip(x_coords, y_coords)) m = shapely.geometry.MultiPoint(coords) # import into shapely extents = m.convex_hull # gets polygon that encomps all points shape_ = ogr.CreateGeometryFromWkt(extents.wkt) if suffix == '.geojson': geometries = [] for layer in shp: for i in range(layer.GetFeatureCount()): feature = layer.GetFeature(i) geometries.append(feature.geometry()) shape_ = geometries[0] for i in range(1, len(geometries)): shape_ = shape_.AddGeometry(geometries[i]) # now reproject in ogr shape_.Transform(transform) # back to shapely for boundary comparison during run() shape = shapely.wkt.loads(shape_.ExportToWkt()) ROIs[filename] = shape self.ROIs = ROIs
[docs]class Stack(np.ndarray): """Used to add an attribute to an existing numpy array. adapted from: https://docs.scipy.org/doc/numpy-1.12.0/user/basics.subclassing.html Also masks all the zero values out of the array. Parameters ---------- input_array : numpy.ndarray An existing array that is to be converted to a `Stack`. info : list, str `list` of `str` or just `str` that contains info on each layer such as UUID of origin, platform, date of capture, band, and processing level. tranform : affline.Affline the out_transform output from the `mask` function of the `rasterio` library. name : str, optional The name of the Stack. Used in the final array stacks to name the stacks with the `str` of their corresponding shapefile names. Attributes ---------- info : list, str Copy of parameter `info`. tranform : affline.Affline Copy of parameter `transform`. name : str, optional Copy of paramter `name`. """ def __new__(cls, input_array, info, transform, name=False): obj = np.asarray(input_array).view(cls) obj = np.ma.masked_where(obj == 0, obj) obj.info = info obj.transform = transform if name: obj.name = name return obj def __array_finalize__(self, obj): if obj is None: return self.info = getattr(obj, 'info', None)