【发布时间】:2021-04-30 10:29:26
【问题描述】:
我正在做一个使用 Flink 将数据写入 S3 的 POC。该程序没有给出错误。但是我也没有看到在 S3 中写入任何文件。
下面是代码
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final String outputPath = "s3a://testbucket-s3-flink/data/";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Enable checkpointing
env.enableCheckpointing();
//S3 Sink
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();
//Source is a local kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka:9094");
properties.setProperty("group.id", "test");
DataStream<String> input = env.addSource(new FlinkKafkaConsumer<String>("queueing.transactions", new SimpleStringSchema(), properties));
input.flatMap(new Tokenizer()) // Tokenizer for generating words
.keyBy(0) // Logically partition the stream for each word
.timeWindow(Time.minutes(1)) // Tumbling window definition
.sum(1) // Sum the number of words per partition
.map(value -> value.f0 + " count: " + value.f1.toString() + "\n")
.addSink(sink);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
请注意,我在配置中设置了 s3.access-key 和 s3.secret-key 值,并通过将它们更改为不正确的值进行了测试(我在不正确的值上遇到了错误)
任何指针可能会出现什么问题?
【问题讨论】:
-
你是否包含了 flink-s3-fs-hadoop 依赖?这是必要的。
-
使用 ENABLE_PLUGINS 环境变量作为插件启用。
标签: java amazon-s3 apache-flink