import logging from datetime import datetime from os import path from typing import List, Optional import requests from fastapi.encoders import jsonable_encoder from requests.models import Response from database.mongo import mongo_measures from schemas.fincas import Finca from schemas.measures import Measure # Logging # Quiza deberiamos centralizar todos los loggers en su propio modulo filename = path.abspath("./measures_client.log") logger = logging.getLogger("measures_client") # handler = logging.FileHandler(filename) # logger.addHandler(handler) class ClimaMeasuresClient: token: str url: str def __init__(self, token: str, url: str) -> None: self.token = token self.url = url def _get_raw_station_measures( self, station_code: str, datetime_from: datetime, datetime_to: Optional[datetime] = None, limit: Optional[int] = None ): parameters: dict[str, str | int] = { "token": self.token, "date_from": datetime_from.isoformat(timespec="microseconds"), } if(datetime_to): parameters["date_to"] = datetime_to.isoformat(timespec="microseconds") if(limit): parameters["limit"] = limit url = self.url + f"import/{station_code}" response = requests.get(url, parameters) if response.ok: return response.json() elif response.status_code == 400: logger.error(f"Bad Request al consultar measures: Station {station_code}") elif response.status_code == 403: logger.error( f"Acceso denegado: Station {station_code}, Token: {self.token}" ) elif response.status_code == 404: logger.error( f"Estacion no encontrada al consultar sus datos: Station {station_code}" ) else: logger.error(f"Error {response.status_code}: Station {station_code}") raise Exception(f"Response status({response.text}) no ok al consultar muestras") def get_station_measures( self, station_code: str, datetime_from: datetime, datetime_to: Optional[datetime] = None, limit: Optional[int] = None ) -> List[Measure]: if(limit is None and datetime_to is None): raise ValueError("Los parĂ¡metros datetime_to y max_count no pueden ser ambos None, incluir al menos uno") data = self._get_raw_station_measures(station_code, datetime_from, datetime_to, limit) return list(map(lambda x: Measure(**x), data)) def _get_raw_stations( self, company_code: str, ) -> Response: parameters = { "token": self.token, } url = self.url + f"get-company-stations/{company_code}" response = requests.get(url, parameters) if response.status_code == 200: return response.json() raise PermissionError() def get_stations(self, company_code: str) -> List[Finca]: data = self._get_raw_stations(company_code) return list(map(lambda x: Finca(**x), data))