【问题标题】:Apache Beam Python Unit Testing a ParDo class with_outputsApache Beam Python 单元测试 ParDo 类 with_outputs
【发布时间】:2020-04-02 19:41:02
【问题描述】:

我有一个 ParDo 类,它检查 pubsub 消息是否包含某些属性并返回有效和无效的 TaggedOutput(在正常流程中工作正常,使用 yield 来返回值),我无法进行单元测试对于这个类,我正在尝试提供一个虚拟消息(字典复制 pubsub 消息信息),并且我想检查该类的输出是否包含其他属性。

这是我目前所拥有的:

class TestValidateMessage(unittest.TestCase):

def test_not_valid(self):
    with TestPipeline() as p:
        pcoll = (
                p
                | beam.Create([{'attributes':{"imageUrl":000}}])
                | beam.ParDo(ValidateMessage()).with_outputs(
        'invalid', main='valid'))
    valid, _ = pcoll
    invalid = pcoll['invalid']
    print(invalid)
    assert_that(invalid, {'failure_step':'Message validation'})

这样我会收到一条错误消息:

TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.

当我尝试打印(无效)时,我得到PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid] 如何访问(出于断言目的)PCollection 的内容?

【问题讨论】:

    标签: python unit-testing apache-beam google-cloud-pubsub


    【解决方案1】:

    TLDR;

    这段代码存在三个问题:

    • assert_that() 在您运行管道之后执行
    • 您必须使用 equal_to()、is_empty() 或 is_not_empty() 作为 assert_that() 中的条件
    • 在管道结束时,您将获得一个数组。始终针对 [ expected_result ] 进行测试。

    更正的代码:

    class TestValidateMessage(unittest.TestCase):
    
    def test_not_valid(self):
        with TestPipeline() as p:
            pcoll = (
                    p
                    | beam.Create([{'attributes':{"imageUrl":000}}])
                    | beam.ParDo(ValidateMessage()).with_outputs(
            'invalid', main='valid'))
    
            assert_that(pcoll.invalid, 
                        equal_to([ {'failure_step':'Message validation'} ] )
    
    

    详细解释

    在 Apache Beam 中,您必须在管道内测试代码

    Beam 为您从 PCollection 构建管道进行断言。 看起来很复杂,其实看起来更简单。

    如果你有这样的管道:

    with TestPipeline() as p:
       pcoll = p | Beam.Create( testdata ) | Beam.DoFn(I_Want_To_Test_This())
    

    您必须在 with 子句中添加 assert_that ,因为 assert_that() 将添加代码以在管道内执行断言。

    with TestPipeline() as p:
       pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
       assert_that(pcoll, equal_to(expected_data) )
    

    这和这样做是一样的:

    P = TestPipeline()
    pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
    assert_that(pcoll, equal_to(expected_data) )
    p.run()  # Test must be run _inside_ the pipeline
    

    当你有多个输出时,它是相似的:

    with TestPipeline() as p:
       pcoll = (
           p 
           | Beam.Create( testdata ) 
           | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
       )
    
       # You can test inside the pipeline with assert_that
    
       assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ))
    
    

    也许您想检查其他输出是否为空:

    from apache_beam.testing.util import assert_that, equal_to, is_empty, is_not_empty
    
    ...
    
    with TestPipeline() as p:
       pcoll = (
           p 
           | Beam.Create( testdata ) 
           | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
       )
    
       # You can test inside the pipeline with assert_that
    
       assert_that(pcoll.valid, is_empty() ), label='valid')
    
       assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ), label='invalid')
    
    

    在这种情况下,您需要向 assert_that() 添加一个标签,以确保它能够生成正确的管道。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多