728x90
[Airflow] 조건에 따라 Task 실행
조건에 따라 task가 실행되도록 구성을 하고 싶어, airflow에서 사용할 수 있는
operator를 찾아보니 활용할 수 있는 두가지 방업이 있었다.
BranchPythonOperator
task_id를 output으로하는 python callable을 통해 다음에 이어지는 작업 요소를 결정한다.
보통 선행 작업의 상황에 따라 다른 작업으로 이어져야 하는 경우에 사용한다.
https://heywantodo.tistory.com/57
import random
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
def decide_branch(**kwargs):
# Randomly select a branch
branches = ['branch_task_1', 'branch_task_2']
selected_branch = random.choice(branches)
return selected_branch
default_args = {
'start_date': datetime(2024, 1, 4),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'branch_test',
default_args = default_args
)
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_branch,
dag=dag
)
branch_task_1 = DummyOperator(
task_id='branch_task_1',
dag=dag
)
branch_task_2 = DummyOperator(
task_id='branch_task_2',
dag=dag
)
branch_task >> [branch_task_1, branch_task_2]
ShortCircuitOperator
조건이 만족하는 경우에만 task를 실행하고 조건에 맞지 않는 task를 skip한다.
import random
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from datetime import datetime, timedelta
def condition_func():
random.choices([True, False])
return True
default_args = {
'start_date': datetime(2024, 1, 4),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'short_circuit_operator_test',
default_args = default_args
)
start_task = DummyOperator(
task_id='start',
dag=dag,
)
def random_condition_func(**kwargs):
random_condition = random.choice([True, False])
return random_condition
random_short_circuit_task = ShortCircuitOperator(
task_id='random_short_circuit_task',
python_callable=random_condition_func,
provide_context=True,
dag=dag,
)
true_branch_task = DummyOperator(
task_id='true_branch_task',
dag=dag,
)
false_branch_task = DummyOperator(
task_id='false_branch_task',
dag=dag,
)
final_task = DummyOperator(
task_id='final_task',
dag=dag,
)
start_task >> random_short_circuit_task
random_short_circuit_task >> [true_branch_task, false_branch_task, final_task]
리턴 값이 False일 경우 그 이후의 task들이 skip 되는 것을 확인 할 수 있다.
참고
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Flower worker auto scale (0) | 2024.07.29 |
---|---|
[Airflow] Docker compose Airflow에서 Cli 사용하기 (0) | 2024.04.03 |
[Airflow] Airflow Configuration (0) | 2023.12.18 |
[Airflow] Celery executor, Flower (0) | 2023.08.10 |
[Airlfow] Sensor (0) | 2023.07.09 |