search iconsearch icon
Type something to search...

DBT testing with DuckDB

DBT testing with DuckDB

0. Intro

In our previous post, we discussed how to self-host DBT on AWS ECS (Fargate) for cost-effective scalability and better control over data transformation processes. If you haven’t read that post, see Domain LogoRunning DBT with AWS ECS (Fargate). This time, we’ll take it a step further by diving into testing DBT projects with DuckDB, ensuring the quality and consistency of your SQL code.

Domain LogoDuckDB, an embedded analytical database, offers a fast, lightweight SQL engine, perfect for testing DBT models. By integrating SQL linting, smoke testing, and automation, we can maintain high-quality DBT projects. This guide will cover setting up a SQL linter with Sqlfluff, automating testing with pre-commit hooks, and creating a streamlined continuous integration (CI) pipeline.

1. SQL Linter

The idea is to have a SQL linter that helps get well-formatted SQL code with a unified style inside the repo. As the linter, I recommend using Domain LogoSqlfluff.

1.1. Integrating it with DBT

To correctly parse DBT code (especially the templating part), we need to set it up properly. The easiest way is to create the .sqlfluff file in the repo root as follows:

.sqlfluff

[sqlfluff]
dialect = duckdb
templater = dbt
runaway_limit = 10
max_line_length = 100
indent_unit = space
exclude_rules = RF01, RF03, ST06, AL09
warnings = None

[sqlfluff:templater:dbt]
project_dir = ./dbt_src/
profiles_dir = ./.github/

Feel free to change the settings such as the excluded rules.

Notice that we are using DuckDB as the dialect and that we need to set a couple of directories:

  1. project_dir: with the root of the DBT project
  2. profiles_dir: with the path where a DBT profile can be found (only for linting purposes).

And here is the very simple profile I use:

./dbt_src/profiles.yml

# This file is only used so that `pre-commit` can properly work
#   (for sqlfluff)

dbt_src:
  outputs:
    dev:
      type: duckdb
  target: dev

If needed, you can exclude some paths using the .sqlfluffignore file as mentioned in Domain LogoSqlfluff Configuration

1.2. Automatic testing with pre-commit

With pre-commit, you can run the linter before each commit. More info about it in Domain Logopre-commit.

The linting job takes between a few seconds to a few minutes. You must decide if you are willing to spend that time prior to each commit to ensure properly linted SQL code. If not, do not set pre-commit.

To set up sqlfluff, you only need to add these lines to the .pre-commit-config.yaml file:

.pre-commit-config.yaml

repos:

# SQL linter
- repo: https://github.com/sqlfluff/sqlfluff
  rev: 3.0.6
  hooks:
  - id: sqlfluff-lint
    entry: sqlfluff lint --processes 0 --disable-progress-bar
    additional_dependencies: [ # Should match what we have in prod
      'dbt-duckdb==1.6.2',
      'sqlfluff-templater-dbt==3.0.6'
    ]
  - id: sqlfluff-fix
    entry: sqlfluff fix --show-lint-violations --processes 0 --disable-progress-bar
    additional_dependencies: [ # Should match what we have in prod
      'dbt-duckdb==1.6.2',
      'sqlfluff-templater-dbt==3.0.6'
    ]

Here we are using 2 jobs:

  1. sqlfluff-lint: this gives better information about the failures.
  2. sqlfluff-fix: this tries to fix the errors. It doesn’t always succeed.

If you want to make it faster, you can keep only one of the 2 jobs. I prefer having both the fixing capabilities and better failure information.

2. Smoke testing with DuckDB

The testing strategy I’ve seen in most DBT projects is to rely on users to manually run DBT to validate that the code works. This is problematic since users might not run those tests (especially first-time contributors).

To have some guarantees over the DBT code, we can do smoke testing using Domain LogoDuckDB. The idea is to have some testing data (it can either be anonymized prod data or synthetic data) so that we can run DBT using Domain Logodbt-duckdb. This can be automated to run at every pull request, ensuring some guarantees about the code.

3. Testing data

The idea is to have some files that define each table and to have a script that exports all of that into a duckdb file that can be used for DBT.

It should be something like:

- testing_data
  ├── schema_1
  │   ├── table_1
  │   ├── table_2
  │   ├── ...
  │   └── table_N
  ├── schema_2
  │   ├── table_1
  │   ├── ...
  │   └── table_N
  └── ...

3.1. Export CSVs

The simplest way of exporting data into duckdb is by a CSV file.

