【问题标题】:How to check status of long running http task with airflow?如何使用气流检查长时间运行的 http 任务的状态?
【发布时间】:2018-12-01 15:40:39
【问题描述】:

我的用例是使用气流控制跨微服务的大量预定作业。我正在尝试的解决方案是将气流用作集中式作业调度程序并通过进行 http 调用来触发作业。其中一些工作将运行很长时间,例如。超过 10 分钟或最长 1 小时。

如何通过气流定期检查这些作业的状态?如果远程任务已经完成但气流不知道工作成功怎么办?我可以将作业完成的事件发布到 kafka 并让气流在 kafka 上侦听以获取作业状态吗?

【问题讨论】:

    标签: airflow job-scheduling


    【解决方案1】:

    您可以通过多种方式使用 Airflow 和您的微服务来做到这一点。一般来说,你会想要使用一个传感器,这是适合这种情况的 Airflow 对象。首先查看BaseSensorOperator 和关于operators。在 Airflow 中,传感器的使用与操作员一样(传感器就是操作员)。所以你可以像这样创建一个工作:

    http_post_task -> http_sensor_task -> success_task
    

    http_post_task 会触发一个作业,http_sensor_task 会定期检查作业是否完成(例如 GET 请求微服务并检查 200,可能吗?),并且 success_task 将在 http_sensor_task 成功后执行。

    您的 http_sensor_task 需要是您自己的自定义传感器。这是一些可以帮助您创建此传感器的 sudo 代码(请记住,传感器就像操作员一样使用)。考虑一下您向微服务发出请求,然后发出另一个请求以检查作业状态(GET 请求并检查 200)的情况,您将像这样扩展 BaseSensorOperator 类型:

    from airflow.operators.sensors import BaseSensorOperator
    from airflow.utils.decorators import apply_defaults
    from time import sleep
    import requests
    
    class HTTPSensorOperator(BaseSensorOperator): 
        """
        Pokes a URL until it returns 200
        """
        ui_color = '#000000'
        @apply_defaults
        def __init__( self, url, *args, **kwargs):
            super(HTTPSensorOperator, self).__init__(*args, **kwargs)
            self.url = url
    
    
        def poke(self, context):
            """
            GET request url and return True if response is 200, False otherwise
            """
            r = requests.post(self.url)
            if r.status_code == 200:
                return True
            else:
                return False
    
        def execute(self, context):
            """
            Check the url and wait for it to return 200.
            """
            started_at = datetime.utcnow()
            while not self.poke(context):
                if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
                    if self.soft_fail:
                        raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
                    else:
                        raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
                sleep(self.poke_interval)
            self.log.info("Success criteria met. Exiting.")
    

    然后使用如下操作符:

    http_sensor_task = HTTPSensorOperator(
          task_id="http_sensor_task",
          url="http://localhost/check_job?job_id=1",
          timeout=3600, # 1 hour
          dag=dag
       )
    

    因此,您必须决定您的微服务将如何与 Airflow 通信。就在我的脑海里,我想你会发出 1 个请求来触发工作,然后发出后续请求(可能是 10 秒)来检查工作。祝你好运!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-14
      • 2014-03-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多