Source code for obplatform.connector

"""APIs to query and download archive from the remote database"""
from __future__ import annotations

import time
from typing import Any, Dict, Iterator, List, Optional, Tuple

import requests
from tqdm import tqdm

from .logger import logger

ENDPOINT = "https://api.ashraeobdatabase.com"


[docs]class Connector: """Connector to the remote database"""
[docs] def __init__(self, endpoint: str = ENDPOINT) -> None: """Initialize the connector Args: endpoint (str): The endpoint of the remote database, currently this should be "https://api.ashraeobdatabase.com" """ self.endpoint = endpoint self.session = requests.Session()
[docs] def list_behaviors(self) -> List[Dict[str, Any]]: """Lists all behaviors available in the database Returns: List of dicts showing all behaviors in the database. The "key" field is what users should use to query and download the data. The "label" field is what is displayed to users on the website For example, {"label": "Occupant Presence", "key": "Occupancy", "disabled": false} "Occupant Presence" is the behavior name shown on the website, "Occupancy" is what users should use in API and other functions in this module to query and download the data from the database. """ response = self.session.get(self.endpoint + "/api/v1/behaviors") response.raise_for_status() return response.json() # type: ignore
def _start_export_job(self, behavior_ids: List[str], studies: List[str]) -> str: """Inform server to start compressing data files for the given behaviors and studies Args: behavior_ids (List[str]): List of behavior ids to download studies (List[str]): List of study ids to download Returns: The URI of the job, used by _poll_export_job to check the status """ response = self.session.post( self.endpoint + "/api/v1/exports", json={ "behaviors": behavior_ids, "studies": studies, }, ) response.raise_for_status() return response.headers["location"] def _poll_export_job(self, job_uri: str) -> requests.Response: """Poll the export job status Args: job_uri (str): The URI of the job, provided by _start_export_job Returns: The response of the server. The download has not started yet. See "requests" package document for details on option stream=True. """ logger.info("Waiting for server to finish...") while True: response = self.session.get(self.endpoint + job_uri, stream=True) response.raise_for_status() if response.status_code == 202: logger.info("Polling status") response.close() time.sleep(1) continue elif response.status_code == 200: return response
[docs] def download_export( self, filename: str, behavior_ids: List[int | str], studies: List[int | str], show_progress_bar: bool = False, chunk_size: Optional[int] = 1000 * 1024, ) -> None: """Download the data archive (ZIP) for the given behaviors Args: filename (str): The filename to save the archive behavior_ids (List[int | str]): List of behavior ids to download studies (List[int | str]): List of study ids to download show_progress_bar (bool): Whether to show a progress bar chunk_size (Optional[int]): The size of each chunk to download. If None, download the whole file. Default is 1000 * 1024 Bytes. """ _behavior_ids: List[str] = list(map(str, behavior_ids)) # Because there is an unfixed bug on the server side, it currently only # accepts strings as study ids. _studies: List[str] = list(map(str, studies)) job_uri = self._start_export_job(_behavior_ids, _studies) response = self._poll_export_job(job_uri) total_size_in_bytes = int(response.headers.get("content-length", 0)) logger.info(f"Downloading {filename}") with ProgressBar(total_size_in_bytes, show_progress_bar) as progress_bar: with open(filename, "wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): progress_bar.update(len(chunk)) f.write(chunk)
[docs] def check_health(self) -> bool: """Check the health of the remote database Returns: True if the backend server is working, False otherwise """ response = self.session.get(self.endpoint + "/api/v1/health") return response.json()["status"] == "ok" # type: ignore
def _yield_payload_dict_item( self, params: Dict[str, Any] ) -> Iterator[Tuple[str, str]]: """ Returns a dictionary key-value pair. Used by _get_payload_dict(params). Args: params: Params dictionary. See _get_payload_dict(params) for details. Yields: A dictionary key-value pair. """ for key, list_ in params.items(): for i, item in enumerate(list_): if not isinstance(item, dict): yield f"{key}[{i}]", item else: for subkey, subvalue in item.items(): yield f"{key}[{i}][{subkey}]", subvalue def _get_payload(self, params: Dict[str, Any]) -> Dict[str, str]: """ Returns a dictionary of the form:: { 'behaviors[0]': 'Appliance_Usage', 'behaviors[1]': 'Occupancy_Measurement', 'countries[0]': 'USA', 'countries[1]': 'UK', 'cities[0]': 'Palo Alto', 'cities[1]': 'Coventry', 'cities[2]': 'San Antonio', 'buildings[0][building_type]': 'Educational', 'buildings[0][room_type]': 'Classroom', 'buildings[1][building_type]': 'Educational', 'buildings[1][room_type]': 'Office', 'buildings[2][building_type]': 'Residential', 'buildings[2][room_type]': 'Single-Family House' } You should not call this method directly. Please use public methods instead. The purpose of this method is to serialize query parameters, which are then passed to requests package and its underlining urllib3 function call as payload. Args: params: Params dictionary. Returns: Payload dictionary, used by requests. """ return dict(self._yield_payload_dict_item(params))
[docs] def list_behaviors_in_studies( self, studies: List[int | str] ) -> List[Dict[str, Any]]: """List available behaviors in each study Args: studies (int | str): List of study ids to query Returns: JSON encoded result of study id and behaviors in the study """ payload = self._get_payload({"studies": studies}) response = self.session.get(self.endpoint + "/api/v1/behaviors", params=payload) response.raise_for_status() return response.json() # type: ignore
[docs] def list_studies( self, behaviors: List[int | str], countries: List[str], cities: List[str], buildings: List[Dict[str, str]], ) -> List[Dict[str, Any]]: """Query available studies based on behaviors, countries, cities and buildings. This function works in the same way as clicking through the "Export" page on the website. Args: behaviors (List[int | str]): List of behavior ids to query countries (List[str]): List of country names to query cities (List[str]): List of city names to query buildings (List[Dict[str, Any]]): List of building types and room types to query. Must be in the following format:: [ { "building_type": "Educational", "room_type": "Classroom", }, { "building_type": "Educational", "room_type": "Office", }, { "building_type": "Residential", "room_type": "Single-Family House", }, ] Returns: JSON encoded result of study ids available in the database filtered by the criteria. """ payload = self._get_payload( params={ "behaviors": behaviors, "countries": countries, "cities": cities, "buildings": buildings, } ) response = self.session.get(self.endpoint + "/api/v1/studies", params=payload) response.raise_for_status() return response.json() # type: ignore
class ProgressBar: """Progress bar for download progress Attributes: total_size_in_bytes: Total size of the file to download current_size_in_bytes: Current size of the chunks downloaded progress_bar: A tqdm progress bar """ def __init__(self, total_size_in_bytes: int = 0, use_tqdm: bool = False) -> None: """Initialize progress logger and progress bar Args: total_size_in_bytes: Total size of the file to download use_tqdm: Whether to use tqdm progress bar """ self.total_size_in_bytes: int = total_size_in_bytes self.current_size_in_bytes: int = 0 self.progress_bar: Optional[Any] = None if use_tqdm: self.progress_bar = tqdm( total=total_size_in_bytes, unit="iB", unit_scale=True ) # Sometimes content length is not available. However, tqdm supports total=0 if total_size_in_bytes: logger.info(f"Total size: {total_size_in_bytes} bytes") def update(self, chunk_size_in_bytes: int) -> None: """Log current chunk size and update the progress bar Args: chunk_size_in_bytes: The size of the current chunk downloaded """ self.current_size_in_bytes += chunk_size_in_bytes logger.debug(f"Current chunk size: {chunk_size_in_bytes}") if self.progress_bar: self.progress_bar.update(chunk_size_in_bytes) def close(self) -> None: """Show total bytes downloaded and clean up""" if self.progress_bar: self.progress_bar.close() logger.info(f"Downloaded {self.current_size_in_bytes} bytes") def __enter__(self) -> ProgressBar: return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() __all__ = ["Connector", "logger"]