Python Airflow

Working with data involves a ton of prerequisites to get up and running with the required set of data, it’s formatting and storage. The first step of a data science process is Data engineering, which plays a crucial role in streamlining every other process of a data science project.

Traditionally, data engineering processes involve three steps: Extract, Transform and Load, which is also known as the ETL process. The ETL process involves a series of actions and manipulations on the data to make it fit for analysis and modeling. Most data science processes require these ETL processes to run almost every day for the purpose of generating daily reports.

Ideally, these processes should be executed automatically in definite time and order. But, it isn’t as easy as it sounds. You might have tried using a time-based scheduler such as Cron by defining the workflows in Crontab. This works fairly well for workflows that are simple. However, when the number of workflows and their dependencies increase, things start getting complicated. It gets difficult to effectively manage as well as monitor these workflows considering they may fail and need to be recovered manually. Apache Airflow is such a tool that can be very helpful for you in that case, whether you are a Data Scientist, Data Engineer, or even a Software Engineer.

What is Apache Airflow?

“Apache Airflow is an open-source workflow management platform. It started at Airbnb in October 2014 as a solution to manage the company’s increasingly complex workflows. ”

The Airflow PythonOperator does exactly what you are looking for. It is a very simple but powerful operator, allowing you to execute a Python callable function from your DAG. You may have seen in my course “The Complete Hands-On Course to Master Apache Airflow” that I use this operator extensively in different use cases. Indeed, mastering this operator is a must-have and that’s what we gonna learn in this post by starting with the basics. Oct 13, 2020 4 min read S ince some time, Apache Airflow has become an important open source tool for building pipelines and automating tasks in the world of data engineering with languages such. Airflow allows users to launch multi-step pipelines using a simple Python object DAG (Directed Acyclic Graph). You can define dependencies, programmatically construct complex workflows, and monitor scheduled jobs in an easy to read UI. Why Airflow on Kubernetes? Since its inception, Airflow’s greatest strength has been its flexibility.


Source

Apache Airflow (or simply Airflow) is a highly versatile tool that can be used across multiple domains for managing and scheduling workflows. It allows you to perform as well as automate simple to complex processes that are written in Python and SQL. Airflow provides a method to view and create workflows in the form of Direct Acyclic Graphs (DAGs) with the help of intelligent command-line tools as well as GUIs.

Apache Airflow is a revolutionary open-source tool for people working with data and its pipelines. It is easy to use and deploy considering data scientists have basic knowledge of Python. Airflow provides the flexibility to use Python scripts to create workflows along with various ready to use operators for easy integrations with platforms such as Amazon AWS, Google Cloud Platform, Microsoft Azure, etc. Moreover, it ensures that the tasks are ordered correctly based on dependencies with the help of DAGs, and also continuously tracks the state of tasks being executed.

One of the most crucial features of Airflow is its ability to recover from failure and manage the allocation of scarce resources dynamically. This makes the Airflow a great choice for running any kind of data processing or modeling tasks in a fairly scalable and maintainable way.

In this tutorial, we will understand how to install it, create the pipeline and why data scientists should be using it, in detail.

Understanding the workflow and DAG

The set of processes that take place in regular intervals is termed as the ‘workflow’. It can consist of any task ranging from extracting data to manipulating them.

Direct Acyclic Graphs (DAG) are one of the key components of Airflow. They represent the series of tasks that needs to be run as a part of the workflow. Each task is represented as a single node in the graph along with the path it takes for execution, as shown in the figure.


Image from Source

Airflow also provides you with the ability to specify the order, relationship (if any) in between 2 or more tasks and enables you to add any dependencies regarding required data values for the execution of a task. A DAG file is a Python script and is saved with a .py extension.

Basic Components of Apache Airflow

Now that you have a basic idea about workflows and DAG, we’ve listed below some of the commonly used components of Apache Airflow that make up the architecture of the Apache Airflow pipeline.

Web Server: It is the graphical user interface built as a Flask app. It provides details regarding the status of your tasks and gives the ability to read logs from a remote file store.

Scheduler: This component is primarily responsible for scheduling tasks, i.e., the execution of DAGs. It retrieves and updates the status of the task in the database.

Executer: It is the mechanism that initiates the execution of different tasks one by one.
Metadata Database: It is the centralized database where Airflow stores the status of all the tasks. All read/write operations of a workflow are done from this database..

Now that you understand the basic architecture of the Airflow, let us begin by installing Python and Apache Airflow into our system.

Installing Python and Apache Airflow

Airflow is primarily Python-based but it can be executed for other languages as well. For this tutorial, we will be using Python.

Installing Python is pretty simple and quick. You can download an executable installation file from Python’s official website and run it on your system. Make sure that you download the latest stable build of the Python interpreter for your operating system, i.e. Windows, Linux, Mac or any other: https://www.python.org/downloads.

