Purumir's Blog

Machine Learning, SW architect, Management, favorites

[airflow] execution_date에 따른 특정 일자 task실행 처리

execution_date의 요일에 따른 Task 분기 처리

월요일~일요일까지의 요일중에서 특정 요일에만 Task를 실행하고, 다른 요일에는 Task실행을 하지 않거나 DummyTask 실행을 하는 상황이 있습니다. 주 1회 실행하는 배치등의 경우 이러한 케이스에 해당합니다.

airflow에서 주입하는 execution_date의 요일(weekday)를 계산할 수 있다면 특정 요일(여기서는 일요일)에만 실행하도록 처리할수 있을 것입니다. 이러한 분기(branch)처리는 BranchPythonOperator를 통해서 처리 할수 있습니다.

코드를 살펴보면 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def weekly_branch_shell(dag : DAG, task_id : str):
"""
주 1회(일요일) 실행되는 배치에 대한 Branch 처리
execution_date를 기준으로 일요일만 Task 실행(배치 실행), 나머지 요일은 DummyOperator 실행
"""
# 0 : Monday ~ 6 : Sunday로 인덱싱
tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]

def get_weekday(**kwargs):
execution_date = kwargs['ti'].execution_date
# 요일에 해당하는 숫자 (0 : Monday ~ 6: Sunday)를 XCom에 담습니다.
kwargs['ti'].xcom_push(key='execution_date_weekday', value=execution_date.weekday())

def weekday_branch(**kwargs):
# XCom에서 요일정보를 추출합니다.
execution_date_weekday = kwargs['ti'].xcom_pull(task_ids='week_day_'+task_id, key='execution_date_weekday')

# sunday
if execution_date_weekdy == 6:
return task_id
else:
# 요일과 실행하려는 Task명을 붙여서 월~토까지 DummyTask명을 만듭니다.
return "DummyTask_for_" + tabDays[execution_date_weekday] + "-" + task_id

# get_weekday를 호출하여 요일정보를 XCom에 담습니다.
get_weekday_oper = PythonOperator(
task_id = 'weekday_' + task_id,
python_callable = get_weekday,
provide_context = True,
queue = '{사용하는 airflow queue 이름}',
dag = dag
)

fork_oper = BranchPythonOperator(
task_id = 'branching_' + task_id,
python_callable = weekday_branch,
provide_context = True,
queue = '{사용하는 airflow queue 이름}',
dag = dag
)

get_weekday_oper.set_downstream(fork_oper)

# 분기후 실행하여야 하는 Task, DummyTask를 연결합니다.
for day in range(0, 7):
if day == 6:
batch_shell_oper = BashOperator(task_id = task_id,
bash_command = '{실행하려는 쉘 정보}',
queue = '{사용하는 airflow queue 이름}',
dag = dag
)
# 분기 branch에 이어 붙입니다.
fork_oper.set_downstream(batch_shell_oper)
else:
fork_oper.set_downstream(DummyOperator(task_id='DummyTask_for_' + tabDays[day] + "-" + task_id, dag=dag)
# 구성이 된 operators를 리턴
return get_weekday_oper

airflow UI Task의 구조

위 코드를 airflow UI에서 확인하면 아래와 같은 형태로 일요일만 해당 쉘을 실행하도록 Task 구조가 생성됩니다. 다른 요일에는 DummyTask가 실행됩니다.