【问题标题】:kSqlDB Exactly Once Processing GuaranteekSqlDB Exactly Once 处理保证
【发布时间】:2021-12-11 23:36:44
【问题描述】:

我通过非常不正常地关闭 docker 运行进程或让 docker 容器耗尽内存来测试 ksqldb 服务器上的恰好一次语义。在这两种情况下,我都会收到重复的内容,这绝对不是保证的行为。我觉得我可能在这里遗漏了明显的东西......

docker 容器有KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE=exactly_once 参数集。据我了解,这将为enable.idempotence 和消费者isolation.level 属性设置底层生产者设置。

由于以下查询,仍然会出现重复项: 这里

create or replace table TEST with (kafka_topic =  'TEST', value_format='avro',partitions=10, replicas=1) 
AS
SELECT 
    CUSTOMERS_ID,
    earliest_by_offset(LDTS) AS LDTS, 
    COLLECT_SET(NAMES) AS NAMES,
    earliest_by_offset(CUSTOMER_PK) AS CUSTOMER_PK
from TEST_1
group by CUSTOMERS_PK
emit changes;

还有这里

create or replace stream TEST_STREAM (CUSTOMERS_ID VARCHAR KEY, LDTS BIGINT, NAMES ARRAY<VARCHAR>, CUSTOMER_PK VARCHAR)
WITH 
(KAFKA_TOPIC='TEST', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

create or replace stream TEST_FINAL (KAFKA_KEY VARCHAR KEY, CUSTOMERS_ID VARCHAR, LDTS BIGINT,NAME VARCHAR, CUSTOMER_PK VARCHAR) WITH
(KAFKA_TOPIC='TEST_FINAL', VALUE_FORMAT='AVRO', partitions=10, replicas=1);

INSERT INTO 
    TEST_FINAL 
select
    CUSTOMERS_ID as KAFKA_KEY,
    AS_VALUE(CUSTOMERS_ID) as CUSTOMERS_ID,
    LDTS,
    NAMES[1] as NAME,
    CUSTOMER_PK
from TEST_STREAM
where
rowtime= LDTS and ARRAY_LENGTH(NAMES)=1;

你可以忽略sql的逻辑。这些只是使问题更有意义的例子。关键是在容器崩溃期间偏移量显然会丢失。

我还能做什么?我缺少任何属性吗?
我正在使用来自 confluent community v6.2.1 和 ksqldb v0.21 的 kafka 代理

【问题讨论】:

    标签: apache-kafka confluent-platform ksqldb exactly-once


    【解决方案1】:

    我想我还是会回答我自己的问题。似乎 consumer.isolation.level 仍然需要在 ksql docker 环境变量中设置为“read_committed”。尽管一切都表明 processing_guarantee 会为您执行此操作,但我的情况并非如此。添加后,我仍然在主题中看到未提交的消息,但不再在 ksql 流和表中看到。 也许这对其他人有帮助

    【讨论】:

      猜你喜欢
      • 2021-01-16
      • 1970-01-01
      • 2019-11-15
      • 1970-01-01
      • 2019-08-25
      • 2019-06-23
      • 1970-01-01
      • 2019-12-10
      • 2020-01-19
      相关资源
      最近更新 更多