【问题标题】:How can I get access to multiple user's OneDrive files through a custom daemon and the graph REST API?如何通过自定义守护程序和图形 REST API 访问多个用户的 OneDrive 文件?
【发布时间】:2026-01-30 13:15:01
【问题描述】:

我们正在开发一个守护程序服务,该服务将定期自动连接到 Microsoft Graph API,以列出所有用户驱动器中包含敏感内容的所有文件。我们已经在我们的 Azure/Office365 租户帐户中设置了一个自定义应用程序,该应用程序启用了许多权限(所有 Graph 和 Sharepoint privs(以及其他一些),为了测试)。

使用 Graph Explorer 工具和我的个人登录帐户,我可以使用 /me/drive/root/children 端点和 /users('<user-id>')/drive/root/children 端点(当用户 ID 是我自己的)列出我自己的驱动器帐户中的文件。当我尝试使用 curl 和 client_credentialsgrant_type 进行连接时,使用 Azure 中我们的自定义应用程序中的 client_idclient_secret/users('<user-id>')/drive 返回正确的驱动器 ID,但 /users('<user-id>')/drive/root/children 只返回一个空孩子名单。

我是否缺少某些需要在某处设置的权限?

这是 Graph API 当前状态的限制吗?

这是client_credentials 授权类型的限制吗?

