Airflow Xcom

Looking for a way to choose one task or another? You want to execute a task based on a condition? You multiple tasks but only one should be executed if a criterion is true? You’ve come to the right place! The BranchPythonOperator does exactly what you are looking for. It’s common to have DAGs with different execution flows and you want to follow only one according to a value or a condition. The BranchPythonOperator allows you to choose one branch among branches of your DAG. To better understanding it, it worths to take a look at the PythonOperator right here. Oh and before moving forward, if you are looking for a great course to master Airflow, check it this one. Ready? Go!

XCom value classmethod getmany ( cls, executiondate, key=None, taskids=None, dagids=None, includepriordates=False, limit=100, session=None ) source ¶ Retrieve an XCom value, optionally meeting certain criteria TODO: “pickling” has been deprecated and JSON is preferred. “pickling” will be removed in Airflow 2.0. 代码: from airflow.models import DAG from airflow.operators.pythonoperator import PythonOperator DAG = DAG( dagid='exampledag', start. 广告 关闭 【玩转腾讯云】征文活动,2021再启程!.

Use Case

As usual, best way to understand a feature/concept is to have a use case. Let’s say, you have the following data pipeline:

Quick explanation about the DAG. The first three tasks are training machine learning models. Once they all complete, “Choosing Best ML” task is getting triggered. Then, either the task “is accurate” or “is inaccurate” should get executed according to the accuracy of the best ML model. Let’s say, if the accuracy is above 5.0 we trigger “Is accurate”, otherwise, “Is inaccurate” is run. How can you do this?Is there a mechanism to achieve this?

Yes there is! And you know it, otherwise you wouldn’t be there 😅

Give a warm welcome to the BranchPythonOperator!

The BranchPythonOperator

Ok, we are to happy to meet with the BranchPythonOperator but what does it do?

The BranchPythonOperator allows to follow a specific path according to a condition. That condition is evaluated in a python callable function. Like the PythonOperator, the BranchPythonOperator executes a Python function returning the task id of the next task to execute. Look at the example below. Can you guess which task is executed next? “Is accurate” or “Is inaccurate”?

“Is accurate” ! Since accuracy is greater than 5, the BranchPythonOperator returns the task id “Is accurate. Therefore, “Is accurate” gets triggered. Great! But wait a minute… what happens for the other task? Does it stay with no status at all?

No!

Here is exactly what you obtain by triggering the DAG:

In our case, “Choosing Best ML” and “Is accurate” have succeeded whereas “Is inaccurate” has been skipped. Consequently, downstream tasks that are not returned by the BranchPythonOperator get skipped! Also, tasks following skipped tasks are skipped as well. In the example, if you put a task after “Is inaccurate”, that task will be skipped.

In practice

All right, you know the BranchPythonOperator and you know how it works. What about a bit of code to implement it?

The code above gives you the same data pipeline as shown before. To run the code, install Docker on your computer. Then, go to my beautiful repository to get the docker compose file that will help you running Airflow on your computer. Airflow 2.0, not 1.10.14 😎 Clone the repo, go into it. Create a file branching.py in the folder airflow-data/dags. Copy paste the code in that file and execute the command docker-compose up -d in the folder docker-airflow.

Airflow Composting Toilet

Pay attention to the arguments of the BranchPythonOperator. It expects a task_id and a python_callable function. If you take a look at the python function _choose_best_result(). You can see the condition returning the task id, either “accurate” or “inaccurate”. Here, the function returns “accurate”, therefore, the next task to tigger is “accurate”.

Behind the scene of the BranchPythonOperator

The BranchPythonOperator inherits from the PythonOperator. As a result, parameters of PythonOperator are accessible in the BranchPythonOperator. You can access the context of the task instance to pull XComs. You can give additional arguments through op_kwargs and op_args. If you don’t know what I’m talking about, check this article.

For instance, let’s say you want to fetch the accuracy by pulling a XCom, you can do that:

ti is the task instance object required for accessing your XComs. Notice that in Airflow 2.0, you don’t have to use the parameter provide_context anymore. The BranchPythonOperator stays the same:

Last but not least, each time you trigger the BranchPythonOperator, 2 XComs are created:

The first XCom indicates which task to follow next. The second XCom indicates the value returned by the python callable function (default behaviour, any returned value, creates a XCom). Be careful! XComs are not automatically removed. Therefore, the more BranchPythonOperators you trigger, the more XComs will be stored automatically. You can reduce that number to one, by setting the parameter do_xcom_push=False. Again, if you don’t know what this parameter is, go check out my other post.

In any cases, don’t forget to remove your XComs time to time as they are NOT automatically removed. If you have hundreds of DAGs, your DB won’t be happy.

The BranchPythonOperator and the Skipped status

When you trigger the BranchPythonOperator, one task is trigger next and the others are skipped. There is one little catch that you have to be aware of. Let’s say you have the following DAG.

Can you guess, what will be the status of the task “storing”?

The task will be skipped. YES, skipped!

Why? Because, by default “storing” expects that all of its parents succeed before getting triggered. Since one of its parent “Is inaccurate”, has been skipped, the task gets skipped as well. The status is propagated to the downstream tasks. How can we solve this?

By changing the trigger rule of the “storing” task. I won’t go into the details of trigger rules, but trigger rules allow to modify the way your tasks get triggered. The rules by which they get triggered. By default, it’s “all_success”, meaning, a task is triggered when all upstream tasks succeeded. In that specific case, we have to change that.

Notice the trigger rule in “storing”. That means, the task gets triggered if at least one parent succeeds.

How to execute multiple tasks with the BranchPythonOperator

So far, we’ve seen how to execute one task after the BranchPythonOperator. What about if we want to execute many?

Simple!

You just have to return multiple task ids!

Here is an example

As you can see from the code, in _choose_best_mode, we return two task ids “super_accurate” and “accurate” if the accuracy is greater than 7. That gives us:

The BranchPythonOperator in action!

Conclusion

That’s it about the BranchPythonOperator. I hope you really enjoyed what you’ve learned. The BranchPythonOperator is super useful. Use it to make complex data pipelines and be careful with the little catches that we saw during that article. If you want to learn more about Airflow, go check my course The Complete Hands-On Introduction to Apache Airflow right here. Or if you already know Airflow and want to go way much further, enrol in my 12 hours course here.

Have a great day! 🙂

Airflow Xcom Variable

Interested by learning more? Stay tuned and get special promotions!

Introduction

Airflow XCom is used for inter-task communications. Sounds a bit complex but it is really very simple. Its implementation inside airflow is very simple and it can be used in a very easy way and needless to say it has numerous use cases.

Inter-task communication is achieved by passing key-value pairs between tasks. Tasks can run on any airflow worker and need not run on the same worker. To pass information a task pushes a key-value pair. The key-value pair is then pulled by another task and utilized.

This blog entry requires some knowledge of airflow. If you are just starting out. I would suggest you first get familiar with airflow. You can try this link

This blog entry is divided into

  • Pushing values to XCOM
  • Viewing XCOM values in Airflow UI
  • Pulling XCOM values

Pushing values to XCOM

Before we dive headlong into XCOM, let’s see where to find them. XCOM can be accessed via the Airflow Web UI->Admin->XComs

Airflow Xcom Size Limit

Let’s look at a DAG below. Its sole purpose in life is to push a key-value pair using XCOM. It will also help us discover some of the mechanics behind the concept.

# Filename: hello_xcom.py
from airflow import DAG
from airflow.operators.bash_operatorimport BashOperator
from airflow.operators.python_operatorimport PythonOperator
fromdatetimeimportdatetime, timedelta
# Step 1 - Define a method for pushing a key value pair to XCOM
def f_send_value_via_xcom(**context):
print'Hello xcom push'
context['ti'].xcom_push(key='my_key', value='my_value')
# Step 2 - Default arguments for airflow DAG
default_args ={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,1,25),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Step 3 - Create an airflow DAG
dag = DAG('hello_xcom',
schedule_interval='0 0 * * *',
default_args=default_args
)
# Step 4 - Create a Linux command string
linux_command ='echo xcom'
# Step 5 - Create a task for running the Linux command
t1 = BashOperator(
task_id='Hello',
bash_command=linux_command,
dag=dag)
# Step 6 - Create a task for pushing key-value pair to xcom, created in step1
t2 = PythonOperator(
task_id='task_xcom_push',
python_callable=f_send_value_via_xcom,
provide_context=True,
dag=dag
)
# Step 7 - Task execution order
t1>>t2

Let’s analyse the two most important steps.

  • Step 1 – Define a python method which pushes a key-value pair to XCOM. Its second line utilizes a context. It refers to a task instance and calls an xcom_push method. It pushes my_key and my_value to XCOM.
  • Step 6 – Is a python operator and also sets the python context to True which provides us with the necessary context in step1.

Viewing XCOM values in Airflow UI

Now let’s run the task. Once done goto Admin->XComs and you should see the following below.

Let’s look at where airflow stores all the data shown above. It is stored in the airflow metastore. Imaginatively, this table is called xcom. See below.

Pulling XCOM values

Www.airflow.com

Xcom

Now let’s take a step further and add a step for pulling XCOM values. This is achieved by using an xcom_pull method. The airflow DAG below has a task which pushes a key-value pair via XCOM and then another task which pulls the key-value pair. See the code below.

# Filename: hello_xcom2.py
from airflow import DAG
from airflow.operators.bash_operatorimport BashOperator
from airflow.operators.python_operatorimport PythonOperator
fromdatetimeimportdatetime, timedelta
# Step 1 - Define a method for pushing a key value pair to XCOM
def f_send_value_via_xcom(**context):
print'Hello xcom push'
context['ti'].xcom_push(key='my_key', value='my_value')
# Step 2 - Define a method for pushing a key value pair to XCOM
def f_get_value_via_xcom(**context):
print'Hello xcom pull'
my_v = context['ti'].xcom_pull(dag_id='hello_xcom2', key='my_key')
print'The value from previous task is ',my_v
# Step 3 - Default arguments for airflow DAG
default_args ={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,1,25),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Step 4 - Create an airflow DAG
dag = DAG('hello_xcom2',
schedule_interval='0 0 * * *',
default_args=default_args
)
# Step 5 - Create a Linux command string
linux_command ='echo xcom'
# Step 6 - Create a task for running the Linux command
t1 = BashOperator(
task_id='Hello',
bash_command=linux_command,
dag=dag)
# Step 7 - Create a task for pushing key-value pair to XCOM, created in step1
t2 = PythonOperator(
task_id='task_xcom_push',
python_callable=f_send_value_via_xcom,
provide_context=True,
dag=dag
)
# Step 8 - Create a task for pushing key-value pair via XCOM
t3 = PythonOperator(
task_id='task_xcom_pull',
python_callable=f_get_value_via_xcom,
provide_context=True,
dag=dag
)
# Step 9 - Task execution order
t1>>t2>>t3

Airflow Xcom

The above DAG only adds an additional step to one before. Let’s analyse the code above

  • Step 2 – A new method f_get_value_via_xcom is defined which uses xcom_pull to get values from XCOM.
  • Step 8 – Defines a new task t3 which uses python operator. It calls the method defined in step-2

When the dag is executed – In Admin->XComs the pushed key value pair is visible

DAG execution log shows the value pulled via XCOM. See below

Make sure you have a look at the xcom_pull method in detail. It has some interesting parameters. They are very useful. Needless to say, if you feel you want to dig more. I suggest you head over the link below

It is the link to the source code of XCOM methods but also exposes some really nice methods which can come handy for some use-cases.

This brings us to end of Airflow XCOM. Hope this blog gets you started on XCOMs and exploring them. If you like the blog entry do leave a comment and tell us what we can write about. Till next time….byeee!!!