Source code for sentinelsat.products

import fnmatch
from pathlib import Path
from xml.etree import ElementTree as etree

import sentinelsat
from sentinelsat.exceptions import LTATriggered, SentinelAPIError
from sentinelsat.sentinel import _check_scihub_response


[docs]class SentinelProductsAPI(sentinelsat.SentinelAPI): """Class to connect to Copernicus Open Access Hub, search and download imagery. The products node interface allows to filter and download individual product files by means of a (optional) *nodefilter* callable function. For each file in the product (only excluding the manifest) the *nodefilter* function is called to decide if the corresponding file must be downloaded or not. The *nodefilter* function has the following signature:: def nodefilter(node_info: dict) -> bool: ... The *node_info* parameter is a dictionary containing information like * the file *path* within the product (e.g. "./preview/map-overlay.kml") * the file size in bytes (int) * the file md5 It the *nodefilter* function returns True the corresponding file is downloaded, otherwise the file is not downloaded. Parameters ---------- user : string username for DataHub set to None to use ~/.netrc password : string password for DataHub set to None to use ~/.netrc api_url : string, optional URL of the DataHub defaults to 'https://apihub.copernicus.eu/apihub' show_progressbars : bool Whether progressbars should be shown or not, e.g. during download. Defaults to True. timeout : float or tuple, optional How long to wait for DataHub response (in seconds). Tuple (connect, read) allowed. Attributes ---------- session : requests.Session Session to connect to DataHub api_url : str URL to the DataHub page_size : int Number of results per query page. Current value: 100 (maximum allowed on ApiHub) timeout : float or tuple How long to wait for DataHub response (in seconds). .. versionadded:: 0.15 """ def _path_to_url(self, product_info, path, urltype=None): id = product_info["id"] title = product_info["title"] path = "/".join(["Nodes('{}')".format(item) for item in path.split("/")]) if urltype == "value": urltype = "/$value" elif urltype == "json": urltype = "?$format=json" elif urltype == "full": urltype = "?$format=json&$expand=Attributes" elif urltype is None: urltype = "" # else: pass urltype as is return self._get_odata_url(id, f"/Nodes('{title}.SAFE')/{path}{urltype}") def _get_manifest(self, product_info, path=None): path = Path(path) if path else None url = self._path_to_url(product_info, "manifest.safe", "value") node_info = product_info.copy() node_info["url"] = url node_info["node_path"] = "./manifest.safe" del node_info["md5"] if path and path.exists(): self.logger.info("manifest file already available (%r), skip download", path) data = path.read_bytes() node_info["size"] = len(data) return node_info, data url = self._path_to_url(product_info, "manifest.safe", "json") response = self.session.get(url, auth=self.session.auth) _check_scihub_response(response) info = response.json()["d"] node_info["size"] = int(info["ContentLength"]) response = self.session.get(node_info["url"], auth=self.session.auth) _check_scihub_response(response, test_json=False) data = response.content if len(data) != node_info["size"]: raise SentinelAPIError("File corrupt: data length do not match") if path: path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(data) return node_info, data def _dataobj_to_node_info(self, dataobj_info, product_info): path = dataobj_info["href"] if path.startswith("./"): path = path[2:] node_info = product_info.copy() node_info["url"] = self._path_to_url(product_info, path, "value") node_info["size"] = dataobj_info["size"] node_info["md5"] = dataobj_info["md5"] node_info["node_path"] = dataobj_info["href"] # node_info["parent"] = product_info return node_info def _filter_nodes(self, manifest, product_info, nodefilter=None): nodes = {} xmldoc = etree.parse(manifest) data_obj_section_elem = xmldoc.find("dataObjectSection") for elem in data_obj_section_elem.iterfind("dataObject"): dataobj_info = _xml_to_dataobj_info(elem) node_info = self._dataobj_to_node_info(dataobj_info, product_info) if nodefilter is not None and not nodefilter(node_info): continue node_path = node_info["node_path"] nodes[node_path] = node_info return nodes
[docs] def download(self, id, directory_path=".", checksum=True, nodefilter=None, **kwargs): """Download a product. Uses the filename on the server for the downloaded files, e.g. "S1A_EW_GRDH_1SDH_20141003T003840_20141003T003920_002658_002F54_4DD1.SAFE/manifest.safe". Incomplete downloads are continued and complete files are skipped. Parameters ---------- id : string UUID of the product, e.g. 'a8dd0cfd-613e-45ce-868c-d79177b916ed' directory_path : string, optional Where the file will be downloaded checksum : bool, optional If True, verify the downloaded file's integrity by checking its MD5 checksum. Throws InvalidChecksumError if the checksum does not match. Defaults to True. nodefilter : callable, optional The *nodefilter* callable used to select which file of each product have to be downloaded. If *nodefilter* is None then no file filtering is performed and the class behaves exactly as :class:`sentinelsat.sentinel.SentinelAPI`. Returns ------- product_info : dict Dictionary containing the product's info from get_product_info() as well as the path on disk. Raises ------ InvalidChecksumError If the MD5 checksum does not match the checksum on the server. """ if nodefilter is None: return sentinelsat.SentinelAPI.download(self, id, directory_path, checksum, **kwargs) product_info = self.get_product_odata(id) product_path = Path(directory_path) / (product_info["title"] + ".SAFE") product_info["node_path"] = "./" + product_info["title"] + ".SAFE" manifest_path = product_path / "manifest.safe" if not manifest_path.exists() and self.trigger_offline_retrieval(id): raise LTATriggered(id) manifest_info, _ = self._get_manifest(product_info, manifest_path) product_info["nodes"] = { manifest_info["node_path"]: manifest_info, } node_infos = self._filter_nodes(manifest_path, product_info, nodefilter) product_info["nodes"].update(node_infos) for node_info in node_infos.values(): node_path = node_info["node_path"] path = (product_path / node_path).resolve() node_info["path"] = path node_info["downloaded_bytes"] = 0 self.logger.info("Downloading %s node to %s", id, path) self.logger.debug("Node URL for %s: %s", id, node_info["url"]) if path.exists(): # We assume that the product node has been downloaded and is complete continue self._download_outer(node_info, path, checksum) return product_info
def _xml_to_dataobj_info(element): assert etree.iselement(element) assert element.tag == "dataObject" data = dict( id=element.attrib["ID"], rep_id=element.attrib["repID"], ) elem = element.find("byteStream") # data["mime_type"] = elem.attrib['mimeType'] data["size"] = int(elem.attrib["size"]) elem = element.find("byteStream/fileLocation") data["href"] = elem.attrib["href"] # data['locator_type'] = elem.attrib["locatorType"] # assert data['locator_type'] == "URL" elem = element.find("byteStream/checksum") assert elem.attrib["checksumName"].upper() == "MD5" data["md5"] = elem.text return data
[docs]def make_size_filter(max_size): """Generate a nodefilter function to download only files below the specified maximum size. .. versionadded:: 0.15 """ def node_filter(node_info): return node_info["size"] <= max_size return node_filter
[docs]def make_path_filter(pattern, exclude=False): """Generate a nodefilter function to download only files matching the specified pattern. Parameters ---------- pattern : str glob patter for files selection exclude : bool, optional if set to True then files matching the specified pattern are excluded. Default False. .. versionadded:: 0.15 """ def node_filter(node_info): match = fnmatch.fnmatch(node_info["node_path"].lower(), pattern) return not match if exclude else match return node_filter
[docs]def all_nodes_filter(node_info): """Node filter function to download all files. This function can be used to download Sentinel product as a directory instead of downloading a single zip archive. .. versionadded:: 0.15 """ return True