HomeAirflowApplying Airflow to build a simple ETL workflow
Applying Airflow to build a simple ETL workflow
April 20, 2021
In my last blog, I have given a quick guide on how to use Airflow in the most basic way. In today’s blog, I will be guiding you through the process of creating a simple ETL workflow from local database to data warehouse, using Airflow as the backend.
E-T-L & Airflow
Extract – Transform – Load is a three-step process commonly seen in the context of data-centric projects. It is the process of aggregating data from various source to a centralized data warehouse so that the data can be analyzed later on. It is a crucial first part of the two parts featured in our LiMDa project (the second one being data refinement). Airflow is said to be built with ETL in mind, so it is a good candidate for ETL workflows of any scale.
Given a database FL_Business with several tables that is used for the business of the client company. For simplicity, the scenario here is limited to operations on one table only: The client wishes to synchronize data from selected columns from the table orders, first to Google Cloud Storage bucket then to BigQuery. The client wants the system to run on Airflow 1.10.x. In this blog, firstly we deal with the full load problem, that is, to extract and load the whole table content to the data warehouse. The incremental load problem (daily synchronization) will be discussed in later blogs. Here’s the screenshot of the order table.
From the client’s demand, we can synchronize each meaningful table following a common series of tasks:
Query meaningful columns from the table.
Push the query result as a file to the pre-defined Google Cloud Storage bucket.
Load the file as data to BigQuery.
In Airflow, to build a workflow, suitable operators must be used and upstream/downstream relationships must be specified. After researching, our team decided to compact the workflow to 2 steps:
MySQL to Google Cloud Storage (Query + push result to GCS) (Extract/Transform)
Google Cloud Storage to BigQuery (Load GCS to BQ) (Load)
The updated workflow allowed the team to use predefined operators from the package airflow.contrib.operators, removing the need to test operators as those operators are official and have been well-tested before release. Now the workflow graph should look like following:
Let’s code the DAG!
Now that we have the idea of how to solve this problem, let us get on to coding! First thing to do is to add Airflow connections for database, Google Cloud Platform and BigQuery:
from airflow.operators.dummy_operator import DummyOperator
begin = DummyOperator(task_id="begin")
end = DummyOperator(task_id="end")
Finally, remember to set the relationship of the tasks:
begin >> fl_order_mysql_to_gcs >> fl_order_gcs_to_bq >> end
Now we can run the DAG in Airflow. Open the Airflow Web UI and fire up the Airflow Scheduler, then enable the DAG. You will have to manually trigger the DAG, however that shall not take too much effort and you should see the tasks run and succeed shortly.
Afterwards, the data is successfully synchronized to the data warehouse (BigQuery):
In today’s blog, I have guided you through the process of setting up a simple DAG in an single-table ETL non-scheduled workflow. See you in later blogs about Airflow!