【问题标题】:Can we update a state's TTL value?我们可以更新一个状态的 TTL 值吗?
【发布时间】:2020-07-01 21:18:44
【问题描述】:

我们有一个使用状态(ValueStateListState)和 TTL(StateTtlConfig) 的拓扑,因为我们不能使用计时器(我们会每天生成数亿个计时器,并且确实可以扩展:生成保存点/检查点需要数小时,might even get stuck while running)。

但是,我们需要在运行时根据某些传入事件的类型和其他逻辑来更新 TTL 的值。可以用新的 StateTtlConfig(和更新的 TTL 时间)重新创建一个新状态,并在 processElement1()processElement2()CoProcessFunction 方法中将值从“旧”复制到“新”(而不是在open() 就像我们通常做的那样)?

我猜“旧”状态会被垃圾回收(?)。

这个解决方案可以扩展吗?表现出色?产生任何问题?有什么不好的吗?

【问题讨论】:

    标签: apache-flink rocksdb


    【解决方案1】:

    我认为您的方法可以在某种程度上与运行时的状态重新创建一起使用,但它很脆弱。我可以看到,问题在于旧的状态元信息可能会根据后端实现在某处徘徊。

    对于堆 (FS) 后端,最终检查点/保存点将没有过期旧状态的记录,但元信息可能会在作业运行时留在内存中。如果重新启动作业,它将消失。

    对于 RocksDB,旧状态的列族可以保留。此外,后台清理仅在压缩期间运行。如果表太小,就像内存中的部分一样,这部分(甚至可能在磁盘上的一点)会流连忘返。如果对完整快照的清理处于活动状态(不适用于增量检查点),它将在重新启动后消失。

    总而言之,这取决于您创建新状态并从保存点/检查点重新启动作业的频率。

    我创建了一个ticket 来记录可以在 TTL 配置中更改的内容以及何时更改, 所以请检查问题中的一些详细信息。

    【讨论】:

    • 我们确实在使用 RocksDB 后端
    【解决方案2】:

    我猜“旧”状态会被垃圾回收(?)。

    来自 Flink 文档Cleanup of Expired State

    默认情况下,过期值会在读取时显式删除,例如 ValueState#value,并在后台定期收集垃圾 如果配置的状态后端支持。后台清理可以 在 StateTtlConfig 中被禁用:

    import org.apache.flink.api.common.state.StateTtlConfig;
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .disableCleanupInBackground()
        .build();
    

    或在完整快照后执行清理:

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupFullSnapshot()
        .build();
    

    您可以根据文档随时更改 TTL。但是,您必须重新启动查询(我不会在运行时使用它):

    对于现有作业,可以激活此清理策略或 在 StateTtlConfig 中随时停用,例如从重启后 保存点。

    但是你为什么没有像大卫在参考答案中所说的那样看到 RocksDB 上的计时器?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-02-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-17
      • 1970-01-01
      • 2020-05-06
      相关资源
      最近更新 更多