【发布时间】:2021-07-02 14:52:21
【问题描述】:
我正在通过 Python 中的 JWT 验证我的 Bearer 令牌,它之前是以仅处理 idTokens 的方式编写的。我们刚刚转移到 UI 上的新身份验证代码流模式,他们建议我使用 accessToken 而不是 idToken 进行令牌验证。如果我使用 idToken,该应用程序的 E2E 效果很好,但是当我在 Bearer 身份验证中使用 accessToken 时,验证失败。我收到了未经授权的 401。
请指教。
这是我的python代码:
import os
import sys
import requests
import time
import calendar
from functools import wraps
from jose import jwk, jwt, JWTError
from flask import abort, current_app, request
from src.database import db
from src.secrets import derive_base64_secret
from src.models.user import User, UserRoleEnum
CLIENT_ID = os.environ.get("AZURE_AD_CLIENT_ID")
TENANT_ID = os.environ.get("AZURE_AD_TENANT_ID")
AUTHORITY_MSAL = f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
OIDC = requests.get(f"{AUTHORITY_MSAL}/.well-known/openid-configuration").json()
MSAL_JWKS = requests.get(OIDC["jwks_uri"]).json()
S2S_JWK = {"alg": "HS256", "kty": "oct", "k": derive_base64_secret("S2S JWT", 32)}
def validate_msal_token(token):
try:
return jwt.decode(token, MSAL_JWKS, audience=CLIENT_ID, issuer=[AUTHORITY_MSAL])
except JWTError:
abort(401)
def validate_s2s_token(token):
try:
return jwt.decode(token, S2S_JWK, audience=CLIENT_ID, issuer=CLIENT_ID)
except JWTError:
abort(401)
def create_s2s_token(ttl, additional_claims):
now = calendar.timegm(time.gmtime())
claims = {
"iss": CLIENT_ID,
"aud": CLIENT_ID,
"iat": now,
"exp": now + ttl,
**additional_claims,
}
return jwt.encode(claims, S2S_JWK)
def get_access_token():
authorization = request.headers.get("Authorization")
if isinstance(authorization, str) and authorization.startswith("Bearer "):
return authorization[7:]
return request.args.get("access_token")
def load_user():
access_token = get_access_token()
if not access_token:
abort(401)
token = validate_msal_token(access_token)
if not token:
abort(401)
oid = token.get("oid", None)
if not oid:
abort(403)
user = User.query.filter_by(OID=oid).first()
if not user:
# Auto add the first user to access the API as an Admin IN DEVELOPMENT ONLY
if current_app.env == 'development' and User.query.count() == 0:
user = User(OID=oid, Display_Name="Unknown", User_Name="unknown@example.com", Role=0)
db.session.add(user)
else:
abort(403)
display_name = token.get("name", None)
if display_name and user.Display_Name != display_name:
user.Display_Name = display_name
# `preferred_username` is supplied on a v2 id token as provided by the FM
# `unique_name` is supplied on a v1 id token as provided by the MDA
user_name = token.get("preferred_username", token.get("unique_name", None))
if user_name and user.User_Name != user_name:
user.User_Name = user_name
db.session.commit()
request.token = token
request.user = user
return user
def jwt_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
access_token = get_access_token()
if not access_token:
abort(401)
token = validate_s2s_token(access_token)
if not token:
abort(401)
request.token = token
return fn(*args, **kwargs)
return wrapper
def user_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
load_user()
return fn(*args, **kwargs)
return wrapper
【问题讨论】:
标签: python-3.x jwt msal