【问题标题】:Implementing a simple queue with PHP and MySQL?用 PHP 和 MySQL 实现一个简单的队列?
【发布时间】:2015-02-12 01:56:45
【问题描述】:

我有一个 PHP 脚本,它从数据库中检索行,然后根据内容执行工作。这项工作可能很耗时(但计算量不一定很昂贵),因此我需要允许多个脚本并行运行。

数据库中的行如下所示:

+---------------------+---------------+------+-----+---------------------+----------------+
| Field               | Type          | Null | Key | Default             | Extra          |
+---------------------+---------------+------+-----+---------------------+----------------+
| id                  | bigint(11)    | NO   | PRI | NULL                | auto_increment |
.....
| date_update_started | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
| date_last_updated   | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
+---------------------+---------------+------+-----+---------------------+----------------+

我的脚本当前选择date_last_updated 中日期最早的行(工作完成后更新)并且不使用date_update_started

如果我现在要并行运行脚本的多个实例,它们会选择相同的行(至少在某些时候)并且会完成重复的工作。

我想做的是使用事务来选择行,更新date_update_started 列,然后在选择行的SQL 语句中添加WHERE 条件以仅选择date_update_started 更大的行比某个值(以确保另一个脚本无法处理它)。例如

$sth = $dbh->prepare('
    START TRANSACTION;
    SELECT * FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
    UPDATE table DAY SET date_update_started = UTC_TIMESTAMP() WHERE id IN (SELECT id FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;);
    COMMIT;
');
$sth->execute(); // in real code some values will be bound
$rows = $sth->fetchAll(PDO::FETCH_ASSOC);

根据我的阅读,这本质上是一个队列实现,在 MySQL 中似乎不被接受。尽管如此,我需要找到一种方法来允许多个脚本并行运行,经过研究,这就是我想出的。

这种方法行得通吗?有没有更好的办法?

【问题讨论】:

  • 如何运行并行脚本?
  • @Lupin 目前该脚本通过 cron 作业每 15 分钟执行一次。该脚本检查另一个实例是否正在运行,如果是,则终止。我不确定我将如何管理多个正在运行的脚本——我可能在数据库中有一个计数器来查看有多少正在运行并以这种方式限制实例的数量,但一次一个问题:-)跨度>
  • 好的,还有一些问题需要我完全理解: 1. 您有一个脚本可以选择行并对其进行处理,然后将其更新回数据库,对吗? 2. 您希望能够在不同的行上运行并行脚本并执行相同的操作,对吗? 3. 每次脚本运行时,选择的行是连续的,意思是它们是 1-100、101-200 等还是它们在 id 方面是随机的并且仅由那些 date_update_started 大于 1 的行选择?
  • @Lupin 1. 是,2. 是,3. 根据日期字段和示例中未显示的另一个字段选择行。因此它们不是严格“连续”的,而是按两个字段排序的。
  • 另一种方法是让某种主脚本获取一些行(例如SELECT ... LIMIT 5),然后为这些行中的每一行启动一个处理脚本的单独实例。您甚至可以使用第二个表来跟踪当前正在运行的处理实例的数量,因此每当 cron 启动您的主脚本时,它就会知道要获取多少行。但由于这甚至与您的要求不相近,因此我决定将其添加为评论而不是答案。

标签: php mysql sql pdo queue


【解决方案1】:

我认为你的方法可以工作,只要你还在你选择的行添加某种标识符,它们当前正在处理,它可能是@JuniusRendel 建议的,我什至会考虑使用另一个字符串键(随机或实例 id)用于脚本导致错误且未正常完成的情况,因为一旦您在工作后更新了行,就必须清理这些字段。

在我看来,这种方法的问题在于,将有 2 个脚本在同一点运行,并且会在它们被签名为锁定之前选择相同的行。正如我所看到的,这真的取决于你在行上做了什么样的工作,如果这两个脚本的最终结果是相同的,我认为你唯一的问题是浪费时间和服务器内存(不是小问题,但我现在将它们放在一边......)。如果您的工作将导致两个脚本的更新不同,那么您的问题将是您可能在 TB 的末尾有错误的更新。

@Jean 提到了您可以采用的第二种方法,即使用 MySql 锁。我不是该主题的专家,但这似乎是一个好方法,并且使用“Select .... FOR UPDATE”语句可以为您提供您正在寻找的东西,就像您可以在同一个调用选择和更新中所做的那样 - 这会更快超过 2 个单独的查询,并且可以降低其他实例选择这些行的风险,因为它们将被锁定。

'SELECT .... FOR UPDATE' 允许您运行选择语句并锁定这些特定行以进行更新,因此您的语句可能如下所示:

START TRANSACTION;
   SELECT * FROM tb where field='value' LIMIT 1000 FOR UPDATE;
   UPDATE tb SET lock_field='1' WHERE field='value' LIMIT 1000;
COMMIT;

锁很强大,但请注意它不会影响您在不同部分的应用程序。检查当前为更新而锁定的那些选定行,它们是否在您的应用程序的其他地方(可能是最终用户)请求以及在这种情况下会发生什么。

