【问题标题】:spring cloud stream kinesis Binder春云流动活页夹
【发布时间】:2018-09-18 02:56:20
【问题描述】:

我正在尝试实现一个能够自动缩放的 spring boot aws kinesis 消费者,以便与原始实例共享负载(拆分处理分片)。

我能做的:使用定义明确的自述文件和此处提供的示例Kinesis binder docs 我已经能够启动多个消费者,通过提供这些属性来实际划分分片以进行处理。

在生产者上,我通过应用程序属性提供 partitionCount: 2。 在消费者身上,我提供了 instanceIndex 和 instanceCount。

在消费者 1 上,我有 instanceIndex=0 和 instantCount =2 , 在消费者 2 上,我有 instanceIndex=1 和 instantCount=2

这很好用,我有两个 Spring Boot 应用程序处理它们的特定分片。但在这种情况下,我必须为每个启动应用程序提供一个预配置的属性文件,该文件需要在加载时可用,以便它们拆分负载。如果我只启动第一个消费者(非自动缩放),我只处理特定于索引 0 的分片,而不处理其他分片。

我想做但不确定是否可以部署一个消费者(处理所有分片)。如果我部署另一个实例,我希望该实例重温一些负载的第一个消费者,换句话说,如果我有 2 个分片和一个消费者,它将同时处理这两个,如果我随后部署另一个应用程序,我希望第一个消费者到现在只处理一个分片,将第二个分片留给第二个消费者。

我试图通过不在消费者上指定 instanceIndex 或 instanceCount 而只提供组名来做到这一点,但这会使第二个消费者闲置直到第一个消费者被关闭。仅供参考,我还创建了自己的元数据和锁定表,防止活页夹创建默认值。

配置: 制片人------

originator: KinesisProducer
server:
 port: 8090

    spring: 
      cloud: 
        stream: 
          bindings:
            output: 
              destination: <stream-name> 
              content-type: application/json
              producer: 
                headerMode: none
                partitionKeyExpression: headers.type

消费者-------------------------------------

originator: KinesisSink
server:
 port: 8091

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          input:
            consumer:
              listenerMode: batch
              recordsLimit: 10
              shardIteratorType: TRIM_HORIZON
        binder:
          checkpoint:
            table: <checkpoint-table>
          locks:
            table: <locking-table
      bindings:
        input:
          destination: <stream-name>
          content-type: application/json
          consumer:
            concurrency: 1
            listenerMode: batch
            useNativeDecoding: true
            recordsLimit: 10
            idleBetweenPolls: 250
            partitioned: true
          group: mygroup

【问题讨论】:

    标签: spring amazon-web-services spring-integration spring-cloud-stream


    【解决方案1】:

    没错。这就是它现在的工作方式:如果有一个消费者在那里,它会占用所有分片进行处理。仅当第一个以某种方式损坏至少一个分片时,第二个才会采取行动。

    适当的类似于 Kafka 的再平衡在我们的路线图上。我们还没有明确的愿景,所以欢迎就此事提出问题和后续贡献!

    【讨论】:

    • 必须。你也可以考虑使用 Kinesis 客户端库作为不同消费方法的选项:github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/…。请参阅kplKclEnabled 选项。
    • 我不这么认为。您应该擅长使用默认 KCL 选项来使用其 DynamoDB 功能在使用者之间分配分片。
    • 正确,但是如果您不提供使用者组,KCL 将如何知道如何管理您的实例?就 Spring Cloud Stream 而言,它是 consumer group,但它完全映射到 KCL 中的 applicationName。这就是我们过去所说的cluster
    • 您的理解完全正确。我们需要将此值向下传播到客户端,并确保我们的 KCL 的多个实例在同一个组中进行管理。
    • 这是要走的路之一。另一种方法是在您的处理程序下游重试建议,以确保您不会因异常而失败而丢失消息。有关更多信息,请参阅 Spring 集成文档:docs.spring.io/spring-integration/reference/html/#retry-advice
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-08
    • 2017-03-11
    • 2020-09-08
    • 1970-01-01
    • 2020-03-29
    • 2016-08-25
    • 1970-01-01
    相关资源
    最近更新 更多