728x90
[Airlfow] Sensor
Airflow의 Sensor는 워크플로우 내에서 특정 이벤트 또는 조건이 발생할 때까지 작업을 지연시키는 데 사용
즉, Sensor는 다른 작업이 실행되기 전 특정 상태가 충족될 때까지 기다리는 역할을 함
예를 들어, 특정 파일이 생성되거나 특정 시간에 도달하거나 외부 시스템의 상태가 변경될 때까지
작업을 일시 중지 시키는 데 사용할 수 있음
Airflow에는 다양한 종류의 Sensor가 있으며 일반적으로 사용되는 몇 가지 예는 다음과 같음
1. FileSensor
파일이 특정 디렉토리에 존재하는지 여부를 확인한다.
from airflow.sensors.filesystem import FileSensor
from airflow import DAG
dag = DAG("file_sensor_example", schedule_interval=None)
check_file_sensor = FileSensor(
task_id='check_file_sensor',
filepath='/path/to/file.txt',
dag=dag
)
2. TimeSensor
특정 시간에 도달했는지 여부를 확인한다.
아래는 2023년 7월 10일 8시 0분까지 대기 후 다음 task를 실행한다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, timedelta
dag = DAG('time_sensor_example', schedule_interval=None)
wait_for_time_sensor = TimeSensor(
task_id='wait_for_time_sensor',
target_time=datetime(2023, 7, 6, 8, 0),
dag=dag
)
3. HttpSensor
웹 서비스나 API의 응답 상태를 확인한다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime, timedelta
dag = DAG('http_sensor_example', schedule_interval=None)
check_http_sensor = HttpSensor(
task_id='check_http_sensor',
http_conn_id='my_http_connection', # 미리 설정한 Http 연결 정보
endpoint='/health', # 확인할 엔드포인트 경로
request_params={}, # 필요한 경우 추가적인 요청 매개변수
response_check=lambda response: True, # 응답을 확인하기 위한 사용자 정의 함수
poke_interval=60, # 60초마다 응답 상태 확인
timeout=10, # 요청 시 제한 시간 설정
dag=dag
)
4. ExternalTaskSensor
다른 Airflow 작업의 완료 여부를 확인한다.
from airflow import DAG
from airflow.contrib.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
dag = DAG('external_task_sensor_example', schedule_interval=None)
wait_for_external_task_sensor = ExternalTaskSensor(
task_id='wait_for_external_task_sensor',
external_dag_id='other_dag_id', # 다른 DAG의 식별자
external_task_id='other_task_id', # 다른 작업의 식별자
execution_delta=timedelta(hours=1), # 작업 완료 여부 확인할 시간 범위
allowed_states=['success'], # 허용되는 작업 상태
check_existence=True, # 외부 작업이 존재하는지 확인
dag=dag
)
Sensor는 워크 플로우의 신뢰성과 유연성을 높여주는 데 도움이 되며, 작업 간의 의존성을 관리하는데 유용함
반응형
'💾 Data > Airflow' 카테고리의 다른 글
[Airflow] Airflow Configuration (0) | 2023.12.18 |
---|---|
[Airflow] Celery executor, Flower (0) | 2023.08.10 |
[Airflow] 병렬 처리 Task (0) | 2023.06.22 |
[Airflow] Airflow CLI 명령어 (0) | 2023.05.28 |
[Airflow] Catchup, Backfil (0) | 2023.05.24 |