💾 Data/Airflow

[Airflow] Airflow 용어 정리 (2)

heywantodo 2023. 3. 24. 13:48
728x90
반응형

DAG Runs

 

DAG Run

  • Task 인스턴스들을 DAG에 정의 된 특정 시작일에 실행하는 DAG의 인스턴스
  • 동시에 최대 몇개의 DAG run을 돌게 허용할 것인지 셋팅하는 파라미터가 있음
 

Airflow Dag Task 옵션 값

Airflow Dag Task 옵션 값 Airflow에는 dag 성능을 개선할 수 있는 다양한 옵션들에는 다음과 같은 것들이 있음 parallelism 에어플로우 클러스터 전체에서 동시에 수행될 수 있는 task 인스턴스의 개수를 결

heywantodo.tistory.com

구분
내용
Status
DAG 실행 결과
Dag id
DAG의 이름 (식별자)
Logical Date 
DAG이 실행 된 자체적인 논리적 시간 (기존의 execution_date)
Run id
DAG을 실행한 인스턴스의 이름 (식별자), 수동 실행인 경우 manual Prefix 추가
Run Type
스케줄러에 의한 자동 실행 여부, 수동 실행은 Manual 표시
Queued At
AirFlow Pools에 추가 된 시간
Start Date / End Date
DAG이 실행되고 종료 된 실제 시간 (Real-world)
External Trigger
DAG UI에서 ▶ 를 클릭하여 수동으로 실행했는지 여부
Conf
DAG UI에서 ▶ 버튼 아래의 Trigger DAG w/ config으로 실행할 때 전달한 변수 값

 

data_interval_start, data_interval_end

 

1. 매일 오전 0시에 시작해서 로그 처리를 하는 DAG를 정의 

2. 이 DAG가 2023년 3월 24일 0시에 시작되었다면, 해당 작업은 2023년 3월 23일 0시부터 24일 0시까지의 로그를 처리

3. 아래와 같이 동작

- 실제 코드가 도는 시간 = 2023년 3월 24일 조금 지나서

- data_interval_start = 2023년 3월 23일 0시

- data_interval_end = 2023년 3월 24일 0시

 

  • execution_date라는 용어를 사용했지만 Airflow 2.2버전 부터 사라짐
  • execution_date = logical_data = data_interval_start

 

Cron Presets

  • DAG를 실행하기 위한 스케줄링에 사용
  • DAG를 정의할 때 schedule_interval 인자로 전달
  • 분 부터 설정 가능하므로 초 단위는 표시되지 않음 
preset
meaning
cron
data_interval (example)
None
스케줄링 없이 직접 DAG 실행
 
 
@once
한번만 실행
 
 
@hourly
한 시간에 한 번 실행
0 * * * *
00:00:00 ~ 00:59:00 (매시간)
@daily
하루에 한 번 실행 (00시)
0 0 * * *
00:00:00 ~ 23:59:00 (매일)
@weekly
일주일에 한 번 실행 (일요일 00시)
0 0 * * 0
2022-04-10 00:00:00 ~ 2022-04-16 23:59:00
@monthly
한 달에 한번 실행 (매월 1일 00시)
0 0 1 * *
2022-04-01 00:00:00 ~ 2022-04-30 23:59:00
@quarterly
분기 별로 한번 실행 (분기 첫 번째 날 00시)
0 0 1 */3 *
2022-04-01 00:00:00 ~ 2022-06-30 23:59:00
@yearly
1년에 한번 실행 (1월 1일 00시)
0 0 1 1 *
2022-01-01 00:00:00 ~ 2022-12-31 23:59:00

 

Catchup

  • DAG가 오래 걸려서 다음 DAG 시작 시간보다 오래 걸릴 경우, 예정대로 다음 DAG를 실행할지 결정하는 옵션
  • True : 이전 DAG가 종료 된 이후 다음 DAG를 실행 (default)
  • False : 이전 DAG가 실행 중이더라도 예정으로 다음 DAG를 실행

 

Backfill

  • Airflow의 start_date에 관계 없이 지정한 과거 기간에 대해 DAG를 실행할 수 있음
  • 과거의 데이터를 생성해서 채워넣거나, 과저 특정 시점의 데이터를 생성을 요구 받은 경우 유용 

 

DAG Pools, Branching, Trigger Rules

 

