Airflow
Introduction
Section titled “Introduction”Apache Airflow is a platform for running data-centric workflows and scheduled compute jobs. LocalStack supports the AWS Managed Workflows for Apache Airflow (MWAA) service to run Airflow jobs locally.
You can use Airflow to interact with the LocalStack Snowflake emulator and run ETL (Extract-Transform-Load) jobs, using the Airflow SnowflakeOperator
for running queries against Snowflake.
On this page we outline how to set up the connection between local Airflow and the Snowflake emulator.
Create an Airflow environment via MWAA in LocalStack
Section titled “Create an Airflow environment via MWAA in LocalStack”In order to create an Airflow environment in local MWAA, we can use the awslocal
command:
awslocal s3 mb s3://my-mwaa-bucketawslocal mwaa create-environment --dag-s3-path /dags \ --execution-role-arn arn:aws:iam::000000000000:role/airflow-role \ --network-configuration {} \ --source-bucket-arn arn:aws:s3:::my-mwaa-bucket \ --airflow-version 2.6.3 \ --name my-mwaa-env
Create an Airflow DAG script that connects to LocalStack Snowflake
Section titled “Create an Airflow DAG script that connects to LocalStack Snowflake”We can then create a local file my_dag.py
with the Airflow DAG definition, for example:
import datetimeimport json
from airflow import settingsfrom airflow.models import Connection, DAGfrom airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
# prepare session and connection info
session = settings.Session()conn_id = "c1"try: # try to look up local Snowflake connection in the session session.query(Connection).filter(Connection.conn_id == conn_id).one()except Exception: # create new Snowflake connection, if it doesn't exist yet conn = Connection( conn_id=conn_id, conn_type="snowflake", login="test", password="test", extra=json.dumps({"account": "test", "host": "snowflake.localhost.localstack.cloud", "port": 4566}) ) session.add(conn) session.commit()
# create DAG
my_dag = DAG( "sf_dag1", start_date=datetime.datetime.utcnow(), default_args={"snowflake_conn_id": conn_id}, catchup=False,)
# add Snowflake operator to DAG
sf_task_1 = SnowflakeOperator( task_id="sf_query_1", dag=my_dag, sql=""" CREATE TABLE IF NOT EXISTS test(id INT); COPY INTO test (id) VALUES (1), (2), (3); SELECT * FROM test """,)
Patching the SnowflakeOperator
in the DAG script
Section titled “Patching the SnowflakeOperator in the DAG script”In order to use the SnowflakeOperator
in your Airflow DAG, a small patch is required in the code.
The code listings below contain the patch for different Airflow versions - simply copy the relevant snippet and paste it into the top of your DAG script (e.g., my_dag.py
).
Airflow version 2.6.3 and above:
# ---# patch for local Snowflake connection, for Airflow 2.6.3 and abovefrom airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
def _get_conn_params(self): result = self._get_conn_params_orig() conn = self.get_connection(self.snowflake_conn_id) extra_dict = conn.extra_dejson if extra_dict.get("host"): result["host"] = extra_dict["host"] if extra_dict.get("port"): result["port"] = extra_dict["port"] return result
SnowflakeHook._get_conn_params_orig = SnowflakeHook._get_conn_paramsSnowflakeHook._get_conn_params = _get_conn_params# ---
# ... rest of your DAG script below ...
Airflow version 2.9.2 and above:
# ---# patch for local Snowflake connection, for Airflow 2.9.2 / 2.10.1from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
@propertydef _get_conn_params(self): result = self._get_conn_params_orig conn = self.get_connection(self.snowflake_conn_id) extra_dict = conn.extra_dejson if extra_dict.get("host"): result["host"] = extra_dict["host"] if extra_dict.get("port"): result["port"] = extra_dict["port"] return result
SnowflakeHook._get_conn_params_orig = SnowflakeHook._get_conn_paramsSnowflakeHook._get_conn_params = _get_conn_params# ---
# ... rest of your DAG script below ...
Deploying the DAG to Airflow
Section titled “Deploying the DAG to Airflow”Next, we copy the my_dag.py
file to the /dags
folder within the my-mwaa-bucket
S3 bucket, to trigger the deployment of the DAG in Airflow:
awslocal s3 cp my_dag.py s3://my-mwaa-bucket/dags/
You should then be able to open the Airflow UI (e.g., http://localhost.localstack.cloud:4510/dags) to view the status of the DAG and trigger a DAG run.