Purumir's Blog

Machine Learning, SW architect, Management, favorites

[airflow] TriggerDagRunOperator와 ExternalTaskSensor를 통한 배치 흐름 제어

수백개의 배치를 그룹으로 구성하고 그 그룹간의 Dependency를 둘 수 없을까?

수백개가 넘는 배치를 하나의 DAG에 넣는 것은 유지보수도 어렵게 하고, 동시 작업성 측면에서도 좋지 않다고 생각되어 이를 그룹화(각 DAG를 그룹화의 단위로 정의)하였습니다. 이 DAGs간의 dependency를 줄 수 있는 방법에 대해서 고민하게 되었습니다. 처음에는 후속 DAG에서 앞의 DAG의 마지막 Task의 결과를 확인하고 데이터가 생성되어 있지 않으면 생성이 될 때까지 대기하는 방법을 사용하였습니다.(hdfs sensor를 통해서 _SUCCESS 같은 마커가 생성될때까지 대기하도록 구성)

보다 개선된 방법으로 airflow에서 ExternalTaskSensor를 제공하는 것을 확인하여 이를 병행하는 방식으로 DAG간의 dependency를 Task 센싱을 통해서 처리하도록 구성하였습니다.

개념적으로 다음과 같은 형태로 구성을 하였습니다.

위 그림에서 선행 DAG의 마지막 Task를 센싱하는 ExternalTaskSensor를 후행 DAG의 앞에 둠으로써 ExternalTaskSensor가 선행 DAG의 마지막 Task의 완료를 센싱할때까지 후행 DAG의 실행(triggering)을 하지 않도록 하는 구조입니다. ExternalTaskSensor는 선행 DAG의 마지막 Task가 완료가 되면 이를 센싱하고 후행 DAG를 TriggerDagRunOperator를 통해서 triggering하게 됩니다.

이러한 방식의 장점은 각 DAG의 스케줄링 시간을 정의하고 time interval에 의해서 실행간 제어를 할 경우 선행 DAG가 완료되지 않았음에도 후행 DAG를 실행하는 문제점에 대한 대안으로 사용 할 수 있고, _SUCCESS 같은 마커를 통한 흐름제어보다 개선된 방식이라고 할 수 있습니다.

1
2
3
4
5
6
7
8
## TriggerDagRunOperator를 통한 pre-DAG triggering
trigger_predag_task = TriggerDagRunOperator(
task_id = f'trigger_pre-DAG', # DAG-A 를 triggering함.
trigger_dag_id = 'pre-DAG',
execution_date = 'YYYYMMDD', # triggering시 execution date
queue = 'queue name',
reset_dag_run = True # 이미 존재하는 dag run이 있을 경우 이를 clear하고 실행할지 여부
)
1
2
3
4
5
6
7
8
9
10
## ExternalTaskSensor 예시
predag_sensor_task = ExternalTaskSensor(
task_id = f'sensor_pre-DAG',
external_dag_id = 'pre-DAG',
external_task_id = 'pre-DAG-final-Task',
execution_date = 'YYYYMMDD', # 센싱을 원하는 execution_date
mode = 'reschedule',
timeout = 3600, # 60 분
queue = 'queue_name'
)
1
2
3
4
5
6
7
8
## TriggerDagRunOperator를 통한 DAG-A triggering
trigger_daga_task = TriggerDagRunOperator(
task_id = f'trigger_DAG-A', # DAG-A 를 triggering함.
trigger_dag_id = 'DAG-A',
execution_date = 'YYYYMMDD', # triggering시 execution date
queue = 'queue name',
reset_dag_run = True # 이미 존재하는 dag run이 있을 경우 이를 clear하고 실행할지 여부
)
1
2
## 'pre-DAG'의 'pre-DAG-final-Task'가 성공하면, DAG-A를 triggering한다.
trigger_predag_task >> predag_sensor_task >> trigger_daga_task