【问题标题】:How to gain access to response headers from SimpleHTTPOperators within Apache Airflow如何从 Apache Airflow 中的 SimpleHTTPOperators 访问响应标头
【发布时间】:2020-12-13 23:39:12
【问题描述】:

截至目前,我正在尝试使用 Airflow 的 SimpleHTTPOperator,并希望使用需要 jwt 身份验证的端点。现在我正在使用 Node.js 和 Passport.js 来验证端点。我在访问登录页面后使用 res.cookie 将 jwt 存储在 cookie 中,登录后它将 jwt 正确存储在 cookie 中。这是 Postman 中的 cookie 标头参数。

Key: Cookie 
Value: 
connect.sid=[CONNECTION ID NOW STORED HERE]; 
jwt=[JWT NOW STORED HERE]

每当我尝试访问需要身份验证的端点时,它都可以在 Postman 中正常工作。

因此,在构建我的 DAG 定义文件时,我创建了一个首先登录的任务,以便其他操作员可以访问 JWT 并允许端点运行。 t1、t2 和 t3 需要通过读取标头并检查 Cookie 来进行 jwt 身份验证。这是我使用 SimpleHTTPOperators 的 Python 代码:

from datetime import datetime, timedelta
from airflow.operators.http_operator import SimpleHttpOperator
from airflow import DAG

default_args = {
    'owner': 'what',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 12),
    'email': 'what@gmail.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(days=1),
    'concurrency': 2,
}

# PT is 7 hours behind UTC, this will be Noon PT
dag = DAG('services',
          schedule_interval='0 19 * * *',
          default_args=default_args)
          
t0 = SimpleHttpOperator(
    task_id='login',
    method='POST',
    http_conn_id='http_service',
    endpoint='/account_service/login?email=admin@example.com&password=admin',
    dag=dag)

t1 = SimpleHttpOperator(
    task_id='delete_comments',
    method='POST',
    http_conn_id='http_service',
    endpoint='/comment_service/delete_comments',
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='delete_photos',
    method='POST',
    http_conn_id='http_service',
    endpoint='/photo_service/delete_photos',
    dag=dag)

t3 = SimpleHttpOperator(
    task_id='pending_to_deleted_account',
    method='POST',
    http_conn_id='http_service',
    endpoint='/account_service/pending_to_deleted',
    dag=dag)

t0 >> t2 >> t3
t0 >> t1 >> t3

但是,当我尝试在 Python 中使用 Airflow 调用各种需要身份验证的端点时,由于 401 错误,它最终会安排重试。

[2020-08-25 14:22:12,050] {logging_mixin.py:112} INFO - [2020-08-25 14:22:12,049] {base_hook.py:87} INFO - Using connection to: id: http_service. Host: localhost, Port: 8000, Schema: None, Login: None, Password: None, extra: None
[2020-08-25 14:22:12,052] {logging_mixin.py:112} INFO - [2020-08-25 14:22:12,051] {http_hook.py:136} INFO - Sending 'POST' to url: http://localhost:8000/comment_service/delete_comments
[2020-08-25 14:22:12,056] {logging_mixin.py:112} INFO - [2020-08-25 14:22:12,056] {http_hook.py:150} ERROR - HTTP error: Unauthorized
[2020-08-25 14:22:12,057] {logging_mixin.py:112} INFO - [2020-08-25 14:22:12,057] {http_hook.py:151} ERROR - Unauthorized
[2020-08-25 14:22:12,067] {taskinstance.py:1145} ERROR - 401:Unauthorized
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/airflow/hooks/http_hook.py", line 148, in check_response
    response.raise_for_status()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/requests/models.py", line 941, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: http://localhost:8000/comment_service/delete_comments

有没有办法让其他操作者访问响应头值,例如从登录中存储的 cookie,以便经过身份验证的端点能够识别 cookie 并正常运行?

编辑 1:我想出了一个非常巧妙的方法来绕过这个问题,方法是进入“连接”选项卡并输入以下参数:

连接 ID:http_service

端口:8000

额外:{ "Cookie": "jwt=[MY JWT HERE]"}

我通过调用 POST 登录路由获取 jwt,并查看 Postman 中的标头并将其放置在 [MY JWT HERE] 插槽所在的位置。虽然这可行,但这并不能解决我的问题,因为我想改为读取从 t0 SimpleHTTPOperator 给出的标头响应。我曾尝试通过查看 Airflow 的 XCom 文档来获得响应,但我取得的唯一进展是研究执行功能,如下所示:

print(t0.execute(context))

这只会导致仅打印出响应的正文,而不是多个参数,例如标头、正文等。

这里是HTTP算子执行函数的源码:

https://github.com/apache/airflow/blob/e62ad5333cbb56ae0f2001f0f79008a21c41a983/airflow/operators/http_operator.py#L94

任何帮助将不胜感激!

【问题讨论】:

    标签: python node.js rest jwt airflow


    【解决方案1】:

    假设我们有一个身份验证服务,它返回一个JSON 响应,如下所示:

    {
        "clientToken": "322e8df6-0597-479e-984d-db6d8705ee66"
    }
    

    这是我在气流 2.1 中使用SimpleHttpOperatorXCOM 变量传递机制获取JWT Security Token 并将其传递给可能对您有用的第二个SimpleHttpOperator 的示例代码:

        get_token = SimpleHttpOperator(
            task_id='get_token',
            method='POST',
            http_conn_id='http_service',
            data=json.dumps( {
                "username": "user_name",
                "password": "n46r4A83"
            }),
            endpoint='/authenticate',
            dag=dag,
            headers = {
            'Content-Type': 'application/json',
            'Cache-Control': 'no-cache',
            },
            response_filter=lambda response: response.json()['clientToken'],
        )
    
        
        get_cities = SimpleHttpOperator(
            task_id='get_cities',
            method='GET',
            http_conn_id='http_service',
            endpoint='/cities?dsCode=120',
            dag=dag,
            headers = {
            'X-CLIENT-TOKEN':'{{ ti.xcom_pull(task_ids="get_token") }}'
            }
            # response_filter=lambda response: response.json()['clientToken'],
        )
    
        get_token >> get_cities
    
    

    【讨论】:

      猜你喜欢
      • 2018-05-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-02-14
      • 2023-03-06
      • 2012-02-24
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多