【问题标题】:How to route custom objects in an embedded Flink Statefun module?如何在嵌入式 Flink Statefun 模块中路由自定义对象?
【发布时间】:2021-07-14 22:51:30
【问题描述】:

我在 Apache Flink Statefun 3.0(自定义 Greeter 示例)中有一个 嵌入式 模块,它使用 JSON 序列化事件。当尝试从入口反序列化 route() 消息时,我得到一个异常,即我的自定义类型不能转换为 protobuf(是的,它不是)——但是 应该 它是吗?我的意思是我尝试使用 3.x 文档,但没有发现关于要路由的类型的任何限制。

对此有任何提示或指示吗?

谢谢

// The custom type (Bean-style and all)
public final class Message {
  @JsonProperty private String name;
  @JsonProperty private String id;
  @JsonProperty private int visits;
  public Message() {}
  public String getName() { return name; }
  public void setName(String s) { name = s; }
  public String getId() { return id; }
  public void setId(String s) { id = s; }
  public int getVisits() { return visits; }
  public void setVisits(int i) { visits = i; }
}

// The function
public class GreeterFn implements StatefulFunction {
    public static final FunctionType TYPE = new FunctionType("example", "greeter");
    @Override
    public void invoke(Context ctx, Object msg) {
        // I never get here
    }
}

// The module
public class EmbeddedModule implements StatefulFunctionModule {
    static final IngressIdentifier<Message> INGRESS = new IngressIdentifier<>(Message.class, "example", "names");

    private static final class MsgDeser implements KafkaIngressDeserializer<Message> {
        private final ObjectMapper mapper = new ObjectMapper();
        @Override
        public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
            try { return mapper.readValue(new String(input.value(), StandardCharsets.UTF_8), Message.class); }
            catch (java.io.IOException e) { e.printStackTrace(); }
            return null; 
        }
    }

    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
            .withKafkaAddress("kafka:9092")
            .withTopic("names")
            .withDeserializer(MsgDeser.class)
            .withConsumerGroupId("my-group-id")
            .build());
        binder.bindIngressRouter(INGRESS, new Router<Message>() {
            @Override
            public void route(Message m, Downstream<Message> ds) {
                ds.forward(GreeterFn.TYPE, m.getName(), m); // <-- I get here OK but then the exception
            }
        });
        binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
    }
}

// And the logs (trimmed)
statefun-worker_1   | 2021-07-12 11:29:33,366 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: example-names-ingress -> router (names) (1/1)#0 (2b43e45ce4bcc61340ff131d147f3afe) switched from RUNNING to FAILED.         
statefun-worker_1   | java.lang.RuntimeException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')                                                                                                                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                      
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator$DownstreamCollector.forward(IngressRouterOperator.java:127) ~[statefun-flink-core.jar:3.0.0]                                                                
statefun-worker_1   |   at org.apache.flink.statefun.sdk.io.Router$Downstream.forward(Router.java:67) ~[statefun-flink-distribution.jar:3.0.0]                                                                                                                
statefun-worker_1   |   at com.my.flink.EmbeddedModule$1.route(EmbeddedModule.java:47) ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at com.my.flink.EmbeddedModule$1.route(EmbeddedModule.java:43) ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement(IngressRouterOperator.java:81) ~[statefun-flink-core.jar:3.0.0]                                                                              
...
statefun-worker_1   | Caused by: java.lang.ClassCastException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')                                                                                                                                                                                     
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.message.MessagePayloadSerializerPb.serialize(MessagePayloadSerializerPb.java:50) ~[statefun-flink-core.jar:3.0.0]                                                                             
...

【问题讨论】:

    标签: java flink-statefun


    【解决方案1】:

    默认情况下,嵌入式函数发送/接收的消息被假定为 Protobuf。您可以通过在 flink-conf.yaml 中设置以下键来使用 Kryo(甚至是自定义序列化程序):

    statefun.message.serializer: WITH_KRYO_PAYLOADS

    这并不是真正推荐的,因为随着时间的推移,您的应用程序将难以发展。

    您仍然可以坚持使用 Protobuf,方法是推迟字符串 -> 使用名为 StringValue 的内置 Protobuf 类型对消息进行反序列化。

    我已采用您粘贴的代码来使用 StringValue:

        public class EmbeddedModule implements StatefulFunctionModule {
        static final IngressIdentifier<StringValue> INGRESS = new IngressIdentifier<>(StringValue.class, "example", "names");
    
    
        private static final class MsgDeser implements KafkaIngressDeserializer<StringValue> {
            private final ObjectMapper mapper = new ObjectMapper();
            @Override
            public StringValue deserialize(ConsumerRecord<byte[], byte[]> input) {
                String json = new String(input.value(), StandardCharsets.UTF_8);
                return StringValue.of(json);
            }
        }
    
        public void configure(Map<String, String> globalConfiguration, Binder binder) {
            binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
                .withKafkaAddress("kafka:9092")
                .withTopic("names")
                .withDeserializer(MsgDeser.class)
                .withConsumerGroupId("my-group-id")
                .build());
            binder.bindIngressRouter(INGRESS, new Router<StringValue>() {
                @Override
                public void route(StringValue m, Downstream<StringValue> ds) {
                    String json = StringValue.getValue();
                    String name = ... ; // extract the name from this JSON
                    ds.forward(GreeterFn.TYPE, name , m);
                }
            });
            binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
        }
       }
    

    将您的消息定义为 Portobuf 消息

    为了避免双重反序列化(在路由器和函数处) 您可以定义以下 Protobuf 消息:

    message MyMessage {
     string name = 1;
     string id = 2;
     int visits = 3;
    }
    

    然后将 json 字符串转换为 MyMessage 的实例:

    MyMessage.Builder builder = MyMessage.newBuilder();
    JsonFormat.parser().merge(jsonString, builder);
    MyMessage myMessage = builder.build();
    

    【讨论】:

    • 谢谢!您能指出这部分的文档在哪里,还是仅在代码中? “消息......被假定为 Protobuf”
    • @user3607933 我们需要更清楚地记录这一点,但这是为每个作业设置“statefun.message.serializer”的结果。
    • 通过“我们”,我知道你在 Flink 上工作——很酷的东西!
    猜你喜欢
    • 2017-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-15
    • 2011-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多