728x90
[Airflow] Xcom
Xcom
: DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용
각각의 task 끼리 작은 정보를 주고 받을 수 있게하는 방법
✔ key
Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 중 하나
따로 지정해주지 않으면 default로 return_value로 설정
✔ task_id
Xcom에 저장된 객체를 불러오기 위해 알아야할 요소
해당 task에서 발생되는 Xcom에 자연스럽게 매핑
✔ value
Xcom에 저장된 객체 또는 값
해당 value를 불러오기 위해서는 key, task_id가 필요하다.
Example
🔎 XCom을 사용하여 작업 간에 데이터를 전달하는 예
Operator Import
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
from datetime import datetime, timedelta
함수 정의
1️⃣ generate_data_task에서 데이터를 생성 후 "my_data" 키를 사용하여 XCom에 푸시
2️⃣ XCom에서 데이터를 가져와 process_data_task에서 대문자로 처리
처리된 데이터를 "processed_data" 키를 사용하여 XCom에 Push
3️⃣ 마지막으로 display_data_task에서 처리된 데이터를 가져와서 로그에 표시
* 참고로 ti는 task_instance로 현재 실행중인 task를 말함
def generate_data_func(**kwargs):
data = "Hello, Airflow!"
kwargs['ti'].xcom_push(key='my_data', value=data)
def process_data_func(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(key='my_data', task_ids='generate_data')
processed_data = data.upper()
ti.xcom_push(key='processed_data', value=processed_data)
def display_data_func(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(key='processed_data', task_ids='process_data')
print("Processed Data:", data)
Default_args & DAG 정의
default_args = {
'start_date': datetime(2023, 4, 20),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'xcom_test',
default_args = default_args
)
Task 정의
- Python Operator에서 provide_context=True 옵션을 사용해야 XCom 데이터에 액세스하고 조작할 수 있음
generate_data_task = PythonOperator(
task_id='generate_data',
python_callable=generate_data_func,
provide_context=True,
dag=dag,
)
process_data_task = PythonOperator(
task_id='process_data',
python_callable=process_data_func,
provide_context=True,
dag=dag,
)
display_data_task = PythonOperator(
task_id='display_data',
python_callable=display_data_func,
provide_context=True,
dag=dag,
)
generate_data_task >> process_data_task >> display_data_task
DAG 확인
- Log에서 첫번째 task에서 받아 두번째 task에서 처리된 데이터가 제대로 찍히는 모습을 확인
참조
https://velog.io/@hyunwoozz/airflow-Xcoms-%EC%97%90-%EB%8C%80%ED%95%9C-%EB%A6%AC%EB%B7%B0
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] 멀티 클러스터 환경 구성 (Celery Executer) (0) | 2023.04.30 |
---|---|
[Airflow] Params (0) | 2023.04.29 |
[Airflow] 조건에 따라 분기 (Branch) (0) | 2023.04.19 |
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |
[Airflow] Airflow Pipeline 생성 (2) (0) | 2023.03.28 |