Airflow Postgres

Apache Airflow Training

RDS Postgres and Containerized Airflow TypeScript View Code A Pulumi program to deploy an RDS Postgres instance and containerized Airflow. Deploying and running the. One of the first operators I discovered with Airflow was the Postgres Operator. The Postgres Operator allows you to interact with your Postgres database. Whether you want to create a table, delete records, insert records, you will use the PostgresOperator. I am trying to build the connection between the postgres and airflow, here is my docker-compose. Version: '3' services: postgres: image: postgres environment: - POSTGRESUSER=airflow - POSTGRESPASSWORD=airflow - POSTGRESDB=airflow webserver: image: apache/airflow:1.10.13-python3.6 command: bash -c 'airflow initdb; airflow webserver; airflow scheduler;' environment: - AIRFLOWDATABASE.

This 1-day GoDataDriven training teaches you the internals, terminology, and best practices of writing DAGs. Plus hands-on experience in writing and maintaining data pipelines.

Airflow is now a handy tool for almost all data engineers. If you have a common strategy for the data sync for many data sources, then we can just pass the sources via a loop and it’ll reuse the whole pipeline for everything. The Postgres metadata database is necessary for Airflow to record the state of each task that it runs. The database can also securely store credentials that allow Airflow to connect to other systems, such as Salesforce, S3, or Redshift.

Testing Airflow is hard

There's a good reason for writing this blog post - testing Airflow code can be difficult. It often leads people to go through an entire deployment cycle to manually push the trigger button on a live system. Only after can they verify their Airflow code. This is a painfully long process and as with any other software, people would like to write, test, and debug their Airflow code locally.

Running an Airflow DAG on your local machine is often not possible due to dependencies on external systems. To start, I'd like to point out this excellent blog post by ING WBAA about testing Airflow. It covers setting up DTAP and CI/CD for Airflow. Besides this, the blog post also describes a DAG integrity test for validating if your DAG files contain valid DAG objects, which is a good starting point. Also, there's this Meetup talk about a local Airflow testing environment with Docker Compose by my colleague Bas Beelen, which will be open sourced in the near future.

The main goal of this post is to explain how to unit test Airflow components locally without the need for a production system. In Airflow this goes for anything - hooks, utility functions, etc. The DAG itself is simply configuration to glue various operations together. This post does not cover testing complete DAGs (although you could do it with the tools shown in this blog post), but explains how to test individual operations.

All tests are written with pytest and Airflow 1.10.2. All code in this blog post is available on GitHub.

Pytest Airflow fixtures & helpers

Airflow jobs always run in the context of a DAG. The execution of a task in a DAG is controlled via a task instance, which provides the context of the current run to the task. Hence testing an cannot be decoupled from running a DAG. So in order to test operators, I use a dummy DAG to be used throughout my tests.

Pytest has the concept of fixtures; objects which can be passed to test functions as input arguments. This is why I prefer pytest over Python unittest; these fixtures allow for reusable code and less code duplication. For Airflow, I have a test_dag fixture with which I test operators which require a DAG to run.

Define this test_dag fixture in tests/conftest.py to use it in any test.

A nice plugin for pytest is the Pytest Helpers Namespace. It allows to register any function under the pytest helpers namespace, to use anywhere in your tests. Install with pip install pytest-helpers-namespace. For testing operators, I need to run a task with a DAG, and therefore define a run_task helper function:

Now the test_dag fixture and run_task helper function can be used to run tasks in a unit test:

The test_dummy test uses two pytest fixtures: the test_dag as described above and tmpdir. Tmpdir is one of the fixtures you get for free when using pytest. It provides a temporary directory which you'd normally create with the tempfile builtin. Simply put tmpdir as a test function argument and you can use this tmpdir in your test. In the test above, I run a BashOperator which writes a file and I verify the content of the file, without having to upload a DAG to an Airflow instance and test manually.

Mocking Airflow

Sometimes you need to fake objects in your tests. For example, when you cannot access the Airflow metastore directly from your laptop and thus cannot read the connections. In these situations, you can mock these objects in your tests. For mocking I use pytest-mock which installs a mocker fixture which is a thin wrapper around the mock package. For example, this shows a test using the SimpleHttpOperator (code):

