1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| def weekly_branch_shell(dag : DAG, task_id : str): """ 주 1회(일요일) 실행되는 배치에 대한 Branch 처리 execution_date를 기준으로 일요일만 Task 실행(배치 실행), 나머지 요일은 DummyOperator 실행 """ tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"] def get_weekday(**kwargs): execution_date = kwargs['ti'].execution_date kwargs['ti'].xcom_push(key='execution_date_weekday', value=execution_date.weekday()) def weekday_branch(**kwargs): execution_date_weekday = kwargs['ti'].xcom_pull(task_ids='week_day_'+task_id, key='execution_date_weekday') if execution_date_weekdy == 6: return task_id else: return "DummyTask_for_" + tabDays[execution_date_weekday] + "-" + task_id get_weekday_oper = PythonOperator( task_id = 'weekday_' + task_id, python_callable = get_weekday, provide_context = True, queue = '{사용하는 airflow queue 이름}', dag = dag ) fork_oper = BranchPythonOperator( task_id = 'branching_' + task_id, python_callable = weekday_branch, provide_context = True, queue = '{사용하는 airflow queue 이름}', dag = dag ) get_weekday_oper.set_downstream(fork_oper) for day in range(0, 7): if day == 6: batch_shell_oper = BashOperator(task_id = task_id, bash_command = '{실행하려는 쉘 정보}', queue = '{사용하는 airflow queue 이름}', dag = dag ) fork_oper.set_downstream(batch_shell_oper) else: fork_oper.set_downstream(DummyOperator(task_id='DummyTask_for_' + tabDays[day] + "-" + task_id, dag=dag) return get_weekday_oper
|