728x90
[Airflow] Params
Params
DAG나 Task에 사용자 정의 데이터를 전달하기 위한 메커니즘
Python의 딕셔너리(Dictionary) 형태로 사용되며,
DAG나 Task에 추가적인 정보나 매개변수를 전달하고자 할 때 사용됨
⚠
DAG 레벨에서 params를 설정하면 해당 DAG의 모든 Task에 적용됨
Task 레벨에서 params를 설정하면 해당 Task에만 적용됨
Example
🔎 params를 활용하여 파이썬 함수에서 매개변수를 전달하고
xcom에 저장하는 DAG
Operator Import
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
함수 정의
1. params에서 매개 변수를 가져오고
2. 가져온 값을 활용하여 로직을 수행 한 후
3. 결과를 xcom에 저장
def my_function(**kwargs):
# params에서 필요한 값을 가져오기
my_param1 = kwargs['params']['my_param1']
my_param2 = kwargs['params']['my_param2']
# params 값을 활용한 로직 수행
result = my_param1 + my_param2
print(result)
# 결과를 XCom에 저장
kwargs['ti'].xcom_push(key='result', value=result)
DAG 정의
default_args = {
'start_date': datetime(2023, 4, 29),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'param_test',
default_args = default_args
)
Task 정의
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=my_function,
params={'my_param1': 10, 'my_param2': 20}, # params 설정
provide_context=True,
dag=dag
)
task1 >> task2
DAG 확인
- params로 가져온 매개변수로 result 값 출력
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Variables (0) | 2023.05.11 |
---|---|
[Airflow] 멀티 클러스터 환경 구성 (Celery Executer) (0) | 2023.04.30 |
[Airflow] Xcom (0) | 2023.04.20 |
[Airflow] 조건에 따라 분기 (Branch) (0) | 2023.04.19 |
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |