Airflow operators
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
Operators
Represent a single task in a work ow.
Run independently (usually).
Generally do not share information.
Various operators to perform di erent tasks.
DummyOperator(task_id='example', dag=dag)
INTRODUCTION TO AIRFLOW IN PYTHON
BashOperator
Executes a given Bash command or script.
BashOperator(
task_id='bash_example', Runs the command in a temporary
bash_command='echo "Example!"', directory.
dag=ml_dag)
Can specify environment variables for the
command.
BashOperator(
task_id='bash_script_example',
bash_command='runcleanup.sh',
dag=ml_dag)
INTRODUCTION TO AIRFLOW IN PYTHON
BashOperator examples
from airflow.operators.bash_operator import BashOperator
example_task = BashOperator(task_id='bash_ex',
bash_command='echo 1',
dag=dag)
bash_task = BashOperator(task_id='clean_addresses',
bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
dag=dag)
INTRODUCTION TO AIRFLOW IN PYTHON
Operator gotchas
Not guaranteed to run in the same location / environment.
May require extensive use of Environment variables.
Can be di cult to run tasks with elevated privileges.
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
Airflow tasks
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
Tasks
Tasks are:
Instances of operators
Usually assigned to a variable in Python
example_task = BashOperator(task_id='bash_example',
bash_command='echo "Example!"',
dag=dag)
Referred to by the task_id within the Air ow tools
INTRODUCTION TO AIRFLOW IN PYTHON
Task dependencies
De ne a given order of task completion
Are not required for a given work ow, but usually present in most
Are referred to as upstream or downstream tasks
In Air ow 1.8 and later, are de ned using the bitshi operators
>>, or the upstream operator
<<, or the downstream operator
INTRODUCTION TO AIRFLOW IN PYTHON
Upstream vs Downstream
Upstream means before
Downstream means a er
INTRODUCTION TO AIRFLOW IN PYTHON
Simple task dependency
# Define the tasks
task1 = BashOperator(task_id='first_task',
bash_command='echo 1',
dag=example_dag)
task2 = BashOperator(task_id='second_task',
bash_command='echo 2',
dag=example_dag)
# Set first_task to run before second_task
task1 >> task2 # or task2 << task1
INTRODUCTION TO AIRFLOW IN PYTHON
Task dependencies in the Airflow UI
INTRODUCTION TO AIRFLOW IN PYTHON
Task dependencies in the Airflow UI
INTRODUCTION TO AIRFLOW IN PYTHON
Task dependencies in the Airflow UI
INTRODUCTION TO AIRFLOW IN PYTHON
Multiple dependencies
Chained dependencies:
task1 >> task2 >> task3 >> task4
Mixed dependencies:
task1 >> task2 << task3
or:
task1 >> task2
task3 >> task2
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
Additional operators
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
PythonOperator
Executes a Python function / callable
Operates similarly to the BashOperator, with more options
Can pass in arguments to the Python code
from airflow.operators.python_operator import PythonOperator
def printme():
print("This goes in the logs!")
python_task = PythonOperator(
task_id='simple_print',
python_callable=printme,
dag=example_dag
)
INTRODUCTION TO AIRFLOW IN PYTHON
Arguments
Supports arguments to tasks
Positional
Keyword
Use the op_kwargs dictionary
INTRODUCTION TO AIRFLOW IN PYTHON
op_kwargs example
def sleep(length_of_time):
time.sleep(length_of_time)
sleep_task = PythonOperator(
task_id='sleep',
python_callable=sleep,
op_kwargs={'length_of_time': 5}
dag=example_dag
)
INTRODUCTION TO AIRFLOW IN PYTHON
EmailOperator
Found in the airflow.operators library
Sends an email
Can contain typical components
HTML content
A achments
Does require the Air ow system to be con gured with email server details
INTRODUCTION TO AIRFLOW IN PYTHON
EmailOperator example
from airflow.operators.email_operator import EmailOperator
email_task = EmailOperator(
task_id='email_sales_report',
to='sales_manager@example.com',
subject='Automated Sales Report',
html_content='Attached is the latest sales report',
files='latest_sales.xlsx',
dag=example_dag
)
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON
Airflow scheduling
INTRODUCTION TO AIRFLOW IN PYTHON
Mike Metzger
Data Engineer
DAG Runs
A speci c instance of a work ow at a point in time
Can be run manually or via schedule_interval
Maintain state for each work ow and the tasks within
running
failed
success
1 h ps://air ow.apache.org/docs/stable/scheduler.html
INTRODUCTION TO AIRFLOW IN PYTHON
DAG Runs view
INTRODUCTION TO AIRFLOW IN PYTHON
DAG Runs state
INTRODUCTION TO AIRFLOW IN PYTHON
Schedule details
When scheduling a DAG, there are several a ributes of note:
start_date - The date / time to initially schedule the DAG run
end_date - Optional a ribute for when to stop running new DAG instances
max_tries - Optional a ribute for how many a empts to make
schedule_interval - How o en to run
INTRODUCTION TO AIRFLOW IN PYTHON
Schedule interval
schedule_interval represents:
How o en to schedule the DAG
Between the start_date and end_date
Can be de ned via cron style syntax or via built-in presets.
INTRODUCTION TO AIRFLOW IN PYTHON
cron syntax
Is pulled from the Unix cron format
Consists of 5 elds separated by a space
An asterisk * represents running for every interval (ie, every minute, every day, etc)
Can be comma separated values in elds for a list of values
INTRODUCTION TO AIRFLOW IN PYTHON
cron examples
0 12 * * * # Run daily at noon
* * 25 2 * # Run once per minute on February 25
0,15,30,45 * * * * # Run every 15 minutes
INTRODUCTION TO AIRFLOW IN PYTHON
Airflow scheduler presets
Preset: cron equivalent:
@hourly 0 * * * *
@daily 0 0 * * *
@weekly 0 0 * * 0
@monthly 0 0 1 * *
@yearly 0 0 1 1 *
1 h ps://air ow.apache.org/docs/stable/scheduler.html
INTRODUCTION TO AIRFLOW IN PYTHON
Special presets
Air ow has two special schedule_interval presets:
None - Don't schedule ever, used for manually triggered DAGs
@once - Schedule only once
INTRODUCTION TO AIRFLOW IN PYTHON
schedule_interval issues
When scheduling a DAG, Air ow will:
Use the start_date as the earliest possible value
Schedule the task at start_date + schedule_interval
'start_date': datetime(2020, 2, 25),
'schedule_interval': @daily
This means the earliest starting time to run the DAG is on February 26th, 2020
INTRODUCTION TO AIRFLOW IN PYTHON
Let's practice!
INTRODUCTION TO AIRFLOW IN PYTHON