💾 Data/Airflow

[Airflow] Params

heywantodo 2023. 4. 29. 11:08
728x90
반응형

[Airflow] Params

 

Params

DAG나 Task에 사용자 정의 데이터를 전달하기 위한 메커니즘

Python의 딕셔너리(Dictionary) 형태로 사용되며,

DAG나 Task에 추가적인 정보매개변수를 전달하고자 할 때 사용됨

 

DAG 레벨에서 params를 설정하면 해당 DAG의 모든 Task에 적용됨

Task 레벨에서 params를 설정하면 해당 Task에만 적용됨

 

Example 

🔎  params를 활용하여 파이썬 함수에서 매개변수를 전달하고 

xcom에 저장하는 DAG

 

Operator Import

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

 

함수 정의

1. params에서 매개 변수를 가져오고

2. 가져온 값을 활용하여 로직을 수행 한 후

3. 결과를 xcom에 저장

def my_function(**kwargs):
    # params에서 필요한 값을 가져오기
    my_param1 = kwargs['params']['my_param1']
    my_param2 = kwargs['params']['my_param2']
    
    # params 값을 활용한 로직 수행
    result = my_param1 + my_param2
    print(result)
    
    # 결과를 XCom에 저장
    kwargs['ti'].xcom_push(key='result', value=result)

 

DAG 정의

default_args = {
    'start_date': datetime(2023, 4, 29),
    'schedule_interval': '@once'
}

dag = DAG(
    dag_id = 'param_test',
    default_args = default_args
)

 

Task 정의

task1 = DummyOperator(task_id='task1', dag=dag)

task2 = PythonOperator(
    task_id='task2',
    python_callable=my_function,
    params={'my_param1': 10, 'my_param2': 20},  # params 설정
    provide_context=True,
    dag=dag
)

task1 >> task2

 

DAG 확인

  • params로 가져온 매개변수로 result 값 출력

 

728x90
반응형