【发布时间】:2019-07-02 23:38:09
【问题描述】:
我们有基于 GCP PubSub 的发布者和订阅者系统。订阅者处理单个消息的时间很长,大约 1 分钟。我们已经将订阅者确认截止时间设置为 600 秒(10 分钟)(最长 1 秒),以确保 pubsub 不会过早开始重新交付,因为基本上我们这里有长时间运行的操作。
我看到了 PubSub 的这种行为。当代码发送 ack 和监视器确认 PubSub 确认请求已被接受并且确认本身已完成并成功状态时,未确认消息的总数仍然相同。
图表上的指标对总和、计数和均值聚合调整器显示相同。上图中的 aligner 是平均值,没有启用减速器。
我正在使用 @google-cloud/pubsub Node.js 库。已经尝试过不同的版本(0.18.1、0.22.2、0.24.1),但我想问题不在其中。
下面的类可以用来检查。
TypeScript 3.1.1,节点 8.x.x - 10.x.x
import { exponential, Backoff } from "backoff";
const pubsub = require("@google-cloud/pubsub");
export interface IMessageHandler {
handle (message): Promise<void>;
}
export class PubSubSyncListener {
private readonly client;
private listener: Backoff;
private runningOperations: Promise<unknown>[] = [];
constructor (
private readonly handler: IMessageHandler,
private readonly options: {
/**
* Maximal messages number to be processed simultaniosly.
* Listener will try to keep processing number as close to provided value
* as possible.
*/
maxMessages: number;
/**
* Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
*/
subscriptionName: string;
/**
* In milliseconds
*/
minimalListenTimeout?: number;
/**
* In milliseconds
*/
maximalListenTimeout?: number;
}
) {
this.client = new pubsub.v1.SubscriberClient();
this.options = Object.assign({
minimalListenTimeout: 300,
maximalListenTimeout: 30000
}, this.options);
}
public async listen () {
this.listener = exponential({
maxDelay: this.options.maximalListenTimeout,
initialDelay: this.options.minimalListenTimeout
});
this.listener.on("ready", async () => {
if (this.runningOperations.length < this.options.maxMessages) {
const [response] = await this.client.pull({
subscription: this.options.subscriptionName,
maxMessages: this.options.maxMessages - this.runningOperations.length
});
for (const m of response.receivedMessages) {
this.startMessageProcessing(m);
}
this.listener.reset();
this.listener.backoff();
} else {
this.listener.backoff();
}
});
this.listener.backoff();
}
private startMessageProcessing (message) {
const index = this.runningOperations.length;
const removeFromRunning = () => {
this.runningOperations.splice(index, 1);
};
this.runningOperations.push(
this.handler.handle(this.getHandlerMessage(message))
.then(removeFromRunning, removeFromRunning)
);
}
private getHandlerMessage (message) {
message.message.ack = async () => {
const ackRequest = {
subscription: this.options.subscriptionName,
ackIds: [message.ackId]
};
await this.client.acknowledge(ackRequest);
};
return message.message;
}
public async stop () {
this.listener.reset();
this.listener = null;
await Promise.all(
this.runningOperations
);
}
}
这基本上是异步拉取消息和立即确认的部分实现。因为建议的解决方案之一是使用同步拉动。
如果我没记错问题的症状,我在 java 存储库中发现了类似的报告问题。
https://github.com/googleapis/google-cloud-java/issues/3567
这里的最后一个细节是,确认似乎适用于少量请求。如果我在 pubsub 中触发单个消息然后立即处理它,未传递的消息数量会减少(下降到 0,因为之前只有一条消息)。
问题本身 - 发生了什么以及为什么未确认的消息数量没有在收到确认后减少?
【问题讨论】:
标签: node.js google-cloud-platform google-cloud-pubsub stackdriver