[Airflow] 멀티 클러스터 환경 구성 [구성] 1. airflow-ec2-1 (Master) metadata database(rds-mysql) airflow webserver airflow worker 2. airflow-ec2-2 (Worker) airflow worker 1. Master, Worker 인스턴스 생성 후 airflow 설치 : airflow 설치 참고하여 설치 Airflow 설치 Airflow 설치 🔎 설치환경 AWS Amazonlinux 2 2.5.1 버전 Airflow 설치 공식 Documents 참조 https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html Inst..
[Airflow] Params Params DAG나 Task에 사용자 정의 데이터를 전달하기 위한 메커니즘 Python의 딕셔너리(Dictionary) 형태로 사용되며, DAG나 Task에 추가적인 정보나 매개변수를 전달하고자 할 때 사용됨 ⚠ DAG 레벨에서 params를 설정하면 해당 DAG의 모든 Task에 적용됨 Task 레벨에서 params를 설정하면 해당 Task에만 적용됨 Example 🔎 params를 활용하여 파이썬 함수에서 매개변수를 전달하고 xcom에 저장하는 DAG Operator Import from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.p..
[Airflow] Xcom Xcom : DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용 각각의 task 끼리 작은 정보를 주고 받을 수 있게하는 방법 ✔ key Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 중 하나 따로 지정해주지 않으면 default로 return_value로 설정 ✔ task_id Xcom에 저장된 객체를 불러오기 위해 알아야할 요소 해당 task에서 발생되는 Xcom에 자연스럽게 매핑 ✔ value Xcom에 저장된 객체 또는 값 해당 value를 불러오기 위해서는 key, task_id가 필요하다. Example 🔎 XCom을 사용하여 작업 간에 데이터를 전달하는 예 Operator Import from airflow import DAG from airflow..
[Airflow] 조건에 따라 분기 (Branch) BranchPythonOperator 분기를 위한 워크플로가 필요하거나, 임의의 조건에 따라 분기를 할 때 사용 반환된 task_id를 따르고 다른 모든 경로를 건너뜀 Operator Import import random from datetime import datetime from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator 함수 정의 분기에 사용될 조건을 정의 b..
시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 ✔ 아래 링크 이어서 진행 https://heywantodo.tistory.com/20 Airflow Pipeline 생성 (1) 시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 1. MySQL to CSV airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용 Airflow Connection 이용 Pymys heywantodo.tistory.com https://heywantodo.tistory.com/34 Airflow Pipeline 생성 (2) 시나리오 1. mysql to csv 2. csv 파일 ..
시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 ✔ 아래 링크 이어서 진행 https://heywantodo.tistory.com/20 Airflow Pipeline 생성 (1) 시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 1. MySQL to CSV airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용 Airflow Connection 이용 Pymys heywantodo.tistory.com 2. CSV file Upload to S3 선행작업 S3 Bucket 생성 Amazon S3 버킷 생성, 구성 및 작업 - Amazon Simple Stora..
DAG Runs DAG Run Task 인스턴스들을 DAG에 정의 된 특정 시작일에 실행하는 DAG의 인스턴스 동시에 최대 몇개의 DAG run을 돌게 허용할 것인지 셋팅하는 파라미터가 있음 Airflow Dag Task 옵션 값 Airflow Dag Task 옵션 값 Airflow에는 dag 성능을 개선할 수 있는 다양한 옵션들에는 다음과 같은 것들이 있음 parallelism 에어플로우 클러스터 전체에서 동시에 수행될 수 있는 task 인스턴스의 개수를 결 heywantodo.tistory.com 구분 내용 Status DAG 실행 결과 Dag id DAG의 이름 (식별자) Logical Date DAG이 실행 된 자체적인 논리적 시간 (기존의 execution_date) Run id DAG을 실행한..
Airflow란? Airflow Workflow를 정의하고 실행 가능한 플랫폼 반복된 작업을 자동화하기 위해 사용 각 작업들은 DAG를 통해 구조화 Airflow의 장점 연결 된 화살표 방향 순서대로 작업을 실행하고, 분기 실행과 병렬 실행이 가능함 각 작업들을 독립 된 단위로 나눠서 개발, 실행 순서를 조정하는 방법으로 모듈화 가능 데이터의 추출, 가공, 적재 기능을 나눠서 개발하고 Airflow는 이 작업들을 연결하여 파이프라인으로 배치 Airflow 구조 Airflow는 Scheduler가 DAG Directory의 작업을 가져와서 Worker에서 실행하는 형태 Scheduler Airflow의 DAG와 작업들을 모니터링하고 실행 순서와 상태 관리 Worker Airflow의 작업을 실행하는 공간 ..
시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 1. MySQL to CSV airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용 Airflow Connection 이용 Pymysql과 Pandas 이용 1-1. Airflow Connection를 이용하여 MySQL Airflow 연결 Airflow webserver에서 [Admin] - [Connections] 메뉴로 이동, +를 눌러 새연결 생성 Connection id : 연결 이름 Connection Type : MySQL (만약 없다면, 서버에서 pip install -U 'apache-airflow[mysql]' 진행) Host : 확인한 IP..
Airflow Dag Task 옵션 값 Airflow에는 dag 성능을 개선할 수 있는 다양한 옵션들에는 다음과 같은 것들이 있음 parallelism 에어플로우 클러스터 전체에서 동시에 수행될 수 있는 task 인스턴스의 개수를 결정 concurrency Dag는 concurrency 값을 초과하는 task 인스턴스를 동시에 수행할 수 없음 Dag 생성 시 concurrency를 설정할 수 있고, 따로 설정하지 않으면 dag_concurrency를 기본값으로 사용 task_concurrency 여러개의 dag_runs가 동시에 수행되는 경우 각 task 별 task 인스턴스 수를 제한 Dag안에 task가 최대 병렬적으로 몇 개 존재할 수 있는지에 대한 설정 값 max_active_runs task들의..
Airflow 데이터베이스 1. 에어플로우의 DB Airflow를 설치하면 Sqlite를 Default DB로 사용하게 되기 때문에 필수 라이브러리 Sqlite는 Sequentail Executor로 순차적으로 진행하는 것만 할 수 있음 작업이 느리고 원하는 시간에 작업을 처리하지 못할 수도 있기 때문에 상용 버전에선 거의 사용하지 않음 다른 Executor를 사용하면 병렬처리가 가능해져 작업이 빨라짐, 다양한 기능 또한 있음 다른 Executor를 사용하기 위해선 DB 변경이 필요함 주로 사용하는 것이 PostgreSQL 이나 MySQL 2. Airflow DB 변경 (PostgreSQL) 공식 문서 참조 https://airflow.apache.org/docs/apache-airflow/stable/..
Airflow 실행 🔎 설치환경 AWS Amazonlinux 2 1. 자동으로 초기화하고 실행 airflow standalone 2. 수동으로 초기화하고 실행 DB 초기화 airflow db init Airflow 사용자 생성 airflow users create \ --username admin \ --firstname Jeff \ --lastname bejos \ --role Admin \ --email ceo@amazon.com \ --password PASSWD 웹 서버 & 스케줄러 시작 airflow webserver --port 8080 airflow scheduler