【问题标题】:stored procedures with sqlAlchemy使用 sqlAlchemy 存储过程
【发布时间】:2011-04-03 14:04:32
【问题描述】:

如何用sqlAlchemy调用sql server的存储过程?

【问题讨论】:

    标签: python sql-server stored-procedures sqlalchemy


    【解决方案1】:

    引擎和连接有一个execute() 方法,可用于任意sql 语句,会话也是如此。例如:

    results = sess.execute('myproc ?, ?', [param1, param2])
    

    如果需要,您可以使用outparam() 创建输出参数(或者对于绑定参数使用bindparam()isoutparam=True 选项)

    【讨论】:

    • 这与数据库无关。请改用sqlalchemy.sql.text
    • 顺便说一句,如果您想访问 MS SQL Server 中存储过程返回的行,您需要 sess.execute('SET NOCOUNT ON')。您可以在一个执行调用中做到这一点:results = sess.execute('SET NOCOUNT ON; EXEC myproc ?, ?; SET NOCOUNT OFF', [param1, param2]).
    • 这是一个旧线程所以,也许这在新版本的 sqlalchemy 中有所改变,但我不得不使用字典而不是参数列表,并在原始 sql 而不是“?”。所以,上面的例子变成了:sess.execute('myproc :p1, :p2', {'p1': 'value1', 'p2': 'value2'})
    • 还要检查 Denis Otkidach 的答案(和 cmets 部分)
    • 我们遇到过标准连接中运行失败,但在事务中运行成功的情况:with engine.begin() as conn: ...
    【解决方案2】:

    context:我将 flask-sqlalchemy 与 MySQL 一起使用,但不使用 ORM 映射。通常,我使用:

    # in the init method
    _db = SqlAlchemy(app)
    
    #... somewhere in my code ...
    _db.session.execute(query)
    

    现成不支持调用存储过程:callproc 不是通用的,而是特定于 mysql 连接器的。

    对于存储过程没有输出参数,可以执行类似的查询

    _db.session.execute(sqlalchemy.text("CALL my_proc(:param)"), param='something')
    

    像往常一样。当您有输出参数时,事情会变得更加复杂......


    使用参数的一种方法是通过engine.raw_connection() 访问底层连接器。例如:

    conn = _db.engine.raw_connection()
    # do the call. The actual parameter does not matter, could be ['lala'] as well
    results = conn.cursor().callproc('my_proc_with_one_out_param', [0])
    conn.close()   # commit
    print(results) # will print (<out param result>)
    

    这很好,因为我们可以访问 out 参数,但是这个连接不是由烧瓶会话管理的。这意味着它不会像其他托管查询一样被提交/中止...(仅当您的过程有副作用时才会出现问题)。

    最后,我这样做了:

    # do the call and store the result in a local mysql variabl
    # the name does not matter, as long as it is prefixed by @
    _db.session.execute('CALL my_proc_with_one_out_param(@out)')
    # do another query to get back the result
    result = _db.session.execute('SELECT @out').fetchone()
    

    result 将是一个具有一个值的元组:out 参数。这并不理想,但危险最小:如果会话期间另一个查询失败,过程调用也将中止(回滚)。

    【讨论】:

    • 是否可以将结果映射到一个类?
    【解决方案3】:

    只需执行使用func 创建的过程对象:

    from sqlalchemy import create_engine, func
    from sqlalchemy.orm import sessionmaker
    
    engine = create_engine('sqlite://', echo=True)
    print engine.execute(func.upper('abc')).scalar() # Using engine
    session = sessionmaker(bind=engine)()
    print session.execute(func.upper('abc')).scalar() # Using session
    

    【讨论】:

    • 为我工作,调用存储过程的优雅而简单的方法。官方说明 :data:.func 构造仅对调用独立“存储过程”的支持有限,尤其是那些具有特殊参数化问题。有关如何将 DBAPI 级别的 callproc() 方法用于完全传统的存储过程的详细信息,请参阅 :ref:stored_procedures 部分。 像我这样的人的代码:session.execute(func.your_proc_name(param1, param2))
    • 这对用户生成的存储过程有效吗?我收到一个错误,提示我的函数不是内置过程。
    【解决方案4】:

    使用 SQLAlchemy 在 MySQL 中调用存储过程的最简单方法是使用 Engine.raw_connection()callproc 方法。 call_proc 将需要调用存储过程所需的过程名称和参数。

    def call_procedure(function_name, params):
           connection = cloudsql.Engine.raw_connection()
           try:
               cursor = connection.cursor()
               cursor.callproc(function_name, params)
               results = list(cursor.fetchall())
               cursor.close()
               connection.commit()
               return results
           finally:
               connection.close()
    

    【讨论】:

    • 它不仅仅适用于 MySQL。我个人可以确认 Postgres 也可以开箱即用。请参阅支持此答案的官方 SQL Alchemy 文档:docs.sqlalchemy.org/en/13/core/connections.html
    • 是否可以将结果映射到 ORM 类并与 where 子句一起使用?
    【解决方案5】:

    假设您已经使用 sessionmaker() 创建了会话,您可以使用以下函数:

    def exec_procedure(session, proc_name, params):
        sql_params = ",".join(["@{0}={1}".format(name, value) for name, value in params.items()])
        sql_string = """
            DECLARE @return_value int;
            EXEC    @return_value = [dbo].[{proc_name}] {params};
            SELECT 'Return Value' = @return_value;
        """.format(proc_name=proc_name, params=sql_params)
    
        return session.execute(sql_string).fetchall()
    

    现在您可以使用以下参数执行存储过程“MyProc”:

    params = {
        'Foo': foo_value,
        'Bar': bar_value
    }
    exec_procedure(session, 'MyProc', params)
    

    【讨论】:

    • 这似乎容易受到 SQL 注入的攻击。
    • 这确实是易受攻击的。最好使用execute(sql_string, params=...) 传递命名参数,让引擎转义参数值。 @Profpatsch 的回答已经做到了。
    • 如何收集输出参数?即,我的声明是:EXEC dbo.next_rowid 'dbo', 'workorder_feature', @id OUTPUT;我如何获取id?
    【解决方案6】:

    出于对我的项目的迫切需要,我编写了一个处理存储过程调用的函数。

    给你:

    import sqlalchemy as sql
    
    def execute_db_store_procedure(database, types, sql_store_procedure, *sp_args):
        """ Execute the store procedure and return the response table.
    
        Attention: No injection checking!!!
    
        Does work with the CALL syntax as of yet (TODO: other databases).
    
        Attributes:
            database            -- the database
            types               -- tuple of strings of SQLAlchemy type names.
                                   Each type describes the type of the argument
                                   with the same number.
                                   List: http://docs.sqlalchemy.org/en/rel_0_7/core/types.html
            sql_store_procudure -- string of the stored procedure to be executed
            sp_args             -- arguments passed to the stored procedure
        """
        if not len(types) == len(sp_args):
            raise ValueError("types tuple must be the length of the sp args.")
    
        # Construch the type list for the given types
        # See
        # http://docs.sqlalchemy.org/en/latest/core/sqlelement.html?highlight=expression.text#sqlalchemy.sql.expression.text
        # sp_args (and their types) are numbered from 0 to len(sp_args)-1
        type_list = [sql.sql.expression.bindparam(
                        str(no), type_=getattr(sql.types, typ)())
                            for no, typ in zip(range(len(types)), types)]
    
        try:
            # Adapts to the number of arguments given to the function
            sp_call = sql.text("CALL `%s`(%s)" % (
                    sql_store_procedure,
                    ", ".join([":%s" % n for n in range(len(sp_args))])),
                bindparams=type_list
            )
            #raise ValueError("%s\n%s" % (sp_call, type_list))
            with database.engine.begin() as connection:
                return connection.execute(
                    sp_call,
                    # Don't do this at home, kids...
                    **dict((str(no), arg)
                        for (no, arg) in zip(range(len(sp_args)), sp_args)))
        except sql.exc.DatabaseError:
            raise
    

    它适用于 CALL 语法,因此 MySQL 应该可以按预期工作。我猜 MSSQL 使用 EXEC 而不是 call 和一些不同的语法。因此,使其与服务器无关取决于您,但应该不会太难。

    【讨论】:

      【解决方案7】:

      另一种解决方法:

      query = f'call Procedure ("{@param1}", "{@param2}", "{@param3}")'    
      sqlEngine = sqlalchemy.create_engine(jdbc)
      conn = sqlEngine.connect() 
      df = pd.read_sql(query,conn,index_col=None)
      

      【讨论】:

        猜你喜欢
        • 2011-03-21
        • 2018-01-10
        • 1970-01-01
        • 2021-08-13
        • 2016-01-23
        • 2015-11-14
        • 2020-01-24
        • 2020-04-06
        相关资源
        最近更新 更多