【问题标题】:Testing exception is thrown when queue is empty [duplicate]队列为空时抛出测试异常[重复]
【发布时间】:2020-12-04 07:47:15
【问题描述】:

当我尝试在 Queue 为空时使用特定消息测试捕获异常时,测试失败。

这里是队列类:

import time
from multiprocessing import Lock, Process, Queue, Pool

class QueueFun():

    def writing_queue(self, work_tasks, name):
        while True:
            print("Writing to queue")
            work_tasks.put('1')
            time.sleep(3)

    def read_queue(self, work_tasks, name):
        while True:
            print("Reading from queue", work_tasks.empty())
            if work_tasks.empty():
                raise ValueError('Queue should not be empty')
            item = work_tasks.get()
            time.sleep(1)

if __name__ == '__main__':
    work_tasks = Queue()
    queue_fun = QueueFun()
    print('setup done')

    write_queue_process = Process(target=queue_fun.writing_queue, args=(work_tasks, "write_to_queue_process"))
    print('write_queue_process')

    write_queue_process.start()

    read_queue_process_1 = Process(target=queue_fun.read_queue, args=(work_tasks, "read_from_queue_process_1"))
    read_queue_process_1.start()
    read_queue_process_2 = Process(target=queue_fun.read_queue, args=(work_tasks, "read_from_queue_process_2"))
    read_queue_process_2.start()

    write_queue_process.join()
    read_queue_process_1.join()
    read_queue_process_2.join()

这是测试:

import unittest
from queue import Queue

from playground.QueueFun import QueueFun


class QueueFunTests(unittest.TestCase):
    def test_empty_queue(self):
        q = QueueFun()
        work_tasks = Queue()
        self.assertRaises(ValueError('Queue should not be empty'), q.read_queue(work_tasks, "1"))

测试失败:

    raise ValueError('Queue should not be empty')
ValueError: Queue should not be empty

我没有正确实施测试吗?

【问题讨论】:

  • @efont 是的,这回答了我的问题。

标签: python python-multiprocessing python-multithreading


【解决方案1】:

这按预期工作:

class QueueFunTests(unittest.TestCase):

    def test_empty_queue(self):
        q = QueueFun()
        work_tasks = Queue()
        with self.assertRaises(ValueError) as ctx:
            q.read_queue(work_tasks, "1")
        self.assertEqual("Queue should not be empty", str(ctx.exception))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-06-22
    • 2012-11-19
    • 2018-12-10
    • 2013-09-08
    • 1970-01-01
    • 2021-12-09
    • 2020-08-18
    相关资源
    最近更新 更多