시나리오
1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 |
✔ 아래 링크 이어서 진행
https://heywantodo.tistory.com/20
Airflow Pipeline 생성 (1)
시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 1. MySQL to CSV airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용 Airflow Connection 이용 Pymys
heywantodo.tistory.com
2. CSV file Upload to S3
선행작업
- S3 Bucket 생성
Amazon S3 버킷 생성, 구성 및 작업 - Amazon Simple Storage Service
이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.
docs.aws.amazon.com
- AWS AccessKey 생성
IAM 사용자의 액세스 키 관리 - AWS Identity and Access Management
사용자 편의를 위해 AWS 로그인 페이지는 브라우저 쿠키를 사용하여 IAM 사용자 이름 및 계정 정보를 기억합니다. 이전에 다른 사용자로 로그인한 경우 페이지 하단 근처의 다른 계정에 로그인(Sig
docs.aws.amazon.com
Ariflow Web Server에서 Connect 등록
- Amazon Web Service 타입
- Access Key와 Secret key를 JSON 형식으로 등록
{"aws_access_key":"본인의 access key", "aws_secreat_access_key":"본인의 secret access key"}
DAG 생성
모듈 설치
pip install apache-airflow[amazon]
DAG 작성
- 모듈 import (S3Hook)
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
- S3Hook을 사용해 S3 Bucket과 연동
def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
hook = S3Hook('aws_default')
hook.load_file(filename=filename, key=key, bucket_name=bucket_name)
- Task 생성
with DAG(
dag_id="mysql_to_csv_upload_to_s3",
default_args=args,
schedule_interval='@once'
) as dag:
#
task1 = PythonOperator(
task_id="pd_mysql_to_csv",
python_callable=pd_mysql_to_csv
)
task2 = PythonOperator(
task_id = 'upload_to_s3',
python_callable = upload_to_s3,
op_kwargs = {
'filename' : '/home/ubuntu/airflow/csv/mysql_output_pandas.csv',
'key' : 'employee.csv',
'bucket_name' : 'airflow-test-s3-00'
}
)
task1 >> task2
DAG 확인
성공 😆
S3 업로드 확인
결론
🔎 MySQL 연동과 달리 Hook을 사용하는 방식이 간편함 (access key 등록만 하면 되니..)
🔎 코드에 Access key를 직접 넣지 않아도 되는게 좋음
🔎 AWS에 대한 사전 지식이 필요함 (s3 권한, iam, s3 등)
참조
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] 조건에 따라 분기 (Branch) (0) | 2023.04.19 |
---|---|
[Airflow] Airflow Pipeline 생성 (3) (0) | 2023.04.06 |
[Airflow] Airflow 용어 정리 (2) (0) | 2023.03.24 |
[Airflow] Airflow 용어 정리 (1) (0) | 2023.03.24 |
[Airflow] Airflow Pipeline 생성 (1) (0) | 2023.03.23 |