【发布时间】:2014-12-17 03:27:38
【问题描述】:
我在使用 Celery 的 group 功能时遇到了一个相当烦人的行为。
我需要定期检查一堆主机解析到的 IP,以确保所述 IP 没有改变。为了做到这一点,我有一本带有< hostname, IPs > 的字典,我需要验证。例如:
REQUIRED_HOSTS = {
'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
'*.com': {'198.252.206.16'}
}
所以唯一要做的就是定期迭代REQUIRED_HOSTS.keys(),解析名称并查看它解析到的任何IP是否与我记录的不同。 (这里不用多想)
为了提高一点效率,每个名字都是并行解析的。我为此创建了一个子任务(它使用dnspython 解决):
@my_tasks.task
def resolve_hostname(hostname, resolver=None):
""" This subtask resolves the 'hostname' to its IP addresses. It's
intended to be used in the 'compare_required_ips' function to resolve
names in parallel """
if resolver is None:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
try:
return (hostname,
{hst.address for hst in resolver.query(hostname)})
except Exception, e:
logger.exception("Got %s when trying to resolve hostname=%s"
% (type(e), hostname))
raise e
现在,查询所有主机名并生成子任务的方法如下:
@my_taks.task
def compare_required_ips():
""" This method verifies that the IPs haven't changed. """
retval = []
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
logger.info("Going to compare IPs for %s hostnames=%s"
% (len(required_hosts.REQUIRED_HOSTS.keys()),
required_hosts.REQUIRED_HOSTS.keys()))
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
for hostname, ips in ip_subtasks.get(timeout=90):
retrieved_hosts[hostname] = ips
for hostname in required_hosts.REQUIRED_HOSTS:
if (required_hosts.REQUIRED_HOSTS[hostname]
!= retrieved_hosts[hostname]):
retval.append(hostname)
logger.error(
"IP resolution mismatch. hostname=%s resolve_target=%s"
", resolve_actual=%s (mismatch=%s)"
% (hostname,
required_hosts.REQUIRED_HOSTS[hostname],
retrieved_hosts[hostname],
(required_hosts.REQUIRED_HOSTS[hostname]
^ retrieved_hosts[hostname]))
)
return retval
再次,相当简单...只需遍历REQUIRED_HOSTS 键(又名 主机名),生成一个子任务来解决每个问题,然后在 90 秒超时(发生在for hostname, ips in ip_subtasks.get(timeout=90) 行中)
现在,令人讨厌的是,除了一个之外的所有子任务都在 90 秒的窗口内成功完成。然后父任务(compare_required_ips)由于timeout=90 而失败,当这种情况发生时,子任务成功完成(在父任务失败后立即)。我已经尝试增加和减少超时,并且子任务总是采用我在group 创建中指定的任何超时,从而使主任务报告失败。
我还手动运行了名称解析(没有使它成为 celery 任务,而是使用常规线程)并且它在毫秒内解析。每次,我尝试进行的每一次测试。我不认为dns.resolver.Resolver() class 有问题。一切似乎都表明这个类解决得非常快,但是子任务,或者组,或者...... Celery 中的某个人不知道它(虽然只是子任务之一)
我正在使用celery==3.1.9、celery-with-redis==3.0 和flower==0.6.0 进行监控。
非常感谢任何帮助、提示或测试的东西。
【问题讨论】:
标签: python parallel-processing timeout celery