【发布时间】:2017-12-16 02:33:53
【问题描述】:
可以使用 Kinesis API 动态添加/删除分片。此外,Kinesis 流数据生产者需要在其 PutRecord API 调用上设置正确的“分区键”(据我所知,它似乎映射到一个分片)。因此,您的数据生产者似乎也需要了解动态扩展,以便利用新分片或停止发送到已删除分片。
问题:我的数据生产者如何动态跟踪特定流上可用的分片数并创建分区键以匹配它们?
【问题讨论】:
标签: amazon-kinesis
可以使用 Kinesis API 动态添加/删除分片。此外,Kinesis 流数据生产者需要在其 PutRecord API 调用上设置正确的“分区键”(据我所知,它似乎映射到一个分片)。因此,您的数据生产者似乎也需要了解动态扩展,以便利用新分片或停止发送到已删除分片。
问题:我的数据生产者如何动态跟踪特定流上可用的分片数并创建分区键以匹配它们?
【问题讨论】:
标签: amazon-kinesis
生产者应该完全按照您在重新分片期间所说的方式工作。
据我所知,kafka 在client api 本身中维护着分区信息的映射
public final class Cluster {
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// ...
}
并在每个send(message, partitionKey)上,获取活动分区的数量来计算分区。见DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// ...
}
}
虽然, kinesis client 只需使用参数 partitionKey,我认为一旦将记录信息发送到 kinesis 服务器,就会根据与活跃的分片数量相同的逻辑。所以,我相信计算 kinesis 分区/分片对客户端 api 是隐藏的。
【讨论】: