measures.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. import json
  2. import os
  3. from datetime import date, datetime, timedelta
  4. from logging import FileHandler, getLogger
  5. from os import path
  6. from pathlib import Path
  7. from time import sleep
  8. from typing import Callable, Optional
  9. import typer
  10. from pymongo.operations import UpdateOne
  11. from sqlalchemy.orm.session import Session
  12. from client.measures_client import ClimaMeasuresClient
  13. from config.settings import CLIMA_API_URL
  14. from database.mongo import mongo_measures
  15. from database.sqlalchemy import SessionLocal
  16. from models.fincas import Finca
  17. from models.users import User
  18. from schemas.measures import Measure
  19. # Logging
  20. # Quiza deberiamos centralizar todos los loggers en su propio modulo
  21. filename = path.abspath("./import_measures.log")
  22. logger = getLogger("import_measures")
  23. # handler = FileHandler(filename)
  24. # logger.addHandler(handler)
  25. def _insert_or_update_data(finca_data, data):
  26. measures = []
  27. for x in data:
  28. measures.append({"temp": x.temp, "precip": x.precip, "date": x.date})
  29. # Probamos insertar todas, si no se pudo,
  30. # intentamos agregar/actualizar una por una.
  31. try:
  32. # Eso puede fallar por limite de inserciones o por tener measures ya existentes
  33. finca_data.insert_many(measures)
  34. except:
  35. msg = "\tNo se pudieron insertar measures en bulk, probando con actualizar uno por uno"
  36. logger.info(msg)
  37. typer.echo(msg)
  38. operations = []
  39. max_date = datetime.min
  40. for measure in data:
  41. max_date = measure.date if measure.date > max_date else max_date
  42. operations.append(
  43. UpdateOne(
  44. {"date": measure.date},
  45. {"$set": dict(measure)},
  46. upsert=True,
  47. )
  48. )
  49. finca_data.bulk_write(operations)
  50. last_date = max(measure["date"] for measure in measures)
  51. return last_date
  52. def _import_finca_range(date_from, date_to, code, client: ClimaMeasuresClient):
  53. finca_data = mongo_measures[code]
  54. # Fecha movil que vamos incrementando
  55. start_date = date_from
  56. typer.echo(f"Importando measures de {code}")
  57. logger.info(f"Importando measures de {code}")
  58. while start_date < date_to:
  59. # 10 dias mas o la fecha maxima del rango, la que sea mas chica
  60. end_date = min(start_date + timedelta(days=10), date_to)
  61. msg = f"Consultando datos desde {start_date} hasta {end_date}]"
  62. logger.info(msg)
  63. typer.echo(msg)
  64. data = []
  65. try:
  66. data = client.get_station_measures(code, start_date, end_date)
  67. except:
  68. logger.error(
  69. f"No se pudo importar las measures de la estacion {code}, ver logs del cliente para mas info"
  70. )
  71. # Si hay un error al traer una porcion de los datos, directamente salimos,
  72. # ya que el error no depende de las fechas pedidas
  73. # Por eso return y no continue
  74. return
  75. if len(data) > 0:
  76. _insert_or_update_data(finca_data, data)
  77. else:
  78. msg = "Porcion de datos vacia"
  79. logger.info(msg)
  80. typer.echo(msg)
  81. start_date = end_date
  82. if end_date != date_to:
  83. sleep(5)
  84. def import_range(
  85. date_from: datetime = typer.Argument(...),
  86. date_to: datetime = typer.Argument(...),
  87. fincas_codes: list[int] = typer.Argument(None),
  88. ):
  89. typer.echo(
  90. f"Importando datos desde {date_from} hasta {date_to} para las fincas: {fincas_codes if len(fincas_codes) > 0 else 'todas'}"
  91. )
  92. if not (date_from < date_to):
  93. typer.echo("La fecha de inicio debe se menor a la fecha de termino")
  94. typer.echo("Para ver los argumentos usar `python main.py import-range --help`")
  95. db: Session = SessionLocal()
  96. user = db.query(User).first()
  97. client = ClimaMeasuresClient(
  98. token=user.token,
  99. url=CLIMA_API_URL,
  100. )
  101. # Si el campo opcional fincas_codes no es dado, usamos todas las fincas de la db
  102. if len(fincas_codes) == 0:
  103. typer.echo("No fincas dadas")
  104. db: Session = SessionLocal()
  105. fincas = db.query(Finca).all()
  106. for finca in fincas:
  107. _import_finca_range(date_from, date_to, finca.station_code, client)
  108. else:
  109. for code in fincas_codes:
  110. _import_finca_range(date_from, date_to, str(code), client)
  111. db.commit()
  112. db.close()
  113. def import_measures():
  114. db: Session = SessionLocal()
  115. fincas = db.query(Finca).all()
  116. user = db.query(User).first()
  117. # Importamos finca por finca
  118. for finca in fincas:
  119. typer.echo(f"Importando measures de {finca.station_code}")
  120. logger.info(f"Importando measures de {finca.station_code}")
  121. finca_data = mongo_measures[finca.station_code]
  122. # Desde la ultima vez
  123. date_from = finca.last_synced
  124. if date_from is None:
  125. typer.echo(
  126. f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
  127. )
  128. logger.warning(
  129. f"\tEl atributo last_synced es nulo en la estacion {finca.station_code}."
  130. )
  131. continue
  132. # Las muestras que habría normalmente en 10 días de datos con datos cada 10 minutos
  133. # 24 horas x 6 muestras
  134. limit = 1440
  135. print(f"\tfrom: {date_from}")
  136. # Cliente de api clima
  137. client = ClimaMeasuresClient(
  138. token=user.token,
  139. url=CLIMA_API_URL,
  140. )
  141. # Probamos traer los datos
  142. # Ante cualquier error seguimos con la proxima estacion
  143. data = None
  144. try:
  145. data = client.get_station_measures(finca.station_code, date_from, limit=limit)
  146. except Exception as e:
  147. logger.error(
  148. f"No se pudo importar las measures de la estacion {finca.station_code}: {e}, ver logs del cliente para mas info"
  149. )
  150. continue
  151. if len(data) > 0:
  152. last_date = _insert_or_update_data(finca_data, data)
  153. finca.last_synced = last_date
  154. else:
  155. typer.echo(f"\tNo hay datos a partir de {date_from}")
  156. db.commit()
  157. db.close()
  158. def _apply_function_conditional(station_code: str, func: Callable[[Finca], None]):
  159. db: Session = SessionLocal()
  160. if station_code is None:
  161. fincas = db.query(Finca).all()
  162. else:
  163. fincas = db.query(Finca).filter(Finca.station_code == station_code)
  164. for finca in fincas:
  165. func(finca)
  166. db.close()
  167. def _get_measures(finca: Finca):
  168. finca_data = mongo_measures[finca.station_code]
  169. query = finca_data.find()
  170. for x in query:
  171. typer.echo(x)
  172. def get_measures(station_code: str = typer.Argument(None)):
  173. _apply_function_conditional(station_code, _get_measures)
  174. def _delete_measures(finca: str):
  175. finca_data = mongo_measures[finca.station_code]
  176. finca_data.delete_many({})
  177. def delete_measures(station_code: str = typer.Argument(None)):
  178. _apply_function_conditional(station_code, _delete_measures)
  179. def import_json_dump(
  180. dumppath: Path = typer.Option("", "--dump"),
  181. station_code: str = typer.Option("", "--code"),
  182. all: bool = typer.Option(False, "--all"),
  183. ):
  184. db: Session = SessionLocal()
  185. fincas = []
  186. if all:
  187. for filename in os.listdir("dumps"):
  188. code = Path(filename).stem
  189. finca = db.query(Finca).filter(Finca.station_code == code).first()
  190. fincas.append(finca)
  191. else:
  192. finca = db.query(Finca).filter(Finca.station_code == station_code).first()
  193. fincas.append(finca)
  194. for finca in fincas:
  195. if finca is None:
  196. print(f"La finca con el código de estación {station_code} no existe.")
  197. continue
  198. typer.echo(f"Importando datos de la estación: {finca.station_code}")
  199. if all:
  200. dumpfile = open(Path(f"dumps/{finca.station_code}.json"))
  201. else:
  202. dumpfile = open(dumppath)
  203. dump = json.load(dumpfile)
  204. finca_data = mongo_measures[finca.station_code]
  205. # Create date index, otherwise it would take ages to upsert every document
  206. # REVIEW: Might want to move this index code somewhere else
  207. indexes = finca_data.index_information()
  208. if indexes.get("date_1") is None:
  209. typer.echo("\tCreating `date` index")
  210. finca_data.create_index("date", unique=True)
  211. operations = []
  212. max_date = datetime.min
  213. with typer.progressbar(dump, label="Procesando datos") as progress:
  214. for measure in progress:
  215. new_measure = Measure(**measure)
  216. max_date = new_measure.date if new_measure.date > max_date else max_date
  217. operations.append(
  218. UpdateOne(
  219. {"date": new_measure.date},
  220. {"$set": dict(new_measure)},
  221. upsert=True,
  222. )
  223. )
  224. last_synced = (
  225. finca.last_synced if finca.last_synced is not None else datetime.min
  226. )
  227. if max_date > last_synced:
  228. typer.echo(f"Updating last_synced: {max_date}")
  229. finca.last_synced = max_date
  230. typer.echo("Bulk writing operations in mongo...")
  231. result = finca_data.bulk_write(operations)
  232. typer.echo(f"Updated measures: {result.modified_count}")
  233. typer.echo(f"Inserted measures {result.upserted_count}")
  234. dumpfile.close()
  235. db.commit()
  236. db.close()