【问题标题】:Spring cloud aws stream, messages are consumed by multiple instances in consumer groupSpring Cloud aws流,消息被消费者组中的多个实例消费
【发布时间】:2026-01-29 16:10:02
【问题描述】:

我用 spring cloud aws binder 编写了一个示例应用程序

  compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')

代码

@StreamListener(Processor.INPUT)
public void receive(Message<String> message) {      
    System.out.println("Message recieved: "+message);
    System.out.println("Message Payload: "+message.getPayload());    

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: group
          destination: stream
          content-type: application/json          
        output:  
          group: group
          destination: stream
          content-type: application/json

我已经在多个端口上启动了应用程序

8081,8082,8083, 8084.

当我将消息发布到流式传输时,大多数情况下不止一个实例正在使用消息。

例如我发送了消息 {"22":"11"},这已经被 8083 和 8084 消费了

申请消息:8084

2018-03-16 12:29:19.715  INFO 10084 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:19.809  INFO 10084 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8084 (http) with context path ''
2018-03-16 12:29:19.809  INFO 10084 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 21.034 seconds (JVM running for 22.975)
2018-03-16 12:29:19.840  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:30:23.929  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}]
Message Payload: {"22":"11"}

申请消息:8083

018-03-16 12:29:05.733  INFO 8188 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:05.733  INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:29:05.796  INFO 8188 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8083 (http) with context path ''
2018-03-16 12:29:05.796  INFO 8188 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 19.463 seconds (JVM running for 20.956)
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}]
Message Payload: {"22":"11"}

理想情况下,组中只有一个消费者可以处理消息。我在这里错过了什么吗?

【问题讨论】:

  • 您是否使用DynamoDbMetaDataStore 进行检查点?有没有机会在 GH 上有一个简单的应用程序并指导如何复制?谢谢
  • @ArtemBilan 感谢您对此进行调查。我们不明确使用任何特殊的 DynamoDbMetaDataStore。我的代码与我的问题相同。我可以将示例项目放在github上
  • 好的,请。以及如何复制的说明
  • @ArtemBilan。我在这里上传了示例项目。你能检查一下吗。 github.com/patan12/spring-aws-cloud-poc
  • 感谢您的样品!我稍后会回来给你一些反馈。如果没有,明天给我打电话!

标签: spring-cloud-stream spring-cloud-aws spring-integration-aws


【解决方案1】:

感谢您验证解决方案!

我想我找到了问题所在。它位于ShardCheckpointer.checkpoint(String sequenceNumber)

目前的代码是这样的:

if (existingSequence == null ||
        new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
        this.checkpointStore.put(this.key, sequenceNumber);
        return true;
}

当两个(所有?)节点都检查状态并从存储中获取较小的值时,就会出现竞争条件。所以,我们正在通过条件,然后我们都去checkpointStore.put() 部分。并且这里都存储了一个新的相同的值并返回true让Channel Adapter处理相同的记录。

我对此的解决方法是:

       if (existingSequence == null ||
                new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
            if (existingSequence != null) {
                return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
            }
            else {
                this.checkpointStore.put(this.key, sequenceNumber);
                return true;
            }
        }

同样的条件,但我也在这个合同中使用checkpointStore.replace()

/**
 * Atomically replace the value for the key in the store if the old
 * value matches the oldValue argument.

现在我尝试提出测试用例进行验证,并会在BUILD-SNAPSHOT 准备好在您这边使用和验证时通知您。

【讨论】:

  • 好的!修复已到达/libs-snapshot:github.com/spring-projects/spring-integration-aws/commit/…
  • 谢谢您,Artem Bilan。感谢快速响应。我会验证并让你知道
  • 您愿意分享您对该修复的反馈吗?我们可能会就此事发布M2,让每个人都能获得修复等。谢谢
  • 我已经测试过了。我循环发送了 100 条消息。我有 7 个消费者在同一组中运行。三个消费者分别收到了 66、33 和 1 条消息。其余四人没有得到任何消息。好的部分是消费者之间没有重复的消息。我有一个问题。在这里我们可以看到一个消费者收到了最多的 66 条消息。实时情况下,是否有少数消费者超载的可能性?
  • 不确定是什么问题,但是同组中的一些消费者处于空闲状态是绝对可以的。当其他人离开时,他们会接手工作。在理想的世界中,组中只有一个消费者应该从一个分片中读取,但这是未来的任务:github.com/spring-projects/spring-integration-aws/issues/66