Once you’ve installed Python successfully, you can proceed towards installing Airflow into your system. Airflow has both a command-line as well as an amazing user interface for monitoring and managing workflows efficiently.

For installing Airflow, you can start by opening up the command prompt/terminal on your operating system. Airflow requires a default location to be installed. So, you will have to specify that first with the following command:

Now that you’ve specified the location, you can go ahead and run the pip command to install Apache Airflow.

This will install Apache Airflow into your Python coding environment. And, you are now ready to start creating your first Airflow pipeline.

Create your first Airflow pipeline

The Apache Airflow pipeline is basically an easy and scalable tool for data engineers to create, monitor and schedule one or multiple workflows simultaneously. The pipeline requires a database backend for running the workflows, which is why we will start by initializing the database using the command:

Upon initializing the database, you can now start the server using the command

This will start an Airflow webserver at port 8080 on your localhost.
Note: You can specify any port as per your preference.

Now, open up another terminal and start the airflow scheduler using the command:

Now, if you have successfully started the airflow scheduler, you access the tool from your browser to monitor as well as manage the status of completed and ongoing tasks in your workflow at localhost:8080/admin.

In Conclusion

With all the advanced features and benefits, Apache Airflow is certainly a great tool with limitless potential when it comes to building automated systems. Moreover, the innovative tool has garnered a lot of active users in a short span as it is open-sourced. Have you tried using this tool? If yes, what is your experience using it? Do share with us in the comments below.

Module Contents¶