This (hypothetical) task fetches sunrise and sunset times from https://api.sunrise-sunset.org and the lambda function prints whether it's currently light or dark. You might save the credentials to such an API in Airflow connections, however you cannot access the metastore from your local machine. So, we patch the BaseHook and mock the return value of get_connection to always return a Connection object with host='api.sunrise-sunset.org' for this test.

That way we can work with an Airflow Connection object and test the operator.

Mocking external systems

Mocking works for objects, but what if you want to verify the implementation of your component against a real external system. By spinning up Docker containers with the system you want to test against, you can verify the correct behaviour of your component!

There's several ways to do this of course, for example Airflow itself starts a set of containers with Docker Compose at the start of its test suite.

Another option that I like is, once again, another pytest package called pytest_docker_tools. It provides a set of helpers for pytest to manage Docker containers. I like that it keeps the test dependencies within the test scripts and you can pass Docker containers as fixtures to your tests.

To demonstrate, I implemented a PostgresToLocalOperator:

The PostgresToLocalOperator queries a Postgres database and stores the JSON-formatted result on local disk. Now I'd like to verify the correct behaviour of my PostgresToLocalOperator, however I cannot access the production Postgres database. So, let's write a test and spin up a Postgres Docker container to query against:

There are a couple of things going on here.

First, we create a namedtuple fixture holding the Postgres credentials. The way to pass variables to the pytest_docker_tools objects is via fixtures. As added bonus, the postgres_credentials fixture can now be passed as an argument to all tests.

Next, pytest_docker_tools requires two statements for creating a Docker container fixture. The fetch() for fetching Docker image metadata and the container() for constructing the Docker container. It follows the same argument names as Docker itself, e.g. volumes for mounting volumes. I created a postgres-init.sql script which can be placed in /docker-entrypoint-initdb.d/ in a Postgres Docker container, to be executed at boot[^1] so that the dummy Postgres DB contains dummy data.

postgres-init.sql:

With all this set up, we can now write the test to validate the
PostgresToLocalOperator reading from Postgres and writing to local
filesystem:

The test takes a number of arguments:

  1. test_dag - DAG fixture
  2. mocker - pytest mock fixture
  3. tmpdir - pytest tmpdir fixture
  4. postgres - dummy Postgres Docker container fixture
  5. postgres_credentials - Postgres credentials fixture

First we define an output path to write the results to:

Next, we patch the PostgresHook to return a mocked Connection object when get_connection is called, since we don't have access to a running Airflow instance locally.

Important! One of, if not the, most made mistake with Python mocking is to patch the incorrect location. The result is that the patch appears to have no effect. To patch the PostgresHook, you must not import from airflow.hooks.postgres_hook import PostgresHook! Instead, import the PostgresHook from the location where you actually use the PostgresHook: from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook.

Next we run our operator, simply querying SELECT * FROM dummy:

The operator has now completed its execution so we expect one file in
output_path and we expect this file to contain whatever was in the
dummy table:

The complete code for the PostgresToLocalOperator can be found here, and the complete code for testing the operator can be found here.

Debugging Airflow

There are various ways to debug a process running in Airflow. If running locally, e.g. a unit test, you can place a breakpoint in your IDE of choice. I left remote debugging with an IDE out of scope for this blog post and I'll explain a different method which works both locally and remote.

PDB

Python comes with a builtin debugger called pdb. You can use it by placing this snippet at the location you want to start debugging:

Or if you're on Python 3.7 (currently only supported on Airflow master) you can simply call breakpoint() somewhere in your code. There's also ipdb with more features such as color highlighting and autocompletion, it is however not a builtin so you'll have to install it with pip install ipdb. Once in a debug session, you can control the debugging with these shortcuts (source):

In case you want to place a breakpoint but don't know where to find the code, simply open up a Python terminal:

Finally, if you want to debug a 'live' Airflow job, you can manually run a task with airflow test [dag_id] [task_id] [yyyy-mm-dd]. This does not create a task instance and does not record the execution anywhere in the metastore. It is useful though for debugging. In the example below, I show how to use this approach to debug an incorrect Pendulum.format() statement:

Final words

