Source code for ascat.download.connectors

# Copyright (c) 2025, TU Wien
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#    * Redistributions of source code must retain the above copyright notice,
#      this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#    * Neither the name of TU Wien, Department of Geodesy and Geoinformation
#      nor the names of its contributors may be used to endorse or promote
#      products derived from this software without specific prior written
#      permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL TU WIEN DEPARTMENT OF GEODESY AND
# GEOINFORMATION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
HTTP and FTP download module.
"""

import base64
import urllib
import requests
import logging
from pathlib import Path, WindowsPath, PosixPath
from ftplib import FTP
from datetime import timedelta
import concurrent.futures

from tqdm.auto import tqdm

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s: %(message)s")


[docs] class Connector: """ Base class for connecting and downloading from remote source. """ def __init__(self, base_url): """ Initialize connector. Parameters ---------- base_url : string Location of remote resource. """ self.base_url = base_url
[docs] def connect(self, credentials): """ Establish connection to remote source. Parameters ---------- credentials : dict Dictionary of needed authentication parameters. """ pass
[docs] def download_date_range(self, path, start_date, end_date): """ Fetch resource location for download of multiple files in date range. Parameters ---------- path : string Local directory, where found datasets are stored. start_date : datetime Start date of date range interval. end_date : datetime End date of date range interval. """ pass
[docs] def download_file(self, file_remote, file_local): """ Download single file from passed url to local file. Parameters ---------- file_remote : string Path of file to download. file_local : string Path (local) where to save file. """ pass
[docs] def close(self): """ Close connection. """ pass
[docs] class HttpConnector(Connector): """ Class for http requests. """ def __init__(self, base_url): """ Initialize connector. Parameters ---------- base_url : string Location of remote resource. """ self.base_url = base_url
[docs] def download_file(self, file_remote, file_local, overwrite=False, n_retry=5): """ Download single file from passed url to local file. Parameters ---------- file_remote : string Path of file to download file_local : string Path (local) where to save file overwrite : bool, optional If True, existing files will be overwritten. """ request_flag = False i = 0 while i < n_retry: logging.debug("Send request") try: stream_response = requests.get( file_remote, params={"format": "json"}, stream=True, headers={"Authorization": f"Bearer {self.access_token}"}) self._assert_response(stream_response) except AssertionError: i += 1 logging.debug(f"API Request failed - retry #{i}") else: logging.debug("API Request successful") request_flag = True break else: logging.debug("Maximum number of API requests failed. Abort.") if request_flag: total = int(stream_response.headers["content-length"]) suffix = "" if stream_response.headers["Content-Type"] == "application/zip": suffix = ".zip" filename = file_local.parent / (file_local.name + suffix) if not overwrite and filename.exists(): lstat = filename.lstat() if lstat.st_size == total: logging.info("Skip download. File exits.") else: pbar = tqdm(desc=file_local.name, total=total, unit="B", unit_divisor=1024, unit_scale=True, leave=False) with open(filename, "wb") as fp: for chunk in stream_response.iter_content(chunk_size=1024): if chunk: fp.write(chunk) fp.flush() pbar.update(len(chunk)) pbar.close() if filename.exists(): lstat = filename.lstat() if lstat.st_size == total: logging.debug("Download successful") else: logging.error("Download unsuccessful (file size mismatch)") else: logging.error("Downloaded file not found")
[docs] class FtpConnector(Connector): """ Class for downloading via FTP """ def __init__(self, base_url): """ Initialize connector. Parameters ---------- base_url : string Location of remote resource. """ super().__init__(base_url) self.ftp = FTP(self.base_url)
[docs] def connect(self, credentials): """ Establish connection to FTP source. Parameters ---------- credentials : dict Dictionary of needed authentication parameters. """ try: self.ftp.login(credentials["user"], credentials["password"]) logging.info("FTP connection successfully established") except: logging.error("FTP connection failed. User or password incorrect")
[docs] def download_file(self, file_remote, file_local, overwrite=False): """ Download single file from passed url to local file Parameters ---------- file_remote : string path of file to download file_local : string path (local) where to save file overwrite : bool, optional If True, existing files will be overwritten. """ if file_remote not in self.ftp.nlst(): logging.warning(f"File not accessible on FTP: {file_remote}") else: logging.debug(f"Start download: {file_remote}") total = self.ftp.size(file_remote) if not overwrite and file_local.exists(): lstat = file_local.lstat() if lstat.st_size == total: logging.info("Skip download. File exits.") else: pbar = tqdm(desc=file_local.name, total=total, unit="B", unit_divisor=1024, unit_scale=True, dynamic_ncols=True, leave=False) with open(file_local, "wb") as fp: def cb(data): pbar.update(len(data)) fp.write(data) self.ftp.retrbinary(f"RETR {file_remote}", cb) pbar.close() if file_local.exists(): lstat = file_local.lstat() if lstat.st_size == total: logging.debug("Download successful") else: logging.error( "Download unsuccessful (file size mismatch)") else: logging.error("Downloaded file not found")
[docs] def close(self): """ Close connection. """ self.ftp.close() logging.info("FTP disconnected")
[docs] class HsafConnector(FtpConnector): """ Class for downloading from HSAF via FTP. """ def __init__(self, base_url="ftphsaf.meteoam.it"): """ Initialize connector. Parameters ---------- base_url : string, optional Location of remote resource (default: ftphsaf.meteoam.it). """ super().__init__(base_url)
[docs] def download(self, remote_path, local_path, start_date, end_date, limit=None, overwrite=False): """ Fetch resource location for download of multiple files in date range. Parameters ---------- remote_path : string Remote directory, where found datasets are stored. local_path : string Local directory, where found datasets are stored. start_date : datetime Start date of date range interval. end_date : datetime End date of date range interval. limit : int, optional Filter used to limit the returned results (default: 1). overwrite : bool, optional If True, existing files will be overwritten. """ download_url_list = [] local_file_list = [] local_path = str2path(local_path) i = 0 for daily_files in self.files(remote_path, start_date, end_date): for file_remote in daily_files: file_local = local_path / file_remote local_file_list.append(file_local) download_url_list.append(file_remote) i = i + 1 if limit and limit == i: break else: # continue if the inner loop wasn't broken continue # inner loop was broken, break the outer break download_url_list, local_file_list = zip(*sorted(zip( download_url_list, local_file_list))) with tqdm(desc="Downloads", total=len(download_url_list)) as pbar: for download_url, local_file in zip(download_url_list, local_file_list): self.download_file(download_url, local_file, overwrite=overwrite) pbar.update()
[docs] def files(self, remote_path, start_date, end_date): """ Generator retrieving file list for given date range. Parameters ---------- remote_path : string Remote directory, where found datasets are stored. local_path : string Local directory, where found datasets are stored. start_date : datetime Start date of date range interval. end_date : datetime End date of date range interval. Yields ------ matches : list List of daily files. """ self.ftp.cwd(remote_path) list_of_files = [] self.ftp.retrlines("NLST ", list_of_files.append) days = end_date - start_date for i in range(days.days): date = (start_date + timedelta(days=i)).strftime("%Y%m%d") matches = sorted([x for x in list_of_files if date in x], reverse=True) yield matches
[docs] class EumConnector(HttpConnector): """ Class for downloading from EUMETSAT via HTTP requests. """ def __init__(self, base_url="https://api.eumetsat.int"): """ Initialize connector. Parameters ---------- base_url : string, optional Location of remote resource (default: https://api.eumetsat.int). """ super().__init__(base_url)
[docs] def connect(self, credentials): """ Establish connection to EUMETSAT. Parameters ---------- credentials: dict Dictionary of needed authentication parameters. """ self.access_token = self._generate_token( consumer_key=credentials["consumer_key"], consumer_secret=credentials["consumer_secret"])
[docs] def download(self, product, local_path, start_date, end_date, max_workers=1, coords=None, limit=None): """ Fetch resource location for download of multiple files in daterange. Parameters ---------- product : string Product. coords : list Coordinates of polygon, where files will be downloaded local_path : string Local directory, where found datasets are stored. start_date : datetime Start date of date range interval. end_date : datetime End date of date range interval. max_workers : int, optional Number of parallel downloads (default: 1). coords : list of float, optional A custom polygon using EPSG:4326 decimal degrees (default: None). limit : int, optional Filter used to limit the returned results (default: None). """ local_path = str2path(local_path) service_search = f"{self.base_url}/data/search-products/os" service_download = f"{self.base_url}/data/download/" dataset_parameters = {"format": "json", "pi": product} fmt = "%Y-%m-%dT%H:%M:%S.%fZ" dataset_parameters["dtstart"] = start_date.strftime(fmt) dataset_parameters["dtend"] = end_date.strftime(fmt) if coords: dataset_parameters["geo"] = "POLYGON(({}))".format(",".join( [f"{coord[0]} {coord[1]}" for coord in coords])) url = service_search response = requests.get(url, dataset_parameters) found_data_sets = response.json() url = service_search dataset_parameters['si'] = 0 items_per_page = 10 if "type" in found_data_sets: if found_data_sets["type"] == "ExceptionReport": msg = found_data_sets["exceptions"][0]["exceptionText"] raise RuntimeError(msg) all_found_data_sets = [] while dataset_parameters['si'] < found_data_sets['totalResults']: response = requests.get(url, dataset_parameters) found_data_sets = response.json() all_found_data_sets.append(found_data_sets) dataset_parameters[ 'si'] = dataset_parameters['si'] + items_per_page download_url_list = [] local_file_list = [] if all_found_data_sets: for found_data_sets in all_found_data_sets: for selected_data_set in found_data_sets["features"]: coll_id = selected_data_set["properties"][ "parentIdentifier"] product_id = selected_data_set["properties"]["identifier"] url_temp = f"collections/{coll_id}/products/{product_id}" download_url_list.append(service_download + urllib.parse.quote(url_temp)) local_file_list.append(local_path / product_id) print(f"Found {found_data_sets['totalResults']} data sets") if limit and len(download_url_list) > limit: print(f"Limited to {limit} data sets") download_url_list = download_url_list[:limit] local_file_list = local_file_list[:limit] download_url_list, local_file_list = zip(*sorted(zip( download_url_list, local_file_list))) concurrent_download(self.download_file, download_url_list, local_file_list, max_workers) else: print("No data sets found")
def _generate_token(self, consumer_key, consumer_secret): """ Function to generate an access token for interacting with EUMETSAT Data Service APIs. Parameters ---------- consumer_key : str The consumer key as a string consumer_secret : str The consumer secret as a string. Returns ------- access_token : str An access token (if pass) or None (if fail). """ token_url = f"{self.base_url}/token" userpass = f"{consumer_key}:{consumer_secret}" encoded_userpass = base64.b64encode(userpass.encode()).decode() headers = {"Authorization": f"Basic {encoded_userpass}"} data_payload = {"grant_type": "client_credentials"} response = requests.post(token_url, headers=headers, data=data_payload) self._assert_response(response) return response.json()["access_token"] def _assert_response(self, response, success_code=200): """ Function to check API key generation response. Will return an error if the key retrieval was not successful. Parameters ---------- response : obj The authentication response. success_code : int, optional The expected sucess code (default: 200). Returns ------- result : None or str Nothing if success, error message if fail. """ msg = f"API Request Failed: {response.status_code}\n{response.content}" assert (response.status_code == success_code), msg
[docs] def concurrent_download(download_func, download_url_list, local_file_list, max_workers=1): """ Threaded file download. Parameters ---------- download_func : function Download function. download_url_list : list Download URLs. local_file_list : list Local filenames. max_workers : int, optional Number of concurrent downloads (default: 1). """ if max_workers == 1: with tqdm(desc="Downloads", total=len(download_url_list)) as pbar: for download_url, local_file in zip(download_url_list, local_file_list): download_func(download_url, local_file) pbar.update() else: with tqdm(desc="Downloads", total=len(download_url_list)) as pbar: with concurrent.futures.ThreadPoolExecutor( max_workers=max_workers) as executor: future_down = { executor.submit(download_func, url, local_file): url for url, local_file in zip(download_url_list, local_file_list) } for future in concurrent.futures.as_completed(future_down): _ = future_down[future] pbar.update()
[docs] def str2path(path): """ Convert str path to pathlib.Path object. Parameters ---------- path : str, pathlib.Path Path. Returns ------- path : pathlib.Path Pathlib path. """ if not isinstance(path, (WindowsPath, PosixPath)): path = Path(path) return path