Writing Efficient DAG using Task Groups

Task Groups which replaces the traditional SubDAG's help us manage and organize multiple subtasks in a much efficient and visually understandable way.

Writing Efficient DAG using Task Groups

Overview

We have an ETL requirement that needs the below tasks to be written in Airflow and executed at regular intervals.

  • Call the parser to load the data into the system.

  • A batch file comes up every day in a GCS path /pathB.

  • Poll for the file in a specific GCS path.

  • Call the parser to load the data into the system

  • Trigger the backup Job of the database.

  • Trigger an email notification.

  • Trigger a Slack alert.

We would start writing 9 tasks providing their dependencies as below.

Air-Flow-1

Problem Statement

When we have a greater number of tasks, we land on the below-described problems,

  • We are unable to get a bigger picture of all the available tasks.

  • Code becomes cumbersome for both maintenance and feature enhancements.

  • We are unable to modularize the codes.

We ultimately search for a more effective method of nesting the Tasks within the DAG. We are looking for a way to group our tasks and manage them in a way that is more productive and efficient.

Legacy Solution

SubDAGs

Let`s take the above example and write SubDAGS, then we would be writing up 3 SubDAGs with one per SubDAG per processing of the file and one for backup and notification. The SubDAGs would look as below.

Air-Flow-2

To view the tasks under a SubDAG, we click on it and we will land on another page called SUBDAG view where we can see the list of SubDAGs.

Air-Flow-3
Air-Flow-4
Air-Flow-5
Code
from airflow.models import DAG 
from airflow.decorators import task,dag 
from airflow.operators.python import get_current_context 
  
@task.python 
def poll_file_a(): 
  print() 
  
@task.python 
def parse_file_a(): 
  print() 
  
def blog_subdag(parent_dag_id,subdag_dag_id,default_args): 
    with DAG(f"{parent_dag_id}.{subdag_dag_id}",default_args=default_args) as dag: 
        poll_file_a() >>  parse_file_a() 
    return dag 

Drawbacks with SubDAGs

Below are possible issues arising out of SubDAGs

  • When a SubDAG is triggered, the SubDAG and child tasks occupy worker slots until the entire SubDAG is complete. This can delay other task processing and, depending on your number of worker slots, can lead to deadlocking on the other hand TaskGroups are just a logical representation of the Tasks and eliminates this issue

  • SubDAGs have their parameters, schedule, and enabled settings. When these are not consistent with their parent DAG, unexpected behavior can occur

  • SubDAG task status cannot be seen at the parent level and must be drilled down further to be seen as SubDAG. It exists as a separate DAG.

  • Coding time was long, and code maintainabilitytook time.

The introduction of TaskGroups was primarily motivated by the need to address these issues.

TaskGroups came for Rescue

With Airflow 2.0,subDAGs were deprecatedsubDAGs were deprecatedand TaskGroups were introduced. There is also a possibility to remove the subDAG feature from Airflow 3.0 onwards.

TaskGroups help us visually group similar or dependent tasks together in the DAG view. We can also create multiple TaskGroups and can have them nested. With Airflow 2.5 and above we can make decorators to create a task group @task_group

Let`s see some of the parameters to configure a TaskGroup

@task_groupThis decorator as of >=2.5 version is used to map the Taskgroup.
tooltipBy default, Docstring is used to show the tooltip in UI. If explicitly provided, the tooltip value is used.
default_argsThis argument overrides the default arg at the DAG level.
prefix_group_idBy default, child tasks/TaskGroups have their IDs prefixed with the group_id of their parent TaskGroup. This makes the group_id and task_id unique throughout the DAG. By setting this Boolean flag we can instruct if we will provide the group id or if it must be auto generated.

TaskGroup

Now, let`s rewrite the same set of code using TaskGroup.

Air-Flow-6

On click of each TaskGroup, we would get an expanded panel.

Air-Flow-7
Code
from airflow.decorators import task,dag,task_group 
from airflow.utils.task_group import TaskGroup 

@task.python 
def poll_file_a( ): 
    print() 
  
@task.python 
def process_file_a( ): 
    print() 
    print() 
  
@task.python 
def poll_file_b( ): 
    print() 
  
@task.python 
def process_file_b( ): 
    print() 
    print() 
  
  
@task.python 
def backup( ): 
    print(partner_name) 
    print(partner_path) 
  
@task.python 
def notify_email( ): 
    print() 
  
  
@task.python 
def notify_slack( ): 
    print() 
  
def process_task_5(): 
    with TaskGroup(group_id='process_task_5') as process_task_1: 
        with TaskGroup(group_id='poll_task_group_1') as process_task_2: 
        	poll_file_a() >> process_file_a() 
        with TaskGroup(group_id='poll_task_group_2') as process_task_3: 
        	poll_file_b() >> process_file_b()  
        with TaskGroup(group_id='notify_task_group') as process_task_4: 
        	backup() >> notify_email() >> notify_slack() 
         
        process_task_2 >> process_task_4 
        process_task_3 >> process_task_4 
    return process_task_5 

Creating TaskGroup is much simpler as can be seen above. TaskGroups can be modularized and placed in separate files and calling it wherever needed.

Conclusion

It is evident that TaskGroups are efficient at developing, overseeing, and upgrading tasks. They enable modularization and cleaner code maintenance. They also assist us in keeping n levels of subtasks/TaskGroups, managing our tasks visually, and effectively monitoring them. SubDAG-using projects would greatly benefit from switching to TaskGroups.


Airflow
DataEngineering
ETL
Dag
Python
Workflow

By Rajesh Subramanian
May 31, 2023

Talk to us for more insights

What more? Your business success story is right next here. We're just a ping away. Let's get connected.