| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- import json
- import os
- from datetime import date, datetime, timedelta
- from logging import FileHandler, getLogger
- from os import path
- from pathlib import Path
- from time import sleep
- from typing import Callable, Optional
- import typer
- from pymongo.operations import UpdateOne
- from sqlalchemy.orm.session import Session
- from client.measures_client import ClimaMeasuresClient
- from config.settings import CLIMA_API_URL
- from database.mongo import mongo_measures
- from database.sqlalchemy import SessionLocal
- from models.fincas import Finca
- from models.users import User
- from schemas.measures import Measure
- # Logging
- # Quiza deberiamos centralizar todos los loggers en su propio modulo
- filename = path.abspath("./import_measures.log")
- logger = getLogger("import_measures")
- # handler = FileHandler(filename)
- # logger.addHandler(handler)
- def _insert_or_update_data(finca_data, data):
- measures = []
- for x in data:
- measures.append({"temp": x.temp, "precip": x.precip, "date": x.date})
- # Probamos insertar todas, si no se pudo,
- # intentamos agregar/actualizar una por una.
- try:
- # Eso puede fallar por limite de inserciones o por tener measures ya existentes
- finca_data.insert_many(measures)
- except:
- msg = "\tNo se pudieron insertar measures en bulk, probando con actualizar uno por uno"
- logger.info(msg)
- typer.echo(msg)
- operations = []
- max_date = datetime.min
- for measure in data:
- max_date = measure.date if measure.date > max_date else max_date
- operations.append(
- UpdateOne(
- {"date": measure.date},
- {"$set": dict(measure)},
- upsert=True,
- )
- )
- finca_data.bulk_write(operations)
- last_date = max(measure["date"] for measure in measures)
- return last_date
- def _import_finca_range(date_from, date_to, code, client: ClimaMeasuresClient):
- finca_data = mongo_measures[code]
- # Fecha movil que vamos incrementando
- start_date = date_from
- typer.echo(f"Importando measures de {code}")
- logger.info(f"Importando measures de {code}")
- while start_date < date_to:
- # 10 dias mas o la fecha maxima del rango, la que sea mas chica
- end_date = min(start_date + timedelta(days=10), date_to)
- msg = f"Consultando datos desde {start_date} hasta {end_date}]"
- logger.info(msg)
- typer.echo(msg)
- data = []
- try:
- data = client.get_station_measures(code, start_date, end_date)
- except:
- logger.error(
- f"No se pudo importar las measures de la estacion {code}, ver logs del cliente para mas info"
- )
- # Si hay un error al traer una porcion de los datos, directamente salimos,
- # ya que el error no depende de las fechas pedidas
- # Por eso return y no continue
- return
- if len(data) > 0:
- _insert_or_update_data(finca_data, data)
- else:
- msg = "Porcion de datos vacia"
- logger.info(msg)
- typer.echo(msg)
- start_date = end_date
- if end_date != date_to:
- sleep(5)
- def import_range(
- date_from: datetime = typer.Argument(...),
- date_to: datetime = typer.Argument(...),
- fincas_codes: list[int] = typer.Argument(None),
- ):
- typer.echo(
- f"Importando datos desde {date_from} hasta {date_to} para las fincas: {fincas_codes if len(fincas_codes) > 0 else 'todas'}"
- )
- if not (date_from < date_to):
- typer.echo("La fecha de inicio debe se menor a la fecha de termino")
- typer.echo("Para ver los argumentos usar `python main.py import-range --help`")
- db: Session = SessionLocal()
- user = db.query(User).first()
- client = ClimaMeasuresClient(
- token=user.token,
- url=CLIMA_API_URL,
- )
- # Si el campo opcional fincas_codes no es dado, usamos todas las fincas de la db
- if len(fincas_codes) == 0:
- typer.echo("No fincas dadas")
- db: Session = SessionLocal()
- fincas = db.query(Finca).all()
- for finca in fincas:
- _import_finca_range(date_from, date_to, finca.station_code, client)
- else:
- for code in fincas_codes:
- _import_finca_range(date_from, date_to, str(code), client)
- db.commit()
- db.close()
- def import_measures():
- db: Session = SessionLocal()
- fincas = db.query(Finca).all()
- user = db.query(User).first()
- # Importamos finca por finca
- for finca in fincas:
- typer.echo(f"Importando measures de {finca.station_code}")
- logger.info(f"Importando measures de {finca.station_code}")
- finca_data = mongo_measures[finca.station_code]
- # Desde la ultima vez
- date_from = finca.last_synced
- if date_from is None:
- typer.echo(
- f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
- )
- logger.warning(
- f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
- )
- continue
- # Las muestras que habría normalmente en 10 días de datos con datos cada 10 minutos
- # 24 horas x 6 muestras
- limit = 1440
- print(f"\tfrom: {date_from}")
- # Cliente de api clima
- client = ClimaMeasuresClient(
- token=user.token,
- url=CLIMA_API_URL,
- )
- # Probamos traer los datos
- # Ante cualquier error seguimos con la proxima estacion
- data = None
- try:
- data = client.get_station_measures(finca.station_code, date_from, limit=limit)
- except Exception as e:
- logger.error(
- f"No se pudo importar las measures de la estacion {finca.station_code}: {e}, ver logs del cliente para mas info"
- )
- continue
- if len(data) > 0:
- last_date = _insert_or_update_data(finca_data, data)
- finca.last_synced = last_date
- else:
- typer.echo(f"\tNo hay datos a partir de {date_from}")
- db.commit()
- db.close()
- def _apply_function_conditional(station_code: str, func: Callable[[Finca], None]):
- db: Session = SessionLocal()
- if station_code is None:
- fincas = db.query(Finca).all()
- else:
- fincas = db.query(Finca).filter(Finca.station_code == station_code)
- for finca in fincas:
- func(finca)
- db.close()
- def _get_measures(finca: Finca):
- finca_data = mongo_measures[finca.station_code]
- query = finca_data.find()
- for x in query:
- typer.echo(x)
- def get_measures(station_code: str = typer.Argument(None)):
- _apply_function_conditional(station_code, _get_measures)
- def _delete_measures(finca: str):
- finca_data = mongo_measures[finca.station_code]
- finca_data.delete_many({})
- def delete_measures(station_code: str = typer.Argument(None)):
- _apply_function_conditional(station_code, _delete_measures)
- def import_json_dump(
- dumppath: Path = typer.Option("", "--dump"),
- station_code: str = typer.Option("", "--code"),
- all: bool = typer.Option(False, "--all"),
- ):
- db: Session = SessionLocal()
- fincas = []
- if all:
- for filename in os.listdir("dumps"):
- code = Path(filename).stem
- finca = db.query(Finca).filter(Finca.station_code == code).first()
- fincas.append(finca)
- else:
- finca = db.query(Finca).filter(Finca.station_code == station_code).first()
- fincas.append(finca)
- for finca in fincas:
- if finca is None:
- print(f"La finca con el código de estación {station_code} no existe.")
- continue
- typer.echo(f"Importando datos de la estación: {finca.station_code}")
- if all:
- dumpfile = open(Path(f"dumps/{finca.station_code}.json"))
- else:
- dumpfile = open(dumppath)
- dump = json.load(dumpfile)
- finca_data = mongo_measures[finca.station_code]
- # Create date index, otherwise it would take ages to upsert every document
- # REVIEW: Might want to move this index code somewhere else
- indexes = finca_data.index_information()
- if indexes.get("date_1") is None:
- typer.echo("\tCreating `date` index")
- finca_data.create_index("date", unique=True)
- operations = []
- max_date = datetime.min
- with typer.progressbar(dump, label="Procesando datos") as progress:
- for measure in progress:
- new_measure = Measure(**measure)
- max_date = new_measure.date if new_measure.date > max_date else max_date
- operations.append(
- UpdateOne(
- {"date": new_measure.date},
- {"$set": dict(new_measure)},
- upsert=True,
- )
- )
- last_synced = (
- finca.last_synced if finca.last_synced is not None else datetime.min
- )
- if max_date > last_synced:
- typer.echo(f"Updating last_synced: {max_date}")
- finca.last_synced = max_date
- typer.echo("Bulk writing operations in mongo...")
- result = finca_data.bulk_write(operations)
- typer.echo(f"Updated measures: {result.modified_count}")
- typer.echo(f"Inserted measures {result.upserted_count}")
- dumpfile.close()
- db.commit()
- db.close()
|