【问题标题】:Dataflow job doesn't consume from PubSub when launched from template从模板启动时,数据流作业不会从 PubSub 消耗
【发布时间】:2018-10-19 11:34:08
【问题描述】:

我目前有一项工作,可以将 pubsub 主题的内容输出到云存储文件夹,如果我直接启动 jar,它可以正常工作。

但是,每当我尝试使用我上传的模板启动作业时,都没有消息通过管道。

它与the Google provided template 非常相似,只是它接受订阅而不是主题。

这是我的配置:

trait Options extends PipelineOptions with StreamingOptions {
  @Description("The Cloud Pub/Sub subscription to read from")
  @Default.String("projects/project/subscriptions/subscription")
  def getInputSubscription: String
  def setInputSubscription(value: String): Unit

  @Description("The Cloud Storage directory to output files to, ends with /")
  @Default.String("gs://tmp/")
  def getOutputDirectory: String
  def setOutputDirectory(value: String): Unit

  @Description("The Cloud Storage prefix to output files to")
  @Default.String("subscription-")
  def getOutputFilenamePrefix: String
  def setOutputFilenamePrefix(value: String): Unit

  @Description("The shard template which will be part of the filenames")
  @Default.String("-W-P-SSSSS-of-NNNNN")
  def getShardTemplate: String
  def setShardTemplate(value: String): Unit

  @Description("The suffix of the filenames written out")
  @Default.String(".txt")
  def getOutputFilenameSuffix: String
  def setOutputFilenameSuffix(value: String): Unit

  @Description("The window duration in minutes, defaults to 5")
  @Default.Integer(5)
  def getWindowDuration: Int
  def setWindowDuration(value: Int): Unit

  @Description("The compression used (gzip, bz2 or none), bz2 can't be loaded into BigQuery")
  @Default.String("none")
  def getCompression: String
  def setCompression(value: String): Unit

  @Description("The maximum number of output shards produced when writing")
  @Default.Integer(1)
  def getNumShards: Int
  def setNumShards(value: Int): Unit
}

这是我启动模板的方式:

   gcloud dataflow jobs run storage \
     --gcs-location gs://bucket/templates/Storage \
     --parameters runner=DataflowRunner,project=project,streaming=true,inputSubscription=projects/project/subscriptions/sub,outputDirectory=gs://bucket/

这是我在没有模板的情况下启动作业的方式:

./storage \
  --runner=DataFlowRunner \
  --project=project \
  --streaming=true \
  --gcpTempLocation=gs://tmp-bucket/ \
  --inputSubscription=projects/project/subscriptions/sub  \
  --outputDirectory=gs://bucket/

【问题讨论】:

  • 你试过改成inputSubscription=sub而不是inputSubscription=projects/project/subscriptions/sub吗?
  • 我怀疑这可能是因为,要在运行时解析模板选项,您需要使用 ValueProviders (docs)。使用控制台UI,您可以选择特定作业并在右侧栏中找到管道选项。 inputSubscription 是否为模板化作业正确填充?
  • 是的,不幸的是,选项已正确填充在右侧栏中。所有默认值都被正确覆盖。不幸的是,我想避免使用ValueProvider,因为Scio 不支持它们。
  • 虽然你很可能会遇到一些事情,例如当我将鼠标悬停在控制台中的参数上以进行工作时,我得到了完整的包(例如com.package.Options.inputSubscription),而当我将鼠标悬停在参数上时工作不工作我只得到参数名称(例如inputSubscription)。
  • 你是对的,ValueProviders 是模板正常工作所必需的。

标签: google-cloud-dataflow google-cloud-pubsub


【解决方案1】:

正如@GuillemXercavins 评论所述,参数必须使用the ValueProvider interface as their type。这将允许在运行时设置或使用管道选项,这就是导致问题的原因。

值得指出,正如您在评论中所做的那样,ValueProvider 似乎是 unsupported in Scio


编辑:

Scio example@BenFradet 在下面的评论中提供。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-08
  • 2021-12-01
  • 2020-12-25
  • 1970-01-01
相关资源
最近更新 更多