【问题标题】:JWT Token Verification FastAPI SQLAlchemy payload['sub'] returning NoneJWT 令牌验证 FastAPI SQLAlchemy 有效负载 ['sub'] 返回无
【发布时间】:2021-09-23 14:56:06
【问题描述】:

我正在尝试将使用 FastAPI 和乌龟 orm 的代码 sn-p 模仿到 SQLAlachemy orm 中:

@auth_router.get("/verify/{token}")
async def verify(token: str):
    invalid_token_error = HTTPException(status_code=400, detail="Invalid token")
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.TOKEN_ALGORITHM)
    except jwt.JWTError:
        raise HTTPException(status_code=403, detail="Token has expired")
    if payload['scope'] != 'registration':
        raise invalid_token_error
    user = await users.UserModel.get_or_none(id=payload['sub'])
    if not user or str(user.confirmation) != payload['jti']:
        raise invalid_token_error
    if user.is_active:
        raise HTTPException(status_code=403, detail="User already activated")
    user.confirmation = None
    user.is_active = True
    await user.save()
    return await users.User_Pydantic.from_tortoise_orm(user)

SQLAlchemy 实现:

from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from jose import jwt

from db.models.users import User
from schemas.users import UserCreate, ShowUser
from db.repository.users_data_access_layer import Users
from core.auth import Auth
from core.hashing import Hasher
from core.mailer import Mailer
from core.config import Settings
from depends import get_user_db

router = APIRouter()

get_settings = Settings()


@router.post("/", response_model=ShowUser)
async def create_user(form_data: UserCreate = Depends(), users: Users = Depends(get_user_db)):
    if await users.check_user(email=form_data.email) is not None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="User already exists"
        )
    elif await users.check_username(username=form_data.username) is not None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Username already exists"
        )

    new_user = User(email=form_data.email,hashed_password=Auth.get_password_hash(form_data.password))
    confirmation = Auth.get_confirmation_token(new_user.id)
    print(confirmation)
    new_user.confirmation = confirmation["jti"]


    try:
        Mailer.send_confirmation_message(confirmation["token"], form_data.email)
    except ConnectionRefusedError:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Email couldn't be send. Please try again."
        )
    return await users.register_user(form_data)

@router.get("/verify/{token}")
async def verify(token: str, users: Users = Depends(get_user_db)):
    invalid_token_error = HTTPException(status_code=400, detail="Invalid Token")
    try:
        payload = jwt.decode(token, get_settings.SECRET_KEY, algorithms=[get_settings.TOKEN_ALGORITHM])
        print(payload['sub'])
    except jwt.JWSError:
        raise HTTPException(status_code=403, detail="Token has Expired")
    if payload['scope'] != 'registration':
        raise invalid_token_error
    print(payload['sub'])
    user = await users.get_user_by_id(id=str(payload['sub']))
    print(user)
    print('hello2')
    if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:
        print('hello')
        raise invalid_token_error
    if user.is_active:
        print('hello2')
        raise HTTPException(status_code=403, detail="User already Activated")
    user.confirmation = None
    user.is_active = True
    return await users.register_user(user)

当我到达端点@router.get("/verify/{token}") 并且控制权到达user = await users.get_user_by_id(id=payload['sub']) 时,它只是返回None 而不是id,因此我得到"detail": Invaild Token。任何人都可以指出我在这里做错了什么?

这是我剩下的 API 结构:

auth.py

from jose import jwt
from datetime import datetime, timedelta
from core.config import Settings
from pydantic import UUID4
import uuid
from passlib.context import CryptContext

settings = Settings()


class Auth:
    password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

    @classmethod
    def get_password_hash(cls, password: str) -> str:
        return cls.password_context.hash(password)

    @staticmethod
    def get_token(data: dict, expires_delta: int):
        pass
        to_encode = data.copy()
        to_encode.update({
            "exp": datetime.utcnow() + timedelta(seconds=expires_delta),
            "iss": settings.PROJECT_NAME
        })
        return jwt.encode(
            to_encode,
            settings.SECRET_KEY,
            algorithm=settings.TOKEN_ALGORITHM
        )

    @staticmethod
    def get_confirmation_token(user_id: UUID4):
        jti = uuid.uuid4()
        claims = {
            "sub": str(user_id),
            "scope": "registration",
            "jti": str(jti)
        }
        return {
            "jti": jti,
            "token": Auth.get_token(
                claims,
                settings.REGISTRATION_TOKEN_LIFETIME
            )
        }

users.py:

import uuid
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship

from db.base_class import Base


class User(Base):
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    username = Column(String, unique=True, nullable=False)
    email = Column(String, nullable=False, unique=True, index=True)
    hashed_password = Column(String(255), nullable=False,)
    is_active = Column(Boolean, default=False)
    is_superuser = Column(Boolean, default=False)
    confirmation = Column(UUID(as_uuid=True), nullable=True, default=uuid.uuid4)
    jobs = relationship("Job", back_populates="owner")

users_data_access_layer.py:

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select


from db.models.users import User
from schemas.users import UserCreate
from core.hashing import Hasher


db_session = Session

class Users():
    
    def __init__(self, db_session: Session):
        self.db_session = db_session

    
            #print('user created')

    async def register_user(self, user: UserCreate):
        new_user = User(username=user.username,
        email=user.email,
        hashed_password=user.password,
        is_active = False,
        is_superuser=False
        )
        
        self.db_session.add(new_user)
        await self.db_session.flush()
        return new_user

    async def check_user(self, email: str):
        user_exist = await self.db_session.execute(select(User).filter(User.email==email))
        #print(user_exist)
        return user_exist.scalar_one_or_none()

    async def check_username(self, username: str):
        user_exist = await self.db_session.execute(select(User).filter(User.username==username))
        #print(user_exist)
        return user_exist.scalar_one_or_none()
    
    async def get_user_by_id(self, id: str):
        item = await self.db_session.get(User, id)
        return item

    async def get_confirmation_uuid(self, confirmation_uuid:str):
        user_exist = await self.db_session.execute(select(User).filter(str(User.confirmation)==confirmation_uuid))
        #print(user_exist)
        return user_exist

架构/users.py

from typing import Optional
from pydantic import BaseModel, EmailStr


class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

class ShowUser(BaseModel):
    username: str
    email: EmailStr
    is_active: bool

    class Config():
        orm_mode = True

【问题讨论】:

  • user 为无或payload['sub'] 为无?
  • @fchancel payload['sub'] 是无
  • 你调用get_confirmation_token的函数或端点在哪里?
  • @fchancel 在注册路径confirmation = Auth.get_confirmation_token(new_user.id)我用注册路径更新了代码。

标签: python sqlalchemy jwt fastapi


【解决方案1】:

问题来自async def create_user中的这一行

new_user = User(email=form_data.email,hashed_password=Auth.get_password_hash(form_data.password))

您的 User 类必须包含自 nullable=False 以来的用户名。但是,在创建用户时,您只需向他提供电子邮件和密码。所以你的变量 new_user 不包含用户。所以当你这样做时: confirmation = Auth.get_confirmation_token(new_user.id) new_user.id 设置为无。所以你的子令牌实际上包含 None。

所以,替换:

new_user = User(email=form_data.email,hashed_password=Auth.get_password_hash(form_data.password))

与:

new_user = User(email=form_data.email,username=form_data.username,hashed_password=Auth.get_password_hash(form_data.password))

此时,您已经创建了映射类的实例。正如 SQLAlchemy ORM 文档中所解释的那样。此时 id 不断返回 None。我们说该实例处于待处理状态。尚未发出 SQL,并且该对象尚未由数据库中的行表示(因此 id 为 none)。所以要让我们的 User 实例持久化。需要 Session.add()。

为了完成我们的创建并验证我们与数据库的事务。现在我们只需要 Session.commit()

所以总结一下你必须做的:

session = Session()
new_user = User(email=form_data.email,username=form_data.username,hashed_password=Auth.get_password_hash(form_data.password))
session.add(new_user)
session.commit()

之后,您将可以访问您的用户的 ID。

关于 Session 的创建以及为了找到有关该过程的更多信息,我鼓励您去查看文档: https://docs.sqlalchemy.org/en/14/orm/tutorial.html#create-a-schema

编辑 针对上述关于 AttributeError 的评论

在您的代码中,您尝试注册用户 3 次。第一次在您的 create_user 路线中(这是我在回答中谈论的代码)。第二次,在此端点的末尾调用return await users.register_user(form_data),第三次在async def verify 内调用return await users.register_user(user)。 错误来自第三次调用,因为您的 register_user 函数将 UserCreate 作为参数,并且此时您向其传递了一个与请求格式不对应的用户实例。所以实际上user.password 不存在,因为您的 User 实例没有名为 password 的属性。

我觉得有必要检查一下你的代码逻辑。

【讨论】:

  • 没有改变它仍然返回None
  • 您查看过 new_user 变量包含的内容吗?
  • 是的,它包含对象<db.models.users.User object at 0x0000014D26A3E280>,我还查看了new_user.id,它仍在返回None
  • AttributeError: 'User' object has no attribute 'password' 现在当我尝试提交到数据库时它给了我这个错误await users.register_user(new_user)File ".\db\repository\users_data_access_layer.py", line 26, in register_user hashed_password=user.password, AttributeError: 'User' object has no attribute 'password'
  • 我想我必须发布这些新错误出现的新问题。你能看看吗?
猜你喜欢
  • 2021-09-25
  • 2018-11-18
  • 1970-01-01
  • 2019-10-29
  • 1970-01-01
  • 2018-09-15
  • 2017-04-05
  • 2021-02-12
  • 2017-11-13
相关资源
最近更新 更多