Data engineering is a difficult job and tools like airflow make that streamlined. Let’s take a look at how to get up and running with airflow on kubernetes.
Jun 29, 2018 The Kubernetes Operator has been merged into the 1.10 release branch of Airflow (the executor in experimental mode), along with a fully k8s native scheduler called the Kubernetes Executor. These features are still in a stage where early adopters/contributers can have a huge influence on the future of these features. Jan 21, 2020 You can have any airflow image created. The important thing is that your airflow have to be installed with the extra feature kubernetes: apache-airflow kubernetes1.10.6 The entrypoint of my. The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes executor will create a new pod for every task instance using the podtemplate.yaml that you can find templates/config/configmap.yaml, otherwise you can override this template using worker.podTemplate. To enable KubernetesExecutor set the following parameters. Apr 23, 2018 kube-airflow provides a set of tools to run Airflow in a Kubernetes cluster. This is useful when you'd want: Easy high availability of the Airflow scheduler Running multiple schedulers for high availability isn't safe so it isn't the way to go in the first place.
- A kubernetes cluster - You can spin up on AWS, GCP, Azure or digitalocean or you can start one on your local machine using minikube
- Helm - If you do not already have helm installed then follow this tutorial to get it installed
1. Easy way
This will install airflow with default settings and a random release name. You are up and running with basic installation.
2. Sophisticated way
Chances are that for anything serious you would want to make changes to default installation.
We will make following changes to make our airflow installation useful in a n enterprise setting:
- Create a namespace
- Change the source of DAG files in the helm chart
- Set up Active Directory authentication for airflow (Optional)
Let’s expand each of the above steps
2.1 Create a namespace
switch to the namespace
kubectl config set-context <context-name> –namespace=airflow
2.2 clone the charts repository:
Since we are going to be using helm chart we will use values.yaml to make required changes.
We want to make following changes:
- Get the code for our DAGS from our git repo. I have a sample repo at https://github.com/prabhatsharma/airflow-dags.
- Setup postgres for airflow
- Set up authentication using microsoft Active Directory.
Open values.yaml in a text editor and modify following sections:
2.3 Get the code for our DAGS from our git repo
to (see line 325)
2.4 Set up postgres
Airflow needs postgres to store state. You will want to setup a postgres if you have one already installed. If you don’t have one already installed then you can let the chart install one for you.
2.5 Active directory authentication - Create image
We will need ldap3 module to be installed in the airflow image. However default image puckel/docker-airflow does not have airflow installed. so we will have to build our own image.
We will fork and clone this repo https://github.com/puckel/docker-airflow and add ldap3 module
Open Dockerfile in a text editor and add ldap3 module
Now build the image
Replace the name of your docker repo from hiprabhat/airflow
Now update the image section in values.yaml to:
2.6 Active directory authentication - set up configuration
I already have an AD installed that I would be using.
2.7 Now lets begin installation
2.8 Access you airflow installation
Let’s check the services created by our helm chart:
Now let’s use port-forward to access the service
Now point your browser to http://localhost:8080
You can setup an ingress in front of your airflow-web service to make it accessible over internet.
Congratulations You are up and running with airflow!!!
You might also want to read the new blog on Installing Airflow on Kubernetes Using Operator
At BenevolentAI we work with a Kubernetes based infrastructure that is completely containerized.
Using Kubernetes we run many sophisticated workflows for data processing, model training, model serving and metrics and evaluation. For such use cases, Apache Airflow is an excellent tool for programmatically managing the workflows and also exposing a friendly UI.
Note that the content below assumes that you are familiar with the common concepts of Airflow such as an Executor, Operator, DAG, etc. and some common concepts of Kubernetes such as a pod.
One nice thing about Airflow is that it’s extremely configurable and extensible. But it also means that there are usually several ways of doing the same thing. Here are a few examples:
- Both CeleryExecutor and KubernetesExecutor can be used to have a distributed task queue. Which executor is better?
- In order to run a Python script, any one of the BashOperator, PythonOperator, and KubernetesPodOperator can be used depending on how you would like to launch the task. What are the pros and cons?
- How should the DAGs be versioned when it’s changed frequently?
In this post we will try to go through the lessons and best practices we have learned from using Airflow with Kubernetes at Benevolent.
Airflow and Kubernetes
Airflow now offers Operators and Executors for running your workload on a Kubernetes cluster: the KubernetesPodOperator and the KubernetesExecutor.
While a DAG (Directed Acyclic Graph) describes how to run a workflow of tasks, an Airflow Operator defines what gets done by a task. The KubernetesPodOperator works with the Kubernetes Python Client to run a task by launching a pod, which allows the user to have full control over the run-time environment, resources, and security. This is achieved by providing a Docker image and corresponding configurations of the pod.
This has the beneficial side-effect of separating the orchestration logic (which we leave to Airflow) from execution logic (which we leave to Docker images that run in Kubernetes). This separation of logic makes it easier to test, debug, and evolve your system. The snippet below shows an example. In this case, if we want to modify the logic of task_1, we modify the code in benevolent/image, release a new versioned image tag, say v2, and the only thing that needs to be changed in Airflow DAG is the image tag
For BenevolentAI, this is particularly important because we constantly improve our software and regularly manage complex dependencies between microservices. We enforce that all business logic should be implemented in Docker images and launched only with a KubernetesPodOperator rather than using other operators such as the PythonOperator or BashOperator.
Alongside the KubernetesPodOperator, it is also important to have a suitable Executor to execute the task in a dynamic and efficient manner. Airflow Executors decide the way task instances get run. At BenevolentAI, we run long-running and computationally intensive tasks like model training and batch inference every day. But we also run smaller short-lived tasks such as creating a directory or pushing training artifacts to s3, so we need both flexibility and scalability in an Executor. In the end we decided on two executors: the CeleryExecutor and the KubernetesExecutor. Both run distributed tasks, but they differ in the resources they have available and the way in which they utilize those resources to complete a task.
The CeleryExecutor requires you to stand up at least one worker node that gets assigned tasks to execute through a message queue like Redis. Having multiple workers always available makes it scalable because it can deal with increased load by scaling horizontally; it is also flexible because if one of the workers goes down the task can be allocated to another worker. However, this high availability comes with an increased cost because it is difficult to fine tune the number of workers and the CPU/RAM resources that a worker requests based on the specific task it’s executing.
The KubernetesExecutor on the other hand does not require you to stand up these workers in advance, but instead creates a new worker pod for every task instance that needs to be executed. This means that when traffic grows, we can scale up to meet demand by using more resources; and when traffic decreases, we can scale back down to conserve resources. This offers us the flexibility and scalability that we need while improving the utilization of our resources. Additionally, the KubernetesExecutor allows the user to make task-specific configurations on the worker nodes. Recently, we have had success in tuning our Airflow setup to use the new KubernetesExecutor. We have already seen improved utilization of our cluster resources – this has made for some happy Site Reliability Engineers!
Using the KubernetesPodOperator with the KubernetesExecutor, we run cost-friendly and resource efficient Airflow DAGs that dynamically scale and effectively decouple orchestration from execution.
Airflow Kubernetes Docker Image
We want to put multiple versions of the same DAG in an Airflow server. Let’s say our DAG is called benevolent_dag. We use the packaged DAG feature to isolate the source code for each version in a zipped file in order to make the DAG folder look clean as shown below.
There are some ways we want the multi-versioned DAGs to behave, for example:
- Updating the DAG code in one version should not affect another version, which might be in running state. When one version of a DAG is in running state, we don’t want the release of a new version to interrupt it.
- We want to re-run an older version of DAG and compare results from different versions of DAGs.
- We want to save compute costs, compared to using separate Airflow deployments for different DAGs.
There is one problem though. Note that in both versions of dags.py we import the bar sub-module from the foo module, but the two bar modules actually have different code despite the same name. The problem is that by default the foo.bar module will only be loaded once. In our case, depending on the order that the DAGs are loaded, either bar.py from v2 will be used by the DAG from v1 or the opposite, which loads the wrong code between versions. This is due to the way that Airflow loads DAGs: it uses Python’s importlib.import_module to load the DAGs as Python modules, in the same process that Airflow webserver or scheduler runs. Because modules are stored in the global variable sys.modules, our module foo.bar would not be loaded again if it already exists in sys.modules, even if it has different code.
One seemingly apparent solution is to rename the sub-modules so that they are unique across DAGs. For example, versions can be appended to module names as shown below. However, this is not feasible for DAGs with complex dependencies because all the corresponding import statements need to be changed.
Unable to find a more elegant solution, we decided to hack the module loading mechanism to solve the problem. What we want to achieve is that everytime foo.bar is imported, it should load the module that’s local to the current script instead of, if exists, reusing the one in sys.modules that’s loaded from other places. We accomplished this by adding the following code before importing the modules.
Now we can enjoy the benefits described at the beginning of this section, while waiting for a more elegant and out-of-the-box solution from Airflow 2.0.
How we test it
BashOperator and PythonOperator are probably the two most frequently used operators in Airflow, and they are also the most talked about in articles.
They are convenient and quick, until your tasks have logic that is more complicated than the hello-world examples. The code becomes not testable due to the interleaved execution logic and orchestration logic in the DAG code, which forces developers to mock a running Airflow environment in order to unit test a task.
In the earlier section we talked about separating the orchestration logic from execution logic by using KubernetesPodOperator. This design makes testing easier as well, as shown by our Airflow testing strategy below:
- Execution logic unit testing: unit testing is done within the code repository of each Docker image, just like any other project, and does not involve Airflow at all.
- Orchestration logic unit testing: we can unit test an instance of Airflow operator as suggested by the documentation. Note that at this step the execution logic is treated as dependencies and should be mocked.
- Acceptance testing: for each DAG, we have defined a suite of acceptance tests that use toy input data and actually run through the DAGs from end to end. Any change has to pass the acceptance test in development/staging before releasing to production.
While we can also imagine aspirational test strategies such as mocking transient failures in DAG execution and make sure DAGs are robust enough to recover from failures, we have found the testing strategy above is good enough to keep things running for our needs.
Apache Airflow Kubernetes
What else have we learned
Airflow Docker Kubernetes Login
- Sub-DAG is a handy concept to develop reusable DAG components. However, as of Airflow 1.10, the implementation of sub-DAG using SubDagOperator is not a first class citizen, and using it leads to issues like scheduler deadlock. We’ve also found that the SubDagOperator is not compatible with KubernetesExecutor out of the box. While tricks can be applied to work around the issues, we recommend not using sub-DAG if possible.
- There are a few ways to release DAGs into Airflow servers, and we found that git-sync is the most convenient for continuously delivering DAGs and updates.
- When deploying Airflow to Kubernetes, it requires persistent storage volumes in order to persist the logs produced by running tasks. However, the volume needs to be mounted by all the worker nodes plus Airflow’s webserver and scheduler, which is tricky when working with storage classes that have more restricted access mode, e.g. ReadWriteOnce. Instead, we enabled the remote logging in order to persist logs in S3 storage.
It’s relatively easy to use the stable/airflow Helm chart to deploy Airflow to Kubernetes. What follows is our example Helm values for the configurations described above.