【问题标题】:How to fix "incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be converted to java.lang.String"如何修复“不兼容的类型:org.apache.beam.sdk.options.ValueProvider<java.lang.String> 无法转换为 java.lang.String”
【发布时间】:2019-02-04 04:33:16
【问题描述】:

我跟随 this link 创建了一个模板,该模板构建了一个从 KafkaIO 读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。 Beam 版本是 2.9.0,这是我的代码的一部分。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

以下是我运行代码的方式:

mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"

【问题讨论】:

  • 您能否说明哪一行导致了这个错误?
  • @IanKemp 它是 .withBootstrapServers(options.getKafkaServer())

标签: java apache-kafka google-cloud-dataflow apache-beam


【解决方案1】:

为了通过ValueProvider访问一个值,你需要使用get方法,然后你就可以得到这个值的具体类型。

例如: 有选项时:

ValueProvider&lt;String&gt; getKafkaServer();

您可以通过以下方式访问它:

getKafkaServer().get() 这将返回 String 对象。

似乎 KafkaIo Api 需要获取字符串参数而不是 ValueProvider,您必须从 ValueProvider 包装器中提取值。

【讨论】:

  • 你能解释一下这个答案吗?只是为了说明如何使用它
  • Guillaume Racicto,使用ValueProvider 时,您可以使用get 访问该值
  • 你能用这些信息编辑你的答案吗?评论并不能真正算作答案
  • 已编辑。解释够了吗?缺少什么
【解决方案2】:

我可能会发现不支持 kafkaIO 的问题。以下来自Google create template

" 一些 I/O 连接器包含接受 ValueProvider 对象的方法。要确定对特定连接器和方法的支持,请参阅 I/O 连接器的 API 参考文档。支持的方法具有 ValueProvider 的重载。如果方法没有重载,则该方法不支持运行时参数。以下 I/O 连接器至少支持部分 ValueProvider:

基于文件的 IO:TextIO、AvroIO、FileIO、TFRecordIO、XmlIO BigQueryIO* BigtableIO(需要 SDK 2.3.0 或更高版本) 发布订阅 扳手IO "

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-10-03
  • 2017-03-29
  • 1970-01-01
  • 1970-01-01
  • 2021-12-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多