【问题标题】:FastAPI JWT Invalid TokenFastAPI JWT 令牌无效
【发布时间】:2021-09-25 07:41:39
【问题描述】:

根据下面的代码,试图找出我收到无效令牌错误的原因。我正在通过我的 API 测试注册和身份验证。

我创建了一个虚拟帐户,然后查看我的电子邮件以获取验证链接。一切正常,直到我单击电子邮件中的链接并收到 400 错误请求,并且由于我的调试,错误是由“无效令牌”引起的。

这是我的代码:

route_user.py:

from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from jose import jwt

from db.models.users import User
from schemas.users import UserCreate, ShowUser
from db.repository.users_data_access_layer import Users
from core.auth import Auth
from core.mailer import Mailer
from core.config import Settings
from depends import get_user_db

router = APIRouter()

get_settings = Settings()


@router.post("/", response_model=ShowUser)
async def create_user(form_data: UserCreate = Depends(), users: Users = Depends(get_user_db)):
    if await users.check_user(email=form_data.email) is not None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="User already exists"
        )
    elif await users.check_username(username=form_data.username) is not None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already exists"
        )

    new_user = await users.register_user(username=form_data.username, email=form_data.email, hashed_password=form_data.password)
    #print(new_user)
    confirmation = Auth.get_confirmation_token(new_user.id)
    #print(confirmation)
    new_user.confirmation = confirmation["jti"]
    # users.db_session.add(new_user)
    # await users.db_session.flush()
    print(new_user.confirmation)
    


    try:
        Mailer.send_confirmation_message(confirmation["token"], form_data.email)
    except ConnectionRefusedError:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Email couldn't be send. Please try again."
        )
    # users.db_session.add(new_user)
    # await users.db_session.flush()
    #return new_user

@router.get("/verify/{token}")
async def verify(token: str, users: Users = Depends(get_user_db)):
    invalid_token_error = HTTPException(status_code=400, detail="Invalid Token")
    try:
        payload = jwt.decode(token, get_settings.SECRET_KEY, algorithms=[get_settings.TOKEN_ALGORITHM])
        print(payload['sub'])
        print(payload['jti'])
    except jwt.JWSError:
        raise HTTPException(status_code=403, detail="Token has Expired")
    if payload['scope'] != 'registration':
        raise invalid_token_error
    #print(payload['sub'])
    user = await users.get_user_by_id(id=payload['sub'])
    #print(user)
    #print('hello2')
    #print(user)
    print(User.confirmation)
    print(User.id)
    if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:
        print('hello')
        raise invalid_token_error
    # if user.is_active:
    #     #print('hello2')
    #     raise HTTPException(status_code=403, detail="User already Activated")
    # user.confirmation = None
    # user.is_active = True
    # return user

user_data_access_layer:

from core.hashing import Hasher
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select
from sqlalchemy.sql import exists


from db.models.users import User
from schemas.users import UserCreate
from core.hashing import Hasher


db_session = Session

class Users():
    
    def __init__(self, db_session: Session):
        self.db_session = db_session

    
            #print('user created')

    async def register_user(self, username: str, email: str, hashed_password: str):
        new_user = User(username=username, email=email, hashed_password=Hasher.get_password_hash(hashed_password))
        self.db_session.add(new_user)
        await self.db_session.flush()
        return new_user

    async def check_user(self, email: str):
        user_exist = await self.db_session.execute(select(User).filter(User.email==email))
        #print(user_exist)
        return user_exist.scalar_one_or_none()

    async def check_username(self, username: str):
        user_exist = await self.db_session.execute(select(User).filter(User.username==username))
        #print(user_exist)
        return user_exist.scalar_one_or_none()
    
    async def get_user_by_id(self, id: str):
        user_exist = await self.db_session.execute(select(User).filter(User.id==id))
        #print(user_exist)
        return user_exist.scalar_one_or_none()

    async def get_confirmation_uuid(self, confirmation_uuid: str):
        user_exist = await self.db_session.execute(select(User).filter(str(User.confirmation)==confirmation_uuid))
        #print(user_exist)
        return user_exist.scalar_one_or_none()

auth.py

from jose import jwt
from datetime import datetime, timedelta
from core.config import Settings
from pydantic import UUID4
import uuid
from passlib.context import CryptContext

settings = Settings()


class Auth:
    password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

    @classmethod
    def get_password_hash(cls, password: str) -> str:
        return cls.password_context.hash(password)

    @staticmethod
    def get_token(data: dict, expires_delta: int):
        pass
        to_encode = data.copy()
        to_encode.update({
            "exp": datetime.utcnow() + timedelta(seconds=expires_delta),
            "iss": settings.PROJECT_NAME
        })
        return jwt.encode(
            to_encode,
            settings.SECRET_KEY,
            algorithm=settings.TOKEN_ALGORITHM
        )

    @staticmethod
    def get_confirmation_token(user_id: UUID4):
        jti = uuid.uuid4()
        claims = {
            "sub": str(user_id),
            "scope": "registration",
            "jti": str(jti)
        }
        return {
            "jti": jti,
            "token": Auth.get_token(
                claims,
                settings.REGISTRATION_TOKEN_LIFETIME
            )
        }

models.py

import uuid
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship

from db.base_class import Base


