Overview
We have an ETL requirement that needs the below tasks to be written in Airflow and executed at regular intervals.
Call the parser to load the data into the system.
A batch file comes up every day in a GCS path /pathB.
Poll for the file in a specific GCS path.
Call the parser to load the data into the system
Trigger the backup Job of the database.
Trigger an email notification.
Trigger a Slack alert.
We would start writing 9 tasks providing their dependencies as below.
Problem Statement
When we have a greater number of tasks, we land on the below-described problems,
We are unable to get a bigger picture of all the available tasks.
Code becomes cumbersome for both maintenance and feature enhancements.
We are unable to modularize the codes.
We ultimately search for a more effective method of nesting the Tasks within the DAG. We are looking for a way to group our tasks and manage them in a way that is more productive and efficient.
Legacy Solution
SubDAGs
Let`s take the above example and write SubDAGS, then we would be writing up 3 SubDAGs with one per SubDAG per processing of the file and one for backup and notification. The SubDAGs would look as below.
To view the tasks under a SubDAG, we click on it and we will land on another page called SUBDAG view where we can see the list of SubDAGs.
from airflow.models import DAG
from airflow.decorators import task,dag
from airflow.operators.python import get_current_context
@task.python
def poll_file_a():
print()
@task.python
def parse_file_a():
print()
def blog_subdag(parent_dag_id,subdag_dag_id,default_args):
with DAG(f"{parent_dag_id}.{subdag_dag_id}",default_args=default_args) as dag:
poll_file_a() >> parse_file_a()
return dag
Drawbacks with SubDAGs
Below are possible issues arising out of SubDAGs
When a SubDAG is triggered, the SubDAG and child tasks occupy worker slots until the entire SubDAG is complete. This can delay other task processing and, depending on your number of worker slots, can lead to deadlocking on the other hand TaskGroups are just a logical representation of the Tasks and eliminates this issue
SubDAGs have their parameters, schedule, and enabled settings. When these are not consistent with their parent DAG, unexpected behavior can occur
SubDAG task status cannot be seen at the parent level and must be drilled down further to be seen as SubDAG. It exists as a separate DAG.
Coding time was long, and code maintainabilitytook time.
The introduction of TaskGroups was primarily motivated by the need to address these issues.
TaskGroups came for Rescue
With Airflow 2.0,subDAGs were deprecatedand TaskGroups were introduced. There is also a possibility to remove the subDAG feature from Airflow 3.0 onwards.
TaskGroups help us visually group similar or dependent tasks together in the DAG view. We can also create multiple TaskGroups and can have them nested. With Airflow 2.5 and above we can make decorators to create a task group @task_group
Let`s see some of the parameters to configure a TaskGroup
@task_group | This decorator as of >=2.5 version is used to map the Taskgroup. |
tooltip | By default, Docstring is used to show the tooltip in UI. If explicitly provided, the tooltip value is used. |
default_args | This argument overrides the default arg at the DAG level. |
prefix_group_id | By default, child tasks/TaskGroups have their IDs prefixed with the group_id of their parent TaskGroup. This makes the group_id and task_id unique throughout the DAG. By setting this Boolean flag we can instruct if we will provide the group id or if it must be auto generated. |
TaskGroup
Now, let`s rewrite the same set of code using TaskGroup.
On click of each TaskGroup, we would get an expanded panel.
from airflow.decorators import task,dag,task_group
from airflow.utils.task_group import TaskGroup
@task.python
def poll_file_a( ):
print()
@task.python
def process_file_a( ):
print()
print()
@task.python
def poll_file_b( ):
print()
@task.python
def process_file_b( ):
print()
print()
@task.python
def backup( ):
print(partner_name)
print(partner_path)
@task.python
def notify_email( ):
print()
@task.python
def notify_slack( ):
print()
def process_task_5():
with TaskGroup(group_id='process_task_5') as process_task_1:
with TaskGroup(group_id='poll_task_group_1') as process_task_2:
poll_file_a() >> process_file_a()
with TaskGroup(group_id='poll_task_group_2') as process_task_3:
poll_file_b() >> process_file_b()
with TaskGroup(group_id='notify_task_group') as process_task_4:
backup() >> notify_email() >> notify_slack()
process_task_2 >> process_task_4
process_task_3 >> process_task_4
return process_task_5
Creating TaskGroup is much simpler as can be seen above. TaskGroups can be modularized and placed in separate files and calling it wherever needed.
Conclusion
It is evident that TaskGroups are efficient at developing, overseeing, and upgrading tasks. They enable modularization and cleaner code maintenance. They also assist us in keeping n levels of subtasks/TaskGroups, managing our tasks visually, and effectively monitoring them. SubDAG-using projects would greatly benefit from switching to TaskGroups.
Talk to us for more insights
What more? Your business success story is right next here. We're just a ping away. Let's get connected.