【问题标题】:Rollback transactions not working with py.test and Flask回滚事务不适用于 py.test 和 Flask
【发布时间】:2014-10-24 19:41:30
【问题描述】:

我正在使用 py.test 测试我的 Flask 应用程序,但我收到了 IntegrityError,因为我在两个不同的测试中创建了相同的模型。

我正在使用 postgreSQL 9.3.5 和 Flask-SQLAlchemy 1.0。

编辑我已经用 Jeremy Allen 的回答更新了我的 sessoin 夹具,它修复了很多错误。但是,当我使用用户夹具时,似乎仍然会收到 IntegrityErrors

错误

E       IntegrityError: (IntegrityError) duplicate key value violates unique constraint "ix_users_email"
E       DETAIL:  Key (email)=(not_used@example.com) already exists.
E        'INSERT INTO users (email, username, name, role_id, company_id, password_hash, confirmed, member_since, last_seen) VALUES (%(email)s, %(username)s, %(name)s, %(role_id)s, %(company_id)s, %(password_hash)s, %(confirmed)s, %(member_since)s, %(last_seen)s) RETURNING users.id' {'username': 'not_used', 'confirmed': True, 'name': 'To be Removed', 'member_since': datetime.datetime(2014, 10, 29, 19, 19, 41, 7929), 'company_id': None, 'role_id': 3, 'last_seen': datetime.datetime(2014, 10, 29, 19, 19, 41, 7941), 'email': 'not_used@example.com', 'password_hash': 'pbkdf2:sha1:1000$cXUh6GbJ$6f38242871cff5e4cce4c1dc49a62c4aea4ba1f3'}

conftest.py

@pytest.yield_fixture(scope='session')
def app():
    app = create_app('testing')
    app.config['SERVER_NAME'] = 'example.com:1234'
    ctx = app.app_context()
    ctx.push()
    app.response_class = TestResponse
    app.test_client_class = TestClient
    yield app
    ctx.pop()


@pytest.fixture(scope='session')
def db(app):
    _db.drop_all()
    _db.create_all()

    Permission.insert_initial()
    Role.insert_initial()
    Technology.insert_initial()
    Product.insert_initial()
    Actor.insert_initial()
    Industry.insert_initial()
    DeliveryCategory.insert_initial()
    DeliveryMethod.insert_initial()

    user = User(email='admin@example.com', username='admin', confirmed=True, password='admin', name='Admin')
    user.role = Role.query.filter_by(name='Administrator').first()
    _db.session.add(user)
    _db.session.commit()

    return _db


@pytest.yield_fixture(scope='function')
def session(db):
    db.session.begin_nested()
    yield db.session
    db.session.rollback()


@pytest.yield_fixture(scope='function')
def user(session):
    yield session.query(User).filter_by(email='admin@example.com').first()


@pytest.yield_fixture(scope='function')
def client(app, user):
    client = app.test_client()
    client.auth = 'Basic ' + b64encode((user.email + ':' + 'admin').encode('utf-8')).decode('utf-8')
    yield client

失败的测试

def test_edit_agenda_add_company_rep_without_company(session, client, user):
    user2 = User(name='To be Removed', password='not_used', username='not_used', confirmed=True,
                email='not_used@example.com', role=Role.query.filter_by(name='User').first())
    agenda = Agenda(name='Invalid Company Rep', creator=user)
    session.add(agenda)
    session.commit()

    response = client.jput('/api/v1.0/agendas/%s' % agenda.id,
        data={
            'company_representative': user2.id
        }
    )
    assert response.status_code == 200

def test_edit_agenda_add_user_already_in_agenda(session, client, user):
    user2 = User(name='To be Removed', password='not_used', username='not_used', confirmed=True,
                email='not_used@example.com', role=Role.query.filter_by(name='User').first())
    agenda = Agenda(name='Invalid Company Rep', creator=user)
    agenda.users.append(user2)
    session.add(agenda)
    session.commit()

    response = client.jput('/api/v1.0/agendas/%s' % agenda.id,
        data={
            'users': [user2.id]
        }
    )
    assert response.status_code == 200

