【问题标题】:Using Flink connector within Flink StateFun在 Flink StateFun 中使用 Flink 连接器
【发布时间】:2022-12-15 21:03:44
【问题描述】:

我已经设法将 GCP PubSub 依赖项插入到 Flink Statefun JAR 中,然后构建 Docker 映像。

我已将以下内容添加到pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-gcp-pubsub</artifactId>
    <version>1.16.0</version>
    <scope>test</scope>
</dependency>

目前还不太清楚我现在如何在与 StateFun 图像一起使用的 module.yaml 中指定我的 PubSub 入口和出口。

https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/

例如,对于 Kafka,您使用:

kind: io.statefun.kafka.v1/egress
spec:
  id: com.example/my-egress
  address: kafka-broker:9092
  deliverySemantic:
    type: exactly-once
    transactionTimeout: 15min

我可以看到官方连接器在 Java 代码中有一个 Kind const,您可以使用它来引用 module.yaml 中的连接器,但我在文档中看不到如何将您自己插入的 Flink 连接器引用到 StateFun图片。

【问题讨论】:

    标签: apache-flink flink-statefun


    【解决方案1】:

    GCP PubSub 没有作为标准的 Statefun IO 组件得到官方支持,目前只有 KafkaKinesis;但是你可以想出your own custom ingress/egress connector relatively easily。不幸的是,您将无法提供一种方法来拥有新的基于 yaml 的配置项,因为 Kafka 和 Kinesis seem to be hard-coded in the runtime 的模块配置器。您必须在代码中进行配置:

    查看源/入口示例:

    public class ModuleWithSourceSpec implements StatefulFunctionModule {
    
        @Override
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            IngressIdentifier<TypedValue> id =
                new IngressIdentifier<>(TypedValue.class, "com.example", "custom-source");
            IngressSpec<TypedValue> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
            binder.bindIngress(spec);
            binder.bindIngressRouter(id, new CustomRouter());
        }
    }
    

    你的目标是提供new FlinkSource&lt;&gt;(),这是一个org.apache.flink.streaming.api.functions.source.SourceFunction

    您可以这样声明:

    SourceFunction source = 
        PubSubSource.newBuilder()
          .withDeserializationSchema(new IntegerSerializer())
          .withProjectName(projectName)
          .withSubscriptionName(subscriptionName)
          .withMessageRateLimit(1)
          .build();
    

    接收器/出口也一样:

    public class ModuleWithSinkSpec implements StatefulFunctionModule {
    
        @Override
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            EgressIdentifier<TypedValue> id = new EgressIdentifier<>("com.example", "custom-sink", TypedValue.class);
            EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
            binder.bindEgress(spec);
        }
    }
    

    new FlinkSink&lt;&gt;() 替换为 sink

    SinkFunction sink =
        PubSubSink.newBuilder()
            .withSerializationSchema(new IntegerSerializer())
            .withProjectName(projectName)
            .withTopicName(outputTopicName)
            .build();
    

    在出口情况下,您会这样使用:

    public class GreeterFn implements StatefulFunction {
    
        static final TypeName TYPE = TypeName.typeNameFromString("com.example.fns/greeter");
    
        static final TypeName CUSTOM_EGRESS = TypeName.typeNameFromString("com.example/custom-sink");
    
        static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType();
    
        @Override 
        CompletableFuture<Void> apply(Context context, Message message) {
            if (!message.is(User.TYPE)) {
                throw new IllegalStateException("Unknown type");
            }
    
            User user = message.as(User.TYPE);
            String name = user.getName();
    
            var storage = context.storage();
            var seen = storage.get(SEEN).orElse(0);
            storage.set(SEEN, seen + 1);
    
            context.send(
                EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
                    .withUtf8Value("Hello " + name + " for the " + seen + "th time!")
                    .build());
    
            return context.done();
        }
    }
    

    我希望它有所帮助!

    【讨论】:

      猜你喜欢
      • 2020-07-24
      • 1970-01-01
      • 2020-11-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多