【问题标题】:Celery: One subtask in group always timesout芹菜:组中的一个子任务总是超时
【发布时间】:2014-12-17 03:27:38
【问题描述】:

我在使用 Celery 的 group 功能时遇到了一个相当烦人的行为。

我需要定期检查一堆主机解析到的 IP,以确保所述 IP 没有改变。为了做到这一点,我有一本带有< hostname, IPs > 的字典,我需要验证。例如:

REQUIRED_HOSTS = {
    'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
    '*.com': {'198.252.206.16'}
}

所以唯一要做的就是定期迭代REQUIRED_HOSTS.keys(),解析名称并查看它解析到的任何IP是否与我记录的不同。 (这里不用多想)

为了提高一点效率,每个名字都是并行解析的。我为此创建了一个子任务(它使用dnspython 解决):

@my_tasks.task
def resolve_hostname(hostname, resolver=None):
    """ This subtask resolves the 'hostname' to its IP addresses. It's
    intended to be used in the 'compare_required_ips' function to resolve
    names in parallel """
    if resolver is None:
        resolver = dns.resolver.Resolver()
        resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers

    try:
        return (hostname,
                {hst.address for hst in resolver.query(hostname)})
    except Exception, e:
        logger.exception("Got %s when trying to resolve hostname=%s"
                         % (type(e), hostname))
        raise e

现在,查询所有主机名并生成子任务的方法如下:

@my_taks.task
def compare_required_ips():
    """ This method verifies that the IPs haven't changed. """
    retval = []
    resolver = dns.resolver.Resolver()
    resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
    retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
    logger.info("Going to compare IPs for %s hostnames=%s"
                % (len(required_hosts.REQUIRED_HOSTS.keys()),
                   required_hosts.REQUIRED_HOSTS.keys()))
    ip_subtasks = group(
        [resolve_hostname.s(hostname, resolver=resolver)
         for hostname in required_hosts.REQUIRED_HOSTS.keys()]
    )()
    for hostname, ips in ip_subtasks.get(timeout=90):
        retrieved_hosts[hostname] = ips

    for hostname in required_hosts.REQUIRED_HOSTS:
        if (required_hosts.REQUIRED_HOSTS[hostname]
                != retrieved_hosts[hostname]):
            retval.append(hostname)
            logger.error(
                "IP resolution mismatch. hostname=%s resolve_target=%s"
                ", resolve_actual=%s (mismatch=%s)"
                % (hostname,
                   required_hosts.REQUIRED_HOSTS[hostname],
                   retrieved_hosts[hostname],
                   (required_hosts.REQUIRED_HOSTS[hostname]
                    ^ retrieved_hosts[hostname]))
            )
    return retval

再次,相当简单...只需遍历REQUIRED_HOSTS 键(又名 主机名),生成一个子任务来解决每个问题,然后在 90 秒超时(发生在for hostname, ips in ip_subtasks.get(timeout=90) 行中)

现在,令人讨厌的是,除了一个之外的所有子任务都在 90 秒的窗口内成功完成。然后父任务(compare_required_ips)由于timeout=90 而失败,当这种情况发生时,子任务成功完成(在父任务失败后立即)。我已经尝试增加和减少超时,并且子任务总是采用我在group 创建中指定的任何超时,从而使主任务报告失败。

我还手动运行了名称解析(没有使它成为 celery 任务,而是使用常规线程)并且它在毫秒内解析。每次,我尝试进行的每一次测试。我不认为dns.resolver.Resolver() class 有问题。一切似乎都表明这个类解决得非常快,但是子任务,或者组,或者...... Celery 中的某个人不知道它(虽然只是子任务之一)

我正在使用celery==3.1.9celery-with-redis==3.0flower==0.6.0 进行监控。

非常感谢任何帮助、提示或测试的东西。

【问题讨论】:

    标签: python parallel-processing timeout celery


    【解决方案1】:

    一个问题可能是由于同步子任务的启动而导致的死锁。 compare_required_ips 是一个芹菜任务。在此任务中,您正在等待 groupresolve_hostname 任务完成,这确实是低效的。

    所以你必须改变它

    ip_subtasks = group(
            [resolve_hostname.s(hostname, resolver=resolver)
             for hostname in required_hosts.REQUIRED_HOSTS.keys()]
        )()
    

    ip_subtasks = group(
            [resolve_hostname.s(hostname, resolver=resolver)
             for hostname in required_hosts.REQUIRED_HOSTS.keys()]
        ).delay()
    

    通过避免死锁异步启动所有任务。

    您不应该在compate_required_ips 任务中执行ip_subtasks.get()(即使ip_subtask 只需要一纳秒)。你必须为此编写一个新函数或使用celery task_success signal

    【讨论】:

      最近更新 更多