【问题标题】:Output pyodbc cursor results as python dictionary将pyodbc游标结果输出为python字典
【发布时间】:2013-05-07 07:54:31
【问题描述】:

如何将 pyodbc 游标输出(来自 .fetchone.fetchmany.fetchall)序列化为 Python 字典?

我正在使用bottlepy,需要返回dict,以便它可以将它作为JSON返回。

【问题讨论】:

  • 是的,我确实注意到这是在 FAQ for PEPE249 中,但这并没有改变我的要求。

标签: python dictionary pyodbc database-cursor pypyodbc


【解决方案1】:

我需要的,与 OP 要求的略有不同:
如果您想完全概括执行 SQL 选择查询的例程,但您需要通过索引号而不是名称来引用结果,您可以使用列表列表而不是字典来执行此操作。

每行返回的数据在返回列表中表示为字段(列)值的列表。
列名可以作为返回列表的第一项提供,因此在调用例程中解析返回的列表非常容易和灵活。
这样,执行数据库调用的例程就不需要知道它正在处理的数据的任何信息。这是这样一个例程:

    def read_DB_Records(self, tablename, fieldlist, wherefield, wherevalue) -> list:

        DBfile = 'C:/DATA/MyDatabase.accdb'
        # this connection string is for Access 2007, 2010 or later .accdb files
        conn = pyodbc.connect(r'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+DBfile)
        cursor = conn.cursor()

        # Build the SQL Query string using the passed-in field list:
        SQL = "SELECT "
        for i in range(0, len(fieldlist)):
            SQL = SQL + "[" + fieldlist[i] + "]"
            if i < (len(fieldlist)-1):
                SQL = SQL + ", "
        SQL = SQL + " FROM " + tablename

        # Support an optional WHERE clause:
        if wherefield != "" and wherevalue != "" :
            SQL = SQL + " WHERE [" + wherefield + "] = " + "'" + wherevalue + "';"

        results = []    # Create the results list object

        cursor.execute(SQL) # Execute the Query

        # (Optional) Get a list of the column names returned from the query:
        columns = [column[0] for column in cursor.description]
        results.append(columns) # append the column names to the return list

        # Now add each row as a list of column data to the results list
        for row in cursor.fetchall():   # iterate over the cursor
            results.append(list(row))   # add the row as a list to the list of lists

        cursor.close()  # close the cursor
        conn.close()    # close the DB connection

        return results  # return the list of lists

【讨论】:

    【解决方案2】:

    步骤如下:

    1. 导入库:
        from pandas import DataFrame
        import pyodbc
        import sqlalchemy
    
    1. 从本地数据库获取结果:
    db_file = r'xxx.accdb'
    odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)
    
    conn = pyodbc.connect(odbc_conn_str)
    cur = conn.cursor() 
    qry = cur.execute("SELECT * FROM tbl")
    columns = [column[0] for column in cur.description]
    
    results = []
    for row in cur.fetchall():
     
       results.append(dict(zip(columns, row)))
    df = DataFrame(results) 
    df
    

    【讨论】:

      【解决方案3】:

      我知道它已经过时了,我只是在重述其他人已经说过的话。但我发现这种方式很整洁,因为它也可以安全注入。

      def to_dict(row):
          return dict(zip([t[0] for t in row.cursor_description], row))
      
      def query(cursor, query, params=[], cursor_func=to_dict):
          cursor.execute(query, params) 
          results = [cursor_func(row) for row in cursor.fetchall()]
          return results
      
      quotes = query(cursor, "select * from currency where abbreviation like ?", ["USD"])
      

      【讨论】:

        【解决方案4】:

        如果您事先不知道列,请使用Cursor.description 构建列名列表,并使用zip 为每一行生成字典列表。示例假设已建立连接和查询:

        >>> cursor = connection.cursor().execute(sql)
        >>> columns = [column[0] for column in cursor.description]
        >>> print(columns)
        ['name', 'create_date']
        >>> results = []
        >>> for row in cursor.fetchall():
        ...     results.append(dict(zip(columns, row)))
        ...
        >>> print(results)
        [{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},   
         {'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
         {'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},     
         {'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]
        

        【讨论】:

        • 不知道cursor.description。这只是为我节省了大量时间。
        • 需要将括号括在 print(列)和 print(结果)周围才能正常工作
        • @LJT 仅在 python3 中......但由于 print() 函数在 python2 中有效,因此使用它是一个好习惯。
        • @BenLutgens 因为该示例生成一个 list 字典,而不是字典。
        • 更新:默认情况下,pypyodbc 设置 lowercase = True。您可以像这样覆盖它: import pypyodbc; pypyodbc.lowercase = 假。参考:link
        【解决方案5】:

        对于游标不可用的情况 - 例如,当某些函数调用或内部方法返回行时,您仍然可以使用 row.cursor_description 创建字典表示

        def row_to_dict(row):
            return dict(zip([t[0] for t in row.cursor_description], row))
        

        【讨论】:

          【解决方案6】:

          我喜欢 @bryan 和 @foo-stack 的答案。如果您正在使用 postgresql 并且正在使用 psycopg2,则可以使用一些 goodies from psycopg2 在从连接创建光标时通过将 cursorfactory 指定为 DictCursor 来实现相同的目的,如下所示:

          cur = conn.cursor( cursor_factory=psycopg2.extras.DictCursor )

          所以现在你可以执行你的 sql 查询,你会得到一个字典来获取你的结果,而不需要手动映射它们。

          cur.execute( sql_query )
          results = cur.fetchall()
          
          for row in results:
              print row['row_no']
          

          请注意,您必须import psycopg2.extras 才能使用。

          【讨论】:

            【解决方案7】:

            这是一个您可以使用的简短版本

            >>> cursor.select("<your SQL here>")
            >>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
            >>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]
            

            您可能知道,当您将 * 添加到列表时,您基本上会删除列表,将单个列表条目作为参数留给您正在调用的函数。通过使用 zip,我们选择第 1 到第 n 个条目并将它们拉在一起,就像裤子上的拉链一样。

            所以通过使用

            zip(*[(a,1,2),(b,1,2)])
            # interpreted by python as zip((a,1,2),(b,1,2))
            

            你得到

            [('a', 'b'), (1, 1), (2, 2)]
            

            由于 description 是一个带有元组的元组,其中每个元组描述了每列的标题和数据类型,因此您可以提取每个元组的第一个

            >>> columns = zip(*cursor.description)[0]
            

            相当于

            >>> columns = [column[0] for column in cursor.description]
            

            【讨论】:

            • 使用 python3.4 我得到:TypeError: 'zip' object is not subscriptable,所以我不能使用 zip(*description)[0] 技巧。
            • 在 python 3.4 中,zip 是一个迭代器。您可以将 zip 包装在列表中 list(zip(*description))[0] @malat
            • 你用columns变量保存了一行,但是通过分别计算每一行的列名来增加函数的复杂度
            【解决方案8】:

            使用@Beargle 的结果和bottlepy,我能够创建这个非常简洁的查询暴露端点:

            @route('/api/query/<query_str>')
            def query(query_str):
                cursor.execute(query_str)
                return {'results':
                        [dict(zip([column[0] for column in cursor.description], row))
                         for row in cursor.fetchall()]}
            

            【讨论】:

            • 这会受到 SQL 注入攻击吗?
            • @Ben 是的!除非您 1000% 确定请求将始终来自受信任的客户端,否则永远不要使用它。
            【解决方案9】:

            主要是通过@Torxed 响应,我创建了一套完整的通用函数来将架构和数据查找到字典中:

            def schema_dict(cursor):
                cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
                schema = {}
            
                for it in cursor.fetchall():
                    if it[0] not in schema:
                        schema[it[0]]={'scheme':[]}
                    else:
                        schema[it[0]]['scheme'].append(it[1])
            
                return schema
            
            
            def populate_dict(cursor, schema):
                for i in schema.keys():
                    cursor.execute("select * from {table};".format(table=i))
            
                    for row in cursor.fetchall():
                        colindex = 0
            
                        for col in schema[i]['scheme']:
                            if not 'data' in schema[i]:
                                schema[i]['data']=[]
            
                            schema[i]['data'].append(row[colindex])
                            colindex += 1
            
                return schema
            
            def database_to_dict():
                cursor = connect()
                schema = populate_dict(cursor, schema_dict(cursor))
            

            随意在这方面进行所有代码高尔夫以减少行数;但与此同时,它有效!

            ;)

            【讨论】:

              【解决方案10】:

              假设您知道列名! 另外,这里有三种不同的解决方案,
              你可能想看看最后一个!

              colnames = ['city', 'area', 'street']
              data = {}
              
              counter = 0
              for row in x.fetchall():
                  if not counter in data:
                      data[counter] = {}
              
                  colcounter = 0
                  for colname in colnames:
                      data[counter][colname] = row[colcounter]
                      colcounter += 1
              
                  counter += 1
              

              这是一个索引版本,不是最漂亮的解决方案,但它会起作用。 另一种方法是将列名作为字典键索引,每个键中的列表包含按行号顺序排列的数据。通过这样做:

              colnames = ['city', 'area', 'street']
              data = {}
              
              for row in x.fetchall():
                  colindex = 0
                  for col in colnames:
                      if not col in data:
                          data[col] = []
                      data[col].append(row[colindex])
                      colindex += 1
              

              写到这里,我知道for col in colnames 可以替换为for colindex in range(0, len()),但你明白了。 后面的示例在不获取所有数据但一次获取一行时很有用,例如:

              对每一行数据使用dict

              def fetchone_dict(stuff):
                  colnames = ['city', 'area', 'street']
                  data = {}
              
                  for colindex in range(0, colnames):
                      data[colnames[colindex]] = stuff[colindex]
                  return data
              
              row = x.fetchone()
              print fetchone_dict(row)['city']
              

              获取表名(我认为.. 感谢 Foo Stack):
              更直接的解决方案来自下面的小熊!

              cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
              schema = {}
              for it in cursor.fetchall():
                  if it[0] in schema:
                     schema[it[0]].append(it[1])
                  else:
                      schema[it[0]] = [it[1]]
              

              【讨论】:

              • 谢谢,但是当我不知道我的列名时,有没有通用的解决方案?
              • 是的,这叫SQL语法。您可以在数据库中查询要查询的表的名称。 stackoverflow.com/questions/4645456/…
              • 我写了一个不错的通用模式收集器:
              • cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';") schema = {} for it in cursor.fetchall(): if it[0] in schema: schema[it[0]].append(it[1]) else: schema[it[0]] = [it[1]]
              • @FooStack 列名已在cursor.description 中返回。不需要单独的查询。
              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2017-07-06
              • 1970-01-01
              • 1970-01-01
              • 2010-09-17
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多