That file can later be imported with something like:

CREATE TABLE "{db}"."{table}"
AS SELECT *
FROM read_csv('{path}/{table}.csv', header=True)

See Domain LogoCSV - DuckDB for more information about read_csv.

3.2. Manual SQLs

There are some cases where the read_csv doesn’t work as expected. This has happened to me mostly when I have non-primitive types such as MAP, STRUCT, and/or JSON in one column.

To handle such cases, I define a sql file with:

  1. Manual table creation
  2. Manual inserts for some testing data

As an example, it would be something like this:

CREATE TABLE bronze__metadata.emr_history
(
    applicationId VARCHAR,
    jobRunId VARCHAR,
    name VARCHAR,
    createdAt TIMESTAMP,
    updatedAt TIMESTAMP,    
    executionRole VARCHAR,
    state VARCHAR,
    stateDetails VARCHAR,
    totalResourceUtilization STRUCT(
        vCPUHour DECIMAL(10, 3),
        memoryGBHour DECIMAL(10, 3),
        storageGBHour DECIMAL(10, 3)
    ),
    networkConfiguration STRUCT(
        subnetIds VARCHAR[],
        securityGroupIds VARCHAR[]
    ),
    totalExecutionDurationSeconds BIGINT,
    executionTimeoutMinutes BIGINT,
    billedResourceUtilization STRUCT(
        vCPUHour DECIMAL(10, 3),
        memoryGBHour DECIMAL(10, 3),
        storageGBHour DECIMAL(10, 3)
    ),
    event_params MAP(
        VARCHAR,
        STRUCT(
            string_value VARCHAR,
            int_value BIGINT,
            double_value DOUBLE
        )
    ),
    p_extracted_at VARCHAR
);

INSERT INTO bronze__metadata.emr_history VALUES (
    '00fdi6bg3i758g0p', -- applicationId
    '00fginrueqgga00q', -- jobRunId
    'v7__crm.export-clone', -- name
    TIMESTAMP '2024-01-26 08:57:44.507', -- createdAt
    TIMESTAMP '2024-01-26 09:22:24.045', -- updatedAt
    'arn:aws:iam::634077897723:role/AmazonEMR-ExecutionRole-1685085488821', -- executionRole
    'SUCCESS', -- state
    NULL, -- stateDetails
    {
        'vCPUHour': 3.559,
        'memoryGBHour': 17.794,
        'storageGBHour': 17.794
    }, -- totalResourceUtilization
    {
        'subnetIds': ['subnet-0075f5bbf95862c10'],
        'securityGroupIds': ['sg-09ba4721cd92f7230']
    }, -- networkConfiguration
    1338, -- totalExecutionDurationSeconds
    360, -- executionTimeoutMinutes
    {
        'vCPUHour': 3.559,
        'memoryGBHour': 14.236,
        'storageGBHour': 0.0
    }, -- billedResourceUtilization
    MAP {
        'plataforma': {'string_value': 'landing', 'int_value': NULL, 'double_value': NULL}, 
        'subtipo_formulario': {'string_value': 'si', 'int_value': NULL, 'double_value': NULL}
    }, -- event_params
    '2024-01-31T13:00:39.571930' -- p_extracted_at
);

In this example, you can see how to create most types such as STRUCT, MAP, and ARRAYS.

3.3. Generate testing data script

To export all that data, I use a script that:

  1. Inspects everything in the testing_data folder
  2. Loads all csv
  3. Runs all sql files

The script is the following:

.github/scripts/create_duck_db.py

import os
import re
import shutil
import sys

import duckdb
from loguru import logger

PATH_DBT = "dbt_src"

DUCK_DB = f"{PATH_DBT}/awsdatacatalog.duckdb" # The name is for compatibility with Athena
PATH_DATA = "testing_data"

PROFILES_IN = "profiles/profiles.ci.yml"
PROFILES_OUT = f"{PATH_DBT}/profiles.yml"

IMPORT_STATEMENT = """
CREATE TABLE "{db}"."{table}"
AS SELECT *
FROM read_csv('{path}/{table}.csv', header=True)
"""


def remove_database_if_exists():
    if os.path.exists(DUCK_DB):
        logger.info(f"{DUCK_DB=} already exists, removing it")
        os.remove(DUCK_DB)


