💾 Data/Airflow

[Airflow] Airflow Pipeline 생성 (1)

heywantodo 2023. 3. 23. 17:15
728x90
반응형

시나리오

1. mysql to csv

2. csv 파일 s3에 업로드

3. 실패 시 슬랙 알람 연동 

 

1. MySQL to CSV

airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용

 

  • Airflow Connection 이용
  • Pymysql과 Pandas 이용

 

1-1. Airflow Connection를 이용하여 MySQL <-> Airflow 연결 

 

Airflow webserver에서 [Admin] - [Connections] 메뉴로 이동, +를 눌러 새연결 생성 

  • Connection id : 연결 이름
  • Connection Type : MySQL (만약 없다면, 서버에서  pip install -U 'apache-airflow[mysql]' 진행)
  • Host : 확인한 IP 입력
  • Schema : Table을 생성할 DB
  • Login : 외부에서도 접속할 수 있도록 설정한 MySQL 계정
  • Password :
  • Port : 3306

MySQLOpertator 모듈이 필요

pip3 install apache-airflow-providers-mysql

DAG 파일에선 아래와 같이 Import

from datetime import datetime, timedelta
from email.policy import default
from textwrap import dedent
import csv
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.mysql.operators.mysql import MySqlOperator

DAG 공통 속성 값 정의

args = { 
    "owner": "admin", 
    "start_date": datetime(2023, 3, 23), 
    "depends_on_pasts" : False,
    "retires": 1,
    "retry_delay": timedelta(minutes=5)
}

Hook을 이용하여 MySQL to CSV 실행

def mysql_hook():
	logging.info("Started mysql_hook")
	hook = MySqlHook.get_hook(conn_id="example_db") 
    conn = hook.get_conn() #connection
    cursor = conn.cursor() #커서 생생
    
    #쿼리문 수행
    cursor.execute("use employees")
    cursor.execute(" select * from employees order by hire_date desc limit 10")
    
    #CSV파일 만들기
    with open("/home/ubuntu/airflow/csv/mysql_output_hook.csv","w") as f:
        csv_writer = csv.writer(f)
        csv_writer.writerow([i[0] for i in cursor.description])
        csv_writer.writerows(cursor)
        f.flush()
        cursor.close()
        conn.close()

DAG 정의

with DAG(
	dag_id = "hook_mysql_to_csv",
    default_args=args,
    schedule_interval='@once'
) as dag:
	task1 = PythonOperator(
    	task_id = 'mysql to_csv",
        python_callable=mysql_hook
		)
    
    task1

실행 확인

 

1-2. Pandas를 이용하여 CSV파일 생성

 

pymysql 모듈 필요

pip install pymysql

모듈 Import

from datetime import datetime, timedelta
from email.policy import default
from textwrap import dedent
import pandas as pd
import pymysql
from airflow import DAG
from airflow.operators.python import PythonOperator

DAG 공통 속성 값 정의

args = { 
    "owner": "admin", 
    "start_date": datetime(2023, 3, 23), 
    "depends_on_pasts" : False,
    "retires": 1,
    "retry_delay": timedelta(minutes=5)
}

Pandas를 이용하여 MySQL to CSV 실행

def pd_mysql_to_csv():
    conn = pymysql.connect(host='', user='admin', password='', db='employees', charset='utf8')
    query = 'select * from employees order by hire_date desc limit 10'
    df = pd.read_sql_query(query, conn)
    df.to_csv('/home/ubuntu/airflow/csv/mysql_output_pandas.csv', index=False)

DAG 정의

with DAG(
    dag_id="pandas_mysql_to_csv",
    default_args=args,
    schedule_interval='@once'
) as dag:
    # 
    task1 = PythonOperator(
        task_id="pd_mysql_to_csv",
        python_callable=pd_mysql_to_csv
    )
    task1

실행 확인

 

결론

🔎 Pymysql 모듈과 Pandas를 사용하는 것이 훨씬 간편함 

🔎 찾아보니 쿼리결과를 가공하거나 추가 쿼리가 필요한 경우에 Hook을 사용하면 유용하다고 함

 

참조

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html
https://airflow.apache.org/docs/apache-airflow-providers-mysql/1.0.0/_api/airflow/providers/mysql/hooks/mysql/index.html

https://magpienote.tistory.com/238

728x90
반응형