【发布时间】:2019-01-15 03:58:25
【问题描述】:
我似乎无法在这里解决这个问题。我正在为description in the docs 之后的频道消费者编写测试。我通常会使用 Django 默认的 unittest,但由于 Channels 需要使用 pytest,所以我正在使用它,但仍然希望维护编写测试的类结构。
我在编写类似于 unittest 的 setup 方法时遇到了问题,我将使用它来初始化测试属性。我尝试了method fixtures,使用autouse=true 的类固定装置,但它们似乎都不起作用,因为我无法访问测试方法中的实例属性。最后我只是决定编写set_up 实例方法并为每个类手动调用它。这是我目前所拥有的:
@pytest.mark.django_db
@pytest.mark.asyncio
class TestChatConsumer(object):
@database_sync_to_async
def set_up(self):
self.user = UserFactory()
self.api_client = APIClientFactory()
self.token = TokenFactory(user=self.user)
self.credentials = {
'AUTHORIZATION': self.token.key,
...
}
self.credentials_params = '&'.join(['{}={}'.format(k, v) for k, v in self.credentials.items()])
self.authless_endpoint = '/api/v1/ws/'
self.endpoint = self.authless_endpoint + '?' + self.credentials_params
async def test_connect_without_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.authless_endpoint)
connected, subprotocol = await communicator.connect()
assert not connected
async def test_connect_with_credentials(self):
await self.set_up()
communicator = WebsocketCommunicator(application, self.endpoint)
connected, subprotocol = await communicator.connect()
assert connected
问题是当我的消费者尝试在connect 方法内的代码中访问数据库时,我不断收到psycopg2.InterfaceError: connection already closed。它使用database_sync_to_async,当我手动测试它时效果很好。
具体来说,connect 方法中引发错误的行如下所示:
await database_sync_to_async(user.inbox.increment_connections)().
不确定究竟是什么问题,因为database_sync_to_async 应该正确清理旧连接,以便正确创建新连接。
任何帮助将不胜感激。
【问题讨论】:
标签: django postgresql pytest python-asyncio django-channels