Purumir's Blog

Machine Learning, SW architect, Management, favorites

airflow hdfs sensor 설정과 사용법

airflow에는 HDFS상에서 파일의 존재유무를 체크하는(poke)하는 hdfs_sensor라는 기능이 있습니다.


hdfs sensor를 위한 필요 패키지 설치

  • pip install apache-airflow-providers-apache-hdfs
  • pip install snakebite-py3


hdfs sensor 사용을 위한 connection 정보 설정

  • airflow > Admin > Connections 에서 hdfs sensor에서 접근할 hdfs 정보를 입력
    • hdfs_connection / IP / Port 정보
    • HDFS Name Node 정보를 입력 (IP, Port 8020)


코드에서 HdfsSensor 모듈 import를 통한 sensor 기능 사용

1
2
3
4
5
6
7
8
9
10
11
from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor

hdfs_sensor = HdfsSensor(
task_id = "hdfs_path_sensor",
filepath = "모니터링 하고자 하는 파일의 full path(파일명포함)",
hdfs_conn_id = "hdfs_connection", # airflow > Admin > Connections 에 설정한 hdfs connection 명
queue ="queue", # airflow사용 queue,
poke_interval=30, # 지정한 interval이 지난후 체크
timeout=11400, # 지정한 timeout 시간동안 체크를 지속
dag=dag
)


아래 그림과 같이 Task간에 선행 작업에서 필수적인 파일을 생성했는지 여부를 체크하여, 후행 작업의 시작을 파일이 생성될때까지 순연시키는 효과를 거둘수 있습니다.