【问题标题】:Flink- error on running WordCount example on remote clusterFlink-在远程集群上运行 WordCount 示例时出错
【发布时间】:2017-10-30 11:08:30
【问题描述】:

我在 VirtualBox 上有一个 Flink 集群,包括三个节点,1 个主节点和 2 个从节点。我自定义了 WordCount 示例并创建了一个胖 jar 文件以使用 VirtualBox Flink 远程集群运行它,但我遇到了错误。

注意:我手动将依赖项导入到项目中(使用 Intellij IDEA),我没有使用 maven 作为依赖项提供程序。我在本地机器上测试了我的代码,一切正常!

更多详情如下:

这是我的 Java 代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

public class WordCount {

// *************************************************************************
//     PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

    final ParameterTool params = ParameterTool.fromArgs(args);
    final int port;
    final String ip;
    DataSet<String> text;
    try {
        ip = params.get("ip");
        port = params.getInt("port");
        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
        text = env.readTextFile(params.get("input"));
    } catch (Exception e) {
        System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
                " --input <input>");
        return;
    }

    DataSet<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);

    System.out.println("Printing result to stdout. Use --output to specify output path.");
    counts.print();

}

// *************************************************************************
//     USER FUNCTIONS
// *************************************************************************

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined
 * FlatMapFunction. The function takes a line (String) and splits it into
 * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

}

我使用命令创建了 ExecutionEnvironment 对象:

final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);

,我在主机上使用以下命令运行代码(连接到集群节点并且 VirtualBox 正在其上运行)

java -cp FlinkWordCountClusetr.jar WordCount --ip 192.168.101.10 --port 6123 --input /usr/local/flink/LICENSE

,但我遇到了以下错误(总结):

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'

我该如何解决这个问题?

【问题讨论】:

  • 您可以从运行作业的地方访问远程环境吗?
  • @BiplobBiswas 当然可以,我可以ping地址192.168.101.10

标签: cluster-computing apache-flink


【解决方案1】:

在尝试了很多设置之后,都是 maven 依赖项与远程集群上安装的 Flink 版本不匹配。 Maven 依赖项是 Flink 版本 1.3.2 build on Scala 2.10,而安装在远程集群上的 Flink 是 1.3.2 build on Scala 2.11。只是细微的差别,但很重要!

【讨论】:

    猜你喜欢
    • 2016-03-12
    • 2015-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多