💾 Data/Airflow

[Airlfow] Sensor

heywantodo 2023. 7. 9. 17:13
728x90
반응형

[Airlfow] Sensor

Airflow의 Sensor는 워크플로우 내에서 특정 이벤트 또는 조건이 발생할 때까지 작업을 지연시키는 데 사용

즉, Sensor는 다른 작업이 실행되기 전 특정 상태가 충족될 때까지 기다리는 역할을 함

 

예를 들어, 특정 파일이 생성되거나 특정 시간에 도달하거나 외부 시스템의 상태가 변경될 때까지

작업을 일시 중지 시키는 데 사용할 수 있음

 

Airflow에는 다양한 종류의 Sensor가 있으며 일반적으로 사용되는 몇 가지 예는 다음과 같음

 

1. FileSensor

파일이 특정 디렉토리에 존재하는지 여부를 확인한다.

from airflow.sensors.filesystem import FileSensor
from airflow import DAG

dag = DAG("file_sensor_example", schedule_interval=None)

check_file_sensor = FileSensor(
    task_id='check_file_sensor',
    filepath='/path/to/file.txt',
    dag=dag
)

2. TimeSensor

특정 시간에 도달했는지 여부를 확인한다.

 아래는 2023년 7월 10일 8시 0분까지 대기 후 다음 task를 실행한다.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, timedelta

dag = DAG('time_sensor_example', schedule_interval=None)

wait_for_time_sensor = TimeSensor(
    task_id='wait_for_time_sensor',
    target_time=datetime(2023, 7, 6, 8, 0),  
    dag=dag
)

3. HttpSensor

웹 서비스나 API의 응답 상태를 확인한다.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime, timedelta

dag = DAG('http_sensor_example', schedule_interval=None)

check_http_sensor = HttpSensor(
    task_id='check_http_sensor',
    http_conn_id='my_http_connection',  # 미리 설정한 Http 연결 정보
    endpoint='/health',  # 확인할 엔드포인트 경로
    request_params={},  # 필요한 경우 추가적인 요청 매개변수
    response_check=lambda response: True,  # 응답을 확인하기 위한 사용자 정의 함수
    poke_interval=60,  # 60초마다 응답 상태 확인
    timeout=10,  # 요청 시 제한 시간 설정
    dag=dag
)

4. ExternalTaskSensor

다른 Airflow 작업의 완료 여부를 확인한다.

from airflow import DAG
from airflow.contrib.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta

dag = DAG('external_task_sensor_example', schedule_interval=None)

wait_for_external_task_sensor = ExternalTaskSensor(
    task_id='wait_for_external_task_sensor',
    external_dag_id='other_dag_id',  # 다른 DAG의 식별자
    external_task_id='other_task_id',  # 다른 작업의 식별자
    execution_delta=timedelta(hours=1),  # 작업 완료 여부 확인할 시간 범위
    allowed_states=['success'],  # 허용되는 작업 상태
    check_existence=True,  # 외부 작업이 존재하는지 확인
    dag=dag
)

 

Sensor는 워크 플로우의 신뢰성과 유연성을 높여주는 데 도움이 되며, 작업 간의 의존성을 관리하는데 유용함

728x90
반응형