Pools

  • Airflow는 스케줄러에 의해 설정 된 일정에 따라 동작하고, 동시에 여러 DAG가 실행될 수 있음
  • Pools는 병렬 실행되는 Task의 수를 제한하는 역할을 함
  • 동시에 실행하는 것이 불가능 할 경우 priority_weight 옵션으로 먼저 수행할 Task의 우선 순위 지정

 

Brancing

  • Airflow는 워크플로우가 2개 이상으로 나눠지는 분기점 역할을 하는 Branch 기능을 제공
  • Branch는 크게 아래 두 가지로 구분됨
    • 분기되는 작업을 모두 수행 : Task의 순서를 지정할 때 대괄호( [] ) 로 묶으면 분기 가능 
    • 특정 조건을 만족하는 경우 : BranchPythonOperator 사용 

 

Trigger Rules

  • Task는 이전  Task들이 성공할 때만 실행됨
  • 앞서 실행된 Task들의 결과에 대한 의존성이 존재
  • 보다 복잡한 의존성 설정을 위한 다양한 Trigger Rule들이 존재 

 

DAG Jinja Templation, Xcom, Variable, SubDags

 

Jinja Templation

  • 파이썬을 위한 Templating Engine으로 디자이너 친화적인 템플릿 언어
  • Airflow는 Jinja2 template를 내장하고 있어 Task 인스턴스가 실행 중일 때도 정보를 동적으로 전달 가능
Variable Description
{{ dag }} DAG 오브젝트로 하위 속성 접근 가능
{{ task }} Task 오브젝트로 하위 속성 접근 가능
{{ ds }} DAG 실행의 논리적 날짜 (ex. 2022-04-07)로 {{ dag_run.logical_date | ds }} 와 동일
{{ ts }} {{ dag_run.logical_date | ts }} 와 동일 (ex.  2018-01-01T00:00:00+00:00)
{{ data_interval_start }} Task가 참고해야 할 데이터의 시작 날짜 범위
{{ data_interval_end }} Task가 참고해야 할 데이터의 종료 날짜 범위
{{ prev_start_date_success }} 이전의 DAG 실행이 성공한 start_date (ds)
{{ macros }} Macro로서 하위 속성으로 일부 표준 라이브러리 접근 가능
datetime, timedelta, dateutil, time, uuid, random 등
{{ task_instance }} or {{ ti }} task_instance 오브젝트로 하위 속성 접근 가능
{{ params }} Dictionary 타입으로 전달 된 사용자 정의 매개변수 사전에 대한 참조
{{ var.value.my_var }} Dictionary 타입의 전역 정의 변수 (Variables에 정의)
{{ var.json.my_var.path }} Dictionary를 deserialize한 JSON 객체 타입의 전역 정의 변수
{{ conn.my_conn_id }} Connection represented as a dictionary.
{{ conf }} airflow.cfg에 정의 된 airflow.configuration.conf 개체
{{ run_id }} 현재 DAG 실행의 run_id
{{ dag_run }} DagRun에 대한 참조

 

macro

  • DAG가 매일 돈다면, 매일 data_interval_start의 값이 달라짐 이 때, 값을 bash 커맨드의 인자로 넘겨야 한다면?
  •  echo {{ data_interval_start }} 
  • 위와 같은 형태로 BashOperator 정의할 때 사용 가능 
  • jinja template 엔진을 사용함

 

Xcom

 

XComs — Airflow Documentation

 

airflow.apache.org

  • Airflow의 Task 간 변수나 데이터 전달을 위해 통신하는 기능
  • Worker가 여러 개인 Airflow 분산 환경에서는 서로 다른 Work에서 Task가 실행될 수 있기 때문에 Xcom을 사용 
  • 해당 기능을 통해 다른 Task에서 이전 Task의 값을 사용할 수 있음

 

✔ Variable과 비슷하지만 Xcom은 특정 DAG 내부에서만 공유되는 특징이 있음

✔ 여러 DAG에서 공유해서 사용하려면 Variable을 사용해야 함 

 

  • Key-Value 형식으로 사용됨
  • Operator 생성할 provide_ context 옵션이 True 되어 있어야

 

Variables

  • DAG에서 공통적으로 사용하는 값을 사용하기 위한 기능
  • 외부에 있는 값을 가져올 수도 있음
  • Key-Value 형태이며 다른 DAG와도 공유할 수 있음

 

참조

https://www.bearpooh.com/154

https://humbledude.github.io/blog/2022/11/30/airflow-basics-1/

728x90
반응형