| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import logging
- from datetime import datetime
- from os import path
- from typing import List, Optional
- import requests
- from fastapi.encoders import jsonable_encoder
- from requests.models import Response
- from database.mongo import mongo_measures
- from schemas.fincas import Finca
- from schemas.measures import Measure
- # Logging
- # Quiza deberiamos centralizar todos los loggers en su propio modulo
- filename = path.abspath("./measures_client.log")
- logger = logging.getLogger("measures_client")
- # handler = logging.FileHandler(filename)
- # logger.addHandler(handler)
- class ClimaMeasuresClient:
- token: str
- url: str
- def __init__(self, token: str, url: str) -> None:
- self.token = token
- self.url = url
- def _get_raw_station_measures(
- self, station_code: str, datetime_from: datetime, datetime_to: Optional[datetime] = None, limit: Optional[int] = None
- ):
- parameters: dict[str, str | int] = {
- "token": self.token,
- "date_from": datetime_from.isoformat(timespec="microseconds"),
- }
- if(datetime_to):
- parameters["date_to"] = datetime_to.isoformat(timespec="microseconds")
- if(limit):
- parameters["limit"] = limit
- url = self.url + f"import/{station_code}"
- response = requests.get(url, parameters)
- if response.ok:
- return response.json()
- elif response.status_code == 400:
- logger.error(f"Bad Request al consultar measures: Station {station_code}")
- elif response.status_code == 403:
- logger.error(
- f"Acceso denegado: Station {station_code}, Token: {self.token}"
- )
- elif response.status_code == 404:
- logger.error(
- f"Estacion no encontrada al consultar sus datos: Station {station_code}"
- )
- else:
- logger.error(f"Error {response.status_code}: Station {station_code}")
- raise Exception(f"Response status({response.text}) no ok al consultar muestras")
- def get_station_measures(
- self, station_code: str, datetime_from: datetime, datetime_to: Optional[datetime] = None, limit: Optional[int] = None
- ) -> List[Measure]:
- if(limit is None and datetime_to is None):
- raise ValueError("Los parámetros datetime_to y max_count no pueden ser ambos None, incluir al menos uno")
- data = self._get_raw_station_measures(station_code, datetime_from, datetime_to, limit)
- return list(map(lambda x: Measure(**x), data))
- def _get_raw_stations(
- self,
- company_code: str,
- ) -> Response:
- parameters = {
- "token": self.token,
- }
- url = self.url + f"get-company-stations/{company_code}"
- response = requests.get(url, parameters)
- if response.status_code == 200:
- return response.json()
- raise PermissionError()
- def get_stations(self, company_code: str) -> List[Finca]:
- data = self._get_raw_stations(company_code)
- return list(map(lambda x: Finca(**x), data))
|