KEMBAR78
Apache Airflow - A Python Hands-On Guide | PDF | Apache Spark | Postgre Sql
0% found this document useful (0 votes)
271 views9 pages

Apache Airflow - A Python Hands-On Guide

The document is a hands-on guide for using Apache Airflow with Python, detailing how to write Directed Acyclic Graphs (DAGs) and utilize various operators like PythonOperator and BranchPythonOperator. It also covers integration with providers such as AWS, Google Cloud, PostgreSQL, and Slack, along with examples for executing tasks and managing dependencies. Additionally, it includes a section on orchestrating Apache Spark jobs within Airflow workflows.

Uploaded by

bisennikhil49
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
271 views9 pages

Apache Airflow - A Python Hands-On Guide

The document is a hands-on guide for using Apache Airflow with Python, detailing how to write Directed Acyclic Graphs (DAGs) and utilize various operators like PythonOperator and BranchPythonOperator. It also covers integration with providers such as AWS, Google Cloud, PostgreSQL, and Slack, along with examples for executing tasks and managing dependencies. Additionally, it includes a section on orchestrating Apache Spark jobs within Airflow workflows.

Uploaded by

bisennikhil49
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 9

Apache Airflow - A Python Hands-On Guide

Apache Airflow - A Python Hands-On Guide


Available Provider
Available Provider

1. Writing a DAG in Python


A Directed Acyclic Graph (DAG) is the core abstraction in Airflow. It defines the
workflow and task dependencies.

Basic DAG Structure

from datetime import datetime


from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# Define default arguments


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}

