💾 Data/Airflow

[Airflow] Airflow Pipeline 생성 (2)

heywantodo 2023. 3. 28. 17:07
728x90
반응형

시나리오

1. mysql to csv

2. csv 파일 s3에 업로드

3. 실패 시 슬랙 알람 연동 

 

✔ 아래 링크 이어서 진행

https://heywantodo.tistory.com/20

 

Airflow Pipeline 생성 (1)

시나리오 1. mysql to csv 2. csv 파일 s3에 업로드 3. 실패 시 슬랙 알람 연동 1. MySQL to CSV airflow에서 mysql to csv를 실행하는 방법에는 여러가지가 있겠지만 여기선 두가지를 사용 Airflow Connection 이용 Pymys

heywantodo.tistory.com

 

2. CSV file Upload to S3

 

선행작업

  • S3 Bucket 생성
 

Amazon S3 버킷 생성, 구성 및 작업 - Amazon Simple Storage Service

이 페이지에 작업이 필요하다는 점을 알려 주셔서 감사합니다. 실망시켜 드려 죄송합니다. 잠깐 시간을 내어 설명서를 향상시킬 수 있는 방법에 대해 말씀해 주십시오.

docs.aws.amazon.com

  • AWS AccessKey 생성
 

IAM 사용자의 액세스 키 관리 - AWS Identity and Access Management

사용자 편의를 위해 AWS 로그인 페이지는 브라우저 쿠키를 사용하여 IAM 사용자 이름 및 계정 정보를 기억합니다. 이전에 다른 사용자로 로그인한 경우 페이지 하단 근처의 다른 계정에 로그인(Sig

docs.aws.amazon.com

 

Ariflow Web Server에서 Connect 등록

  • Amazon Web Service 타입
  • Access Key와 Secret key를 JSON 형식으로 등록
{"aws_access_key":"본인의 access key", "aws_secreat_access_key":"본인의 secret access key"}

 

DAG 생성

 

모듈 설치

pip install apache-airflow[amazon]

 

DAG 작성

  • 모듈 import (S3Hook)
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
  • S3Hook을 사용해 S3 Bucket과 연동
def upload_to_s3(filename: str, key: str, bucket_name: str) -> None:
    hook = S3Hook('aws_default')
    hook.load_file(filename=filename, key=key, bucket_name=bucket_name)
  • Task 생성
with DAG(
    dag_id="mysql_to_csv_upload_to_s3",
    default_args=args,
    schedule_interval='@once'
) as dag:
    #
    task1 = PythonOperator(
        task_id="pd_mysql_to_csv",
        python_callable=pd_mysql_to_csv
    )


    task2 = PythonOperator(
        task_id = 'upload_to_s3',
        python_callable = upload_to_s3,
        op_kwargs = {
            'filename' : '/home/ubuntu/airflow/csv/mysql_output_pandas.csv',
            'key' : 'employee.csv',
            'bucket_name' : 'airflow-test-s3-00'
        }
    )

    task1 >> task2

 

DAG 확인

성공 😆

 

S3 업로드 확인

 

결론

🔎 MySQL 연동과 달리 Hook을 사용하는 방식이 간편함 (access key 등록만 하면 되니..)

🔎 코드에 Access key를 직접 넣지 않아도 되는게 좋음

🔎 AWS에 대한 사전 지식이 필요함 (s3 권한, iam, s3 등)

 

 

참조

https://velog.io/@jskim/Airflow-Pipeline-%EB%A7%8C%EB%93%A4%EA%B8%B0-AWS-S3%EC%97%90-%ED%8C%8C%EC%9D%BC-%EC%97%85%EB%A1%9C%EB%93%9C%ED%95%98%EA%B8%B0

728x90
반응형