# FastApi imports # Pydantic imports from typing import List from fastapi import APIRouter, Depends, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm # passlib imports from passlib.hash import sha256_crypt # Sqlalchemy imports from sqlalchemy.orm import Session from starlette.responses import Response from client.loginClient import get_auth_token from commands.fincas import update_fincas from cruds.companies import create_company, get_company_by_id from cruds.users import ( create_user, get_user_by_token, get_user_by_username, get_users, update_password, ) from database.sqlalchemy import get_db from schemas.company import Company as CompanySchema from schemas.users import User as UserSchema from schemas.users import UserBase as UserBaseSchema router = APIRouter() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") async def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ) -> UserSchema: user = get_user_by_token(db, token) if not user: raise HTTPException( status_code=401, detail="No puede iniciar sesión con el token proporcionado.", headers={"WWW-Authenticate": "Bearer"}, ) return user @router.get("/users", response_model=List[UserBaseSchema]) async def get_all_users( db: Session = Depends(get_db), current_user: UserSchema = Depends(get_current_user), ) -> Response: users = get_users(db) return users @router.post("/login") async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db) ) -> Response: # Me logeo en clima response = get_auth_token(form_data.username, form_data.password) if response: if response.status_code == 200: # Login correcto, tengo el usuario cacheado? user_sqlite = get_user_by_username(db, username=form_data.username) if user_sqlite: # Si si, entonces verifico que las contraseñas sean identicas # De lo contrario, actualizo la cacheada if not sha256_crypt.verify(form_data.password, user_sqlite.password): update_password(db, user_sqlite, form_data.password) else: # Verifico si la organizacion a la que pertenece el usuario existe, sino la creo company = get_company_by_id(db, response.json()["company_id"]) if not company: company = CompanySchema( id=response.json()["company_id"], title=response.json()["company_name"], ) create_company(db, company) # Cacheo el usuario new_user = UserSchema( username=form_data.username, password=form_data.password, token=response.json()["token"], company_id=response.json()["company_id"], ) user_sqlite = create_user(db=db, user=new_user) # Agrego las de la nueva finca estaciones update_fincas(company.id) return {"access_token": user_sqlite.token, "token_type": "bearer"} elif response.status_code == 400: # Credenciales incorrectas para clima, devuelvo el error raise HTTPException( status_code=401, detail=response.json()["non_field_errors"][0] ) elif response.status_code == 401: # Credenciales Correctas pero no tiene acceso al sistema raise HTTPException(status_code=401, detail=response.json()["detail"]) else: # Ante cualquier otro error en clima, # directamente me logeo con las credenciales cacheadas user_sqlite = get_user_by_username(db, username=form_data.username) if user_sqlite: if not sha256_crypt.verify(form_data.password, user_sqlite.password): raise HTTPException( status_code=401, detail="No puede iniciar sesión con las credenciales proporcionadas.", ) else: return {"access_token": user_sqlite.token, "token_type": "bearer"} else: raise HTTPException( status_code=401, detail="No puede iniciar sesión con las credenciales proporcionadas.", ) # Por si hay que crear un usuario en el sqlite # El nuevo usuario no se sincronizaria en clima """ @router.post("/users/", response_model=UserSchema) def post_user(user: UserSchema, db: Session = Depends(get_db)): db_user = get_user_by_username(db, username=user.username) print(db_user) if db_user: raise HTTPException(status_code=400, detail="Username already registered") return create_user(db=db, user=user) """