【问题标题】:Spring Aws Kinesis Messages are not consumed in orderSpring Aws Kinesis 消息未按顺序使用
【发布时间】:2019-08-24 02:46:40
【问题描述】:

我正在使用 1 个碎片推送 100 条消息。

spring:
  cloud:
    stream:
      bindings:
        myOutBound:
          destination: my-stream
          contentType: application/json

我正在循环推送消息以进行测试

@EnableBinding(MyBinder.class)
public class MyProcessor {

  @Autowired
  private MyBinder myBinder;

  public void processRollup() {
    List<MyObject> myObjects =  IntStream.range(1, 100)
        .mapToObj(Integer::valueOf)
        .map(s-> new MyObject(s))
        .collect(toList());
    myObjects.forEach(messagePayload ->{
      System.out.println(messagePayload.getId());
      myBinder.myOutBound()
          .send(MessageBuilder.withPayload(messagePayload)
              .build());
        }
    );
  }

}

我正在使用如下消息

spring:
  cloud:
    stream:
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-stream
          content-type: application/json

消息消费无序。

我是不是错过了什么。

【问题讨论】:

    标签: spring-cloud-stream spring-cloud-aws spring-integration-aws


    【解决方案1】:

    有几件事情需要考虑。首先Binder中的producer默认基于KinesisMessageHandlerasync模式:

    messageHandler.setSync(producerProperties.getExtension().isSync());
    

    因此,即使它要求您以正确的顺序一一发送这些消息,也不意味着它们以相同的顺序到达 AWS 上的流。

    此外,即使您以同步模式发送它们,也不能保证它们以相同的顺序在 AWS 上结算。

    请看这里:Amazon Kinesis and guaranteed ordering

    您还可以通过显式sequenceNumber 在同一分片内实现订单保证:

    为保证严格递增的顺序,串行写入分片并使用 SequenceNumberForOrdering 参数。

    https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

    不幸的是,Kinesis Binder 目前不支持该选项,但我们可以通过在消息中显式设置 AwsHeaders.SEQUENCE_NUMBER 来克服它,然后再将其发送到活页夹的 output 目的地:

    String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
        if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
            sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
        }
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-20
    • 1970-01-01
    • 2017-10-02
    • 2016-09-06
    • 2019-01-23
    • 2019-08-30
    • 2020-03-28
    • 2018-03-02
    相关资源
    最近更新 更多