Apache airflow — Dynamic workflow creation using templates
Apache workflow is designed for complex workflows that are well defined upfront. Creating dynamic workflows in Airflow is a different kind of challenge and there are limited approaches to get it done.
Problem statement
Let’s say we want to create new DAG dynamically where the content of the DAG like what kind of tasks to perform (Operators), schedule of the workflow etc., depends on trigger event.
Different approaches
There are ways to get a dynamic workflow. Couple of options:
- using globals()
- using Variables
- generating python files using templates — this is the one we are going to discuss
Using globals()
for i in range(10):
dag_id = f'dynamic_dag_{i}'
globals()[dag_id] = DAG(dag_id)
Disadvantages :
- It is not always possible to operate on the global scope. For example, if the dynamic part of the DAG is based on external triggers like REST API conf parameters.
Variables
This approach is similar to above one, except it takes the Variable, performs logic to convert it to DAG and sets it in globals. I am not fully sure, how create, read, update & delete lifecycle of DAG would be supported well with this.
Generating python files using templated workflows
This is closer to Airflow’s way of processing the DAG. By design, Airflow is designed to keep scanning DAG_FOLDER recursively and process them efficiently.
How it works
- Let’s say we want to create a DAG based on input parameter.
- REST-API receives the configuration and triggers dag-creator workflow
## Example
curl --location --request POST 'http://localhost:8080/api/experimental/dags/dag-creator/dag_runs' \
--header 'Content-Type: application/json' \
--data-raw '{
"conf": {
"dag_id": "your-tag-id-1",
"schedule_interval": "@daily",
"namespace":"sub-directory-name-1",
"template":"hello_world.template"
}
}'
DAG that creates DAG
Operator that transforms trigger data
Operator that generates DAG based on Template
An example templated DAG
Best practices
- As a good practice, keep the template file as mimimal as possible.
- Ensure the
dag_id
remains unique and has some conventions based on the usage. This helps in searching and managing them - Have destination python file in a well defined directory namespace. This helps to distinguish manually created DAG from dynamic ones.
- Extract out common utility methods to normal python
- As dynamic DAG could explode, too much repeated content could derail the objective. Any change could be costly to make in multiple files.
- Start the template file as a python file. This helps in syntax highlighting & other editor features to be more productive. Once development done, rename the template file. If you are using visual studio code, we can start as template itself and choose the Editor language as Python (right bottom bar) — So the file has all syntax supports
Testability
We could start with a fixed workflow, test different scenarios. Once happy, introduce template variables and orchestrate from dag_creator
. By this, we first have a concrete workflow and then templatizing it.
Validation
In certain cases, we may need to validate the trigger data. If that is the case, then dag_creator
workflow could have a custom validation step before calling the RestToTemplateWrapperOperator
.
All normal DAG features are available to the dynamic DAGs
Since all we are doing is creating a python file, All features that are available for normal DAG is now available for the dynamic DAG. We can use all the operators, hooks etc., just like the Normal DAG.
Follow up read
Good reads
- https://stackoverflow.com/questions/45945783/how-do-i-use-the-conf-option-in-airflow
- This article explains in detail about each component in airflow
- To understand Scheduler — https://cwiki.apache.org/confluence/display/AIRFLOW/Scheduler+Basics
- https://github.com/jghoman/awesome-apache-airflow
- https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py#L62
- https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls