728x90
시나리오
1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 |
✔ 아래 링크 이어서 진행
https://heywantodo.tistory.com/20
https://heywantodo.tistory.com/34
3. 실패 시 Slack 알람 연동
선행작업
- 슬랙 workspace 생성
슬랙 Token 생성
✔ 슬랙 알람과 연동하려면 Token을 부여 받아야 함
https://api.slack.com/apps?new_app=1
- 위 링크에서 app을 먼저 생성
- 옆 카테고리에서 OAuth & Permissions 클릭
- Scope에서 권한 2개 추가
- chat:write
- im:write
- Install to workspace 클릭 후 Token 발급
- 알람 받을 채널에 app 추가
Airflow에서 Slack 알람받도록 설정
https://heywantodo.tistory.com/34
앞 포스팅에서 생성했던 DAG를 수정해서 진행
- Airflow slack Operator 설치
pip install apache-airflow[slack]
pip install apache-airflow-providers-slack
- DAG 적용을 위한 Slack Alert 파일 생성
alert.py
from airflow.operators.slack_operator import SlackAPIPostOperator
from dateutil.relativedelta import relativedelta
def on_failure(context):
channel = 'your_channel_name'
token = 'your_slack_token'
task_instance = context.get('task_instance')
task_id = task_instance.task_id
dag_id = task_instance.dag_id
text = f'''
*[:exclamation: AIRFLOW ERROR REPORT]*
■ DAG: _{dag_id}_
■ Task: _{task_id}_
'''
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=channel,
token=token,
text=text)
return alert.execute(context=context)
❗ 코드 참조
https://data-engineer-tech.tistory.com/29
- DAG에 적용
1️⃣ 함수 import
2️⃣ 위에서 만든 함수를 default_args의 on_failure_callback에 넣고 실패하면 함수를 실행
from alert import on_failure
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'on_failure_callback': on_failure
}
더보기
전체코드
from textwrap import dedent
import pandas as pd
import pymysql
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from alert import on_failure
args = {
"owner": "admin",
"start_date": datetime(2023, 3, 23),
"depends_on_pasts" : False,
"retires": 1,
"retry_delay": timedelta(minutes=5),
'on_failure_callback': on_failure
}
def pd_mysql_to_csv():
conn = pymysql.connect(host='', user='', password='', db='', charset='utf8')
query = 'select * from employees order by hire_date desc limit 10'
df = pd.read_sql_query(query, conn)
df.to_csv('/home/ubuntu/airflow/csv/mysql_output_pandas.csv', index=False)
def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('aws_default')
hook.load_file(filename=filename, key=key, bucket_name=bucket_name)
dag = DAG(
dag_id="upload_to_s3_slack_test",
default_args=args,
schedule_interval='@once'
)
task1 = PythonOperator(
task_id="pd_mysql_to_csv",
python_callable=pd_mysql_to_csv,
dag = dag
)
task2 = PythonOperator(
task_id = 'upload_to_s3',
python_callable = upload_to_s3,
op_kwargs = {
'filename' : '/home/ubuntu/airflow/csv/mysql_output_pandas.csv',
'key' : 'employee.csv',
'bucket_name' : 'airflow-test-s3-00'
},
dag = dag
)
task1 >> task2
결과 확인
- S3에 이미 파일이 있어서 upload_to_s3 task에서 Error 발생
👍 실패 알람 작동 확인 완료
결론
🔎 항상 모니터링 하고 있을 수 없으니 실패시 Slack 알람을 연동해놓는게 좋은 방법일 듯
🔎 어떤 Task에서 에러가 발생했는지 확인 할 수 있어서 좋음
참조
https://data-engineer-tech.tistory.com/29
https://jaeyung1001.tistory.com/241
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Xcom (0) | 2023.04.20 |
---|---|
[Airflow] 조건에 따라 분기 (Branch) (0) | 2023.04.19 |
[Airflow] Airflow Pipeline 생성 (2) (0) | 2023.03.28 |
[Airflow] Airflow 용어 정리 (2) (0) | 2023.03.24 |
[Airflow] Airflow 용어 정리 (1) (0) | 2023.03.24 |