시나리오
1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 |
1. MySQL to CSV
airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용
- Airflow Connection 이용
- Pymysql과 Pandas 이용
1-1. Airflow Connection를 이용하여 MySQL <-> Airflow 연결
Airflow webserver에서 [Admin] - [Connections] 메뉴로 이동, +를 눌러 새연결 생성
- Connection id : 연결 이름
- Connection Type : MySQL (만약 없다면, 서버에서 pip install -U 'apache-airflow[mysql]' 진행)
- Host : 확인한 IP 입력
- Schema : Table을 생성할 DB
- Login : 외부에서도 접속할 수 있도록 설정한 MySQL 계정
- Password :
- Port : 3306
MySQLOpertator 모듈이 필요
pip3 install apache-airflow-providers-mysql
DAG 파일에선 아래와 같이 Import
from datetime import datetime, timedelta
from email.policy import default
from textwrap import dedent
import csv
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.mysql.operators.mysql import MySqlOperator
DAG 공통 속성 값 정의
args = {
"owner": "admin",
"start_date": datetime(2023, 3, 23),
"depends_on_pasts" : False,
"retires": 1,
"retry_delay": timedelta(minutes=5)
}
Hook을 이용하여 MySQL to CSV 실행
def mysql_hook():
logging.info("Started mysql_hook")
hook = MySqlHook.get_hook(conn_id="example_db")
conn = hook.get_conn() #connection
cursor = conn.cursor() #커서 생생
#쿼리문 수행
cursor.execute("use employees")
cursor.execute(" select * from employees order by hire_date desc limit 10")
#CSV파일 만들기
with open("/home/ubuntu/airflow/csv/mysql_output_hook.csv","w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow([i[0] for i in cursor.description])
csv_writer.writerows(cursor)
f.flush()
cursor.close()
conn.close()
DAG 정의
with DAG(
dag_id = "hook_mysql_to_csv",
default_args=args,
schedule_interval='@once'
) as dag:
task1 = PythonOperator(
task_id = 'mysql to_csv",
python_callable=mysql_hook
)
task1
실행 확인
1-2. Pandas를 이용하여 CSV파일 생성
pymysql 모듈 필요
pip install pymysql
모듈 Import
from datetime import datetime, timedelta
from email.policy import default
from textwrap import dedent
import pandas as pd
import pymysql
from airflow import DAG
from airflow.operators.python import PythonOperator
DAG 공통 속성 값 정의
args = {
"owner": "admin",
"start_date": datetime(2023, 3, 23),
"depends_on_pasts" : False,
"retires": 1,
"retry_delay": timedelta(minutes=5)
}
Pandas를 이용하여 MySQL to CSV 실행
def pd_mysql_to_csv():
conn = pymysql.connect(host='', user='admin', password='', db='employees', charset='utf8')
query = 'select * from employees order by hire_date desc limit 10'
df = pd.read_sql_query(query, conn)
df.to_csv('/home/ubuntu/airflow/csv/mysql_output_pandas.csv', index=False)
DAG 정의
with DAG(
dag_id="pandas_mysql_to_csv",
default_args=args,
schedule_interval='@once'
) as dag:
#
task1 = PythonOperator(
task_id="pd_mysql_to_csv",
python_callable=pd_mysql_to_csv
)
task1
실행 확인
결론
🔎 Pymysql 모듈과 Pandas를 사용하는 것이 훨씬 간편함
🔎 찾아보니 쿼리결과를 가공하거나 추가 쿼리가 필요한 경우에 Hook을 사용하면 유용하다고 함
참조
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html
https://airflow.apache.org/docs/apache-airflow-providers-mysql/1.0.0/_api/airflow/providers/mysql/hooks/mysql/index.html
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Airflow 용어 정리 (2) (0) | 2023.03.24 |
---|---|
[Airflow] Airflow 용어 정리 (1) (0) | 2023.03.24 |
[Airflow] Airflow Dag Task 옵션 값 (0) | 2023.03.21 |
[Airflow] Airflow DB & DB 변경 (0) | 2023.03.14 |
[Airflow] Airflow 실행 (0) | 2023.03.14 |