Orchestrating Big Data with
Apache Airflow
July 2016
Airflow allows developers, admins and operations teams to author, schedule and orchestrate
workflows and jobs within an organization. While it’s main focus started with orchestrating data
pipelines, it’s ability to work seamlessly outside of the Hadoop stack makes it a compelling solution
to manage even traditional workloads.
The paper discusses the architecture of Airflow as a big data platform and how it can help address
these challenges to create a stable data pipelines for enterprises.
6185 W DETROIT ST | CHANDLER, AZ 85226 | (623) 282-2385 | CLAIRVOYANTSOFT.COM | HELLO@CLAIRVOYANTSOFT.COM
OVERVIEW • Operators: An operator describes a single task in a
Data Analytics is playing a key role in the decision-making process workflow. While DAGs describe how to run a workflow,
at various stages of business in many industries. Data is being Operators determine what gets done.
generated at a very fast pace through various sources across the o Task: Once an operator is instantiated using
business. Applications that automate the business processes are some parameters, it is referred to as a “task”
literally fountains of data today. Implementing solutions for use o Task Instance: A task executed at a time is
cases like “real time data ingestion from various sources”, called Task Instance.
“processing the data at different levels of the data ingestion” and • Scheduling the DAGs/Tasks: The DAGs and Tasks can be
preparing the final data for analysis is a serious challenge given the scheduled to be run at certain frequency using the
dynamic nature of the data that is being generated. Proper below parameters.
orchestrating, scheduling, managing and monitoring the data o Schedule interval: Determines when the DAG
pipelines is a critical task for any data platform to be stable and should be triggered. This can be a cron
reliable. The dynamic nature of the data sources, data inflow rates, expression or a datetime object of python.
data schema, processing needs, etc., the work flow management • Executors: Once the DAGs, Tasks and the scheduling
(pipeline generation / maintenance/monitoring) creates these definitions are in place, someone need to execute the
challenges for any data platform. jobs/tasks. Here is where Executors come into picture.
o There are three types of executors provided
This whitepaper provides a view on some of the open source tools by Airflow out of the box.
available today. The paper also discusses the unique architecture o Sequential: A Sequential executor is
of Airflow as a big data platform and how it can help address these for test drive that can execute the
challenges to create a stable data platform for enterprises. In tasks one by one (sequentially).
addition, ingestion won't be halted at the first sign of trouble. Tasks cannot be parallelized.
INTRODUCTION TO AIRFLOW
• Local: A local executor is like Sequential executor. But it
Airflow is a platform to programmatically author, schedule and can parallelize task instances locally.
monitor data pipelines that meets the need of almost all the stages • Celery: Celery executor is a open source Distributed
of the lifecycle of Workflow Management. The system has been
Tasks Execution Engine that based on message queues
built by Airbnb on the below four principles:
making it more scalable and fault tolerant. Message
queues like RabbitMQ or Redis can be used along with
• Dynamic: Airflow pipelines are configuration as code Celery.
(Python), allowing for dynamic pipeline generation. This
• This is typically used for production purposes.
allows for writing code that instantiates pipelines
dynamically.
Airflow has an edge over other tools in the space
• Extensible: Easily define your own operators, executors
and extend the library so that it fits the level of
abstraction that suits your environment. Below are some key features where Airflow has an upper hand
• Elegant: Airflow pipelines are lean and explicit. over other tools like Luigi and Oozie:
Parameterizing your scripts is built into the core of
Airflow using the powerful Jinja templating engine. • Pipelines are configured via code making the pipelines
• Scalable: Airflow has a modular architecture and uses a dynamic
message queue to orchestrate an arbitrary number of • A graphical representation of the DAG instances and
workers. Airflow is ready to scale to infinity. Task Instances along with the metrics.
Basic concepts of Airflow • Scalability: Distribution of Workers and Queues for Task
execution
• DAGs: Directed Acyclic Graph – is a collection of all the • Hot Deployment of DAGS/Tasks
tasks you want to run, organized in a way that reflects
• Support for Celery, SLAs, great UI for monitoring
their relationships and dependencies.
o DAGs are defined as python scripts and are matrices
placed in the DAGs folder (could be any • Has support for Calendar schedule and Crontab
location, but needs to be configured in the scheduling
airflow config file).
• Backfill: Ability to rerun a DAG instance in case of a
o Once a new DAG is placed into the DAGS
folder, the DAGS are picked up by Airflow failure.
automatically within a minute’s time.
• Variables for making the changes to the DAGS/Tasks DAGs folder on the older primary server
quick and easy (instance 1)
o Move the actual DAGs into DAGs folder and
move the PrimaryServerPoller DAG out of
DAGs folder on the older standby (instance 2).
ARCHITECTURE OF AIRFLOW
• So here, the Primary and Standby servers have swapped
Airflow typically constitutes of the below components. their positions.
• Even if the airflow scheduler on the current standby
server (instance 1) comes back, since there would be
• Configuration file: All the configuration points like
only the CounterServerPoller DAG running on it, there
“which port to run the web server on”, “which executor
would be no harm. And this server (instance 1) would
to use”, “config related to RabbitMQ/Redis”, workers,
remain to be standby till the current Primary server
DAGS location, repository etc. are configured.
(instance 2) goes down.
• Metadata database (MySQL or postgres): The database
• In case the current primary server goes down, the same
where all the metadata related to the DAGS, DAG runs,
process would repeat and the airflow running on
tasks, variables are stored.
instance 1 would become the Primary server.
• DAGs (Directed Acyclic Graphs): These are the Workflow
definitions (logical units) that contains the task
definitions along with the dependencies info. These are
the actual jobs that the user would be like to execute.
• Scheduler: A component that is responsible for
triggering the DAG instances and job instances for each
DAG. The scheduler is also responsible for invoking the
Executor (be it Local or Celery or Sequential)
• Broker (Redis or RabbitMQ): In case of a Celery executor,
the broker is required to hold the messages and act as a
communicator between the executor and the workers.
• Worker nodes: The actual workers that execute the
tasks and return the result of the task.
• Web server: A web server that renders the UI for Airflow
through which one can view the DAGs, its status, rerun,
create variables, connections etc.
HOW IT WORKS
• Initially the primary (instance 1) and standby (instance
2) schedulers would be up and running. The instance 1
would be declared as primary Airflow server in the DEPLOYMENT VIEWS
MySQL table.
• The DAGs folder for primary instance (instance 1) would Based on the needs, one may have to go with a simple setup or a
contain the actual DAGs and the DAGs folder and the complex setup of Airflow. There are different ways Airflow can be
standby instance (instance 2) would contain deployed (especially from an Executor point of view). Below are
Counterpart Poller (PrimaryServerPoller). the deployment options along with the description for each.
o Primary server would be scheduling the actual
DAGs as required. Standalone mode of deployment
o Standby server would be running the
PrimaryServerPoller which would Description: As mentioned in the above section, the typical
continuously poll the Primary Airflow installation of Airflow will start as follows.
scheduler.
• Let's assume, the Primary server has gone down. In that • Configuration file (airflow.cfg): which contains the details of
case, the PrimaryServerPoller would detect the same where to pick the DAGs from, what Executor to run, how
and frequently the scheduler should poll the DAGs folder for new
o Declare itself as the primary Airflow server in definitions, which port to start the webserver on etc.
the MySQL table. • Metadata Repository: Typically, MySQL or postgres database
o Move the actual DAGs out of DAGs folder and is used for this purpose. All the metadata related to the DAGs,
moves the PrimaryServerPoller DAG into the their metrics, Tasks and their statuses, SLAs, Variables, etc.
are stored here.
• Web Server: This renders the beautiful UI that shows all the Description: As part of the setup for high availability of Airflow
DAGs, their current states along with the metrics (which are installation, we are assuming if MySQL repository is configured to
pulled from the Repo). be highly available and RabbitMQ would be highly available. The
• Scheduler: This reads the DAGs, put the details about the focus is on how to make the airflow components like the Web
DAGs into Repo. It initiates the Executor. Server and the Scheduler highly available.
• Executor: This is responsible for reading the schedule interval
The description for most of the components would remain the
info and creates the instances for the DAGs and Tasks into
Repo. same as above. Below are what changes:
• Worker: The worker reads the tasks instances and perform
the tasks and writes the status back to the Repo.
• New airflow instance (standby): There would be another
instance of airflow setup as a standby.
o The ones shown in Green is the primary
airflow instance.
o The one in red is the stand by one.
• A new DAG must be put in place. Something called
“CounterPart Poller”. The purpose of this DAG would be
two-fold
o To continuously poll the counterpart
scheduler to see if it is up and running.
o If the counterpart instance is not reachable
(which means the instance is down),
• Declare the current airflow instance as the Primary
• Move the DAGs of the (previous) primary instance out of
DAGs folder and move the Counterpart Poller DAG into
the DAGs folder.
• Move the actual DAGs into the DAGs folder and move
the Counterpart Poller out of DAGs folder on the
standby server (the one which declares itself as primary
now).
Distributed mode of deployment
Note that the declaration as primary server by the instances can
Description: The description for most of the components
be done as a flag in some MySQL table.
mentioned in the Standalone section remain the same except for
the Executor and the workers.
• RabbitMQ: RabbitMQ is the distributed messaging service
that is leveraged by Celery Executor to put the task instances
into. This is where the workers would typically read the tasks
for execution. Basically, there is a broker URL that is exposed
by RabbitMQ for the Celery Executor and Workers to talk to.
• Executor: Here the executor would be Celery executor
(configured in airflow.cfg). The Celery executor is configured
to point to the RabbitMQ Broker.
• Workers: The workers are installed on different nodes (based
on the requirement) and they are configured to read the
tasks info from the RabbitMQ brokers. The workers are also
configured with a Worker_result_backend which typically can
be configured to be the MySQL repo itself.
TYPICAL STAGES OF WORKFLOW MANAGEMENT
The important point to be noted here is:
The typical stages of the life cycle for Workflow Management of
The Worker nodes is nothing but the airflow installation. The DAG Big Data are as follows:
definitions should be in sync on all the nodes (both the primary
airflow installation and the Worker nodes) • Create Jobs to interact with systems that operate on Data
o Use of tools/products like: Hive / Presto /
Distributed mode of deployment with High Availability set up HDFS/Postgres/S3 etc.
• (Dynamic) Workflow creation
o Based on the number of sources, size of data, Installation steps
business logic, variety of data, changes in the
schema, and the list goes on. 1 sudo yum install epel-release
• Manage Dependencies between Operations
o Upstream, Downstream, Cross Pipeline
dependencies, Previous Job state, etc. 2 sudo yum install python-pip python-wheel
• Schedule the Jobs/Operations
o Calendar schedule, Event Driven, Cron Expression
etc.
• Keep track of the Operations and the metrics of the workflow Install Erlang
o Monitor the current/historic state of the jobs, the
results of the jobs etc. Installation steps
• Ensuring Fault tolerance of the pipelines and capability to
back fill any missing data, etc. 1 sudo yum install wxGTK
This list grows as the complexity increases.
2 sudo yum install erlang
TOOLS THAT SOLVE WORKFLOW MANAGEMENT
There are a many Workflow Management Tools in the market.
Some have support for Big Data operations out of the box, and
some that need extensive customization/extensions to support Big RabbitMQ
Data pipelines. Installation steps
• Oozie: Oozie is a workflow scheduler system to manage
1 wget https://www.rabbitmq.com/releases/rabbitmq-
Apache Hadoop jobs
server/v3.6.2/rabbitmq-server-3.6.2-1.noarch.rpm
• BigDataScript: BigDataScript is intended as a scripting
language for big data pipeline
• Makeflow: Makeflow is a workflow engine for executing large sudo yum install rabbitmq-server-3.6.2-1.noarch.rpm
complex workflows on clusters, and grids
• Luigi: Luigi is a Python module that helps you build complex
pipelines of batch jobs. (This is a strong contender for Airflow)
• Airflow: Airflow is a platform to programmatically author, Celery
schedule and monitor workflows
• Azkaban: Azkaban is a batch workflow job scheduler created Installation steps
at LinkedIn to run Hadoop jobs
• Pinball: Pinball is a scalable workflow manager developed at 1 pip install celery
Pinterest
Most of the mentioned tools meets the basic need of the workflow
management. When it comes to dealing with the complex
workflows, only few of the above shine. Luigi, Airflow, Oozie and Airflow: Pre-requisites
Pinball are the tools preferred (and are being used in Production)
by most teams across the industry. Installation steps
None of the existing resources (on the web) talk about 1 sudo yum install gcc-gfortran libgfortran numpy redhat-rpm-
architecture and about the setup of Airflow in production with config python-devel gcc-c++
CeleryExecutor and more importantly on how Airflow needs to be
configured to be highly available. Hence here is an attempt to
share that missing information.
Airflow
INSTALLATION STEPS FOR AIRFLOW AND HOW IT
Installation steps
WORKS.
Install pip
1 # create a home directory for airflow o The folder where your airflow pipelines live
• executor = LocalExecutor
2 mkdir ~/airflow o The executor class that airflow should use.
• sql_alchemy_conn = mysql://root:root@localhost/airflow
o The SqlAlchemy connection string to the metadata
3 # export the location to AIRFLOW_HOME variable
database.
• base_url = http://localhost:8080
4 export AIRFLOW_HOME=~/airflow o The hostname and port at which the Airflow
webserver runs
5 pip install airflow • broker_url = sqla+mysql://root:root@localhost:3306/airflow
o The Celery broker URL. Celery supports RabbitMQ,
Redis and experimentally a sqlalchemy database
• celery_result_backend =
db+mysql://root:root@localhost:3306/airflow
o A key Celery setting that determines the location of
where the workers write the results.
Initialize the Airflow Database This should give you a running airflow instance and set you on the
path to run it in production.
Installation Steps
If you have any questions, feedback – we would love to hear from
you, do drop in a line to airflow@clairvoyantsoft.com. We have
1 airflow initdb
used Airflow extensively in production and would love to hear
more about your needs and thoughts.
By default, Airflow installs with SQLite DB. Above step would
create a airflow.cfg file within “$AIRFLOW_HOME”/ directory.
Once this is done, you may want to change the Repository ABOUT CLAIRVOYANT
database to some well-known (Highly Available) relations
database like “MySQL”, Postgres etc. Then reinitialize the database
(using airflow initdb command). That would create all the required Clairvoyant is a global technology consulting and services
tables for airflow in the relational database. company. We help organizations build innovative products and
solutions using big data, analytics, and the cloud. We provide
best-in-class solutions and services that leverage big data and
Start the Airflow components
continually exceed client expectations. Our deep vertical
Installation steps knowledge combined with expertise on multiple, enterprise-
grade big data platforms helps support purpose-built solutions to
meet our client’s business needs.
1 # Start the Scheduler
2 airflow scheduler
3 # Start the Webserver
4 airflow webserver
5 # Start the Worker
6 airflow worker
Here are few important Configuration points in airflow.cfg file
• Dags_folder = /root/airflow/DAGS