【发布时间】:2018-10-01 13:23:53
【问题描述】:
我们正在尝试将 SQL-Server JDBC 连接器与 KafkaAvroSerializer 一起使用,并提供定制的 ProducerInterceptor 以在将数据发送到 Kafka 之前对其进行加密。
在消费者端,我们希望使用 Flink 连接器进行解密,然后使用适当的解串器。
为了实现这一目标,我们有几个问题:
1)如果我们提供自定义的ConsumerInterceptor来解密数据,那么在Flink创建DataStream的时候是否应该通过Properties文件传入?
Properties properties = new Properties();
...
`properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
...
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));
上面的配置是否正确,还是我需要设置任何其他属性以便我可以将 ConsumerInterceptor 传递给 Flink?
2) 另一个问题是关于 Flink 中的 Deserializer。例如,我在网上查了一下,发现很少有如下代码 sn-ps:
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
因此,如果我们使用 JDBC 连接器将数据传递到 Kafka 而不进行任何修改(除了加密数据),那么我们应该在反序列化期间提供什么数据类型?我们将在反序列化之前解密数据。
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
提前致谢,
【问题讨论】:
-
您可以尝试使用 Avro 库中的
GenericRecord。否则,我认为您不应该为每个正在解码的不同类型的对象拥有一个DeserializationSchema -
GenericRecord 工作,谢谢!
标签: java apache-flink avro confluent-platform confluent-schema-registry