【问题标题】:how to perform https requests over an SSLsocket in python如何在 python 中通过 SSLsocket 执行 https 请求
【发布时间】:2012-05-17 17:51:32
【问题描述】:

我正在使用以下代码与 ssl 服务器执行 ssl 握手和证书验证。

import ssl
import socket

s = socket.socket()
print "connecting..."
#logging.debug("Connecting")
# Connect with SSL mutual authentication
# We only trust our server's CA, and it only trusts user certificates signed by it
c = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED,
                    ssl_version=ssl.PROTOCOL_SSLv3, ca_certs='ca.crt',
                    certfile='user.crt', keyfile='user.key')
c.connect((constants.server_addr, constants.port))

我能够连接到服务器并且证书已正确验证,但是,我不确定从这里做什么。我需要通过套接字执行 https 操作,包括将 XML 发布到 REST API。我该怎么办?

【问题讨论】:

    标签: python ssl https


    【解决方案1】:

    您可以使用wrap_socket 代码扩展httplib.HTTPConnection,如this answer 中所述。

    (我仍然会考虑使用 PycURL 之类的东西,正如我已经在 your previous question 中回答的那样。)

    【讨论】:

      【解决方案2】:

      你可能想从urllib2.urlopen开始:http://docs.python.org/library/urllib2.html#urllib2.urlopen

      这可以处理 https URL、获取、发布等。您不需要直接在低级 socketssl 对象上工作。但是,如果您使用的是 Python 2.x,HTTPS 连接不会对服务器端证书进行任何验证,这看起来像是您需要的(这很好)。不过,Python 3 的 urllib 确实做到了。

      如果您使用的是 Python 2,您有几个选择。一种是将urllib2.HTTPSHandler 子类化,以便对其套接字进行适当的验证。另一种是自己实现需要的HTTP协议位(不推荐)。您也可以正常实例化各种urllib2httplib 对象,然后简单地分配您已经过身份验证的ssl 套接字来代替他们正在使用的那些,尽管您需要非常小心它们的状态不会得到弄乱。不过,标准库中的源代码非常易读,以防您需要像这样进行修改。

      【讨论】:

      • 如果你想使用httplib,你确实需要做一些手动的ssl处理,不幸的是httplib不做任何服务器证书验证。同样适用于 urllib2...
      • @Bruno 我是通过另一个问题发现的。这就是为什么我不得不使用我发布的代码。你知道如何使用该连接来获得经过身份验证的 https 连接吗?
      【解决方案3】:

      这正是我在项目中所做的。这是我在项目中使用的 REST 客户端模块。它已根据我的需要进行了修改,但我认为您可能会发现它也很有用。它需要 httplib2:http://pypi.python.org/pypi/httplib2

      """
          client.py
          ---------
      
          Modified to allow validation server's certificate with external cacert list.
          -- Arif Widi Nugroho <arif@sainsmograf.com>
      
          Copyright (C) 2008 Benjamin O'Steen
      
          This file is part of python-fedoracommons.
      
          python-fedoracommons is free software: you can redistribute it and/or modify
          it under the terms of the GNU General Public License as published by
          the Free Software Foundation, either version 3 of the License, or
          (at your option) any later version.
      
          python-fedoracommons is distributed in the hope that it will be useful,
          but WITHOUT ANY WARRANTY; without even the implied warranty of
          MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
          GNU General Public License for more details.
      
          You should have received a copy of the GNU General Public License
          along with python-fedoracommons.  If not, see <http://www.gnu.org/licenses/>.
      """
      
      __license__ = 'GPL http://www.gnu.org/licenses/gpl.txt'
      __author__ = "Benjamin O'Steen <bosteen@gmail.com>, Arif Widi Nugroho <arif@sainsmograf.com>"
      __version__ = '0.1'
      
      import httplib2
      import urlparse
      import urllib
      import base64
      from base64 import encodestring
      
      from mime_types import *
      
      import mimetypes
      
      from cStringIO import StringIO
      
      class Connection(object):
          def __init__(self, base_url, username=None, password=None, cache=None, ca_certs=None, user_agent_name=None):
              self.base_url = base_url
              self.username = username
              m = MimeTypes()
              self.mimetypes = m.get_dictionary()
      
              self.url = urlparse.urlparse(base_url)
      
              (scheme, netloc, path, query, fragment) = urlparse.urlsplit(base_url)
      
              self.scheme = scheme
              self.host = netloc
              self.path = path
      
              if user_agent_name is None:
                  self.user_agent_name = 'Basic Agent'
              else:
                  self.user_agent_name = user_agent_name
      
              # Create Http class with support for Digest HTTP Authentication, if necessary
              # self.h = httplib2.Http(".cache")
              self.h = httplib2.Http(cache=cache, ca_certs=ca_certs)
              self.h.follow_all_redirects = True
              if username and password:
                  self.h.add_credentials(username, password)
      
          def request_get(self, resource, args = None, headers={}):
              return self.request(resource, "get", args, headers=headers)
      
          def request_delete(self, resource, args = None, headers={}):
              return self.request(resource, "delete", args, headers=headers)
      
          def request_head(self, resource, args = None, headers={}):
              return self.request(resource, "head", args, headers=headers)
      
          def request_post(self, resource, args = None, body = None, filename=None, headers={}):
              return self.request(resource, "post", args , body = body, filename=filename, headers=headers)
      
          def request_put(self, resource, args = None, body = None, filename=None, headers={}):
              return self.request(resource, "put", args , body = body, filename=filename, headers=headers)
      
          def get_content_type(self, filename):
              extension = filename.split('.')[-1]
              guessed_mimetype = self.mimetypes.get(extension, mimetypes.guess_type(filename)[0])
              return guessed_mimetype or 'application/octet-stream'
      
          def request(self, resource, method = "get", args = None, body = None, filename=None, headers={}):
              params = None
              path = resource
              headers['User-Agent'] = self.user_agent_name
      
              BOUNDARY = u'00hoYUXOnLD5RQ8SKGYVgLLt64jejnMwtO7q8XE1'
              CRLF = u'\r\n'
      
              if filename and body:
                  #fn = open(filename ,'r')
                  #chunks = fn.read()
                  #fn.close()
      
                  # Attempt to find the Mimetype
                  content_type = self.get_content_type(filename)
                  headers['Content-Type']='multipart/form-data; boundary='+BOUNDARY
                  encode_string = StringIO()
                  encode_string.write(CRLF)
                  encode_string.write(u'--' + BOUNDARY + CRLF)
                  encode_string.write(u'Content-Disposition: form-data; name="file"; filename="%s"' % filename)
                  encode_string.write(CRLF)
                  encode_string.write(u'Content-Type: %s' % content_type + CRLF)
                  encode_string.write(CRLF)
                  encode_string.write(body)
                  encode_string.write(CRLF)
                  encode_string.write(u'--' + BOUNDARY + u'--' + CRLF)
      
                  body = encode_string.getvalue()
                  headers['Content-Length'] = str(len(body))
              elif body:
                  if not headers.get('Content-Type', None):
                      headers['Content-Type']='text/xml'
                  headers['Content-Length'] = str(len(body))        
              else: 
                  headers['Content-Type']='text/xml'
      
              if method.upper() == 'POST':
                  headers['Content-Type']='application/x-www-form-urlencoded'
      
              if args:
                  path += u"?" + urllib.urlencode(args)
      
              request_path = []
              if self.path != "/":
                  if self.path.endswith('/'):
                      request_path.append(self.path[:-1])
                  else:
                      request_path.append(self.path)
                  if path.startswith('/'):
                      request_path.append(path[1:])
                  else:
                      request_path.append(path)
      
              resp, content = self.h.request(u"%s://%s%s" % (self.scheme, self.host, u'/'.join(request_path)), method.upper(), body=body, headers=headers )
      
              return {u'headers':resp, u'body':content.decode('UTF-8')}
      

      使用示例(如果服务器证书没有被指定的ca签名,连接会失败):

      c = client.Connection('https://localhost:8000', certs='/path/to/cacert.pem')
      # now post some data to the server
      response = c.request_post('rest/path/', body=some_urlencoded_data)
      if response['headers']['status'] == '200':
          # do something...
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-10-18
        • 2014-08-25
        • 1970-01-01
        • 2011-08-17
        • 2021-06-20
        • 1970-01-01
        • 2018-03-24
        相关资源
        最近更新 更多