[Airflow] Airflow에서 DAG 파일을 처리하는 법 (DagFileProcessor)Airflow는 DAG(Directed Acyclic Graph) 파일을 통해 워크플로우를 정의하는 워크플로우 관리 도구다.DAG 파일은 Python 코드로 작성되는데, Airflow가 이를 어떻게 인식하고 실행하는 지에 대해 궁금점이 생겼다, Airflow에서 DAG 파일을 처리하는 핵심 컴포넌트 중 하나가 바로 DagFileProcessor다. 이 글에서는 DagFileProcessor가 무엇인지, 어떻게 동작하는지에 대해 알아보고자 한다. DagFileProcessor란?DagFileProcessor는 Airflow가 DAG 파일을 주기적으로 읽고 파싱하여, 메타 데이터를 저장하게하는 역할을 한다.Airf..
[Airflow] Flower worker auto scale Flower는 Celery 클러스터를 모니터링하고 관리하기 위한 웹 기반 도구다. Flower는 Celery 이벤트를 이용하여 작업 진행 상황 및 내역등을 실시간으로 모니터링 할 수 있으며,웹을 통해 작업자 인스턴스 종료 및 다시 시작 등의 remote 컨트롤이 가능하다. Worker auto scale플라워에서 다음과 같은 설정을 조정할 수 있는 칸을 확인할 수 있다.Airflow 공식 문서에서의 work_autoscale에 대한 설명은 다음과 같다. work_autoscale부하에 따라 풀 크기를 동적으로 조정하는 데 사용되는 최대 및 최소 풀 프로세스 수이다.항상 최소 프로세스를 유지하지만, 필요한 경우 최대로 늘린다.max_concu..
[Airflow] Docker compose Airflow에서 Cli 사용하기 docker compose는 컨테이너 환경에 Airflow를 띄우기 때문에 컨테이너 내부에 cli 명령어를 사용해야한다. 명령어를 사용하는 방법은 다음과 같다. docker compose run airflow-cli 명령어가 길기때문에 다음과 같이 alias로 등록해두고 사용하면 편하다. alias airflow='docker compose run airflow-cli' >> ~/.bashrc source ~/.bashrc
[Airflow] 조건에 따라 Task 실행 조건에 따라 task가 실행되도록 구성을 하고 싶어, airflow에서 사용할 수 있는 operator를 찾아보니 활용할 수 있는 두가지 방업이 있었다. BranchPythonOperator task_id를 output으로하는 python callable을 통해 다음에 이어지는 작업 요소를 결정한다. 보통 선행 작업의 상황에 따라 다른 작업으로 이어져야 하는 경우에 사용한다. https://heywantodo.tistory.com/57 [Airflow] 조건에 따라 분기 (Branch) [Airflow] 조건에 따라 분기 (Branch) BranchPythonOperator 분기를 위한 워크플로가 필요하거나, 임의의 조건에 따라 분기를 할 때 사용 반환된 ta..
[Airflow] Airflow Configuration Airflow를 사용하면서 튜닝이 가능한 여러 설정이 있는데, 그 설정들에 대해 정리 해보고자 한다. Airflow 레벨 airflow 환경을 조정하는 것이므로 모든 dag에 영향을 준다. core config 명 설명 parallelism 단일 Airflow 환경 내에서 동시에 실행 할 수 있는 최대 task 수 max_activate_tasks_per_dag DAG당 한 번에 스케줄링되는 최대 task 수를 결정 max_activate_runs_per_dag 1개의 DAG안에서 최대 활성화되는 DAG의 수를 제어 dag_file_processor_timeout DAG 파일을 처리하는 DagFileProcessor가 시간 초과되기 전에 실행할 수..
[Airflow] Celery executor, Flower https://heywantodo.tistory.com/68 [Airflow] 멀티 클러스터 환경 구성 (Celery Executer) [Airflow] 멀티 클러스터 환경 구성 [구성] 1. airflow-ec2-1 (Master) metadata database(rds-mysql) airflow webserver airflow worker 2. airflow-ec2-2 (Worker) airflow worker 1. Master, Worker 인스턴스 생성 후 airflow 설치 : airflow 설치 heywantodo.tistory.com 이전 포스팅에서 Docker composer를 통해 Celery executor를 구성하는 방법을 ..
[Airlfow] Sensor Airflow의 Sensor는 워크플로우 내에서 특정 이벤트 또는 조건이 발생할 때까지 작업을 지연시키는 데 사용 즉, Sensor는 다른 작업이 실행되기 전 특정 상태가 충족될 때까지 기다리는 역할을 함 예를 들어, 특정 파일이 생성되거나 특정 시간에 도달하거나 외부 시스템의 상태가 변경될 때까지 작업을 일시 중지 시키는 데 사용할 수 있음 Airflow에는 다양한 종류의 Sensor가 있으며 일반적으로 사용되는 몇 가지 예는 다음과 같음 1. FileSensor 파일이 특정 디렉토리에 존재하는지 여부를 확인한다. from airflow.sensors.filesystem import FileSensor from airflow import DAG dag = DAG("file..
[Airflow] Airflow CLI 명령어 Airflow CLI를 사용해 작업 실행, 스케줄 조작, 작업 상태 확인 등 다양한 작업 수행이 가능함 airflow cheat-sheet 명령어를 통해 자주쓰이는 명령어를 확인 할 수 있으며 airflow --help 명령어를 사용해서 자세한 도움말을 확인 할 수 있음 아래는 일부 명령어이며, airflow CLI에는 더 많은 명령어와 옵션이 있음 arguments dag _id : 실행 할 DAG의 ID task_id : 실행할 task의 ID execution_date : 실행할 날짜와 시간 (YYYY-MM-DD 또는 ISO 8601 형식) DAG ON airflow dags pause [dag_id] DAG OFF airflow dags unpause..
[Airflow] Catchup, Backfil Airflow는 스케줄 된 작업을 실행하고, 종속성을 관리하기 위해 DAG를 사용함 DAG는 작업 간의 종속성을 정의하고, 작업을 스케줄링하고, 작업 상태를 추적하는 데 사용됨 그렇다면 스케줄링과 관련된 Airflow의 catchup과 backfill에 대해서 알아보자 1. Catchup Airflow의 DAG 스케줄러 설정 중 하나 catchup이 활성화되면, DAG가 처음 생성된 날짜 이후에도 이전 날짜의 실행을 캐치업함 즉 DAG가 생성된 이후에 실행되어야 하는 작업 중, 현재 날짜부터 이전에 실행되지 않은 작업들을 자동으로 실행 DAG가 매일 실행되도록 예약이 되었지만, catchup이 활성화되어 있다면, DAG가 생성된 이후에 실행되어야 하는 작업..
[Airflow] airflow decorators airflow decorators airflow.decorators는 Airflow에서 제공하는 데코레이터 모듈 @dag : DAG를 정의할 때 사용되는 데코레이터 DAG 객체를 생성하고, DAG 속성을 설정하는 데 사용됨 @task : Task를 정의할 때 사용되는 데코레이터 Python 함수를 task로 등록하고, task의 실행을 제어하는데 사용됨 @task_decorator : 커스텀 task 데코레이터를 정의할 때 사용되는 데코레이터 이를 사용하여 task에 사용자 지정 로직이나 기능을 추가할 수 있음 @task_group : task group을 정의할 때 사용되는 데코레이터 여러 task를 그룹으로 묶어서 실행 순서나 의존성을 관리할 수 있..
[Airflow] Variables Variables Airflow 변수(Variables)는 Airflow에서 사용되는 일종의 키-값(key-value) 쌍으로 DAG 실행 중에 사용할 수 있는 데이터를 저장하는데 사용됨 이 변수들은 Airflow의 UI 또는 CLI를 통해 생성하고 관리할 수 있음 DAG에서 사용되는 DB 연결 정보나 API 키, 환경 변수 등을 저장할 수 있음 이러한 변수들은 DAG 코드 내에서 하드코딩하여 사용하는 것보다 유연성과 보안성을 높일 수 있음 1. UI에서 설정 Admin -> Variables에서 설정 가능 2. CLI에서 설정 airflow variables 명령어를 사용하여 CLI에서 변수를 생성, 수정, 삭제할 수 있음 airflow variables --set ..
[Airflow] 멀티 클러스터 환경 구성 [구성] 1. airflow-ec2-1 (Master) metadata database(rds-mysql) airflow webserver airflow worker 2. airflow-ec2-2 (Worker) airflow worker 1. Master, Worker 인스턴스 생성 후 airflow 설치 : airflow 설치 참고하여 설치 Airflow 설치 Airflow 설치 🔎 설치환경 AWS Amazonlinux 2 2.5.1 버전 Airflow 설치 공식 Documents 참조 https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html Inst..