With the examples in this post, hopefully you'll be able to shorten your development time and verify the behaviour of your Airflow code locally. Testing operators locally using unit tests without an Airflow installation can feel like quite a breeze! If you have any questions, feel free to contact me on Airflow Slack as @BasPH.

Interested in Apache Airflow Training?

A quick heads up: we offer Apache Airflow as a public course in ourAcademy. Join us to learn everything you need to successfully work with Airflow!

[^1]: From the Postgres Docker Hub documentation: after initdb is called, any *.sql and executable *.sh file in /docker-entrypoint-initdb.d/ is run.

Apache Airflow Training

This 1-day GoDataDriven training teaches you the internals, terminology, and best practices of writing DAGs. Plus hands-on experience in writing and maintaining data pipelines.

Testing Airflow is hard

There's a good reason for writing this blog post - testing Airflow code can be difficult. It often leads people to go through an entire deployment cycle to manually push the trigger button on a live system. Only after can they verify their Airflow code. This is a painfully long process and as with any other software, people would like to write, test, and debug their Airflow code locally.

Running an Airflow DAG on your local machine is often not possible due to dependencies on external systems. To start, I'd like to point out this excellent blog post by ING WBAA about testing Airflow. It covers setting up DTAP and CI/CD for Airflow. Besides this, the blog post also describes a DAG integrity test for validating if your DAG files contain valid DAG objects, which is a good starting point. Also, there's this Meetup talk about a local Airflow testing environment with Docker Compose by my colleague Bas Beelen, which will be open sourced in the near future.

The main goal of this post is to explain how to unit test Airflow components locally without the need for a production system. In Airflow this goes for anything - hooks, utility functions, etc. The DAG itself is simply configuration to glue various operations together. This post does not cover testing complete DAGs (although you could do it with the tools shown in this blog post), but explains how to test individual operations.

All tests are written with pytest and Airflow 1.10.2. All code in this blog post is available on GitHub.

Pytest Airflow fixtures & helpers

Airflow jobs always run in the context of a DAG. The execution of a task in a DAG is controlled via a task instance, which provides the context of the current run to the task. Hence testing an cannot be decoupled from running a DAG. So in order to test operators, I use a dummy DAG to be used throughout my tests.

Pytest has the concept of fixtures; objects which can be passed to test functions as input arguments. This is why I prefer pytest over Python unittest; these fixtures allow for reusable code and less code duplication. For Airflow, I have a test_dag fixture with which I test operators which require a DAG to run.

Airflow Postgres Operator

Define this test_dag fixture in tests/conftest.py to use it in any test.

A nice plugin for pytest is the Pytest Helpers Namespace. It allows to register any function under the pytest helpers namespace, to use anywhere in your tests. Install with pip install pytest-helpers-namespace. For testing operators, I need to run a task with a DAG, and therefore define a run_task helper function:

Now the test_dag fixture and run_task helper function can be used to run tasks in a unit test:

The test_dummy test uses two pytest fixtures: the test_dag as described above and tmpdir. Tmpdir is one of the fixtures you get for free when using pytest. It provides a temporary directory which you'd normally create with the tempfile builtin. Simply put tmpdir as a test function argument and you can use this tmpdir in your test. In the test above, I run a BashOperator which writes a file and I verify the content of the file, without having to upload a DAG to an Airflow instance and test manually.

Mocking Airflow

Sometimes you need to fake objects in your tests. For example, when you cannot access the Airflow metastore directly from your laptop and thus cannot read the connections. In these situations, you can mock these objects in your tests. For mocking I use pytest-mock which installs a mocker fixture which is a thin wrapper around the mock package. For example, this shows a test using the SimpleHttpOperator (code):

This (hypothetical) task fetches sunrise and sunset times from https://api.sunrise-sunset.org and the lambda function prints whether it's currently light or dark. You might save the credentials to such an API in Airflow connections, however you cannot access the metastore from your local machine. So, we patch the BaseHook and mock the return value of get_connection to always return a Connection object with host='api.sunrise-sunset.org' for this test.

That way we can work with an Airflow Connection object and test the operator.

Mocking external systems

Mocking works for objects, but what if you want to verify the implementation of your component against a real external system. By spinning up Docker containers with the system you want to test against, you can verify the correct behaviour of your component!