def get_databases_with_tables():
    logger.info("Reading local data to infer databases and tables to import")
    out_csv = {}
    out_sql = {}
    count_tables = 0

    for db in os.listdir(PATH_DATA):
        if "." in db:
            continue

        logger.info(f"Reading {db=}")
        csv = {}
        sql = {}
        for path, _, filenames in os.walk(f"{PATH_DATA}/{db}"):
            for filename in filenames:
                table, extension = filename.split(".")

                if extension == "csv":
                    csv[table] = path

                if extension == "sql":
                    with open(f"{path}/{filename}") as stream:
                        data = stream.read()

                    # Split queries and remove whitespaces
                    sql[f"{db}.{table}"] = [x.strip() for x in data.split(";") if x]

        logger.debug(f"There are {len(csv)} csv in {db=}")
        logger.debug(f"There are {len(sql)} sql in {db=}")
        out_csv[db] = csv
        out_sql[db] = sql
        count_tables += len(csv) + len(sql)

    count_dbs = len(set(out_csv).union(set(out_sql)))
    logger.success(f"{count_dbs} databases (with {count_tables} tables) where scanned")
    return out_csv, out_sql


def export_csv_tables(con, data):
    logger.info(f"Exporting all 'csv' tables in {DUCK_DB=}")

    for db, tables in data.items():
        if not tables:
            continue

        logger.info(f"Creating schema '{db}'")
        con.sql(f'CREATE SCHEMA "{db}"')

        logger.info(f"Creating all {len(tables)} tables in {db=}")
        for table, path in tables.items():
            logger.debug(f"* Creating table '{db}.{table}'")
            con.sql(IMPORT_STATEMENT.format(db=db, table=table, path=path))

    logger.success(f"All 'csv' tables exported to {DUCK_DB=}")


def export_sql_tables(con, data):
    logger.info(f"Exporting all 'sql' tables in {DUCK_DB=}")

    for db, tables in data.items():
        if not tables:
            continue

        logger.info(f"Creating all {len(tables)} tables in {db=}")

        for table, queries in tables.items():
            logger.debug(f"Running {len(queries)} queries for '{db}.{table}'")

            for query in queries:
                if not query:
                    continue

                query_preview = re.sub(" +", " ", query.replace("\n", " "))[:100]
                logger.debug(f"* Running query='{query_preview}...'")
                con.sql(query)

    logger.success(f"All 'sql' tables exported to {DUCK_DB=}")


def copy_profiles_file():
    if "win" in sys.platform:
        logger.warning(
            "Skipping copying 'profiles.yml' since you are on windows (not in a github action)"
        )

    else:
        logger.info(f"Copying '{PROFILES_IN}' to '{PROFILES_OUT}'")
        shutil.copy2(PROFILES_IN, PROFILES_OUT)


def main():
    remove_database_if_exists()
    data_csv, data_sql = get_databases_with_tables()

    with duckdb.connect(DUCK_DB) as con:
        export_csv_tables(con, data_csv)
        export_sql_tables(con, data_sql)

    copy_profiles_file()


if __name__ == "__main__":
    main()

4. Running dbt-duckdb

To do the smoke test, you can run DBT the same way you run it in production. That could be dbt build --select xx --exclude yy or using a Python script as described in Domain LogoRunning DBT with AWS ECS (Fargate).

There might be some models or tests that you don’t want to test during CI (such as testing data freshness). To handle that, I use the no_test tag and pass --exclude tag:no_test when running DBT.

4.1. In memory vs working with a file

Domain Logodbt-duckdb can run in 2 ways:

  1. With a duckdb file
  2. In memory

Option 1 is the simplest given that we have a duckdb file with the testing data. With this option, each model run will be exported in the duckdb file. This is very useful when debugging since you can do manual queries to inspect the results and/or manually retry failed queries.

However, this way of working implies I/O operations which makes DBT run sequentially and not in parallel. To speed this up, we can run DBT in memory and attach a file for the testing data. This is what I recommend doing during CI.

Here is how you would declare both profiles:

dbt_northius:
  target: villoro_ci
  outputs:
    villoro_ci:
      type: duckdb
      path: ":memory:"
      attach:
        - path: awsdatacatalog.duckdb # The name is for compatibility with Athena
          alias: ""

    villoro_sequencial_ci:
      type: duckdb
      path: awsdatacatalog.duckdb # The name is for compatibility with Athena


    # Other profiles go here, such as PRE and PRO

The naming for awsdatacatalog.duckdb is for compatibility with Athena, more on that in the next section.

4.3. Duckdb filename

