【问题标题】:Error while creating a DataStream in Apache Flink在 Apache Flink 中创建 DataStream 时出错
【发布时间】:2015-12-20 08:26:04
【问题描述】:

使用 fromElements 函数创建 DataStream 时出错

下面是expeption -

原因:java.io.IOException:无法反序列化来自的元素 来源。如果您使用用户定义的序列化(值和 可写类型),检查序列化函数。串行器是 org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda 在 org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:121) 在 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) 在 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) 在 java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 你提供给fromElements方法的类型是什么?
  • 它是 InputStreamReader
  • 下面是示例 - private static DataStream getStream(StreamExecutionEnvironment env) { InputStreamReader isr=null;尝试 { URL url = new URL("ex.in/res"); HttpURLConnection httpconn = (HttpURLConnection) url.openConnection(); if (httpconn.getResponseCode() != 200) throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode()); isr = new InputStreamReader((httpconn.getInputStream())); } catch (Exception e){} return env.fromElements(isr); }

标签: apache-flink flink-streaming


【解决方案1】:

为什么要处理InputStreamReader 元组?我想这里有一些误解。泛型类型指定要处理的数据的类型。例如

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

使用 5 个Integer 元组生成一个有限数据流。我假设您实际上想使用 InputStreamReader 来生成实际的元组。

如果您想通过HttpURLConnection 阅读,您可以按如下方式实现您自己的SourceFunction(或RichSourceFunction)(将OUT 替换为您要使用的实际数据类型——也可以考虑Flinks Tuple0Tuple25 类型):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }

        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }

    @Override
    public void cancel() {
         this.isRunning = false;
    }
});

【讨论】:

  • 我对 Flink 非常陌生。我正在尝试阅读可通过 HTTP URL 访问的 Web 日志。因此,我正在尝试这种方式。为了让用户能够读取 HTTP 流,Flink 提供了哪些方法/对象?
  • 也许对您来说实现自定义源是有意义的。只需看看SourceFunction 接口。来自run() 方法的SourceContext 允许您发出元素。
  • 只是扩展我的答案。
【解决方案2】:

您无法使用fromElements 创建DataStream&lt;InputStreamReader&gt;,因为InputStreamReader 不可序列化。这是fromElements 方法所要求的。此外,在InputStreamReaders 上工作可能没有多大意义。我想最好直接从HttpURLConnection 读取数据,然后继续处理这些数据。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-30
    • 2018-09-12
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多