【问题讨论】:

    标签: office365 onedrive microsoft-graph-api office365-restapi


    【解决方案1】:

    这是 Graph API 当前状态的限制 - 不存在与客户端凭据流一起使用的仅限应用权限范围,这将允许应用访问任何用户的驱动器/文件. Files.* 范围只能用作委派权限 - 请参阅 https://graph.microsoft.io/en-us/docs/authorization/permission_scopes

    【讨论】:

      【解决方案2】:

      现在可以使用新的Microsoft App Dev Portal 并按照说明here 进行操作(具有应用程序权限)。或者,如果您在 Azure 门户中创建(注册)您的应用程序,则必须使用 X509 证书而不是共享密钥(客户端密钥)。至少对我来说,最有用的资源是:

      这里有一些 python 代码(用于第二种情况),它会生成一个 url 供用户访问,这样她就可以授权您的应用程序,并请求访问令牌:

      import calendar
      from cryptography import x509
      from cryptography.hazmat.backends import default_backend
      from cryptography.hazmat.primitives import serialization
      from datetime import datetime, timedelta
      import jwt
      from jwt.exceptions import InvalidTokenError
      from oauthlib.common import generate_nonce, generate_token
      from oauthlib.oauth2 import BackendApplicationClient
      import requests
      from requests_oauthlib import OAuth2Session
      import uuid
      
      def to_unix(obj):
          if isinstance(obj, datetime):
              if obj.utcoffset() is not None:
                  obj = obj - obj.utcoffset()
          millis = calendar.timegm(obj.timetuple()) + obj.microsecond / 1e6
          return millis
      
      def validate_id_token(token):
          '''Validates the given id token.
      
          Args:
              token (str): An encoded ID token.
          Returns:
              The decoded token which is a dict.
          '''
          # Extract kid from token header
          try:
              header = jwt.get_unverified_header(token)
          except InvalidTokenError as e:
              raise Exception('No valid id token provided.')
              })
          else:
              kid = header.get('kid', '')
      
          if not kid:
              raise Exception("Unable to find 'kid' claim in token header.")
      
          # Fetch public key info
          url = 'https://login.microsoftonline.com/common/discovery/keys'
          try:
              response = requests.get(url)
          except RequestException as e:
              raise Exception('Failed to get public key info: %s' % e)
          else:
              if not response.ok:
                  raise Exception('Failed to get public key info: %s' %
                                    response.content)
              else:
                  public_keys = response.json().get('keys', [])
      
          # Find public key, used to sign id token
          public_key = None
          for k in public_keys:
              if kid == k['kid']:
                  public_key = k['x5c'][0]
                  break
          if not public_key:
              raise Exception("Unable to find public key for given kid '%s'" % kid)
      
          # Verify id token signature
          # NOTE: The x5c value is actually a X509 certificate. The public key
          # could also be generated from the n (modulos) and e (exponent) values.
          # But that's more involved.
          cert_string = ('-----BEGIN CERTIFICATE-----\n' +
                         public_key +
                         '\n-----END CERTIFICATE-----').encode('UTF-8')
          try:
              cert = x509.load_pem_x509_certificate(
                  cert_string, default_backend())
          except ValueError as e:
              raise Exception('Failed to load certificate for token signature'
                                'verification: %s' % e)
          else:
              public_key = cert.public_key()
      
          try:
              decoded = jwt.decode(token, public_key, audience=self.key)
          except InvalidTokenError as e:
              raise Exception('Failed to decode token: %s' % e)
          else:
              return decoded
      
      def generate_client_assertion(tenant_id, fp_hash, private_key, private_key_passphrase):
          """Generate a client assertion (jwt token).
      
          This token is required to fetch an oauth app-only access token.
      
          Args:
              fp_hash (str): Base64 encoded SHA1 has of certificate fingerprint
              private_key (str): Private key used to sign the jwt token
              tenant_id (str): The tenant to which this token is bound.
          Returns:
              On success a tuple of the client assertion and the token type
              indicator.
          """
          valid_from = str(int(ts.to_unix(datetime.utcnow() - timedelta(0, 1))))
          expires_at = str(int(ts.to_unix(datetime.utcnow() + timedelta(7))))
          jwt_payload = {
              'aud': ('https://login.microsoftonline.com/%s/'
                      'oauth2/token' % tenant_id),
              'iss': client_id,
              'sub': client_id,
              'jti': str(uuid.uuid1()),
              'nbf': valid_from,
              'exp': expires_at,
          }
          headers = {
              'x5t': fp_hash
          }
      
          if not private_key_passphrase:
              secret = private_key
          else:
              try:
                  secret = serialization.load_pem_private_key(
                      str(private_key), password=str(private_key_passphrase),
                      backend=default_backend())
              except Exception as e:
                  raise Exception('Failed to load private key: %s' % e)
      
          try:
              client_assertion = jwt.encode(jwt_payload, secret,
                                            algorithm='RS256', headers=headers)
          except ValueError as e:
              raise Exception('Failed to encode jwt_payload: %s' % e)
      
          client_assertion_type = ('urn:ietf:params:oauth:client-assertion-type:'
                                   'jwt-bearer')
      
          return client_assertion, client_assertion_type
      
      def generate_auth_url(client_id, redirect_uri):
          nonce = generate_nonce()
          state = generate_token()
          query_params = {
              'client_id': client_id,
              'nonce': nonce,
              'prompt': 'admin_consent',
              'redirect_uri': redirect_uri,
              'response_mode': 'fragment',
              'response_type': 'id_token',
              'scope': 'openid',
              'state': state
          }
          tenant = 'common'
          auth_url = ('https://login.microsoftonline.com/%s'
                      '/oauth2/authorize?%s') % (tenant, urllib.urlencode(query_params))
      
          return nonce, auth_url
      
      def get_access_token(client_id, id_token, nonce=None):
          '''id_token is returned w/ the url after the user authorized the app'''
          decoded_id_token = validate_id_token(id_token)
      
          # Compare the nonce values, to mitigate token replay attacks
          if not nonce:
              raise Exception("No nonce value provided.")
          elif nonce != decoded_id_token['nonce']:
              raise Exception("Nonce values don't match!")
      
          # Prepare the JWT token for fetching an access token
          tenant_id = decoded_id_token['tid']
          client_assertion, client_assertion_type = generate_client_assertion(tenant_id)
      
          # Fetch the access token
          client = BackendApplicationClient(self.key)
          oauth = OAuth2Session(client=client)
          resource = 'https://graph.microsoft.com/'
          url = https://login.microsoftonline.com/common/oauth2/token
          query_params = {
              'client_id': client_id,
              'client_assertion': client_assertion,
              'client_assertion_type': client_assertion_type,
              'resource': resource
          }
          try:
              fetch_token_response = oauth.fetch_token(url, **query_params)
          except Exception as e:
              raise Exception('Failed to obtain access token: %s' % e)
          else:
              return fetch_token_response
      

      【讨论】:

        最近更新 更多