TLDR;
这段代码存在三个问题:
- assert_that() 在您运行管道之后执行。
- 您必须使用 equal_to()、is_empty() 或 is_not_empty() 作为 assert_that() 中的条件
- 在管道结束时,您将获得一个数组。始终针对 [ expected_result ] 进行测试。
更正的代码:
class TestValidateMessage(unittest.TestCase):
def test_not_valid(self):
with TestPipeline() as p:
pcoll = (
p
| beam.Create([{'attributes':{"imageUrl":000}}])
| beam.ParDo(ValidateMessage()).with_outputs(
'invalid', main='valid'))
assert_that(pcoll.invalid,
equal_to([ {'failure_step':'Message validation'} ] )
详细解释
在 Apache Beam 中,您必须在管道内测试代码。
Beam 为您从 PCollection 构建管道进行断言。
看起来很复杂,其实看起来更简单。
如果你有这样的管道:
with TestPipeline() as p:
pcoll = p | Beam.Create( testdata ) | Beam.DoFn(I_Want_To_Test_This())
您必须在 with 子句中添加 assert_that ,因为 assert_that() 将添加代码以在管道内执行断言。
with TestPipeline() as p:
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
这和这样做是一样的:
P = TestPipeline()
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
p.run() # Test must be run _inside_ the pipeline
当你有多个输出时,它是相似的:
with TestPipeline() as p:
pcoll = (
p
| Beam.Create( testdata )
| Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
)
# You can test inside the pipeline with assert_that
assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ))
也许您想检查其他输出是否为空:
from apache_beam.testing.util import assert_that, equal_to, is_empty, is_not_empty
...
with TestPipeline() as p:
pcoll = (
p
| Beam.Create( testdata )
| Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
)
# You can test inside the pipeline with assert_that
assert_that(pcoll.valid, is_empty() ), label='valid')
assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ), label='invalid')
在这种情况下,您需要向 assert_that() 添加一个标签,以确保它能够生成正确的管道。