【发布时间】:2020-07-14 04:08:39
【问题描述】:
我们有一个自定义 Golang 脚本来将消息发布到 PubSub。
client, err := pubsub.NewClient(ctx, GOOGLE_CLOUD_PROJECT))
然后我们使用同一个客户端发布到多达 40 个主题。
topic1 := client.Topic(topicName)
topic1.PublishSettings = pubsub.PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 1000,
NumGoroutines: 70 * runtime.GOMAXPROCS(0),
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
}
topic2 := client.Topic(topicName)
topic2.PublishSettings = pubsub.PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 1000,
NumGoroutines: 70 * runtime.GOMAXPROCS(0),
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
}
.
.
.
然后根据一定的条件发布到主题1。我们的发布者循环如下所示
semaphore := make(chan int, 3000)
for i := 0; i < totalMessages; i++ {
semaphore <- 1
go func(topic *pubsub.Topic, semaphore chan int) {
data := []byte(_RandStringBytes(messageLengthInBytes))
msg := &pubsub.Message{
Data: data,
}
if _, err := topic.Publish(ctx, msg).Get(ctx); err != nil {
log.Fatalf("Could not publish message: %v", err)
}
<-semaphore
}(topic, semaphore)
}
我们使用 3000 个 Goroutine 将消息发布到主题并同步等待消息被确认,这意味着一次只有 3000 个正在运行/等待客户端确认。
我们目前的发布速度接近 5K RPS,但我们的延迟高达 30 秒。
以下是我从我们的 Datadog 仪表板中编译的统计数据。
Publish Latency. Number of Messages
0-1 1877
1-2 1990
2-3 2661
2-3 2149
5-10 10323
10-15 4013
15-20 10322
20-25 3034
25-30 925
> 30 1901
当我编写一个小型基准脚本来将消息发布到单个主题时,同一台机器的平均延迟为 147 毫秒。
我尝试调整每个主题的发布者设置,但没有帮助。
现在我有几个问题。
- 使用单个客户端实例发布到多个主题是否正确?
- 库中是否有一些内置结构来支持这种扇出场景?
【问题讨论】:
-
你是如何测量延迟的?了解您在哪里检测代码以确定发布延迟会很有用。计时器是否在
Publish调用之前开始并在它之后结束? -
是的,计时器在发布之前开始并在它之后结束。
标签: go google-cloud-pubsub google-cloud-sdk