我遇到了类似的问题,但没有简单的解决方案。我发现对于任何使用外部系统的操作来说,使这样的函数具有幂等性是完全不可能的。我正在使用 TypeScript 和 Firestore。
要解决此问题,您需要使用Firebase transactions。只有使用事务,您才能面对当一个函数被多次触发时发生的竞争条件,通常是同时触发。
我发现这个问题有2个层次:
- 你没有幂等函数,你只需要发邮件就可以幂等了。
- 您有一组幂等函数,需要执行一些需要与外部系统集成的操作。
这种整合的例子是:
1。对于非幂等函数(简单案例场景)
async function isFirstRun(user: UserRecord) {
return await admin.firestore().runTransaction(async transaction => {
const userReference = admin.firestore().collection('users').doc(user.uid);
const userData = await transaction.get(userReference) as any
const emailSent = userData && userData.emailSent
if (!emailSent) {
transaction.set(userReference, { emailSent: true }, { merge: true })
return true;
} else {
return false;
}
})
}
export const onUserCreated = functions.auth.user().onCreate(async (user, context) => {
const shouldSendEmail = await isFirstRun(user);
if (shouldSendEmail) {
await sendWelcomeEmail(user)
}
})
附:您还可以使用内置的eventId 字段来过滤掉重复的事件触发。见https://cloud.google.com/blog/products/serverless/cloud-functions-pro-tips-building-idempotent-functions。所需的工作将是可比较的 - 您仍然需要存储已处理的操作或事件。
2。对于一组已经具有幂等性的函数(真实案例场景)
为了使用一组已经是幂等的函数,我切换到排队系统。我将操作推送到集合并利用 Firebase 事务将操作的执行“锁定”到一次仅一个函数。
我会尝试在这里放一个最小的例子。
部署动作处理函数
export const onActionAdded = functions.firestore
.document('actions/{actionId}')
.onCreate(async (actionSnapshot) => {
const actionItem: ActionQueueItem = tryPickingNewAction(actionSnapshot)
if (actionItem) {
if (actionItem.type === "SEND_EMAIL") {
await handleSendEmail(actionItem)
await actionSnapshot.ref.update({ status: ActionQueueItemStatus.Finished } as ActionQueueItemStatusUpdate)
} else {
await handleOtherAction(actionItem)
}
}
});
/** Returns the action if no other Function already started processing it */
function tryPickingNewAction(actionSnapshot: DocumentSnapshot): Promise<ActionQueueItem> {
return admin.firestore().runTransaction(async transaction => {
const actionItemSnapshot = await transaction.get(actionSnapshot.ref);
const freshActionItem = actionItemSnapshot.data() as ActionQueueItem;
if (freshActionItem.status === ActionQueueItemStatus.Todo) {
// Take this action
transaction.update(actionSnapshot.ref, { status: ActionQueueItemStatus.Processing } as ActionQueueItemStatusUpdate)
return freshActionItem;
} else {
console.warn("Trying to process an item that is already being processed by other thread.");
return null;
}
})
}
像这样向集合推送操作
admin.firestore()
.collection('actions')
.add({
created: new Date(),
status: ActionQueueItemStatus.Todo,
type: 'SEND_EMAIL',
data: {...}
})
TypeScript 定义
export enum ActionQueueItemStatus {
Todo = "NEW",
Processing = "PROCESSING",
Finished = "FINISHED"
}
export interface ActionQueueItem {
created: Date
status: ActionQueueItemStatus
type: 'SEND_EMAIL' | 'OTHER_ACTION'
data: EmailActionData
}
export interface EmailActionData {
subject: string,
content: string,
userEmail: string,
userDisplayName: string
}
您可能需要使用更丰富的状态及其更改来调整它,但这种方法应该适用于任何情况,并且提供的代码应该是一个很好的起点。这也不包括重新运行失败操作的机制,但它们很容易找到。
如果你知道一个更简单的方法 - 请告诉我如何:)
祝你好运!