728x90
DAG Runs
DAG Run
- Task 인스턴스들을 DAG에 정의 된 특정 시작일에 실행하는 DAG의 인스턴스
- 동시에 최대 몇개의 DAG run을 돌게 허용할 것인지 셋팅하는 파라미터가 있음
구분
|
내용
|
Status
|
DAG 실행 결과
|
Dag id
|
DAG의 이름 (식별자)
|
Logical Date
|
DAG이 실행 된 자체적인 논리적 시간 (기존의 execution_date)
|
Run id
|
DAG을 실행한 인스턴스의 이름 (식별자), 수동 실행인 경우 manual Prefix 추가
|
Run Type
|
스케줄러에 의한 자동 실행 여부, 수동 실행은 Manual 표시
|
Queued At
|
AirFlow Pools에 추가 된 시간
|
Start Date / End Date
|
DAG이 실행되고 종료 된 실제 시간 (Real-world)
|
External Trigger
|
DAG UI에서 ▶ 를 클릭하여 수동으로 실행했는지 여부
|
Conf
|
DAG UI에서 ▶ 버튼 아래의 Trigger DAG w/ config으로 실행할 때 전달한 변수 값
|
data_interval_start, data_interval_end
1. 매일 오전 0시에 시작해서 로그 처리를 하는 DAG를 정의
2. 이 DAG가 2023년 3월 24일 0시에 시작되었다면, 해당 작업은 2023년 3월 23일 0시부터 24일 0시까지의 로그를 처리
3. 아래와 같이 동작
- 실제 코드가 도는 시간 = 2023년 3월 24일 조금 지나서
- data_interval_start = 2023년 3월 23일 0시
- data_interval_end = 2023년 3월 24일 0시
- execution_date라는 용어를 사용했지만 Airflow 2.2버전 부터 사라짐
- execution_date = logical_data = data_interval_start
Cron Presets
- DAG를 실행하기 위한 스케줄링에 사용
- DAG를 정의할 때 schedule_interval 인자로 전달
- 분 부터 설정 가능하므로 초 단위는 표시되지 않음
preset
|
meaning
|
cron
|
data_interval (example)
|
None
|
스케줄링 없이 직접 DAG 실행
|
|
|
@once
|
한번만 실행
|
|
|
@hourly
|
한 시간에 한 번 실행
|
0 * * * *
|
00:00:00 ~ 00:59:00 (매시간)
|
@daily
|
하루에 한 번 실행 (00시)
|
0 0 * * *
|
00:00:00 ~ 23:59:00 (매일)
|
@weekly
|
일주일에 한 번 실행 (일요일 00시)
|
0 0 * * 0
|
2022-04-10 00:00:00 ~ 2022-04-16 23:59:00
|
@monthly
|
한 달에 한번 실행 (매월 1일 00시)
|
0 0 1 * *
|
2022-04-01 00:00:00 ~ 2022-04-30 23:59:00
|
@quarterly
|
분기 별로 한번 실행 (분기 첫 번째 날 00시)
|
0 0 1 */3 *
|
2022-04-01 00:00:00 ~ 2022-06-30 23:59:00
|
@yearly
|
1년에 한번 실행 (1월 1일 00시)
|
0 0 1 1 *
|
2022-01-01 00:00:00 ~ 2022-12-31 23:59:00
|
Catchup
- DAG가 오래 걸려서 다음 DAG 시작 시간보다 오래 걸릴 경우, 예정대로 다음 DAG를 실행할지 결정하는 옵션
- True : 이전 DAG가 종료 된 이후 다음 DAG를 실행 (default)
- False : 이전 DAG가 실행 중이더라도 예정으로 다음 DAG를 실행
Backfill
- Airflow의 start_date에 관계 없이 지정한 과거 기간에 대해 DAG를 실행할 수 있음
- 과거의 데이터를 생성해서 채워넣거나, 과저 특정 시점의 데이터를 생성을 요구 받은 경우 유용
DAG Pools, Branching, Trigger Rules
Pools
- Airflow는 스케줄러에 의해 설정 된 일정에 따라 동작하고, 동시에 여러 DAG가 실행될 수 있음
- Pools는 병렬 실행되는 Task의 수를 제한하는 역할을 함
- 동시에 실행하는 것이 불가능 할 경우 priority_weight 옵션으로 먼저 수행할 Task의 우선 순위 지정
Brancing
- Airflow는 워크플로우가 2개 이상으로 나눠지는 분기점 역할을 하는 Branch 기능을 제공
- Branch는 크게 아래 두 가지로 구분됨
- 분기되는 작업을 모두 수행 : Task의 순서를 지정할 때 대괄호( [] ) 로 묶으면 분기 가능
- 특정 조건을 만족하는 경우 : BranchPythonOperator 사용
Trigger Rules
- Task는 이전 Task들이 성공할 때만 실행됨
- 앞서 실행된 Task들의 결과에 대한 의존성이 존재
- 보다 복잡한 의존성 설정을 위한 다양한 Trigger Rule들이 존재
DAG Jinja Templation, Xcom, Variable, SubDags
Jinja Templation
- 파이썬을 위한 Templating Engine으로 디자이너 친화적인 템플릿 언어
- Airflow는 Jinja2 template를 내장하고 있어 Task 인스턴스가 실행 중일 때도 정보를 동적으로 전달 가능
Variable | Description |
{{ dag }} | DAG 오브젝트로 하위 속성 접근 가능 |
{{ task }} | Task 오브젝트로 하위 속성 접근 가능 |
{{ ds }} | DAG 실행의 논리적 날짜 (ex. 2022-04-07)로 {{ dag_run.logical_date | ds }} 와 동일 |
{{ ts }} | {{ dag_run.logical_date | ts }} 와 동일 (ex. 2018-01-01T00:00:00+00:00) |
{{ data_interval_start }} | Task가 참고해야 할 데이터의 시작 날짜 범위 |
{{ data_interval_end }} | Task가 참고해야 할 데이터의 종료 날짜 범위 |
{{ prev_start_date_success }} | 이전의 DAG 실행이 성공한 start_date (ds) |
{{ macros }} | Macro로서 하위 속성으로 일부 표준 라이브러리 접근 가능 datetime, timedelta, dateutil, time, uuid, random 등 |
{{ task_instance }} or {{ ti }} | task_instance 오브젝트로 하위 속성 접근 가능 |
{{ params }} | Dictionary 타입으로 전달 된 사용자 정의 매개변수 사전에 대한 참조 |
{{ var.value.my_var }} | Dictionary 타입의 전역 정의 변수 (Variables에 정의) |
{{ var.json.my_var.path }} | Dictionary를 deserialize한 JSON 객체 타입의 전역 정의 변수 |
{{ conn.my_conn_id }} | Connection represented as a dictionary. |
{{ conf }} | airflow.cfg에 정의 된 airflow.configuration.conf 개체 |
{{ run_id }} | 현재 DAG 실행의 run_id |
{{ dag_run }} | DagRun에 대한 참조 |
macro
- DAG가 매일 돈다면, 매일 data_interval_start의 값이 달라짐 이 때, 값을 bash 커맨드의 인자로 넘겨야 한다면?
- echo {{ data_interval_start }}
- 위와 같은 형태로 BashOperator 정의할 때 사용 가능
- jinja template 엔진을 사용함
Xcom
- Airflow의 Task 간 변수나 데이터 전달을 위해 통신하는 기능
- Worker가 여러 개인 Airflow 분산 환경에서는 서로 다른 Work에서 Task가 실행될 수 있기 때문에 Xcom을 사용
- 해당 기능을 통해 다른 Task에서 이전 Task의 값을 사용할 수 있음
✔ Variable과 비슷하지만 Xcom은 특정 DAG 내부에서만 공유되는 특징이 있음
✔ 여러 DAG에서 공유해서 사용하려면 Variable을 사용해야 함
- Key-Value 형식으로 사용됨
- Operator를 생성할 때 provide_ context 옵션이 True로 되어 있어야 함
Variables
- DAG에서 공통적으로 사용하는 값을 사용하기 위한 기능
- 외부에 있는 값을 가져올 수도 있음
- Key-Value 형태이며 다른 DAG와도 공유할 수 있음
참조
https://humbledude.github.io/blog/2022/11/30/airflow-basics-1/
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |
---|---|
[Airflow] Airflow Pipeline 생성 (2) (0) | 2023.03.28 |
[Airflow] Airflow 용어 정리 (1) (0) | 2023.03.24 |
[Airflow] Airflow Pipeline 생성 (1) (0) | 2023.03.23 |
[Airflow] Airflow Dag Task 옵션 값 (0) | 2023.03.21 |