Voiced by Amazon Polly |
Overview
In the first part of this blog, we delved into the fundamental concepts of Airflow and its efficacy within the GCP cloud environment for streamlined data orchestration. As we venture into the second part, the focus shifts towards practical implementation, sharpening the intricacy between Airflow and GCP Composer. This installment aims to provide readers with a hands-on understanding of Airflow’s functionality within the GCP ecosystem, showcasing its practical applications and the seamless orchestration it brings to data workflows. Building on the groundwork laid in the first part, we’re set to unravel the implementation intricacies, ensuring that readers understand how Airflow becomes a powerful ally in data management on GCP.
Pioneers in Cloud Consulting & Migration Services
- Reduced infrastructural costs
- Accelerated application deployment
Introduction
By the end of this guide, you’ll have a robust and automated solution for managing your data workflows in the cloud.
Prerequisites
Before you begin, ensure you have the following set up:
- A GCP account with billing enabled.
- A Google Cloud Storage bucket.
- A BigQuery dataset.
- Apache Airflow installed, either locally or on GCP Composer.
Step-by-Step Guide
Step 1: Set Up GCP Resources
- Create a GCS Bucket:
- In the GCP Console, navigate to “Storage” > “Browser.”
- Create a new bucket and note the bucket name.
- Set Up Google Cloud BigQuery
- In the GCP Console, navigate to “BigQuery.”
- Create a new dataset within your project.
Step 2: Write Python Script for Transformation
Create a Python script:
- Write a Python script that performs the desired transformation on your data.
- Save the script, e.g., transform_data.py.
Python code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# transform_data.py import pandas as pd def transform_data(input_data): # Perform your data transformation here # Example: Convert CSV to DataFrame and add a new column df = pd.read_csv(input_data) df['new_column'] = df['existing_column'] * 2 # Save the transformed data output_data = 'transformed_data.csv' df.to_csv(output_data, index=False) return output_data |
Step 3: Set Up Airflow DAG:
Create a new DAG file:
- Create a new Python file in your Airflow DAGs folder, e.g., airflow.py.
- Define the DAG structure and import the transform_data function.
Python code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# data_pipeline_dag.py from Airflow import DAG from Airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from transform_data import transform_data default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'data_pipeline_dag', default_args=default_args, schedule_interval='0 0 * * *', # Daily at midnight ) transform_task = PythonOperator( task_id='transform_data_task', python_callable=transform_data, op_args=['gs://your-gcs-bucket/your-input-data.csv'], # Replace with your GCS input data path provide_context=True, dag=dag, ) transform_task |
Step 4: Upload DAG to Airflow
Upload the DAG file:
- Copy the airflow.py file to the DAGs folder in your Airflow instance.
- Airflow will automatically detect the new DAG and add it to the DAG list.
Step 5: Schedule and Monitor
Set up a schedule:
- In the data_pipeline_dag.py file, adjust the schedule_interval parameter to your desired schedule.
- Save the file.
Monitor in Airflow UI:
- Access the Airflow web UI.
- Navigate to the “DAGs” section and find the “data_pipeline_dag.”
- Trigger the DAG manually to test the data transformation.
Step 6: Load Transformed Data into Google Cloud BigQuery
- Write Python Script for Google Cloud BigQuery Load.
- Write a Python script to load the transformed data into Google Cloud BigQuery.
- Use the BigQuery client library to interact with Google Cloud BigQuery.
Python code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# load_to_bigquery.py from google.cloud import bigquery def load_to_bigquery(input_data, dataset_id, table_id): client = bigquery.Client() # Set up the destination table dataset_ref = client.dataset(dataset_id) table_ref = dataset_ref.table(table_id) job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV # Load data into BigQuery with open(input_data, 'rb') as source_file: job = client.load_table_from_file( source_file, table_ref, job_config=job_config ) job.result() if __name__ == "__main__": load_to_bigquery('transformed_data.csv', 'your-dataset-id', 'your-table-id') |
Step 7: Extend Airflow DAG for Google Cloud BigQuery Load:
Modify the DAG file:
- Edit the data_pipeline_dag.py file to include the BigQuery load task.
Python code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# data_pipeline_dag.py from Airflow.operators.python_operator import PythonOperator from load_to_bigquery import load_to_bigquery # ... (previous code) load_to_bigquery_task = PythonOperator( task_id='load_to_bigquery_task', python_callable=load_to_bigquery, op_args=['transformed_data.csv', 'your-dataset-id', 'your-table-id'], # Replace with your BigQuery dataset and table provide_context=True, dag=dag, ) transform_task >> load_to_bigquery_task |
Conclusion
You’ve successfully built a data pipeline using Apache Airflow to take files from Google Cloud Storage, transform them, and store the results in Google Cloud BigQuery. This scalable and automated solution showcases the power of Airflow in orchestrating complex workflows. Feel free to customize the script and DAG to fit your requirements and explore additional Airflow features for more advanced data processing tasks. Automation like this enhances the efficiency of your data workflows, ultimately leading to more informed decision-making and insights from your data.
Drop a query if you have any questions regarding Apache Airflow and we will get back to you quickly.
Making IT Networks Enterprise-ready – Cloud Management Services
- Accelerated cloud migration
- End-to-end view of the cloud environment
About CloudThat
CloudThat is an official AWS (Amazon Web Services) Advanced Consulting Partner and Training partner, AWS Migration Partner, AWS Data and Analytics Partner, AWS DevOps Competency Partner, Amazon QuickSight Service Delivery Partner, Amazon EKS Service Delivery Partner, Microsoft Gold Partner, and many more, helping people develop knowledge of the cloud and help their businesses aim for higher goals using best-in-industry cloud computing practices and expertise. We are on a mission to build a robust cloud computing ecosystem by disseminating knowledge on technological intricacies within the cloud space. Our blogs, webinars, case studies, and white papers enable all the stakeholders in the cloud computing sphere.
To get started, go through our Consultancy page and Managed Services Package, CloudThat’s offerings.
FAQs
1. How can I version-control and manage the code for my Airflow DAGs when using GCP Composer?
ANS: – Version-controlling Airflow DAGs in GCP Composer is best achieved by storing your DAG definitions in a version-controlled repository like Git. You can then use GCP Composer’s environment versioning feature to update your environment with the latest version of your DAGs. This ensures that your DAGs are consistent across different environments and allows for easy rollback in case of issues. GCP Composer allows you to upload DAGs directly through the web UI or the gcloud command-line tool.
2. Can I trigger Google Cloud Functions or other GCP services from Apache Airflow DAGs?
ANS: – Absolutely! Apache Airflow has operators that enable interaction with various GCP services. For triggering Google Cloud Functions, you can use the GoogleCloudFunctionOperator. Similarly, operators for other GCP services like BigQuery, Dataflow, and more exist. You can seamlessly integrate and orchestrate workflows involving multiple GCP services within your Airflow DAGs by leveraging these operators.
WRITTEN BY Hariprasad Kulkarni
Click to Comment