728x90
[Airflow] Variables
Variables
Airflow 변수(Variables)는 Airflow에서 사용되는 일종의 키-값(key-value) 쌍으로 DAG 실행 중에 사용할 수 있는 데이터를 저장하는데 사용됨
이 변수들은 Airflow의 UI 또는 CLI를 통해 생성하고 관리할 수 있음
DAG에서 사용되는 DB 연결 정보나 API 키, 환경 변수 등을 저장할 수 있음
이러한 변수들은 DAG 코드 내에서 하드코딩하여 사용하는 것보다 유연성과 보안성을 높일 수 있음
1. UI에서 설정
Admin -> Variables에서 설정 가능
2. CLI에서 설정
airflow variables 명령어를 사용하여 CLI에서 변수를 생성, 수정, 삭제할 수 있음
airflow variables --set <key> <value>
airflow variables --set aws_credentials '{"aws_access_key_id":"YOUR_ACCESS_KEY","aws_secret_access_key":"YOUR_SECRET_KEY"}'
airflow variables --set s3_bucket "your_s3_bucket"
Example
Operator Import
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from datetime import datetime
import boto3
함수 정의
Variable.get()을 사용하여 aws_credentials의 변수 값을 가져와서 S3 클라이언트 생성
s3.download_file()을 사용해서 S3 버킷에서 파일 업로드
def download_file():
# Airflow 변수에서 AWS 자격 증명 정보와 S3 버킷 이름을 가져옴
aws_credentials = Variable.get("aws_credentials", deserialize_json=True)
s3_bucket = Variable.get("s3_bucket")
file_path = '/home/ubuntu/airflow/file/file.txt'
file_name = 'file.txt'
# S3 클라이언트 생성
s3 = boto3.client('s3',
aws_access_key_id=aws_credentials['aws_access_key_id'],
aws_secret_access_key=aws_credentials['aws_secret_access_key'])
# S3 버킷에서 파일 업로드
s3.upload_file(file_path ,s3_bucket, file_name)
DAG 정의
default_args = {
'start_date': datetime(2023, 5, 11),
'schedule_interval': '@once'
}
dag = DAG(
dag_id = 'variable_test',
default_args = default_args
)
Task정의
download_task = PythonOperator(
task_id='upload_file',
python_callable=upload_file,
dag=dag
)
DAG 확인
- variables에서 access key 정보를 받아 업로드가 잘 된 모습 확인
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Catchup, Backfil (0) | 2023.05.24 |
---|---|
[Airflow] airflow decorators (0) | 2023.05.18 |
[Airflow] 멀티 클러스터 환경 구성 (Celery Executer) (0) | 2023.04.30 |
[Airflow] Params (0) | 2023.04.29 |
[Airflow] Xcom (0) | 2023.04.20 |