authentication.py
from datetime import datetime, timedelta
from fastapi import Depends , HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError, jwt
from auth.schemas import TokenData, UserBase
from sqlalchemy.orm import Session
from db.database import get_db
from pydantic import ValidationError
from db.models import UserModel
SECRET_KEY = '7f0ef2549adac85716db6ca433a44c79021e829c8c4dd8e7d9363eb9fc800e73'
ALGORITHM = 'HS256'
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login",
scopes={"admin":"create content", "user":"interact with content"}
)
def get_user(email: str,db:Session=Depends(get_db)):
user = db.query(UserModel).filter(UserModel.email == email).first()
if not user:
raise HTTPException(status=status.HTTP_404_NOT_FOUND,
details=f"User with email: {email} not found")
return user
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = f"Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(email=token_data.email)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="not enough permission",
headers={"WWW-Authenticate": authenticate_value},
)
return user
def get_current_active_user(
current_user:UserBase = Security(get_current_user, scopes=["admin"])):
if not current_user:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def create_access_token(data:dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token:str,credentials_exception):
try:
payload = jwt.decode(token,SECRET_KEY,algorithms=ALGORITHM)
email:str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
return token_data
except JWTError:
raise credentials_exception
login
from fastapi import APIRouter, Depends,HTTPException,status
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from sqlalchemy.orm import Session
from db.models import UserModel
from db.database import get_db
from . import hashing
from auth.oauth import create_access_token
from .hashing import get_password_hash
from .schemas import UserBase
router = APIRouter(tags=['authentication'])
@router.post('/login')
async def login(request:OAuth2PasswordRequestForm=Depends(),db:Session=Depends(get_db)):
user = db.query(UserModel).filter(UserModel.email == request.username).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="Invalid Credentials")
if not hashing.verify_password(request.password,user.password):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail="Invalid Password")
access_token = create_access_token(data={"sub":user.username,"scopes":request.scopes})
return {"access_token":access_token,"token_type":"bearer"}
@router.post('/register')
async def register(request:UserBase,db:Session=Depends(get_db)):
new_user = UserModel(
email = request.email,
username = request.username,
password = get_password_hash(request.password),
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
endpoints
from fastapi import APIRouter, Depends, Security
from auth.oauth import get_current_active_user
from routers.schemas import BikeBase
from sqlalchemy.orm import Session
from db.database import get_db
from db.models import BikeModel
from .crud import add_bike
from auth.schemas import UserBase
router = APIRouter(
prefix='/bike',
tags = ['bike']
)
@router.post('/add')
def add_bike(request:BikeBase,user:UserBase=Depends(get_current_active_user),db:Session=Depends(get_db)):
bike = db.query(BikeModel).filter(BikeModel.model==request.model).first()
if bike is None:
return add_bike(request,user)
if request.model in bike:
raise Exception("Bike Model Already present")
return add_bike(request,user)
@router.get('/check')
def check(user:UserBase=Security(get_current_active_user, scopes=["admin"])):
return {"data":"scope working"}
def add_bike(request:BikeBase,user=Security(get_current_active_user, scopes=["admin"]),db:Session=Depends(get_db)):
new_bike = BikeBase(
brand = request.brand.lower(),
model = request.model.lower(),
price = request.price,
mileage = request.mileage,
max_power = request.max_power,
rating = request.rating
)
db.add(new_bike)
db.commit()
db.refresh(new_bike)
return new_bike
On the @router.post('/add')
getting the error:
File "/home/raj/code/bike-forum-fastapi/./auth/oauth.py", line 24, in get_user
user = db.query(UserModel).filter(UserModel.email == email).first()
AttributeError: 'Depends' object has no attribute 'query'
When i change some endpoints to async the error changes something with coroutine, i cant remember how to produce it
@router.check('/check')
endpoint works fine with scope and permission, but produces error at @router.post('/add')
i new to fastapi, the auth code is directly copied from fastapi-docs, but theres a hardcoded fake database for get_user in docs, i cant figure how to do it with a real database