import json import os from datetime import date, datetime, timedelta from logging import FileHandler, getLogger from os import path from pathlib import Path from time import sleep from typing import Callable, Optional import typer from pymongo.operations import UpdateOne from sqlalchemy.orm.session import Session from client.measures_client import ClimaMeasuresClient from config.settings import CLIMA_API_URL from database.mongo import mongo_measures from database.sqlalchemy import SessionLocal from models.fincas import Finca from models.users import User from schemas.measures import Measure # Logging # Quiza deberiamos centralizar todos los loggers en su propio modulo filename = path.abspath("./import_measures.log") logger = getLogger("import_measures") # handler = FileHandler(filename) # logger.addHandler(handler) def _insert_or_update_data(finca_data, data): measures = [] for x in data: measures.append({"temp": x.temp, "precip": x.precip, "date": x.date}) # Probamos insertar todas, si no se pudo, # intentamos agregar/actualizar una por una. try: # Eso puede fallar por limite de inserciones o por tener measures ya existentes finca_data.insert_many(measures) except: msg = "\tNo se pudieron insertar measures en bulk, probando con actualizar uno por uno" logger.info(msg) typer.echo(msg) operations = [] max_date = datetime.min for measure in data: max_date = measure.date if measure.date > max_date else max_date operations.append( UpdateOne( {"date": measure.date}, {"$set": dict(measure)}, upsert=True, ) ) finca_data.bulk_write(operations) last_date = max(measure["date"] for measure in measures) return last_date def _import_finca_range(date_from, date_to, code, client: ClimaMeasuresClient): finca_data = mongo_measures[code] # Fecha movil que vamos incrementando start_date = date_from typer.echo(f"Importando measures de {code}") logger.info(f"Importando measures de {code}") while start_date < date_to: # 10 dias mas o la fecha maxima del rango, la que sea mas chica end_date = min(start_date + timedelta(days=10), date_to) msg = f"Consultando datos desde {start_date} hasta {end_date}]" logger.info(msg) typer.echo(msg) data = [] try: data = client.get_station_measures(code, start_date, end_date) except: logger.error( f"No se pudo importar las measures de la estacion {code}, ver logs del cliente para mas info" ) # Si hay un error al traer una porcion de los datos, directamente salimos, # ya que el error no depende de las fechas pedidas # Por eso return y no continue return if len(data) > 0: _insert_or_update_data(finca_data, data) else: msg = "Porcion de datos vacia" logger.info(msg) typer.echo(msg) start_date = end_date if end_date != date_to: sleep(5) def import_range( date_from: datetime = typer.Argument(...), date_to: datetime = typer.Argument(...), fincas_codes: list[int] = typer.Argument(None), ): typer.echo( f"Importando datos desde {date_from} hasta {date_to} para las fincas: {fincas_codes if len(fincas_codes) > 0 else 'todas'}" ) if not (date_from < date_to): typer.echo("La fecha de inicio debe se menor a la fecha de termino") typer.echo("Para ver los argumentos usar `python main.py import-range --help`") db: Session = SessionLocal() user = db.query(User).first() client = ClimaMeasuresClient( token=user.token, url=CLIMA_API_URL, ) # Si el campo opcional fincas_codes no es dado, usamos todas las fincas de la db if len(fincas_codes) == 0: typer.echo("No fincas dadas") db: Session = SessionLocal() fincas = db.query(Finca).all() for finca in fincas: _import_finca_range(date_from, date_to, finca.station_code, client) else: for code in fincas_codes: _import_finca_range(date_from, date_to, str(code), client) db.commit() db.close() def import_measures(): db: Session = SessionLocal() fincas = db.query(Finca).all() user = db.query(User).first() # Importamos finca por finca for finca in fincas: typer.echo(f"Importando measures de {finca.station_code}") logger.info(f"Importando measures de {finca.station_code}") finca_data = mongo_measures[finca.station_code] # Desde la ultima vez date_from = finca.last_synced if date_from is None: typer.echo( f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}." ) logger.warning( f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}." ) continue # Las muestras que habría normalmente en 10 días de datos con datos cada 10 minutos # 24 horas x 6 muestras limit = 1440 print(f"\tfrom: {date_from}") # Cliente de api clima client = ClimaMeasuresClient( token=user.token, url=CLIMA_API_URL, ) # Probamos traer los datos # Ante cualquier error seguimos con la proxima estacion data = None try: data = client.get_station_measures(finca.station_code, date_from, limit=limit) except Exception as e: logger.error( f"No se pudo importar las measures de la estacion {finca.station_code}: {e}, ver logs del cliente para mas info" ) continue if len(data) > 0: last_date = _insert_or_update_data(finca_data, data) finca.last_synced = last_date else: typer.echo(f"\tNo hay datos a partir de {date_from}") db.commit() db.close() def _apply_function_conditional(station_code: str, func: Callable[[Finca], None]): db: Session = SessionLocal() if station_code is None: fincas = db.query(Finca).all() else: fincas = db.query(Finca).filter(Finca.station_code == station_code) for finca in fincas: func(finca) db.close() def _get_measures(finca: Finca): finca_data = mongo_measures[finca.station_code] query = finca_data.find() for x in query: typer.echo(x) def get_measures(station_code: str = typer.Argument(None)): _apply_function_conditional(station_code, _get_measures) def _delete_measures(finca: str): finca_data = mongo_measures[finca.station_code] finca_data.delete_many({}) def delete_measures(station_code: str = typer.Argument(None)): _apply_function_conditional(station_code, _delete_measures) def import_json_dump( dumppath: Path = typer.Option("", "--dump"), station_code: str = typer.Option("", "--code"), all: bool = typer.Option(False, "--all"), ): db: Session = SessionLocal() fincas = [] if all: for filename in os.listdir("dumps"): code = Path(filename).stem finca = db.query(Finca).filter(Finca.station_code == code).first() fincas.append(finca) else: finca = db.query(Finca).filter(Finca.station_code == station_code).first() fincas.append(finca) for finca in fincas: if finca is None: print(f"La finca con el código de estación {station_code} no existe.") continue typer.echo(f"Importando datos de la estación: {finca.station_code}") if all: dumpfile = open(Path(f"dumps/{finca.station_code}.json")) else: dumpfile = open(dumppath) dump = json.load(dumpfile) finca_data = mongo_measures[finca.station_code] # Create date index, otherwise it would take ages to upsert every document # REVIEW: Might want to move this index code somewhere else indexes = finca_data.index_information() if indexes.get("date_1") is None: typer.echo("\tCreating `date` index") finca_data.create_index("date", unique=True) operations = [] max_date = datetime.min with typer.progressbar(dump, label="Procesando datos") as progress: for measure in progress: new_measure = Measure(**measure) max_date = new_measure.date if new_measure.date > max_date else max_date operations.append( UpdateOne( {"date": new_measure.date}, {"$set": dict(new_measure)}, upsert=True, ) ) last_synced = ( finca.last_synced if finca.last_synced is not None else datetime.min ) if max_date > last_synced: typer.echo(f"Updating last_synced: {max_date}") finca.last_synced = max_date typer.echo("Bulk writing operations in mongo...") result = finca_data.bulk_write(operations) typer.echo(f"Updated measures: {result.modified_count}") typer.echo(f"Inserted measures {result.upserted_count}") dumpfile.close() db.commit() db.close()