[Airflow] 병렬 처리 Task
Airflow의 주요 기능 중 하나는 작업(Task)들을 병렬로 실행할 수 있는 기능임
병렬 처리는 동시에 여러 작업을 실행하여, 작업의 처리 시간을 단축시키는 방법
Airflow에서는 작업을 병렬로 실행하기 위해, Parallelism과 동시성(Concurrency) 개념을 사용함
parallelism은 Airflow에서 동시에 실행 가능한 작업의 최대 개수를 제어하는 설정 값
동시성은 Airflow 스케줄러에서 제어되며, 작업을 실행할 수 있는 동시 작업 수를 제어하는데 사용됨
동시성은 Airflow 구성 파일에서 설정할 수 있으며, 작업들 사이의 의존성 및 리소스 제약에 따라 조정 가능
Example
아래는 Airflow를 사용하여 병렬로 S3 버킷 내의 여러 폴더를 조사하는 간단한 예제다.
0. 병렬처리 설정
Airflow에서 병렬 처리를 설정하기 위해선, airflow의 설정파일인 airflow.cfg를 열어
parallelism 및 dag_concurrency 값을 확인
vi /home/airflow/airflow.cfg
1. 사용 할 모듈 Import
s3 객체를 조사하기 위해서 python용 aws sdk인 boto3를 사용
BashOperator를 사용해서 aws cli로 하는 방법도 있음
import boto3
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
2. S3 폴더 별 객체 조사 함수
AWS S3 버킷 내의 특정 폴더에 있는 객체들을 조회하는 함수를 정의함
boto3를 사용해서 객체를 조회하려면 list_objects_v2 메서드를 호출하여 사용함
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
메서드를 호출하면 AWS 서버로 부터 응답이 반환됨
매개변수 bucket_name은 S3 버킷 이름, prefix는 조회 할 폴더의 경로를 나타냄
응답 객체는 JSON 형식으로 구성되어 있으며, 조회한 객체의 정보를 포함함
주요한 정보는 'Content' 라는 키에 담겨 있으며, 객체의 키를 추출하려면 다음과 같이 할 수 있음
objects = [obj['Key'] for obj in response['Contents']]
그럼 s3.list_objects_v2 메서드를 사용해 버킷 내 폴더별 객체를 출력하는 함수를 생성해보자
""" S3 Bucket Folder List """
def list_s3_objects(bucket_name, prefix):
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
objects = [obj['Key'] for obj in response['Contents']]
print(f"Objects in folder '{prefix}': {objects}")
bucket_name = 'your-bucket-name'
s3_folders = ['test1', 'test2', 'test3']
3. DAG 정의
dag = DAG(
's3_folder_listing',
description = 'Parallel S3 Folder Listing',
schedule_interval = None,
start_date = datetime.now(),
catchup = False
)
4. Task 정의
위에서 정의한 s3_folders의 수만큼 task를 생성해줘야 하기 때문에, for문을 통해 task를 생성해줌
매개변수 또한 전달해준다.
start = DummyOperator(
task_id = 'start',
dag = dag
)
s3_folder_tasks = []
for folder in s3_folders:
task = PythonOperator(
task_id = f'list_s3_folder_{folder}',
python_callable=list_s3_objects,
op_kwargs={'bucket_name': bucket_name, 'prefix': folder},
retries=3,
dag = dag
)
s3_folder_tasks.append(task)
start >> s3_folder_tasks
⚠ Paralleism 설정과 Concurrency 작업은 Airflow의 성능과 자원 사용에 영향을 미침
값이 낮으면 동시에 실행 가능한 작업 수가 제한되어 작업 실행 시간이 늘어날 수 있으며,
반면에 값이 높으면 시스템 자원 부하가 증가하여 성능 저하나 예기치 않은 동작이 발생할 수 있음
시스템 자원, 작업 로드, 작업 실행 시간 등을 고려하여 최적의 값을 설정하자
import boto3
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
""" S3 Bucket Folder List """
def list_s3_objects(bucket_name, prefix):
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
objects = [obj['Key'] for obj in response['Contents']]
print(f"Objects in folder '{prefix}': {objects}")
dag = DAG(
's3_folder_listing',
description = 'Parallel S3 Folder Listing',
schedule_interval = None,
start_date = datetime.now(),
catchup = False
)
bucket_name = 'airflow-test-s3-00'
s3_folders = ['test1', 'test2', 'test3']
start = DummyOperator(
task_id = 'start',
dag = dag
)
s3_folder_tasks = []
for folder in s3_folders:
task = PythonOperator(
task_id = f'list_s3_folder_{folder}',
python_callable=list_s3_objects,
op_kwargs={'bucket_name': bucket_name, 'prefix': folder},
retries=3,
dag = dag,
)
s3_folder_tasks.append(task)
start >> s3_folder_tasks
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Celery executor, Flower (0) | 2023.08.10 |
---|---|
[Airlfow] Sensor (0) | 2023.07.09 |
[Airflow] Airflow CLI 명령어 (0) | 2023.05.28 |
[Airflow] Catchup, Backfil (0) | 2023.05.24 |
[Airflow] airflow decorators (0) | 2023.05.18 |