If you’ve just begun to use Airflow, the most important thing to understand is that a DAG script simply defines your DAGs. If you’ve used Jenkinsfiles before, the idea that your file describes some flow of actions will be familiar.

Airflow Python script is really just a configuration file specifying the DAG’s structure as code… The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.

(source)

If you stop generating a dag_id, it doesn’t exist anymore, as if you’d taken it out of a static config file.

(source)

You can use for loops to create a number of DAGs in a single script. You could even read in a file of objects and use each object to generate a unique DAG ID and create its tasks by templating values from the file.

Creating and accessing a DagRun conf

Example 1

…if a task returns a value (either from its Operator’s execute() method, or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.

(source)

def implicit_push(ds, **kwargs):
    """Does a thing.

    Args:
        ds
        \**kwargs: Contains key "dag_run" which is the DagRun object.
    """
    return {"data": "my value"} # If the conf already contained data, this would overwrite it

dag = DAG(dag_id="DAG_1",
    default_args=args,
    schedule_interval=None)

# Implicitly push data into the DagRun.conf
set_data_task = PythonOperator(
    task_id="set_data_task",
    provide_context=True,
    python_callable=implicit_push,
    dag=dag)

# Read the data from DagRun
print_things_task = BashOperator(
    task_id="print_things_task",
    bash_command='echo ',
    dag=dag)

set_data_task >> print_things_task

Example 2

def implicit_push(ds, **kwargs):
    """Does a thing.

    Args:
        ds
        \**kwargs: Contains key "dag_run" which is the DagRun object.
    """
    kwargs["dag_run"].conf = {"data": "value"}

Example 3

# DAG_1
def set_payload(context, dag_run_obj):
    dag_run_obj.payload = {"data": "import value"}
    return dag_run_obj

dag = DAG(dag_id="DAG_1",
    default_args=args,
    schedule_interval=None)

set_payload_task = TriggerDagRunOperator(
  task_id="trigger_task",
  trigger_dag_id="DAG_2",
  python_callable=set_payload,
  dag=dag)
# DAG_2 with PythonOperator

def print_things(ds, **kwargs):
    print(kwargs["dag_run"].conf)

dag = DAG(dag_id="DAG_2",
      default_args=args,
      schedule_interval=None)

print_things_task=PythonOperator(
    task_id="cool",
    provide_context=True,
    python_callable=print_things,
    dag=dag)
# DAG_2 with BashOperator

dag = DAG(dag_id="DAG_2",
      default_args=args,
      schedule_interval=None)

echo_things_task = BashOperator(
    task_id="echo_things_task",
    bash_command='echo ',
    dag=dag)