【发布时间】:2021-09-25 07:41:39
【问题描述】:
根据下面的代码,试图找出我收到无效令牌错误的原因。我正在通过我的 API 测试注册和身份验证。
我创建了一个虚拟帐户,然后查看我的电子邮件以获取验证链接。一切正常,直到我单击电子邮件中的链接并收到 400 错误请求,并且由于我的调试,错误是由“无效令牌”引起的。
这是我的代码:
route_user.py:
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from jose import jwt
from db.models.users import User
from schemas.users import UserCreate, ShowUser
from db.repository.users_data_access_layer import Users
from core.auth import Auth
from core.mailer import Mailer
from core.config import Settings
from depends import get_user_db
router = APIRouter()
get_settings = Settings()
@router.post("/", response_model=ShowUser)
async def create_user(form_data: UserCreate = Depends(), users: Users = Depends(get_user_db)):
if await users.check_user(email=form_data.email) is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User already exists"
)
elif await users.check_username(username=form_data.username) is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
new_user = await users.register_user(username=form_data.username, email=form_data.email, hashed_password=form_data.password)
#print(new_user)
confirmation = Auth.get_confirmation_token(new_user.id)
#print(confirmation)
new_user.confirmation = confirmation["jti"]
# users.db_session.add(new_user)
# await users.db_session.flush()
print(new_user.confirmation)
try:
Mailer.send_confirmation_message(confirmation["token"], form_data.email)
except ConnectionRefusedError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Email couldn't be send. Please try again."
)
# users.db_session.add(new_user)
# await users.db_session.flush()
#return new_user
@router.get("/verify/{token}")
async def verify(token: str, users: Users = Depends(get_user_db)):
invalid_token_error = HTTPException(status_code=400, detail="Invalid Token")
try:
payload = jwt.decode(token, get_settings.SECRET_KEY, algorithms=[get_settings.TOKEN_ALGORITHM])
print(payload['sub'])
print(payload['jti'])
except jwt.JWSError:
raise HTTPException(status_code=403, detail="Token has Expired")
if payload['scope'] != 'registration':
raise invalid_token_error
#print(payload['sub'])
user = await users.get_user_by_id(id=payload['sub'])
#print(user)
#print('hello2')
#print(user)
print(User.confirmation)
print(User.id)
if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:
print('hello')
raise invalid_token_error
# if user.is_active:
# #print('hello2')
# raise HTTPException(status_code=403, detail="User already Activated")
# user.confirmation = None
# user.is_active = True
# return user
user_data_access_layer:
from core.hashing import Hasher
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select
from sqlalchemy.sql import exists
from db.models.users import User
from schemas.users import UserCreate
from core.hashing import Hasher
db_session = Session
class Users():
def __init__(self, db_session: Session):
self.db_session = db_session
#print('user created')
async def register_user(self, username: str, email: str, hashed_password: str):
new_user = User(username=username, email=email, hashed_password=Hasher.get_password_hash(hashed_password))
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def check_user(self, email: str):
user_exist = await self.db_session.execute(select(User).filter(User.email==email))
#print(user_exist)
return user_exist.scalar_one_or_none()
async def check_username(self, username: str):
user_exist = await self.db_session.execute(select(User).filter(User.username==username))
#print(user_exist)
return user_exist.scalar_one_or_none()
async def get_user_by_id(self, id: str):
user_exist = await self.db_session.execute(select(User).filter(User.id==id))
#print(user_exist)
return user_exist.scalar_one_or_none()
async def get_confirmation_uuid(self, confirmation_uuid: str):
user_exist = await self.db_session.execute(select(User).filter(str(User.confirmation)==confirmation_uuid))
#print(user_exist)
return user_exist.scalar_one_or_none()
auth.py
from jose import jwt
from datetime import datetime, timedelta
from core.config import Settings
from pydantic import UUID4
import uuid
from passlib.context import CryptContext
settings = Settings()
class Auth:
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@classmethod
def get_password_hash(cls, password: str) -> str:
return cls.password_context.hash(password)
@staticmethod
def get_token(data: dict, expires_delta: int):
pass
to_encode = data.copy()
to_encode.update({
"exp": datetime.utcnow() + timedelta(seconds=expires_delta),
"iss": settings.PROJECT_NAME
})
return jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.TOKEN_ALGORITHM
)
@staticmethod
def get_confirmation_token(user_id: UUID4):
jti = uuid.uuid4()
claims = {
"sub": str(user_id),
"scope": "registration",
"jti": str(jti)
}
return {
"jti": jti,
"token": Auth.get_token(
claims,
settings.REGISTRATION_TOKEN_LIFETIME
)
}
models.py
import uuid
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from db.base_class import Base
class User(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String, unique=True, nullable=False)
email = Column(String, nullable=False, unique=True, index=True)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=False)
is_superuser = Column(Boolean, default=False)
confirmation = Column(UUID(as_uuid=True), nullable=True, default=uuid.uuid4)
jobs = relationship("Job", back_populates="owner")
当我尝试检查用户是否存在?令牌 ID 是否存储在数据库中?在
if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:
我得到"detail": "Invalid Token",当我得到print(User.confirmation) 时,它给了我User.id。
邮件发送后的数据库日志:
User.confirmation
1867657b-7cfa-471f-9daa-92ea192abb5a
E-mail has been sent!
INFO: 127.0.0.1:64005 - "POST /registration/?username=usertest&email=tutepoha%40livinginsurance.co.uk&password=usertest123 HTTP/1.1" 200 OK
2021-07-16 18:18:37,973 INFO sqlalchemy.engine.Engine UPDATE "user" SET confirmation=%s WHERE "user".id = %s
2021-07-16 18:18:37,974 INFO sqlalchemy.engine.Engine [generated in 0.00079s] (UUID('1867657b-7cfa-471f-9daa-92ea192abb5a'), UUID('674ead42-44a3-46b0-9645-689885ace026'))
不知道是什么问题,我已经尝试调试到我的程度,但现在我碰壁了。
【问题讨论】:
标签: python sqlalchemy jwt fastapi