通过的测试

def test_get_agenda_modules_where_agenda_that_does_not_exist(session, app):
    # Create admin user with permission to create models
    user = User(email='admin2@example.com', username='admin2', confirmed=True, password='admin2')
    user.role = Role.query.filter_by(name='Administrator').first()
    session.add(user)
    session.commit()

    client = app.test_client()
    client.auth = 'Basic ' + b64encode(
        (user.email + ':' + 'admin2').encode('utf-8')).decode('utf-8')
    response = client.jget('/api/v1.0/agenda-modules/%s/%s' % (5, 4))
    assert response.status_code == 404

def test_get_agenda_modules_agenda_modules_does_not_exist(session, app):
    agenda = Agenda(name='Is tired in the AM')
    session.add(agenda)

    # Create admin user with permission to create models
    user = User(email='admin2@example.com', username='admin2', confirmed=True, password='admin2')
    user.role = Role.query.filter_by(name='Administrator').first()
    session.add(user)
    session.commit()

    client = app.test_client()
    client.auth = 'Basic ' + b64encode(
        (user.email + ':' + 'admin2').encode('utf-8')).decode('utf-8')
    response = client.jget('/api/v1.0/agenda-modules/%s/%s' % (agenda.id, 4))
    assert response.status_code == 400
    assert response.jdata['message'] == 'AgendaModule does not exist.'