There's several ways to do this of course, for example Airflow itself starts a set of containers with Docker Compose at the start of its test suite.

Another option that I like is, once again, another pytest package called pytest_docker_tools. It provides a set of helpers for pytest to manage Docker containers. I like that it keeps the test dependencies within the test scripts and you can pass Docker containers as fixtures to your tests.

To demonstrate, I implemented a PostgresToLocalOperator:

The PostgresToLocalOperator queries a Postgres database and stores the JSON-formatted result on local disk. Now I'd like to verify the correct behaviour of my PostgresToLocalOperator, however I cannot access the production Postgres database. So, let's write a test and spin up a Postgres Docker container to query against:

There are a couple of things going on here.

First, we create a namedtuple fixture holding the Postgres credentials. The way to pass variables to the pytest_docker_tools objects is via fixtures. As added bonus, the postgres_credentials fixture can now be passed as an argument to all tests.

Next, pytest_docker_tools requires two statements for creating a Docker container fixture. The fetch() for fetching Docker image metadata and the container() for constructing the Docker container. It follows the same argument names as Docker itself, e.g. volumes for mounting volumes. I created a postgres-init.sql script which can be placed in /docker-entrypoint-initdb.d/ in a Postgres Docker container, to be executed at boot[^1] so that the dummy Postgres DB contains dummy data.

Airflow postgres database

postgres-init.sql:

With all this set up, we can now write the test to validate the
PostgresToLocalOperator reading from Postgres and writing to local
filesystem:

The test takes a number of arguments:

  1. test_dag - DAG fixture
  2. mocker - pytest mock fixture
  3. tmpdir - pytest tmpdir fixture
  4. postgres - dummy Postgres Docker container fixture
  5. postgres_credentials - Postgres credentials fixture

First we define an output path to write the results to:

Next, we patch the PostgresHook to return a mocked Connection object when get_connection is called, since we don't have access to a running Airflow instance locally.

Important! One of, if not the, most made mistake with Python mocking is to patch the incorrect location. The result is that the patch appears to have no effect. To patch the PostgresHook, you must not import from airflow.hooks.postgres_hook import PostgresHook! Instead, import the PostgresHook from the location where you actually use the PostgresHook: from my_package.operators.postgres_to_local_operator import PostgresToLocalOperator, PostgresHook.

Next we run our operator, simply querying SELECT * FROM dummy:

The operator has now completed its execution so we expect one file in
output_path and we expect this file to contain whatever was in the
dummy table:

Airflow Postgres Version

The complete code for the PostgresToLocalOperator can be found here, and the complete code for testing the operator can be found here.

Debugging Airflow

There are various ways to debug a process running in Airflow. If running locally, e.g. a unit test, you can place a breakpoint in your IDE of choice. I left remote debugging with an IDE out of scope for this blog post and I'll explain a different method which works both locally and remote.

PDB

Python comes with a builtin debugger called pdb. You can use it by placing this snippet at the location you want to start debugging:

Or if you're on Python 3.7 (currently only supported on Airflow master) you can simply call breakpoint() somewhere in your code. There's also ipdb with more features such as color highlighting and autocompletion, it is however not a builtin so you'll have to install it with pip install ipdb. Once in a debug session, you can control the debugging with these shortcuts (source):

In case you want to place a breakpoint but don't know where to find the code, simply open up a Python terminal:

Finally, if you want to debug a 'live' Airflow job, you can manually run a task with airflow test [dag_id] [task_id] [yyyy-mm-dd]. This does not create a task instance and does not record the execution anywhere in the metastore. It is useful though for debugging. In the example below, I show how to use this approach to debug an incorrect Pendulum.format() statement:

Final words

With the examples in this post, hopefully you'll be able to shorten your development time and verify the behaviour of your Airflow code locally. Testing operators locally using unit tests without an Airflow installation can feel like quite a breeze! If you have any questions, feel free to contact me on Airflow Slack as @BasPH.

Interested in Apache Airflow Training?

A quick heads up: we offer Apache Airflow as a public course in ourAcademy. Join us to learn everything you need to successfully work with Airflow!

[^1]: From the Postgres Docker Hub documentation: after initdb is called, any *.sql and executable *.sh file in /docker-entrypoint-initdb.d/ is run.