我猜你有一个 MySQL 实例,以及来自你的 n 个服务器的连接来运行这个处理作业。你在这里实现了一个作业队列。
您提到的table 需要使用 InnoDB 访问方法(或 Percona 或 MariaDB 提供的其他事务友好访问方法之一)。
您表中的这些项目是否需要批量处理?也就是说,它们是否以某种方式相互关联?或者您的服务器进程是否可以一对一地处理它们?这是一个重要的问题,因为如果您可以单独或小批量处理它们,您将在服务器进程之间获得更好的负载平衡。让我们假设小批量。
这个想法是为了防止任何服务器进程抓取到表中的一行,如果其他服务器进程有该行。我不得不经常做这种事情,这是我的建议;我知道这行得通。
首先,向表中添加一个整数列。称之为“工作”或类似的东西。给它一个默认值零。
其次,为每个服务器分配一个永久的 ID 号。服务器 IP 地址的最后一部分(例如,如果服务器的 IP 地址为 10.1.0.123,则 id 号为 123)是一个不错的选择,因为它在您的环境中可能是唯一的。
然后,当服务器抓取工作要做时,使用这两个 SQL 查询。
UPDATE table
SET working = :this_server_id
WHERE working = 0
AND time_scheduled < CURRENT_TIME
ORDER BY time_scheduled
LIMIT 1
SELECT table_id, whatever, whatever
FROM table
WHERE working = :this_server_id
第一个查询将始终抓取一批要处理的行。如果另一个服务器进程同时进入,它永远不会抓取相同的行,因为除非working = 0,否则没有进程可以抓取行。请注意,LIMIT 1 将限制您的批量大小。你不必这样做,但你可以。我还加入了ORDER BY 来首先处理等待时间最长的行。这可能是一种有用的做事方式。
第二个查询检索您完成工作所需的信息。不要忘记检索您正在处理的行的主键值(我称它们为table_id)。
然后,您的服务器进程会做它需要做的任何事情。
完成后,它需要将该行放回队列中以供稍后使用。为此,服务器进程需要将time_scheduled 设置为所需的任何值,然后设置working = 0。因此,例如,您可以为正在处理的每一行运行此查询。
UPDATE table
SET time_scheduled = CURRENT_TIME + INTERVAL 5 MINUTE,
working = 0
WHERE table_id = ?table_id_from_previous_query
就是这样。
除了一件事。在现实世界中,这些排队系统有时会出错。服务器进程崩溃。等等。参见墨菲定律。您需要一个监控查询。在这个系统中这很容易。
此查询将列出所有逾期超过 5 分钟的作业,以及应该处理它们的服务器。
SELECT working, COUNT(*) stale_jobs
FROM table
WHERE time_scheduled < CURRENT_TIME - INTERVAL 5 MINUTE
GROUP BY WORKING
如果此查询为空,则一切正常。如果它提供了很多将working 设置为零的作业,那么您的服务器就跟不上。如果它提出了将 working 设置为某个服务器的 ID 号的作业,则该服务器正在午休。
如果需要,您可以使用此查询重置分配给服务器的所有作业。
UPDATE table
SET working=0
WHERE working=?server_id_at_lunch
顺便说一句,(working, time_scheduled) 上的复合索引可能会帮助它更好地执行。