class User(Base):
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    username = Column(String, unique=True, nullable=False)
    email = Column(String, nullable=False, unique=True, index=True)
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=False)
    is_superuser = Column(Boolean, default=False)
    confirmation = Column(UUID(as_uuid=True), nullable=True, default=uuid.uuid4)
    jobs = relationship("Job", back_populates="owner")

当我尝试检查用户是否存在?令牌 ID 是否存储在数据库中?在

if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:

我得到"detail": "Invalid Token",当我得到print(User.confirmation) 时,它给了我User.id

邮件发送后的数据库日志:

User.confirmation
1867657b-7cfa-471f-9daa-92ea192abb5a
E-mail has been sent!
INFO:     127.0.0.1:64005 - "POST /registration/?username=usertest&email=tutepoha%40livinginsurance.co.uk&password=usertest123 HTTP/1.1" 200 OK
2021-07-16 18:18:37,973 INFO sqlalchemy.engine.Engine UPDATE "user" SET confirmation=%s WHERE "user".id = %s
2021-07-16 18:18:37,974 INFO sqlalchemy.engine.Engine [generated in 0.00079s] (UUID('1867657b-7cfa-471f-9daa-92ea192abb5a'), UUID('674ead42-44a3-46b0-9645-689885ace026'))

不知道是什么问题,我已经尝试调试到我的程度,但现在我碰壁了。

【问题讨论】:

    标签: python sqlalchemy jwt fastapi


    【解决方案1】:

    您的代码现在存在不同的问题。

    在您的代码中:

    if not user or await users.get_confirmation_uuid(str(User.confirmation))!= payload['jti']:

    您检查 users.get_confirmation_uuid(str(User.confirmation)) 如上所述,您正在比较类的值,因为您调用的是用户而不是用户。在第二种情况下,您调用实例,而在第一种情况下,您调用类

    还有比较 users.get_confirmation_uuid str (user.confirmation))! = Payload ['jti'] 永远不可能是真的,因为您的 users.get_confirmation_uuid method 不返回 UID 值,而是返回用户对象的列表。所以调用users.get_confirmation 方法实际上是没有必要的。您只需要执行以下操作:

    if not user or user.confirmation! = payload['jti']:

    但是,问题远不止于此。目前,您检索用户的方法users.get_user_by_id(id=payload['sub']) 返回一个列表,而不是单个对象。因此,您必须返回单个对象或浏览列表。

    这是我建议解决这一切的代码:

    方法已更改为返回对象而不是列表

    user_data_access_layer.py

    from fastapi import HTTPException, status
    
    from db.models.users import User
    from schemas.users import UserCreate
    from db_config import SESSION
    from auth import Auth
    
    class Users():
    
        def __init__(self):
            pass
    
        @classmethod
        async def save(cls, user_instance):
            try:
                SESSION.add(user_instance)
                SESSION.commit()
            except Exception as error:
                SESSION.rollback()
                raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
    
        @classmethod
        async def get_user_by_id(cls, id):
            user = await SESSION.query(User).filter(User.id == id).one_or_none()
            return user
    
        @classmethod
        async def get_user_by_username(cls, username):
            user = await SESSION.query(User).filter(User.username == username).one_or_none()
            return user
    
        @classmethod
        async def get_user_by_email(cls, email):
            user = await SESSION.query(User).filter(User.email == email).one_or_none()
            return user
    
        @classmethod
        async def get_user_by_confirmation(cls, confirmation):
            user = await SESSION.query(User).filter(User.confirmation == confirmation).one_or_none()
            return user
    
        @classmethod
        async def create_user(self, user: UserCreate):
            new_user = User(username=user.username,
                            email=user.email,
                            hashed_password=Auth.get_password_hash(user.password)
                            )
            cls.save(new_user)
            return new_user
    

    routes.py

    
    @router.get("/verify/{token}")
    async def verify(token: str, users: Users = Depends(get_user_db)):
        invalid_token_error = HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid Token")
        
        # TRYING DECODE TOKEN
        try:
            payload = jwt.decode(token, get_settings.SECRET_KEY, algorithms=[
                                 get_settings.TOKEN_ALGORITHM])
        except jwt.JWSError:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN, detail="Token has Expired")
            
        # CHECK IF THE SCOPE IS OK
        if payload['scope'] != 'registration':
            raise invalid_token_error
        
        # TRY TO GET AN USER WITH THE ID FROM TOKEN
        user = await Users.get_user_by_id(id=payload['sub'])
        
        # CHECK IF WE FOUND AN USER AND IF THE UID CONFIRMATION IS THE SAME OF THE TOKEN
        if not user or str(user.confirmation) != str(payload['jti']):
            raise invalid_token_error
        
        # CHECK IF THE USER IS ALREADY ACTIVE
        if user.is_active:
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
                                detail="User already Activated")
            
        # IF ALL IT'S OK, WE UPDATE THE CONFIRMATION AND IS_ACTIVE ATTRIBUTE AND CALL THE SAVE METHOD
        user.confirmation = None
        user.is_active = True
        Users.save(user)
        return user
    

    【讨论】:

      猜你喜欢
      • 2021-04-13
      • 2021-09-23
      • 2019-03-19
      • 2018-07-14
      • 2020-04-19
      • 2016-10-20
      • 2018-04-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多