|
@@ -0,0 +1,294 @@
|
|
|
|
|
+import json
|
|
|
|
|
+import os
|
|
|
|
|
+from datetime import date, datetime, timedelta
|
|
|
|
|
+from logging import FileHandler, getLogger
|
|
|
|
|
+from os import path
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from time import sleep
|
|
|
|
|
+from typing import Callable, Optional
|
|
|
|
|
+
|
|
|
|
|
+import typer
|
|
|
|
|
+from pymongo.operations import UpdateOne
|
|
|
|
|
+from sqlalchemy.orm.session import Session
|
|
|
|
|
+
|
|
|
|
|
+from client.measures_client import ClimaMeasuresClient
|
|
|
|
|
+from config.settings import CLIMA_API_URL
|
|
|
|
|
+from database.mongo import mongo_measures
|
|
|
|
|
+from database.sqlalchemy import SessionLocal
|
|
|
|
|
+from models.fincas import Finca
|
|
|
|
|
+from models.users import User
|
|
|
|
|
+from schemas.measures import Measure
|
|
|
|
|
+
|
|
|
|
|
+# Logging
|
|
|
|
|
+# Quiza deberiamos centralizar todos los loggers en su propio modulo
|
|
|
|
|
+filename = path.abspath("./import_measures.log")
|
|
|
|
|
+logger = getLogger("import_measures")
|
|
|
|
|
+# handler = FileHandler(filename)
|
|
|
|
|
+# logger.addHandler(handler)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _insert_or_update_data(finca_data, data):
|
|
|
|
|
+ measures = []
|
|
|
|
|
+ for x in data:
|
|
|
|
|
+ measures.append({"temp": x.temp, "precip": x.precip, "date": x.date})
|
|
|
|
|
+
|
|
|
|
|
+ # Probamos insertar todas, si no se pudo,
|
|
|
|
|
+ # intentamos agregar/actualizar una por una.
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Eso puede fallar por limite de inserciones o por tener measures ya existentes
|
|
|
|
|
+ finca_data.insert_many(measures)
|
|
|
|
|
+ except:
|
|
|
|
|
+ msg = "\tNo se pudieron insertar measures en bulk, probando con actualizar uno por uno"
|
|
|
|
|
+ logger.info(msg)
|
|
|
|
|
+ typer.echo(msg)
|
|
|
|
|
+ operations = []
|
|
|
|
|
+ max_date = datetime.min
|
|
|
|
|
+ for measure in data:
|
|
|
|
|
+ max_date = measure.date if measure.date > max_date else max_date
|
|
|
|
|
+
|
|
|
|
|
+ operations.append(
|
|
|
|
|
+ UpdateOne(
|
|
|
|
|
+ {"date": measure.date},
|
|
|
|
|
+ {"$set": dict(measure)},
|
|
|
|
|
+ upsert=True,
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+ finca_data.bulk_write(operations)
|
|
|
|
|
+
|
|
|
|
|
+ last_date = max(measure["date"] for measure in measures)
|
|
|
|
|
+ return last_date
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _import_finca_range(date_from, date_to, code, client: ClimaMeasuresClient):
|
|
|
|
|
+ finca_data = mongo_measures[code]
|
|
|
|
|
+
|
|
|
|
|
+ # Fecha movil que vamos incrementando
|
|
|
|
|
+ start_date = date_from
|
|
|
|
|
+ typer.echo(f"Importando measures de {code}")
|
|
|
|
|
+ logger.info(f"Importando measures de {code}")
|
|
|
|
|
+ while start_date < date_to:
|
|
|
|
|
+ # 10 dias mas o la fecha maxima del rango, la que sea mas chica
|
|
|
|
|
+ end_date = min(start_date + timedelta(days=10), date_to)
|
|
|
|
|
+
|
|
|
|
|
+ msg = f"Consultando datos desde {start_date} hasta {end_date}]"
|
|
|
|
|
+ logger.info(msg)
|
|
|
|
|
+ typer.echo(msg)
|
|
|
|
|
+ data = []
|
|
|
|
|
+ try:
|
|
|
|
|
+ data = client.get_station_measures(code, start_date, end_date)
|
|
|
|
|
+ except:
|
|
|
|
|
+ logger.error(
|
|
|
|
|
+ f"No se pudo importar las measures de la estacion {code}, ver logs del cliente para mas info"
|
|
|
|
|
+ )
|
|
|
|
|
+ # Si hay un error al traer una porcion de los datos, directamente salimos,
|
|
|
|
|
+ # ya que el error no depende de las fechas pedidas
|
|
|
|
|
+ # Por eso return y no continue
|
|
|
|
|
+ return
|
|
|
|
|
+ if len(data) > 0:
|
|
|
|
|
+ _insert_or_update_data(finca_data, data)
|
|
|
|
|
+ else:
|
|
|
|
|
+ msg = "Porcion de datos vacia"
|
|
|
|
|
+ logger.info(msg)
|
|
|
|
|
+ typer.echo(msg)
|
|
|
|
|
+ start_date = end_date
|
|
|
|
|
+ if end_date != date_to:
|
|
|
|
|
+ sleep(5)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def import_range(
|
|
|
|
|
+ date_from: datetime = typer.Argument(...),
|
|
|
|
|
+ date_to: datetime = typer.Argument(...),
|
|
|
|
|
+ fincas_codes: list[int] = typer.Argument(None),
|
|
|
|
|
+):
|
|
|
|
|
+ typer.echo(
|
|
|
|
|
+ f"Importando datos desde {date_from} hasta {date_to} para las fincas: {fincas_codes if len(fincas_codes) > 0 else 'todas'}"
|
|
|
|
|
+ )
|
|
|
|
|
+ if not (date_from < date_to):
|
|
|
|
|
+ typer.echo("La fecha de inicio debe se menor a la fecha de termino")
|
|
|
|
|
+ typer.echo("Para ver los argumentos usar `python main.py import-range --help`")
|
|
|
|
|
+ db: Session = SessionLocal()
|
|
|
|
|
+ user = db.query(User).first()
|
|
|
|
|
+ client = ClimaMeasuresClient(
|
|
|
|
|
+ token=user.token,
|
|
|
|
|
+ url=CLIMA_API_URL,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Si el campo opcional fincas_codes no es dado, usamos todas las fincas de la db
|
|
|
|
|
+ if len(fincas_codes) == 0:
|
|
|
|
|
+ typer.echo("No fincas dadas")
|
|
|
|
|
+ db: Session = SessionLocal()
|
|
|
|
|
+ fincas = db.query(Finca).all()
|
|
|
|
|
+ for finca in fincas:
|
|
|
|
|
+ _import_finca_range(date_from, date_to, finca.station_code, client)
|
|
|
|
|
+ else:
|
|
|
|
|
+ for code in fincas_codes:
|
|
|
|
|
+ _import_finca_range(date_from, date_to, str(code), client)
|
|
|
|
|
+ db.commit()
|
|
|
|
|
+ db.close()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def import_measures():
|
|
|
|
|
+ db: Session = SessionLocal()
|
|
|
|
|
+ fincas = db.query(Finca).all()
|
|
|
|
|
+ user = db.query(User).first()
|
|
|
|
|
+
|
|
|
|
|
+ # Importamos finca por finca
|
|
|
|
|
+ for finca in fincas:
|
|
|
|
|
+ typer.echo(f"Importando measures de {finca.station_code}")
|
|
|
|
|
+ logger.info(f"Importando measures de {finca.station_code}")
|
|
|
|
|
+ finca_data = mongo_measures[finca.station_code]
|
|
|
|
|
+
|
|
|
|
|
+ # Desde la ultima vez
|
|
|
|
|
+ date_from = finca.last_synced
|
|
|
|
|
+ if date_from is None:
|
|
|
|
|
+ typer.echo(
|
|
|
|
|
+ f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
|
|
|
|
|
+ )
|
|
|
|
|
+ logger.warning(
|
|
|
|
|
+ f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
|
|
|
|
|
+ )
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Las muestras que habría normalmente en 10 días de datos con datos cada 10 minutos
|
|
|
|
|
+ # 24 horas x 6 muestras
|
|
|
|
|
+ limit = 1440
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\tfrom: {date_from}")
|
|
|
|
|
+
|
|
|
|
|
+ # Cliente de api clima
|
|
|
|
|
+ client = ClimaMeasuresClient(
|
|
|
|
|
+ token=user.token,
|
|
|
|
|
+ url=CLIMA_API_URL,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Probamos traer los datos
|
|
|
|
|
+ # Ante cualquier error seguimos con la proxima estacion
|
|
|
|
|
+ data = None
|
|
|
|
|
+ try:
|
|
|
|
|
+ data = client.get_station_measures(finca.station_code, date_from, limit=limit)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(
|
|
|
|
|
+ f"No se pudo importar las measures de la estacion {finca.station_code}: {e}, ver logs del cliente para mas info"
|
|
|
|
|
+ )
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if len(data) > 0:
|
|
|
|
|
+
|
|
|
|
|
+ last_date = _insert_or_update_data(finca_data, data)
|
|
|
|
|
+
|
|
|
|
|
+ finca.last_synced = last_date
|
|
|
|
|
+ else:
|
|
|
|
|
+ typer.echo(f"\tNo hay datos a partir de {date_from}")
|
|
|
|
|
+
|
|
|
|
|
+ db.commit()
|
|
|
|
|
+
|
|
|
|
|
+ db.close()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _apply_function_conditional(station_code: str, func: Callable[[Finca], None]):
|
|
|
|
|
+ db: Session = SessionLocal()
|
|
|
|
|
+
|
|
|
|
|
+ if station_code is None:
|
|
|
|
|
+ fincas = db.query(Finca).all()
|
|
|
|
|
+ else:
|
|
|
|
|
+ fincas = db.query(Finca).filter(Finca.station_code == station_code)
|
|
|
|
|
+
|
|
|
|
|
+ for finca in fincas:
|
|
|
|
|
+ func(finca)
|
|
|
|
|
+
|
|
|
|
|
+ db.close()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _get_measures(finca: Finca):
|
|
|
|
|
+ finca_data = mongo_measures[finca.station_code]
|
|
|
|
|
+
|
|
|
|
|
+ query = finca_data.find()
|
|
|
|
|
+ for x in query:
|
|
|
|
|
+ typer.echo(x)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_measures(station_code: str = typer.Argument(None)):
|
|
|
|
|
+ _apply_function_conditional(station_code, _get_measures)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _delete_measures(finca: str):
|
|
|
|
|
+ finca_data = mongo_measures[finca.station_code]
|
|
|
|
|
+ finca_data.delete_many({})
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def delete_measures(station_code: str = typer.Argument(None)):
|
|
|
|
|
+ _apply_function_conditional(station_code, _delete_measures)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def import_json_dump(
|
|
|
|
|
+ dumppath: Path = typer.Option("", "--dump"),
|
|
|
|
|
+ station_code: str = typer.Option("", "--code"),
|
|
|
|
|
+ all: bool = typer.Option(False, "--all"),
|
|
|
|
|
+):
|
|
|
|
|
+ db: Session = SessionLocal()
|
|
|
|
|
+ fincas = []
|
|
|
|
|
+
|
|
|
|
|
+ if all:
|
|
|
|
|
+ for filename in os.listdir("dumps"):
|
|
|
|
|
+ code = Path(filename).stem
|
|
|
|
|
+ finca = db.query(Finca).filter(Finca.station_code == code).first()
|
|
|
|
|
+ fincas.append(finca)
|
|
|
|
|
+ else:
|
|
|
|
|
+ finca = db.query(Finca).filter(Finca.station_code == station_code).first()
|
|
|
|
|
+ fincas.append(finca)
|
|
|
|
|
+
|
|
|
|
|
+ for finca in fincas:
|
|
|
|
|
+ if finca is None:
|
|
|
|
|
+ print(f"La finca con el código de estación {station_code} no existe.")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ typer.echo(f"Importando datos de la estación: {finca.station_code}")
|
|
|
|
|
+
|
|
|
|
|
+ if all:
|
|
|
|
|
+ dumpfile = open(Path(f"dumps/{finca.station_code}.json"))
|
|
|
|
|
+ else:
|
|
|
|
|
+ dumpfile = open(dumppath)
|
|
|
|
|
+
|
|
|
|
|
+ dump = json.load(dumpfile)
|
|
|
|
|
+
|
|
|
|
|
+ finca_data = mongo_measures[finca.station_code]
|
|
|
|
|
+
|
|
|
|
|
+ # Create date index, otherwise it would take ages to upsert every document
|
|
|
|
|
+ # REVIEW: Might want to move this index code somewhere else
|
|
|
|
|
+ indexes = finca_data.index_information()
|
|
|
|
|
+ if indexes.get("date_1") is None:
|
|
|
|
|
+ typer.echo("\tCreating `date` index")
|
|
|
|
|
+ finca_data.create_index("date", unique=True)
|
|
|
|
|
+
|
|
|
|
|
+ operations = []
|
|
|
|
|
+ max_date = datetime.min
|
|
|
|
|
+ with typer.progressbar(dump, label="Procesando datos") as progress:
|
|
|
|
|
+ for measure in progress:
|
|
|
|
|
+ new_measure = Measure(**measure)
|
|
|
|
|
+ max_date = new_measure.date if new_measure.date > max_date else max_date
|
|
|
|
|
+
|
|
|
|
|
+ operations.append(
|
|
|
|
|
+ UpdateOne(
|
|
|
|
|
+ {"date": new_measure.date},
|
|
|
|
|
+ {"$set": dict(new_measure)},
|
|
|
|
|
+ upsert=True,
|
|
|
|
|
+ )
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ last_synced = (
|
|
|
|
|
+ finca.last_synced if finca.last_synced is not None else datetime.min
|
|
|
|
|
+ )
|
|
|
|
|
+ if max_date > last_synced:
|
|
|
|
|
+ typer.echo(f"Updating last_synced: {max_date}")
|
|
|
|
|
+ finca.last_synced = max_date
|
|
|
|
|
+
|
|
|
|
|
+ typer.echo("Bulk writing operations in mongo...")
|
|
|
|
|
+ result = finca_data.bulk_write(operations)
|
|
|
|
|
+
|
|
|
|
|
+ typer.echo(f"Updated measures: {result.modified_count}")
|
|
|
|
|
+ typer.echo(f"Inserted measures {result.upserted_count}")
|
|
|
|
|
+
|
|
|
|
|
+ dumpfile.close()
|
|
|
|
|
+
|
|
|
|
|
+ db.commit()
|
|
|
|
|
+ db.close()
|