【发布时间】:2021-09-23 14:56:06
【问题描述】:
我正在尝试将使用 FastAPI 和乌龟 orm 的代码 sn-p 模仿到 SQLAlachemy orm 中:
@auth_router.get("/verify/{token}")
async def verify(token: str):
invalid_token_error = HTTPException(status_code=400, detail="Invalid token")
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=settings.TOKEN_ALGORITHM)
except jwt.JWTError:
raise HTTPException(status_code=403, detail="Token has expired")
if payload['scope'] != 'registration':
raise invalid_token_error
user = await users.UserModel.get_or_none(id=payload['sub'])
if not user or str(user.confirmation) != payload['jti']:
raise invalid_token_error
if user.is_active:
raise HTTPException(status_code=403, detail="User already activated")
user.confirmation = None
user.is_active = True
await user.save()
return await users.User_Pydantic.from_tortoise_orm(user)
SQLAlchemy 实现:
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from jose import jwt
from db.models.users import User
from schemas.users import UserCreate, ShowUser
from db.repository.users_data_access_layer import Users
from core.auth import Auth
from core.hashing import Hasher
from core.mailer import Mailer
from core.config import Settings
from depends import get_user_db
router = APIRouter()
get_settings = Settings()
@router.post("/", response_model=ShowUser)
async def create_user(form_data: UserCreate = Depends(), users: Users = Depends(get_user_db)):
if await users.check_user(email=form_data.email) is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User already exists"
)
elif await users.check_username(username=form_data.username) is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
new_user = User(email=form_data.email,hashed_password=Auth.get_password_hash(form_data.password))
confirmation = Auth.get_confirmation_token(new_user.id)
print(confirmation)
new_user.confirmation = confirmation["jti"]
try:
Mailer.send_confirmation_message(confirmation["token"], form_data.email)
except ConnectionRefusedError:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Email couldn't be send. Please try again."
)
return await users.register_user(form_data)
@router.get("/verify/{token}")
async def verify(token: str, users: Users = Depends(get_user_db)):
invalid_token_error = HTTPException(status_code=400, detail="Invalid Token")
try:
payload = jwt.decode(token, get_settings.SECRET_KEY, algorithms=[get_settings.TOKEN_ALGORITHM])
print(payload['sub'])
except jwt.JWSError:
raise HTTPException(status_code=403, detail="Token has Expired")
if payload['scope'] != 'registration':
raise invalid_token_error
print(payload['sub'])
user = await users.get_user_by_id(id=str(payload['sub']))
print(user)
print('hello2')
if not user or await users.get_confirmation_uuid(str(User.confirmation)) != payload['jti']:
print('hello')
raise invalid_token_error
if user.is_active:
print('hello2')
raise HTTPException(status_code=403, detail="User already Activated")
user.confirmation = None
user.is_active = True
return await users.register_user(user)
当我到达端点@router.get("/verify/{token}") 并且控制权到达user = await users.get_user_by_id(id=payload['sub']) 时,它只是返回None 而不是id,因此我得到"detail": Invaild Token。任何人都可以指出我在这里做错了什么?
这是我剩下的 API 结构:
auth.py
from jose import jwt
from datetime import datetime, timedelta
from core.config import Settings
from pydantic import UUID4
import uuid
from passlib.context import CryptContext
settings = Settings()
class Auth:
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@classmethod
def get_password_hash(cls, password: str) -> str:
return cls.password_context.hash(password)
@staticmethod
def get_token(data: dict, expires_delta: int):
pass
to_encode = data.copy()
to_encode.update({
"exp": datetime.utcnow() + timedelta(seconds=expires_delta),
"iss": settings.PROJECT_NAME
})
return jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.TOKEN_ALGORITHM
)
@staticmethod
def get_confirmation_token(user_id: UUID4):
jti = uuid.uuid4()
claims = {
"sub": str(user_id),
"scope": "registration",
"jti": str(jti)
}
return {
"jti": jti,
"token": Auth.get_token(
claims,
settings.REGISTRATION_TOKEN_LIFETIME
)
}
users.py:
import uuid
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.orm import relationship
from db.base_class import Base
class User(Base):
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String, unique=True, nullable=False)
email = Column(String, nullable=False, unique=True, index=True)
hashed_password = Column(String(255), nullable=False,)
is_active = Column(Boolean, default=False)
is_superuser = Column(Boolean, default=False)
confirmation = Column(UUID(as_uuid=True), nullable=True, default=uuid.uuid4)
jobs = relationship("Job", back_populates="owner")
users_data_access_layer.py:
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select
from db.models.users import User
from schemas.users import UserCreate
from core.hashing import Hasher
db_session = Session
class Users():
def __init__(self, db_session: Session):
self.db_session = db_session
#print('user created')
async def register_user(self, user: UserCreate):
new_user = User(username=user.username,
email=user.email,
hashed_password=user.password,
is_active = False,
is_superuser=False
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def check_user(self, email: str):
user_exist = await self.db_session.execute(select(User).filter(User.email==email))
#print(user_exist)
return user_exist.scalar_one_or_none()
async def check_username(self, username: str):
user_exist = await self.db_session.execute(select(User).filter(User.username==username))
#print(user_exist)
return user_exist.scalar_one_or_none()
async def get_user_by_id(self, id: str):
item = await self.db_session.get(User, id)
return item
async def get_confirmation_uuid(self, confirmation_uuid:str):
user_exist = await self.db_session.execute(select(User).filter(str(User.confirmation)==confirmation_uuid))
#print(user_exist)
return user_exist
架构/users.py
from typing import Optional
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class ShowUser(BaseModel):
username: str
email: EmailStr
is_active: bool
class Config():
orm_mode = True
【问题讨论】:
-
user为无或payload['sub']为无? -
@fchancel
payload['sub']是无 -
你调用
get_confirmation_token的函数或端点在哪里? -
@fchancel 在注册路径
confirmation = Auth.get_confirmation_token(new_user.id)我用注册路径更新了代码。
标签: python sqlalchemy jwt fastapi