【问题标题】:Flink streaming example that generates its own data生成自己的数据的 Flink 流式传输示例
【发布时间】:2020-04-27 11:47:05
【问题描述】:

之前我为 Flink 询问了一个简单的 hello world example。这给了我一些很好的例子!

但是,我想要求一个更“流式”的示例,我们每秒生成一个输入值。理想情况下,这将是随机的,但即使每次都是相同的值也可以。

目标是在没有/最少的外部接触的情况下获得“移动”的流。

因此我的问题是:

如何在没有外部依赖的情况下显示 Flink 实际流数据?

我找到了如何通过在外部生成数据并写入 Kafka 或收听公共源来显示这一点,但是我试图以最小的依赖来解决它(比如从 Nifi 中的 GenerateFlowFile 开始)。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    这是一个例子。这是作为如何使您的源和接收器可插入的示例而构建的。这个想法是,在开发中您可能会使用随机源并打印结果,对于测试,您可能会使用输入事件的硬连线列表并将结果收集到列表中,而在生产中您将使用真实的源和接收器。

    这是工作:

    /*
     * Example showing how to make sources and sinks pluggable in your application code so
     * you can inject special test sources and test sinks in your tests.
     */
    
    public class TestableStreamingJob {
        private SourceFunction<Long> source;
        private SinkFunction<Long> sink;
    
        public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
            this.source = source;
            this.sink = sink;
        }
    
        public void execute() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Long> LongStream =
                    env.addSource(source)
                            .returns(TypeInformation.of(Long.class));
    
            LongStream
                    .map(new IncrementMapFunction())
                    .addSink(sink);
    
            env.execute();
        }
    
        public static void main(String[] args) throws Exception {
            TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
            job.execute();
        }
    
        // While it's tempting for something this simple, avoid using anonymous classes or lambdas
        // for any business logic you might want to unit test.
        public class IncrementMapFunction implements MapFunction<Long, Long> {
    
            @Override
            public Long map(Long record) throws Exception {
                return record + 1 ;
            }
        }
    
    }
    

    这是RandomLongSource

    public class RandomLongSource extends RichParallelSourceFunction<Long> {
    
        private volatile boolean cancelled = false;
        private Random random;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            random = new Random();
        }
    
        @Override
        public void run(SourceContext<Long> ctx) throws Exception {
            while (!cancelled) {
                Long nextLong = random.nextLong();
                synchronized (ctx.getCheckpointLock()) {
                    ctx.collect(nextLong);
                }
            }
        }
    
        @Override
        public void cancel() {
            cancelled = true;
        }
    }
    

    【讨论】:

    • 我觉得这一定是我所要求的。我想了解的一件事是速度是什么/如何设置它。在 while 循环中添加“等待 1 秒”是否有意义?
    • 你当然可以在源代码的运行循环中添加一个 Thread.sleep 。你想不想,取决于你的目标。 Flink 优雅地处理背压,因此源不会超出管道的其余部分,即使没有睡眠。如果您想查看管道在耗尽时的表现,请不要让它休眠,但如果您想限制管道,请继续。
    猜你喜欢
    • 2021-08-01
    • 1970-01-01
    • 2016-10-10
    • 2020-12-02
    • 1970-01-01
    • 1970-01-01
    • 2019-01-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多