💾 Data/Airflow

[Airflow] Xcom

heywantodo 2023. 4. 20. 16:53
728x90
반응형

[Airflow] Xcom

 

Xcom

: DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용

각각의 task 끼리 작은 정보를 주고 받을 수 있게하는 방법

 

key

Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 중 하나
따로 지정해주지 않으면 default로 return_value로 설정

 

task_id

Xcom에 저장된 객체를 불러오기 위해 알아야할 요소

해당 task에서 발생되는 Xcom에 자연스럽게 매핑

 

✔ value

Xcom에 저장된 객체 또는 값
해당 value를 불러오기 위해서는 key, task_id가 필요하다.

 

Example

🔎 XCom을 사용하여 작업 간에 데이터를 전달하는 예

 

Operator Import

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable

from datetime import datetime, timedelta

 

함수 정의

1️⃣ generate_data_task에서 데이터를 생성 후 "my_data" 키를 사용하여 XCom에 푸시

 

2️⃣ XCom에서 데이터를 가져와 process_data_task에서 대문자로 처리

처리된 데이터를 "processed_data" 키를 사용하여 XCom에 Push

 

3️⃣ 마지막으로 display_data_task에서 처리된 데이터를 가져와서 로그에 표시

 

* 참고로 ti는 task_instance로 현재 실행중인 task를 말함

def generate_data_func(**kwargs):
    data = "Hello, Airflow!"
    kwargs['ti'].xcom_push(key='my_data', value=data)
    
def process_data_func(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(key='my_data', task_ids='generate_data')
    processed_data = data.upper()
    ti.xcom_push(key='processed_data', value=processed_data)
    
def display_data_func(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(key='processed_data', task_ids='process_data')
    print("Processed Data:", data)

 

Default_args & DAG 정의

default_args = {
    'start_date': datetime(2023, 4, 20),
    'schedule_interval': '@once'
}

dag = DAG(
    dag_id = 'xcom_test',
    default_args = default_args
)

 

Task 정의

  • Python Operator에서 provide_context=True  옵션을 사용해야 XCom 데이터에 액세스하고 조작할 수 있음
generate_data_task = PythonOperator(
    task_id='generate_data',
    python_callable=generate_data_func,
    provide_context=True,
    dag=dag,
)

process_data_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data_func,
    provide_context=True,
    dag=dag,
)

display_data_task = PythonOperator(
    task_id='display_data',
    python_callable=display_data_func,
    provide_context=True,
    dag=dag,
)

generate_data_task >> process_data_task >> display_data_task

 

DAG 확인

  • Log에서 첫번째 task에서 받아 두번째 task에서 처리된 데이터가 제대로 찍히는 모습을 확인

 

참조

https://velog.io/@hyunwoozz/airflow-Xcoms-%EC%97%90-%EB%8C%80%ED%95%9C-%EB%A6%AC%EB%B7%B0

728x90
반응형