728x90
시나리오
1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 |
✔ 아래 링크 이어서 진행
https://heywantodo.tistory.com/20
2. CSV file Upload to S3
선행작업
- S3 Bucket 생성
- AWS AccessKey 생성
Ariflow Web Server에서 Connect 등록
- Amazon Web Service 타입
- Access Key와 Secret key를 JSON 형식으로 등록
{"aws_access_key":"본인의 access key", "aws_secreat_access_key":"본인의 secret access key"}
DAG 생성
모듈 설치
pip install apache-airflow[amazon]
DAG 작성
- 모듈 import (S3Hook)
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
- S3Hook을 사용해 S3 Bucket과 연동
def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('aws_default')
hook.load_file(filename=filename, key=key, bucket_name=bucket_name)
- Task 생성
with DAG(
dag_id="mysql_to_csv_upload_to_s3",
default_args=args,
schedule_interval='@once'
) as dag:
#
task1 = PythonOperator(
task_id="pd_mysql_to_csv",
python_callable=pd_mysql_to_csv
)
task2 = PythonOperator(
task_id = 'upload_to_s3',
python_callable = upload_to_s3,
op_kwargs = {
'filename' : '/home/ubuntu/airflow/csv/mysql_output_pandas.csv',
'key' : 'employee.csv',
'bucket_name' : 'airflow-test-s3-00'
}
)
task1 >> task2
DAG 확인
성공 😆
S3 업로드 확인
결론
🔎 MySQL 연동과 달리 Hook을 사용하는 방식이 간편함 (access key 등록만 하면 되니..)
🔎 코드에 Access key를 직접 넣지 않아도 되는게 좋음
🔎 AWS에 대한 사전 지식이 필요함 (s3 권한, iam, s3 등)
참조
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] 조건에 따라 분기 (Branch) (0) | 2023.04.19 |
---|---|
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |
[Airflow] Airflow 용어 정리 (2) (0) | 2023.03.24 |
[Airflow] Airflow 용어 정리 (1) (0) | 2023.03.24 |
[Airflow] Airflow Pipeline 생성 (1) (0) | 2023.03.23 |