You might wonder why the duckdb file is called awsdatacatalog.duckdb, and behind that name, there is a very interesting trick. In production, I’m using Athena (see Domain LogoRunning DBT with AWS ECS (Fargate)) and there the default catalog is called awsdatacatalog. If you want to query the table athena_history inside the bronze__metadata schema you can either do:

SELECT * FROM bronze__metadata.athena_history
SELECT * FROM awsdatacatalog.bronze__metadata.athena_history

That allows us to define the DBT sources like:

sources:
  - name: nt_bronze__metadata
    database: awsdatacatalog # Needed for duckdb compatibility
    tags: ["metadata"]
    description: Raw metadata tables
    tables:
      - name: athena_history
      - name: dbt_execution
      - name: dbt_run_results
      - name: emr_history
      - name: raw_files

So queries using source(xx) will have the prefix awsdatacatalog and queries using ref(xx) will not have it. With duckdb, that implies that source(xx) will be read from the awsdatacatalog.duckdb file and ref(xx) will be run in memory.

Running duckdb in memory is ~8x faster based on my current project with around 1000 models.

5. Continuous Integration (CI)

Once you are able to run DBT locally with DuckDB, you can set up the Continuous Integration (CI) job. With that, it will run at each commit done in a pull request.

name: CI

on:
  pull_request:

concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: true

jobs:
  dbt_all:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - uses: dorny/paths-filter@v2
      id: changes
      with:
        filters: |
          dbt:
            - 'dbt_src/**'
            - 'testing_data/**'

    - uses: actions/setup-python@v4
      if: steps.changes.outputs.dbt == 'true'

    - name: Install poetry
      if: steps.changes.outputs.dbt == 'true'
      run: pip install poetry==1.6.1 # Should match .github/docker/Dockerfile.base
    - name: Install dependencies
      if: steps.changes.outputs.dbt == 'true'
      run: poetry install

    - name: Create duckdb DB
      if: steps.changes.outputs.dbt == 'true'
      run: poetry run python .github/scripts/create_duck_db.py

    - name: Test pipeline
      if: steps.changes.outputs.dbt == 'true'
      run: |
        cd dbt_src
        poetry run python run.py --exclude tag:no_test

Remember to set this test as a mandatory prerequisite for merging PRs into the main branch.

6. Integrating with other SQL engines

You probably will use another engine in production instead of duckdb. In my case, I use Domain Logodbt-athena. The problem with that is that some SQL functions will be different and you will need to adapt your code so that it works with both engines.

The way to solve that problem is with DBT macros that do different things based on the engine (or dbt adapter) you are using. You can do that by calling adapter.type().

For example, imagine you want to get the last element of an array column. In duckdb, you can do last but in Athena, you should do last_value. By defining the last macro you could handle that difference:

{%- macro last(column) -%}
    {%- if (adapter.type() == "athena") -%}
        last_value({{ column }})
    {%- else -%}
        last({{ column }})
    {%- endif -%}
{%- endmacro -%}

Another situation that you might need to handle differently is when you want to run table maintenance operations such as OPTIMIZE for Domain LogoIceberg.

That kind of operation is usually done with a post-hook (see Domain LogoDBT | pre-hook & post-hook).

So insted of adding the optimize there like:

{{
  config(
    post_hook = "OPTIMIZE {{ this }} REWRITE DATA USING BIN_PACK;"
  )
}}

You should first define a macro:

{%- macro optimize(table=this) -%}
    {%- if (adapter.type() == "athena")
        and (model.config.table_type == "iceberg")
        and (model.config.materialized != "view")
    -%}
        OPTIMIZE {{ table }} REWRITE DATA USING BIN_PACK;
    {%- endif -%}
{%- endmacro -%}

Then you can simply call it with:

+post-hook:
  - "{{ optimize() }}"

7. Conclusion

Testing is a critical part of any data engineering workflow, and integrating DBT with DuckDB for testing can significantly enhance the reliability of your data transformation processes. By setting up SQL linting, automating tests with pre-commit hooks, and establishing a CI pipeline, you can ensure that your DBT models are robust and error-free.

In this post, we’ve shown how to leverage DuckDB for efficient testing and provided detailed steps to implement these practices in your DBT projects. By following these guidelines, you can maintain high standards for your SQL code and improve the overall quality of your data workflows.

We hope you find this guide helpful and encourage you to explore these testing strategies to keep your DBT projects running smoothly. As always, feel free to share your feedback or any questions you might have. Happy testing!