# Initialize DAG
with DAG(
dag_id='example_dag',
default_args=default_args,
description='A simple example DAG',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:

def print_hello():
print("Hello, Airflow!")

task = PythonOperator(

1/9
Apache Airflow - A Python Hands-On Guide

task_id='print_hello',
python_callable=print_hello,
)

Key Notes:
DAG : The container for your workflow.
PythonOperator : Executes Python functions.
schedule_interval : Defines the schedule (e.g., @daily , @hourly ).
catchup : Prevents past executions from running.

2. Options for Python Operators


Airflow offers various operators that use Python extensively:

PythonOperator
Executes Python callables.

PythonOperator(
task_id='process_data',
python_callable=process_data_function,
op_kwargs={'param': 'value'}, # Pass arguments to the callable
)

BranchPythonOperator
Allows branching based on a condition.

from airflow.operators.python_operator import BranchPythonOperator

def choose_branch(**kwargs):
return 'branch_1' if kwargs['some_condition'] else 'branch_2'

branch_task = BranchPythonOperator(
task_id='branching',
python_callable=choose_branch,
provide_context=True,
)

2/9
Apache Airflow - A Python Hands-On Guide

PythonVirtualenvOperator
Executes Python code within a virtual environment.

from airflow.operators.python_operator import


PythonVirtualenvOperator

virtualenv_task = PythonVirtualenvOperator(
task_id='venv_task',
python_callable=lambda: print("Running in a virtualenv!"),
requirements=["numpy", "pandas"],
system_site_packages=False,
)

3. Working with Providers


Providers are integrations for various platforms. Here's how to use five popular ones
with Python:

1. AWS Provider
Install: pip install apache-airflow-providers-amazon

Example: S3 File Upload

from airflow.providers.amazon.aws.operators.s3 import


S3CreateObjectOperator

upload_task = S3CreateObjectOperator(
task_id='upload_to_s3',
aws_conn_id='my_aws_conn',
s3_bucket='my_bucket',
s3_key='path/to/file.txt',
data="Sample Data",
)

2. Google Cloud Provider


Install: pip install apache-airflow-providers-google

3/9
Apache Airflow - A Python Hands-On Guide

Example: BigQuery Query Execution

from airflow.providers.google.cloud.operators.bigquery import


BigQueryExecuteQueryOperator

bq_task = BigQueryExecuteQueryOperator(
task_id='bq_query',
sql='SELECT * FROM my_dataset.my_table',
gcp_conn_id='my_gcp_conn',
use_legacy_sql=False,
)

3. PostgreSQL Provider
Install: pip install apache-airflow-providers-postgres

Example: Run SQL on PostgreSQL

from airflow.providers.postgres.operators.postgres import


PostgresOperator

sql_task = PostgresOperator(
task_id='run_postgres_query',
postgres_conn_id='my_postgres_conn',
sql='SELECT * FROM my_table;',
)

4. Slack Provider
Install: pip install apache-airflow-providers-slack

Example: Send Slack Notification

from airflow.providers.slack.operators.slack_webhook import


SlackWebhookOperator

slack_task = SlackWebhookOperator(
task_id='send_slack_message',

4/9
Apache Airflow - A Python Hands-On Guide

http_conn_id='slack_conn',
message="Workflow completed successfully!",
channel="#alerts",
)

5. MySQL Provider
Install: pip install apache-airflow-providers-mysql

Example: Execute SQL in MySQL

from airflow.providers.mysql.operators.mysql import MySqlOperator

mysql_task = MySqlOperator(
task_id='mysql_query',
mysql_conn_id='my_mysql_conn',
sql='INSERT INTO my_table (id, value) VALUES (1, "test");',
)

5. Python Cheatsheet for Apache Airflow

Component Python Example


DAG DAG(dag_id='my_dag', schedule_interval='@daily', ...)
Initialization
PythonOperator PythonOperator(task_id='task',
python_callable=my_func)

Branching BranchPythonOperator(task_id='branch',
python_callable=my_func)

S3 Upload S3CreateObjectOperator(...,
s3_key='path/to/file.txt')

SQL Execution PostgresOperator(sql='SELECT * FROM table;')

BigQuery BigQueryExecuteQueryOperator(sql='SELECT * FROM


table')

Slack SlackWebhookOperator(message="Job done!")


Notification

5/9
Apache Airflow - A Python Hands-On Guide

Component Python Example


Virtualenv PythonVirtualenvOperator(python_callable=my_func,
...)

Working with Apache Spark in Airflow


Apache Spark is a distributed data processing framework widely used for big data
tasks. In Airflow, we can manage and orchestrate Spark jobs using operators such as:

SparkSubmitOperator : Submits a Spark job directly to a cluster.


EmrAddStepsOperator : Submits a Spark job to an Amazon EMR cluster.
DataprocSubmitJobOperator : Submits a Spark job to Google Dataproc.

These operators allow us to control Spark jobs programmatically within Airflow


workflows.

Complex Workflow: Conditional Spark Job Execution


Workflow Logic:

1. Execute spark_job_1 .
2. Execute spark_job_2 .
3. If spark_job_2 fails, run spark_job_3 .
4. If spark_job_2 succeeds, run spark_job_4 .

DAG Implementation
Step 1: Import Required Modules

from airflow import DAG


from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import
SparkSubmitOperator

6/9
Apache Airflow - A Python Hands-On Guide

from airflow.operators.python_operator import BranchPythonOperator


from datetime import datetime

Step 2: Define the DAG and Default Arguments

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'retries': 1,
}

dag = DAG(
dag_id='spark_conditional_jobs',
default_args=default_args,
description='A DAG with conditional Spark job execution',
schedule_interval=None,
start_date=datetime(2023, 12, 1),
catchup=False,
)

Step 3: Define the SparkSubmitOperator Jobs

# Spark Job 1
spark_job_1 = SparkSubmitOperator(
task_id='spark_job_1',
application='/path/to/spark_job_1.py',
conn_id='spark_default', # Connection to your Spark cluster
application_args=['arg1', 'arg2'],
dag=dag,
)

# Spark Job 2
spark_job_2 = SparkSubmitOperator(
task_id='spark_job_2',
application='/path/to/spark_job_2.py',
conn_id='spark_default',
application_args=['arg1', 'arg2'],
dag=dag,
)

# Spark Job 3
spark_job_3 = SparkSubmitOperator(
task_id='spark_job_3',

7/9
Apache Airflow - A Python Hands-On Guide

application='/path/to/spark_job_3.py',
conn_id='spark_default',
application_args=['arg1', 'arg2'],
dag=dag,
)

# Spark Job 4
spark_job_4 = SparkSubmitOperator(
task_id='spark_job_4',
application='/path/to/spark_job_4.py',
conn_id='spark_default',
application_args=['arg1', 'arg2'],
dag=dag,
)

Step 4: Branch Logic Using BranchPythonOperator

def choose_next_task(**kwargs):
# Check the state of spark_job_2
task_instance = kwargs['ti']
spark_job_2_state =
task_instance.xcom_pull(task_ids='spark_job_2', key='return_value')

# Return the task to execute next


if spark_job_2_state == 'failed':
return 'spark_job_3'
return 'spark_job_4'

branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_next_task,
provide_context=True,
dag=dag,
)

Step 5: Define the Task Dependencies

start = DummyOperator(task_id='start', dag=dag)


end = DummyOperator(task_id='end', dag=dag)

# Define dependencies
start >> spark_job_1
spark_job_1 >> spark_job_2
spark_job_2 >> branch_task

8/9
Apache Airflow - A Python Hands-On Guide

branch_task >> spark_job_3 >> end


branch_task >> spark_job_4 >> end

Explanation of Components
1. SparkSubmitOperator :
Used to submit Spark jobs to a cluster.
Specify the application path, connection ID, and arguments for the Spark job.
2. BranchPythonOperator :
Dynamically determines the next task based on the state of a previous task.
In this case, it checks if spark_job_2 succeeded or failed.
3. Dependencies:
The workflow ensures sequential execution from spark_job_1 to
spark_job_2 and conditional branching to spark_job_3 or spark_job_4 .

Python Cheatsheet for Spark in Airflow

Component Python Example


SparkSubmitOperator SparkSubmitOperator(application='/path/app.py',
...)

BranchPythonOperator BranchPythonOperator(python_callable=my_func,
...)

Conditional Execution branch_task >> task_1 >> end or branch_task >>


task_2

Task Dependency task_1 >> task_2 >> task_3

9/9

You might also like