【问题讨论】:

    标签: python flask flask-sqlalchemy pytest


    【解决方案1】:

    看起来您正在尝试 join a Session into an External Transaction 并且您正在使用 flask-sqlalchemy。

    您的代码未按预期工作,因为会话实际上最终使用了与您开始事务的连接不同的连接。

    1。您需要将 Session 绑定到 Connection

    如上面链接的示例所示。快速更改 conftest.py 中的代码即可:

    @pytest.yield_fixture(scope='function')
    def session(db):
        ...
        session = db.create_scoped_session(options={'bind':connection})
        ...
    

    不幸的是,由于 flask-sqlalchemy 的 SignallingSession(在 v2.0 中),您的“绑定”参数将被否决!

    这是因为 SignallingSession 设置了 'binds' 参数,使其优先于我们的 'bind' 参数,并且它没有为我们提供指定我们自己的 'binds' 参数的好方法。

    2013 年 12 月有一个 GitHub pull request,其他人也遇到了同样的问题。

    2。调整烧瓶-sqlalchemy

    我们可以继承 SignallingSession 以允许我们做我们想做的事:

    class SessionWithBinds(SignallingSession):
        """The extends the flask-sqlalchemy signalling session so that we may
        provide our own 'binds' argument.
        """
    
        def __init__(self, db, autocommit=False, autoflush=True, **options):
            #: The application that this session belongs to.
            self.app = db.get_app()
            self._model_changes = {}
            #: A flag that controls whether this session should keep track of
            #: model modifications.  The default value for this attribute
            #: is set from the ``SQLALCHEMY_TRACK_MODIFICATIONS`` config
            #: key.
            self.emit_modification_signals = \
                self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS']
            bind = options.pop('bind', None) or db.engine
            # Our changes to allow a 'binds' argument
            try:
                binds = options.pop('binds')
            except KeyError:
                binds = db.get_binds(self.app)
            SessionBase.__init__(self, autocommit=autocommit, autoflush=autoflush,
                                 bind=bind,
                                 binds=binds, **options)
    

    然后子类化 SQLAlchemy(主要的 flask-sqlalchemy 类)以使用我们的 SessionWithBinds 代替 SignallingSession

    class TestFriendlySQLAlchemy(SQLAlchemy):
        """For overriding create_session to return our own Session class"""
    
        def create_session(self, options):
            return SessionWithBinds(self, **options)
    

    现在你必须使用这个类来代替 SQLAlchemy:

    db = TestFriendlySQLAlchemy()
    

    最后,回到我们的 conftest.py 中指定一个新的“绑定”:

    @pytest.yield_fixture(scope='function')
    def session(db):
        ...
        session = db.create_scoped_session(options={'bind':connection, 'binds':None})
        ...
    

    现在您的事务应该按预期回滚。

    这有点复杂……

    您可以尝试使用Session.begin_nested,而不是执行所有这些操作。它要求您的数据库支持 SQL SAVEPOINT(PostgreSQL 支持)。

    更改您的 conftest.py 夹具:

    @pytest.yield_fixture(scope='function')
    def session(db):
        db.session.begin_nested()
        yield db.session
        db.session.rollback()
    

    有关在 SQLAlchemy 中使用 SAVEPOINTs 的更多信息:http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint

    这很简单,但只要您正在测试的代码本身不调用rollback,它就可以工作。如果这是一个问题,请查看标题“支持回滚测试”下的代码 here in the SQLAlchemy docs

    【讨论】:

    • 我更新了我的问题。似乎用户夹具或我如何使用它存在问题。也许你有一些见解。
    • 谢谢,你的回答太棒了!
    • 只是检查环境变量SQLALCHEMY_BINDS 是否可以满足您的要求?
    • SAVEPOINT 的一个潜在问题是序列(例如 PK ID 分配)不会被重置。因此,如果您的测试以随机顺序运行,则插入对象的 PK ID 将是未知的。
    【解决方案2】:

    这里的关键是在嵌套会话中运行您的测试,然后在每个测试执行后回滚所有内容(这也假设您的测试之间没有依赖关系)。

    我建议采用以下方法,通过在嵌套事务中运行每个测试:

    # module conftest.py
    import pytest
    
    from app import create_app
    from app import db as _db
    from sqlalchemy import event
    from sqlalchemy.orm import sessionmaker
    
    @pytest.fixture(scope="session")
    def app(request):
        """
        Returns session-wide application.
        """
        return create_app("testing")
    
    
    @pytest.fixture(scope="session")
    def db(app, request):
        """
        Returns session-wide initialised database.
        """
        with app.app_context():
            _db.drop_all()
            _db.create_all()
    
    
    @pytest.fixture(scope="function", autouse=True)
    def session(app, db, request):
        """
        Returns function-scoped session.
        """
        with app.app_context():
            conn = _db.engine.connect()
            txn = conn.begin()
    
            options = dict(bind=conn, binds={})
            sess = _db.create_scoped_session(options=options)
    
            # establish  a SAVEPOINT just before beginning the test
            # (http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#using-savepoint)
            sess.begin_nested()
    
            @event.listens_for(sess(), 'after_transaction_end')
            def restart_savepoint(sess2, trans):
                # Detecting whether this is indeed the nested transaction of the test
                if trans.nested and not trans._parent.nested:
                    # The test should have normally called session.commit(),
                    # but to be safe we explicitly expire the session
                    sess2.expire_all()
                    sess2.begin_nested()
    
            _db.session = sess
            yield sess
    
            # Cleanup
            sess.remove()
            # This instruction rollsback any commit that were executed in the tests.
            txn.rollback()
            conn.close()
    

    【讨论】:

      【解决方案3】:

      您还没有真正说出您使用什么来管理数据库,不知道 _db 或任何模型类背后的库是什么。

      但不管怎样,我怀疑session.commit() 调用可能与事务提交的原因有关。最终,您必须阅读文档,了解 session.commit() 在您使用的框架中的作用。

      【讨论】:

      • 我正在使用 Flask-SQLAlchemy
      猜你喜欢
      • 2015-05-17
      • 1970-01-01
      • 2018-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-28
      • 1970-01-01
      • 2014-10-25
      • 2011-09-19
      相关资源
      最近更新 更多