此外,表必须是 InnoDB,建议您检查 where 子句的字段具有 Mysql 索引,如果没有,您可能会锁定整个表或遇到“Gap Lock”。

还有一种可能是锁定过程,尤其是在运行并行脚本时,会占用您的 CPU 和内存。

这是关于该主题的另一篇文章:http://www.percona.com/blog/2006/08/06/select-lock-in-share-mode-and-for-update/

希望这会有所帮助,并想听听您的进展情况。

【讨论】:

    【解决方案2】:

    我们在生产中实现了类似的东西。

    为避免重复,我们执行这样的 MySQL UPDATE(我将查询修改为类似于您的表):

    UPDATE queue SET id = LAST_INSERT_ID(id), date_update_started = ... 
    WHERE date_update_started IS NULL AND ...
    LIMIT 1;
    

    我们在单个事务中执行此更新,并利用LAST_INSERT_ID 函数。当像这样使用参数时,它会在事务会话中写入参数,在这种情况下,它是已更新的单个 (LIMIT 1) 队列的 ID(如果有的话)。

    在那之后,我们这样做:

    SELECT LAST_INSERT_ID();
    

    当不带参数使用时,它检索之前存储的值,获取必须执行的队列项的 ID。

    【讨论】:

    • 你能详细说明“写锁”是什么意思吗?也许有一个代码示例?
    • @Nate,已编辑和扩展 ;) 我还建议使用 RabbitMQ,顺便说一句。我们梦想着使用它:D
    【解决方案3】:

    编辑:抱歉,我完全误解了你的问题

    您应该在您的表格上放置一个“锁定”列,将您的脚本正在使用的条目的值设置为 true,并在完成后将其设置为 false。

    在我的例子中,我放置了 3 个其他时间戳(整数)列:target_ts、start_ts、done_ts。 你

    UPDATE table SET locked = TRUE WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(done_ts) AND ISNULL(start_ts);
    

    然后

    SELECT * FROM table WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(start_ts) AND locked=TRUE;
    

    完成您的工作并一一更新每个条目(以避免数据不一致)将 done_ts 属性设置为当前时间戳(您现在也可以解锁它们)。您可以将 target_ts 更新为您希望的下一个更新,或者您可以忽略此列并仅使用 done_ts 进行选择

    【讨论】:

    • 我不认为 PHP 实际上支持多线程,但无论如何让脚本的多个实例运行都不是问题。问题主要是如何处理从数据库中检索行。
    • 我更新了,抱歉我可能喝醉了:)。对于线程,我不知道,这就是 PECL 扩展所声称的,但我没有测试它,所以......
    【解决方案4】:

    每次脚本运行时,我都会让脚本生成一个 uniqid。

    $sctiptInstance = uniqid();
    

    我会添加一个脚本实例列来将此值保存为 varchar 并在其上放置一个索引。当脚本运行时,我会在事务内部使用 select for update 根据任何逻辑选择行,不包括具有脚本实例的行,然后使用脚本实例更新这些行。比如:

    START TRANSACTION;
    SELECT * FROM table WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000 FOR UPDATE;
    UPDATE table SET date_update_started = UTC_TIMESTAMP(), script_instance = '{$scriptInstance}' WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
    COMMIT;
    

    现在这些行将从脚本的其他实例中排除。您是否工作,然后更新行以将脚本实例设置回 null 或空白,并更新您的上次更新日期列。

    您还可以使用脚本实例写入另一个名为“当前实例”或类似名称的表,并让脚本检查该表以获取正在运行的脚本计数以控制并发脚本的数量。我也会将脚本的 PID 添加到表中。然后,您可以使用该信息创建一个定期从 cron 运行的管家脚本,以检查长时间运行或流氓进程并杀死它们等。

    【讨论】:

      【解决方案5】:

      我有一个系统在生产中的工作方式与此完全一样。我们每分钟运行一个脚本来进行一些处理,有时运行可能需要一分钟以上。

      我们有一个表列状态,0 表示尚未运行,1 表示已完成,其他值表示正在进行中。

      脚本做的第一件事是更新表格,设置一行或多行的值,表示我们正在处理该行。我们使用getmypid() 来更新我们想要处理但仍未处理的行。

      当我们完成处理时,脚本会更新具有相同进程 ID 的行,将它们标记为已完成(状态 1)。

      通过这种方式,我们避免了每个脚本都尝试处理已经在处理的行,并且它的工作原理就像一个魅力。这并不意味着没有更好的方法,但这确实可以完成工作。

      【讨论】:

        【解决方案6】:

        我过去曾出于非常相似的原因使用过存储过程。我们使用 FOR UPDATE 读锁来锁定表,同时更新选定标志以从任何未来的选择中删除该条目。它看起来像这样:

        CREATE PROCEDURE `select_and_lock`()
         BEGIN
          START TRANSACTION;
          SELECT your_fields FROM a_table WHERE some_stuff=something 
           AND selected = 0 FOR UPDATE;
          UPDATE a_table SET selected = 1;
          COMMIT;
         END$$
        

        虽然现在我考虑了一下,但没有理由必须在存储过程中完成。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2010-09-28
          • 1970-01-01
          相关资源
          最近更新 更多