728x90
[Airflow] 조건에 따라 분기 (Branch)
BranchPythonOperator
분기를 위한 워크플로가 필요하거나, 임의의 조건에 따라 분기를 할 때 사용
반환된 task_id를 따르고 다른 모든 경로를 건너뜀
Operator Import
import random
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
함수 정의
- 분기에 사용될 조건을 정의
- branch_task_1과 branch_task_2 중 랜덤으로 choice
def decide_branch(**kwargs):
# Randomly select a branch
branches = ['branch_task_1', 'branch_task_2']
selected_branch = random.choice(branches)
return selected_branch
Default_args & DAG 정의
default_args = {
'start_date': datetime(2023, 4, 19),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'branch_test',
default_args = default_args
)
Task 정의
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_branch,
dag=dag
)
branch_task_1 = DummyOperator(
task_id='branch_task_1',
dag=dag
)
branch_task_2 = DummyOperator(
task_id='branch_task_2',
dag=dag
)
branch_task >> [branch_task_1, branch_task_2]
DAG 확인
- Trigger했을 때 랜덤으로 brach_task_1이 Choice
- task_1이 실행되고 task_2가 Skipped 된 모습 확인 가능
👍Good
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Params (0) | 2023.04.29 |
---|---|
[Airflow] Xcom (0) | 2023.04.20 |
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |
[Airflow] Airflow Pipeline 생성 (2) (0) | 2023.03.28 |
[Airflow] Airflow 용어 정리 (2) (0) | 2023.03.24 |