【问题标题】:Integration testing flink job集成测试 flink 作业
【发布时间】:2020-06-01 07:26:39
【问题描述】:

我写了一个小的 flink 应用程序。我有一些输入,并使用来自外部来源的数据来丰富它。这是一个RichAsyncFunction,在open 方法中,我构造了一个用于扩充的http 客户端。

现在我想为我的工作编写一个集成测试。但是由于 http 客户端是在 open 方法中创建的,所以我无法提供它,并在我的集成测试中模拟它。我尝试在构造函数中提供它来重构它,但我总是遇到序列化错误。

这是我正在使用的示例: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html

提前致谢:)

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    这个问题是一年多前发布的,但我会发布答案,以防将来有人偶然发现这个问题。

    您看到的序列化异常可能是这样的

    Exception encountered when invoking run on a nested suite. *** ABORTED *** (610 milliseconds)
      java.lang.NullPointerException:
      at java.util.Objects.requireNonNull(Objects.java:203)
      at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
      at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:136)
      at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:77)
      at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
      at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:366)
      at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:165)
    ...
    

    原因是您的测试操作员需要知道如何反序列化 DataStream 输入类型。提供此功能的唯一方法是在初始化 testHarness 时直接提供它,然后将其传递给 setup() 方法调用。

    因此,要测试您链接的 Flink docs 中的示例,您可以执行类似的操作(我的实现是在 Scala 中,但您也可以将其调整为 Java)

    import org.apache.flink.api.common.ExecutionConfig
    import org.apache.flink.api.java.typeutils.TypeExtractor
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
    import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
    import org.apache.flink.streaming.runtime.tasks.{StreamTaskActionExecutor, TestProcessingTimeService}
    import org.apache.flink.streaming.runtime.tasks.mailbox.{MailboxExecutorImpl, TaskMailboxImpl}
    import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    
    
    /**
     This test case is written using Flink 1.11+.
     Older versions likely have a simpler constructor definition for [[AsyncWaitOperator]] so you might have to remove the last two arguments (processingTimeService and mailboxExecutor)
    */
    class AsyncDatabaseRequestSuite extends FunSuite with BeforeAndAfter with Matchers {
    
      var testHarness: OneInputStreamOperatorTestHarness[String, (String, String)] = _
      val TIMEOUT = 1000
      val CAPACITY = 1000
      val MAILBOX_PRIORITY = 0
    
      def createTestHarness: Unit = {
        val operator = new AsyncWaitOperator[String, (String, String)](
          new AsyncDatabaseRequest {
    
            override def open(configuration: Configuration): Unit = {
              client = new MockDatabaseClient(host, post, credentials);  // put your mock DatabaseClient object here
            }
          },
          TIMEOUT,
          CAPACITY,
          OutputMode.UNORDERED,
          new TestProcessingTimeService,
          new MailboxExecutorImpl(
            new TaskMailboxImpl,
            MAILBOX_PRIORITY,
            StreamTaskActionExecutor.IMMEDIATE
          )
        )
    
        // supply the TypeSerializer for the "input" type of the operator
        testHarness = new OneInputStreamOperatorTestHarness[String, (String, String)](
          operator,
          TypeExtractor.getForClass(classOf[String]).createSerializer(new ExecutionConfig)
        )
    
        // supply the TypeSerializer for the "output" type of the operator to the setup() call
        testHarness.setup(
          TypeExtractor.getForClass(classOf[(String, String)]).createSerializer(new ExecutionConfig)
        )
        testHarness.open()
      }
    
      before {
        createTestHarness
      }
    
      after {
        testHarness.close()
      }
    
      test("Your test case goes here") {
        // fill in your test case here
      }
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-12-21
      • 2018-09-25
      • 1970-01-01
      • 1970-01-01
      • 2018-06-28
      • 1970-01-01
      • 2014-02-08
      相关资源
      最近更新 更多