【问题标题】:org.apache.beam.sdk.util.UserCodeException while executing Beam Pipeline using the Samza Runnerorg.apache.beam.sdk.util.UserCodeException 使用 Samza Runner 执行 Beam Pipeline
【发布时间】:2020-07-11 17:27:37
【问题描述】:

我正在尝试使用 Samza Runner 从 here 运行 Wordcount 演示。这是我的 build.gradle

plugins {
  id 'eclipse'
  id 'java'
  id 'application'

  // 'shadow' allows us to embed all the dependencies into a fat jar.
  id 'com.github.johnrengelman.shadow' version '4.0.3'
}

mainClassName = 'samples.quickstart.WordCount'

maven {
        url = uri('http://packages.confluent.io/maven/')
    }
  mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

ext.apacheBeamVersion = '2.22.0'

dependencies {
  shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"
    
  runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
  runtime "org.slf4j:slf4j-api:1.+"
  runtime "org.slf4j:slf4j-jdk14:1.+"
  compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile group: 'org.apache.samza', name: 'samza-api', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-core_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kafka_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kv_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: '1.4.0'
  testCompile "junit:junit:4.+"
}
shadowJar {
  zip64 true
  baseName = 'WordCount'  // Name of the fat jar file.
  classifier = null       // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
  manifest {
    attributes('Main-Class': mainClassName)  // Specify where the main class resides.
  }
} 

我的wordcount.java如下。

package samples.quickstart;

import org.apache.beam.runners.samza.SamzaRunner;
//import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;

public class WordCount {
    
    private static final String jobName = "beamtest";
    
  public static void main(String[] args) {
    String inputsDir = "data/*";
    String outputsPrefix = "outputs/part";

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    options.setRunner(SamzaRunner.class);

    Pipeline pipeline = Pipeline.create(options);
    
    pipeline
        .apply("Read lines", TextIO.read().from(inputsDir))
        .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
        .apply("Count words", Count.perElement())
        .apply("Write results", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                  wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to(outputsPrefix));
    pipeline.run().waitUntilFinish();
  }
} 


我正在使用 Beam 版本 2.22.0。我尝试了以下组合。带有 Beam 2.22 的 Samza 1.4、带有 Beam 2.11 和 Beam 2.22 的 Samza 1.0 以及带有 Beam 2.11.0 的 Samza 0.14.1。 但是在执行时出现以下错误:

java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

我使用的是 Java 1.8。有没有人知道是什么导致了这个问题?

【问题讨论】:

  • 我遇到了同样的问题,但是当我从 Java 11 切换到 1.8 时它就消失了。 (可能想重新检查您实际上是在 1.8 上运行的。)我真的很想知道是什么原因导致您出现这种情况。

标签: java apache-beam stream-processing apache-samza


【解决方案1】:

可以把build.gradle和修改过的wordcount.java用Samza runner贴在这里,方便我们排查是不兼容问题还是配置问题。感谢您试用 Samza runner!

【讨论】:

  • 我在我的帖子中添加了这些。感谢您的关注。
猜你喜欢
  • 2020-10-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多