from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt from pydantic import BaseModel from ..config import (ACCESS_TOKEN_EXPIRE_MINUTES, ALGORITHM, CRYPT_CONTEXT, SECRET_KEY) from ..db import User router = APIRouter(prefix="/auth") SCHEME = OAuth2PasswordBearer( tokenUrl=f".{router.prefix}/token" ) class Token(BaseModel): access_token: str token_type: str def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta is None: expires_delta = timedelta(minutes=15) to_encode.update({"exp": datetime.utcnow() + expires_delta}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) @router.post("/token", response_model=Token) async def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends() ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = User.get_by_name(form_data.username) if user is None: CRYPT_CONTEXT.dummy_verify() raise credentials_exception if not user.verify(form_data.password): raise credentials_exception access_token = create_access_token( data={"sub": user.name}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES), ) return {"access_token": access_token, "token_type": "bearer"}