Blog

Testing Airflow

Testing plays an important role in ensuring our code is error and bug free so that we can freely implement new features without worrying about any impact on existing functionality. These testing efforts also extend to Airflow, maintained by the data team.

Background

Airflow is a workflow orchestration tool that we use for scheduling our ELT processes across AWS Glue, dbt and others. As Airflow is Python based, we are able to apply all standard unit testing practices on many functions that we have implemented within Airflow. Testing starts to get tricky when we want to unit test functions that have some reliance on external services such as AWS. It gets even trickier when we start wanting to write integration tests to evaluate Airflow DAG imports and runs.

Mocking External Services

A large part of our use of Airflow deals with orchestrating AWS Glue crawler and job runs in addition to interacting with other AWS services, such as S3. One way of testing these functions is to provide credentials to a real AWS environment. This is problematic however, as it introduces latency in the tests and complexity when attempting to set up resources on an AWS environment for testing. An alternative to this is mocking, allowing us to simulate an AWS environment. This approach eliminates latency entirely, gives us fine-grained control of the mocked AWS environment and enables us to mock resources at test time. We use a library called moto for these purposes.

In conjunction with pytest, we are able to set up AWS test fixtures like so.

AWS credentials:

@pytest.fixture(scope="function")
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ["AWS_ACCESS_KEY_ID"] = "test_access_key_id"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "test_secret_access_key"
    os.environ["AWS_SECURITY_TOKEN"] = "test_security_token"
    os.environ["AWS_SESSION_TOKEN"] = "test_session_token"
    os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

Glue resource:

@pytest.fixture(scope="function")
def glue_client(aws_credentials):
    with mock_glue():
        yield boto3.client(
            "glue",
            config=botocore.config.Config(
                retries={"mode": "adaptive"},
            ),
        )

These fixtures can be used for any test cases where we rely on the existence of a particular AWS resource. For example, we generate some of our Airflow DAGs dynamically to trigger Glue jobs where each Glue job generates a DAG. We can test this functionality using our mocked AWS services.

def test_glue_job_dynamic_dag(glue_client, sts_client, airflow_variables):
    number_of_glue_jobs = 4
    create_test_jobs(
        glue_client,
        number_of_glue_jobs,
        tags={},
    )
    dag_bag = airflow.models.DagBag(
        dag_folder=f"{DAG_DIR}/glue_job.py",
        include_examples=False,
    )
    assert len(dag_bag.dags) == number_of_glue_jobs + 1

Note the additional parameters sts_client and airflow_variables. These are additional mocked fixtures that we can use for a test case. Any number of fixtures can be passed to a test, leading to limitless possibilities on what can be tested.

Mocking The Airflow Metadata Database

Airflow uses a metadata database under the hood to track its own state. This includes a list of DAGs and their historical runs. To test DAG runs, Airflow requires an instance of this database to exist. Airflow supports the use of PostgreSQL, MySQL and MsSQL as its metadata store in production and SQLite for testing and development purposes - this is what we are using for simplicity here.

Mocking this database starts with declaring an environment variable to tell the tests where the metadata database is. We declared ours in init.py.

os.environ[
    "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"
] = f"sqlite:////{AIRFLOW_METASTORE_PATH}"

We can then create a function that uses the Python subprocess library to interact with the Airflow CLI and initialise the metadata database at the path specified above.

def init_airflow_db():
    init_metastore = subprocess.Popen(["airflow", "db", "init"])
    init_metastore.wait()

This function can be used in a pytest fixture so that we can initialise this database at test time. In our case, the fixture scope is at the module level, to reduce test latency, as the initialisation process is expensive.

@pytest.fixture(scope="module")
def crawler_db():
    yield init_airflow_db()
    destroy_airflow_db()

This fixture can be used for any test cases that simulate a DAG run. For example, we can test that a DAG should trigger a Glue crawler.

def test_trigger_glue_crawler(glue_client, crawler_db, airflow_variables, mocker):
    mocker.patch("utils.airflow.wait", return_value=None)
    crawler_name = create_test_crawler(glue_client)
    dag_bag = airflow.models.DagBag(
        dag_folder=f"{DAG_DIR}/glue_crawler.py",
        include_examples=False,
    )
    dag_bag.sync_to_db()
    for dag in dag_bag.dags.items():
        dag[1].run(
            start_date=datetime(2022, 1, 1, tzinfo=utc),
            end_date=datetime(2022, 1, 2, tzinfo=utc),
            continue_on_failures=True,
        )
    assert glue_client.get_crawler(Name=crawler_name)["Crawler"]["State"] == "RUNNING"

Packaging Tests In Docker

To simplify the execution of tests as part of a CI/CD pipeline, we can package our tests in Docker. This has the advantage of ensuring that we have a clean test environment for our test cases and eliminates the need to worry about whether the pipeline runner agent has all the dependencies required for the tests. It also allows us to run our tests against other external services that do not have mocking libraries available by simply spinning up these services as Docker containers.