How to refresh token and burn token in logout In FastAPI?

2k Views Asked by At

I use this code for authentication.

#1. auth_handler.py

import time

from decouple import config
import jwt

JWT_SECRET = config("JWT_SECRET")
JWT_ALGORITHM = config("JWT_ALGORITHM")


def response_token(token: str) -> dict:
    """
        docstring
    """
    return {
        "access_token": token
    }


def sign_jwt(user_id: str or int, validity: int) -> dict:
    """
        docstring
    """
    payload = {
        "user_id": user_id,
        "expiry": time.time() + validity
    }

    token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

    return response_token(
        {
            "access_token": token
        }
    )


def decode_jwt(token: str) -> dict:
    """
        docstring
    """
    try:
        decoded_token = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM]
        )
        return decoded_token if decoded_token["expiry"] >= time.time() else {}
    except:
        return {}

and

#2. auth_bearer.py

from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from . import auth_handler


class JWTBearer(HTTPBearer):
    """
        docstring
    """

    def __init__(self, auto_error: bool = True):
        """
            docstring
        """
        super(JWTBearer, self).__init__(auto_error=auto_error)

    async def __call__(self, request: Request):
        """
            docstring
        """
        credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
        if credentials:
            if not credentials.scheme == "Bearer":
                raise HTTPException(
                    status_code=403,
                    detail="Invalid Authentication Scheme."
                )
            if not self.verify_jwt(credentials.credentials):
                raise HTTPException(
                    status_code=403,
                    detail="Invalid Token OR Expired Token."
                )
            return credentials.credentials
        else:
            raise HTTPException(
                status_code=403,
                detail="Invalid Authorization Code."
            )

    def verify_jwt(self, jwt_token: str) -> bool:
        """
            docstring
        """
        isTokenValid: bool = False

        try:
            payload = auth_handler.decode_jwt(jwt_token)
        except:
            payload = None
        if payload:
            isTokenValid = True
        return isTokenValid

To secure the routes, I'll leverage dependency injection via FastAPI's Depends.

from fastapi import FastAPI, Body, Depends
from schemas import UserSchema, UserLoginSchema
from auth_bearer import JWTBearer
from auth_handler import signJWT

app = FastAPI()

def check_user(data: UserLoginSchema):
    for user in users:
        if user.email == data.email and user.password == data.password:
            return True
    return False

@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
    users.append(user) # replace with db call, making sure to hash the password first
    return signJWT(user.email)


@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
    if check_user(user):
        return signJWT(user.email)
    return {
        "error": "Wrong login details!"
    }

@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
    post.id = len(posts) + 1
    posts.append(post.dict())
    return {
        "data": "post added."
    }

this code works for sign-in and correct authorization.

But I want to add refresh token functionality and burn token functionality for the time that users log out.

How to implement these functionalities?

1

There are 1 best solutions below

0
On

For that you need to declare Token model(table) in database and keep logged in tokens there. Token model must contain: user_id, access_token, refresh_token

NOTE(refresh_token expired-time must be long than access_token)

  1. you should return only access_token to client and there inside JWTBearer() class you must check if access_token expires and refresh_token expires, if expires only access_token then return refresh_token instead access_token and refresh refresh_token with new generated token and update access_token with refresh_token

2)you can create "/check" api and here you can check access_token expires or not