I use this code for authentication.
#1. auth_handler.py
import time
from decouple import config
import jwt
JWT_SECRET = config("JWT_SECRET")
JWT_ALGORITHM = config("JWT_ALGORITHM")
def response_token(token: str) -> dict:
"""
docstring
"""
return {
"access_token": token
}
def sign_jwt(user_id: str or int, validity: int) -> dict:
"""
docstring
"""
payload = {
"user_id": user_id,
"expiry": time.time() + validity
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return response_token(
{
"access_token": token
}
)
def decode_jwt(token: str) -> dict:
"""
docstring
"""
try:
decoded_token = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM]
)
return decoded_token if decoded_token["expiry"] >= time.time() else {}
except:
return {}
and
#2. auth_bearer.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from . import auth_handler
class JWTBearer(HTTPBearer):
"""
docstring
"""
def __init__(self, auto_error: bool = True):
"""
docstring
"""
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request):
"""
docstring
"""
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=403,
detail="Invalid Authentication Scheme."
)
if not self.verify_jwt(credentials.credentials):
raise HTTPException(
status_code=403,
detail="Invalid Token OR Expired Token."
)
return credentials.credentials
else:
raise HTTPException(
status_code=403,
detail="Invalid Authorization Code."
)
def verify_jwt(self, jwt_token: str) -> bool:
"""
docstring
"""
isTokenValid: bool = False
try:
payload = auth_handler.decode_jwt(jwt_token)
except:
payload = None
if payload:
isTokenValid = True
return isTokenValid
To secure the routes, I'll leverage dependency injection via FastAPI's Depends.
from fastapi import FastAPI, Body, Depends
from schemas import UserSchema, UserLoginSchema
from auth_bearer import JWTBearer
from auth_handler import signJWT
app = FastAPI()
def check_user(data: UserLoginSchema):
for user in users:
if user.email == data.email and user.password == data.password:
return True
return False
@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
users.append(user) # replace with db call, making sure to hash the password first
return signJWT(user.email)
@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
if check_user(user):
return signJWT(user.email)
return {
"error": "Wrong login details!"
}
@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
post.id = len(posts) + 1
posts.append(post.dict())
return {
"data": "post added."
}
this code works for sign-in and correct authorization.
But I want to add refresh token functionality and burn token functionality for the time that users log out.
How to implement these functionalities?
For that you need to declare
Token
model(table) in database and keep logged in tokens there.Token
model must contain:user_id
,access_token
,refresh_token