class airflow.operators.python.PythonOperator(*, python_callable:Callable, op_args:Optional[List]=None, op_kwargs:Optional[Dict]=None, templates_dict:Optional[Dict]=None, templates_exts:Optional[List[str]]=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Airflow

Executes a Python callable

See also

For more information on how to use this operator, take a look at the guide:PythonOperator

Parameters
  • python_callable (python callable) – A reference to an object that is callable

  • op_kwargs (dict (templated)) – a dictionary of keyword arguments that will get unpackedin your function

  • op_args (list (templated)) – a list of positional arguments that will get unpacked whencalling your callable

  • templates_dict (dict[str]) – a dictionary where the values are templates thatwill get templated by the Airflow engine sometime between__init__ and execute takes place and are made availablein your callable’s context after the template has been applied. (templated)

  • templates_exts (list[str]) – a list of file extensions to resolve whileprocessing templated fields, for examples ['.sql','.hql']

template_fields = ['templates_dict', 'op_args', 'op_kwargs'][source]
ui_color = #ffefeb[source]
shallow_copy_attrs = ['python_callable', 'op_kwargs'][source]
static determine_op_kwargs(python_callable:Callable, context:Dict, num_op_args:int=0)[source]

Function that will inspect the signature of a python_callable to determine whichvalues need to be passed to the function.

Parameters
  • python_callable – The function that you want to invoke

  • context – The context provided by the execute method of the Operator/Sensor

  • num_op_args – The number of op_args provided, so we know how many to skip

Returns

The op_args dictionary which contains the values that are compatible with the Callable

execute(self, context:Dict)[source]
execute_callable(self)[source]

Calls the python callable with the given arguments.

Returns

the return value of the call.

Return type

any

class airflow.operators.python._PythonFunctionalOperator(*, python_callable:Callable, task_id:str, op_args:Tuple[Any], op_kwargs:Dict[str, Any], multiple_outputs:bool=False, **kwargs)[source]

Bases: airflow.models.BaseOperator

Wraps a Python callable and captures args/kwargs when called for execution.

Parameters
  • python_callable (python callable) – A reference to an object that is callable

  • op_kwargs (dict) – a dictionary of keyword arguments that will get unpackedin your function (templated)

  • op_args (list) – a list of positional arguments that will get unpacked whencalling your callable (templated)

  • multiple_outputs (bool) – if set, function return value will beunrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.Defaults to False.

template_fields = ['op_args', 'op_kwargs'][source]
ui_color[source]
shallow_copy_attrs = ['python_callable'][source]
static _get_unique_task_id(task_id:str, dag:Optional[DAG]=None)[source]

Generate unique task id given a DAG (or if run in a DAG context)Ids are generated by appending a unique number to the end ofthe original task id.

Example:

task_idtask_id__1task_id__2…task_id__20

static validate_python_callable(python_callable)[source]

Validate that python callable can be wrapped by operator.Raises exception if invalid.

Parameters

python_callable – Python object to be validated

Raises

TypeError, AirflowException

execute(self, context:Dict)[source]

Python Airflow Github

airflow.operators.python.T[source]
airflow.operators.python.task(python_callable:Optional[Callable]=None, multiple_outputs:bool=False, **kwargs) → Callable[[T], T][source]

Airflow Basics — Airflow Tutorial Documentation

Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
Parameters
  • python_callable (Optional[Callable]) – Function to decorate

  • multiple_outputs (bool) – if set, function return value will beunrolled to multiple XCom values. List/Tuples will unroll to xcom valueswith index as key. Dict will unroll to xcom values with keys as XCom keys.Defaults to False.

class airflow.operators.python.BranchPythonOperator[source]

Bases: airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin

Allows a workflow to “branch” or follow a path following the executionof this task.

It derives the PythonOperator and expects a Python function that returnsa single task_id or list of task_ids to follow. The task_id(s) returnedshould point to a task directly downstream from {self}. All other “branches”or directly downstream tasks are marked with a state of skipped so thatthese paths can’t move forward. The skipped states are propagateddownstream to allow for the DAG state to fill up and the DAG run’s stateto be inferred.

execute(self, context:Dict)[source]
class airflow.operators.python.ShortCircuitOperator[source]

Bases: airflow.operators.python.PythonOperator, airflow.models.skipmixin.SkipMixin

Allows a workflow to continue only if a condition is met. Otherwise, theworkflow “short-circuits” and downstream tasks are skipped.

The ShortCircuitOperator is derived from the PythonOperator. It evaluates acondition and short-circuits the workflow if the condition is False. Anydownstream tasks are marked with a state of “skipped”. If the condition isTrue, downstream tasks proceed as normal.

The condition is determined by the result of python_callable.

execute(self, context:Dict)[source]
class airflow.operators.python.PythonVirtualenvOperator(*, python_callable:Callable, requirements:Optional[Iterable[str]]=None, python_version:Optional[Union[str, int, float]]=None, use_dill:bool=False, system_site_packages:bool=True, op_args:Optional[List]=None, op_kwargs:Optional[Dict]=None, string_args:Optional[Iterable[str]]=None, templates_dict:Optional[Dict]=None, templates_exts:Optional[List[str]]=None, **kwargs)[source]

Bases: airflow.operators.python.PythonOperator

Allows one to run a function in a virtualenv that is created and destroyedautomatically (with certain caveats).

The function must be defined using def, and not bepart of a class. All imports must happen inside the functionand no variables outside of the scope may be referenced. A global scopevariable named virtualenv_string_args will be available (populated bystring_args). In addition, one can pass stuff through op_args and op_kwargs, and onecan use a return value.Note that if your virtualenv runs in a different Python major version than Airflow,you cannot use return values, op_args, or op_kwargs. You can use string_args though.

See also

For more information on how to use this operator, take a look at the guide:PythonVirtualenvOperator

Parameters
  • python_callable (function) – A python function with no references to outside variables,defined with def, which will be run in a virtualenv

  • requirements (list[str]) – A list of requirements as specified in a pip install command

  • python_version (Optional[Union[str, int, float]]) – The Python version to run the virtualenv with. Note thatboth 2 and 2.7 are acceptable forms.

  • use_dill (bool) – Whether to use dill to serializethe args and result (pickle is default). This allow more complex typesbut requires you to include dill in your requirements.

  • system_site_packages (bool) – Whether to includesystem_site_packages in your virtualenv.See virtualenv documentation for more information.

  • op_args – A list of positional arguments to pass to python_callable.

  • op_kwargs (dict) – A dict of keyword arguments to pass to python_callable.

  • string_args (list[str]) – Strings that are present in the global var virtualenv_string_args,available to python_callable at runtime as a list[str]. Note that args are splitby newline.

  • templates_dict (dict of str) – a dictionary where the values are templates thatwill get templated by the Airflow engine sometime between__init__ and execute takes place and are made availablein your callable’s context after the template has been applied

  • templates_exts (list[str]) – a list of file extensions to resolve whileprocessing templated fields, for examples ['.sql','.hql']

BASE_SERIALIZABLE_CONTEXT_KEYS[source]
PENDULUM_SERIALIZABLE_CONTEXT_KEYS[source]
AIRFLOW_SERIALIZABLE_CONTEXT_KEYS[source]
execute(self, context:Dict)[source]
execute_callable(self)[source]
_write_args(self, filename)[source]
_get_serializable_context_keys(self)[source]
_write_string_args(self, filename)[source]
_read_result(self, filename)[source]
airflow.operators.python.get_current_context() → Dict[str, Any][source]
Obtain the execution context for the currently executing operator without
altering user method's signature.
This is the simplest method of retrieving the execution context dictionary.

Python Airflow Tutorial

** Old style:
def my_task(**context):
ti = context['ti']
** New style:
from airflow.task.context import get_current_context

Cached

def my_task():
context = get_current_context()
ti = context['ti']

Current context will only have value if this method was called after an operatorwas starting to execute.