DBT testing with DuckDB
- 2024-06-19
- ⚙️ DE
- SQL DE Best Practices DBT
0. Intro
In our previous post, we discussed how to self-host DBT on AWS ECS (Fargate) for cost-effective scalability and better control over data transformation processes. If you haven’t read that post, see Running DBT with AWS ECS (Fargate). This time, we’ll take it a step further by diving into testing DBT projects with DuckDB, ensuring the quality and consistency of your SQL code.
DuckDB, an embedded analytical database, offers a fast, lightweight SQL engine, perfect for testing DBT models. By integrating SQL linting, smoke testing, and automation, we can maintain high-quality DBT projects. This guide will cover setting up a SQL linter with Sqlfluff, automating testing with pre-commit hooks, and creating a streamlined continuous integration (CI) pipeline.
1. SQL Linter
The idea is to have a SQL linter that helps get well-formatted SQL code with a unified style inside the repo. As the linter, I recommend using Sqlfluff.
1.1. Integrating it with DBT
To correctly parse DBT code (especially the templating part), we need to set it up properly.
The easiest way is to create the .sqlfluff
file in the repo root as follows:
.sqlfluff
[sqlfluff]
dialect = duckdb
templater = dbt
runaway_limit = 10
max_line_length = 100
indent_unit = space
exclude_rules = RF01, RF03, ST06, AL09
warnings = None
[sqlfluff:templater:dbt]
project_dir = ./dbt_src/
profiles_dir = ./.github/
Feel free to change the settings such as the excluded rules.
Notice that we are using DuckDB as the dialect and that we need to set a couple of directories:
project_dir
: with the root of the DBT projectprofiles_dir
: with the path where a DBT profile can be found (only for linting purposes).
And here is the very simple profile I use:
./dbt_src/profiles.yml
# This file is only used so that `pre-commit` can properly work
# (for sqlfluff)
dbt_src:
outputs:
dev:
type: duckdb
target: dev
If needed, you can exclude some paths using the .sqlfluffignore
file as mentioned in Sqlfluff Configuration
1.2. Automatic testing with pre-commit
With pre-commit, you can run the linter before each commit. More info about it in pre-commit.
The linting job takes between a few seconds to a few minutes. You must decide if you are willing to spend that time prior to each commit to ensure properly linted SQL code. If not, do not set pre-commit
.
To set up sqlfluff
, you only need to add these lines to the .pre-commit-config.yaml
file:
.pre-commit-config.yaml
repos:
# SQL linter
- repo: https://github.com/sqlfluff/sqlfluff
rev: 3.0.6
hooks:
- id: sqlfluff-lint
entry: sqlfluff lint --processes 0 --disable-progress-bar
additional_dependencies: [ # Should match what we have in prod
'dbt-duckdb==1.6.2',
'sqlfluff-templater-dbt==3.0.6'
]
- id: sqlfluff-fix
entry: sqlfluff fix --show-lint-violations --processes 0 --disable-progress-bar
additional_dependencies: [ # Should match what we have in prod
'dbt-duckdb==1.6.2',
'sqlfluff-templater-dbt==3.0.6'
]
Here we are using 2 jobs:
sqlfluff-lint
: this gives better information about the failures.sqlfluff-fix
: this tries to fix the errors. It doesn’t always succeed.
If you want to make it faster, you can keep only one of the 2 jobs. I prefer having both the fixing capabilities and better failure information.
2. Smoke testing with DuckDB
The testing strategy I’ve seen in most DBT projects is to rely on users to manually run DBT to validate that the code works. This is problematic since users might not run those tests (especially first-time contributors).
To have some guarantees over the DBT code, we can do smoke testing using DuckDB. The idea is to have some testing data (it can either be anonymized prod data or synthetic data) so that we can run DBT using dbt-duckdb. This can be automated to run at every pull request, ensuring some guarantees about the code.
3. Testing data
The idea is to have some files that define each table and to have a script that exports all of that into a duckdb file that can be used for DBT.
It should be something like:
- testing_data
├── schema_1
│ ├── table_1
│ ├── table_2
│ ├── ...
│ └── table_N
├── schema_2
│ ├── table_1
│ ├── ...
│ └── table_N
└── ...
3.1. Export CSVs
The simplest way of exporting data into duckdb is by a CSV file.
That file can later be imported with something like:
CREATE TABLE "{db}"."{table}"
AS SELECT *
FROM read_csv('{path}/{table}.csv', header=True)
See CSV - DuckDB for more information about read_csv
.
3.2. Manual SQLs
There are some cases where the read_csv
doesn’t work as expected.
This has happened to me mostly when I have non-primitive types such as MAP
, STRUCT
, and/or JSON
in one column.
To handle such cases, I define a sql
file with:
- Manual table creation
- Manual inserts for some testing data
As an example, it would be something like this:
CREATE TABLE bronze__metadata.emr_history
(
applicationId VARCHAR,
jobRunId VARCHAR,
name VARCHAR,
createdAt TIMESTAMP,
updatedAt TIMESTAMP,
executionRole VARCHAR,
state VARCHAR,
stateDetails VARCHAR,
totalResourceUtilization STRUCT(
vCPUHour DECIMAL(10, 3),
memoryGBHour DECIMAL(10, 3),
storageGBHour DECIMAL(10, 3)
),
networkConfiguration STRUCT(
subnetIds VARCHAR[],
securityGroupIds VARCHAR[]
),
totalExecutionDurationSeconds BIGINT,
executionTimeoutMinutes BIGINT,
billedResourceUtilization STRUCT(
vCPUHour DECIMAL(10, 3),
memoryGBHour DECIMAL(10, 3),
storageGBHour DECIMAL(10, 3)
),
event_params MAP(
VARCHAR,
STRUCT(
string_value VARCHAR,
int_value BIGINT,
double_value DOUBLE
)
),
p_extracted_at VARCHAR
);
INSERT INTO bronze__metadata.emr_history VALUES (
'00fdi6bg3i758g0p', -- applicationId
'00fginrueqgga00q', -- jobRunId
'v7__crm.export-clone', -- name
TIMESTAMP '2024-01-26 08:57:44.507', -- createdAt
TIMESTAMP '2024-01-26 09:22:24.045', -- updatedAt
'arn:aws:iam::634077897723:role/AmazonEMR-ExecutionRole-1685085488821', -- executionRole
'SUCCESS', -- state
NULL, -- stateDetails
{
'vCPUHour': 3.559,
'memoryGBHour': 17.794,
'storageGBHour': 17.794
}, -- totalResourceUtilization
{
'subnetIds': ['subnet-0075f5bbf95862c10'],
'securityGroupIds': ['sg-09ba4721cd92f7230']
}, -- networkConfiguration
1338, -- totalExecutionDurationSeconds
360, -- executionTimeoutMinutes
{
'vCPUHour': 3.559,
'memoryGBHour': 14.236,
'storageGBHour': 0.0
}, -- billedResourceUtilization
MAP {
'plataforma': {'string_value': 'landing', 'int_value': NULL, 'double_value': NULL},
'subtipo_formulario': {'string_value': 'si', 'int_value': NULL, 'double_value': NULL}
}, -- event_params
'2024-01-31T13:00:39.571930' -- p_extracted_at
);
In this example, you can see how to create most types such as STRUCT
, MAP
, and ARRAYS
.
3.3. Generate testing data script
To export all that data, I use a script that:
- Inspects everything in the
testing_data
folder - Loads all
csv
- Runs all
sql
files
The script is the following:
.github/scripts/create_duck_db.py
import os
import re
import shutil
import sys
import duckdb
from loguru import logger
PATH_DBT = "dbt_src"
DUCK_DB = f"{PATH_DBT}/awsdatacatalog.duckdb" # The name is for compatibility with Athena
PATH_DATA = "testing_data"
PROFILES_IN = "profiles/profiles.ci.yml"
PROFILES_OUT = f"{PATH_DBT}/profiles.yml"
IMPORT_STATEMENT = """
CREATE TABLE "{db}"."{table}"
AS SELECT *
FROM read_csv('{path}/{table}.csv', header=True)
"""
def remove_database_if_exists():
if os.path.exists(DUCK_DB):
logger.info(f"{DUCK_DB=} already exists, removing it")
os.remove(DUCK_DB)
def get_databases_with_tables():
logger.info("Reading local data to infer databases and tables to import")
out_csv = {}
out_sql = {}
count_tables = 0
for db in os.listdir(PATH_DATA):
if "." in db:
continue
logger.info(f"Reading {db=}")
csv = {}
sql = {}
for path, _, filenames in os.walk(f"{PATH_DATA}/{db}"):
for filename in filenames:
table, extension = filename.split(".")
if extension == "csv":
csv[table] = path
if extension == "sql":
with open(f"{path}/{filename}") as stream:
data = stream.read()
# Split queries and remove whitespaces
sql[f"{db}.{table}"] = [x.strip() for x in data.split(";") if x]
logger.debug(f"There are {len(csv)} csv in {db=}")
logger.debug(f"There are {len(sql)} sql in {db=}")
out_csv[db] = csv
out_sql[db] = sql
count_tables += len(csv) + len(sql)
count_dbs = len(set(out_csv).union(set(out_sql)))
logger.success(f"{count_dbs} databases (with {count_tables} tables) where scanned")
return out_csv, out_sql
def export_csv_tables(con, data):
logger.info(f"Exporting all 'csv' tables in {DUCK_DB=}")
for db, tables in data.items():
if not tables:
continue
logger.info(f"Creating schema '{db}'")
con.sql(f'CREATE SCHEMA "{db}"')
logger.info(f"Creating all {len(tables)} tables in {db=}")
for table, path in tables.items():
logger.debug(f"* Creating table '{db}.{table}'")
con.sql(IMPORT_STATEMENT.format(db=db, table=table, path=path))
logger.success(f"All 'csv' tables exported to {DUCK_DB=}")
def export_sql_tables(con, data):
logger.info(f"Exporting all 'sql' tables in {DUCK_DB=}")
for db, tables in data.items():
if not tables:
continue
logger.info(f"Creating all {len(tables)} tables in {db=}")
for table, queries in tables.items():
logger.debug(f"Running {len(queries)} queries for '{db}.{table}'")
for query in queries:
if not query:
continue
query_preview = re.sub(" +", " ", query.replace("\n", " "))[:100]
logger.debug(f"* Running query='{query_preview}...'")
con.sql(query)
logger.success(f"All 'sql' tables exported to {DUCK_DB=}")
def copy_profiles_file():
if "win" in sys.platform:
logger.warning(
"Skipping copying 'profiles.yml' since you are on windows (not in a github action)"
)
else:
logger.info(f"Copying '{PROFILES_IN}' to '{PROFILES_OUT}'")
shutil.copy2(PROFILES_IN, PROFILES_OUT)
def main():
remove_database_if_exists()
data_csv, data_sql = get_databases_with_tables()
with duckdb.connect(DUCK_DB) as con:
export_csv_tables(con, data_csv)
export_sql_tables(con, data_sql)
copy_profiles_file()
if __name__ == "__main__":
main()
4. Running dbt-duckdb
To do the smoke test, you can run DBT the same way you run it in production.
That could be dbt build --select xx --exclude yy
or using a Python script as described in Running DBT with AWS ECS (Fargate).
There might be some models or tests that you don’t want to test during CI (such as testing data freshness).
To handle that, I use the no_test
tag and pass --exclude tag:no_test
when running DBT.
4.1. In memory vs working with a file
dbt-duckdb can run in 2 ways:
- With a
duckdb
file - In
memory
Option 1 is the simplest given that we have a duckdb file with the testing data. With this option, each model run will be exported in the duckdb file. This is very useful when debugging since you can do manual queries to inspect the results and/or manually retry failed queries.
However, this way of working implies I/O operations which makes DBT run sequentially and not in parallel. To speed this up, we can run DBT in memory and attach a file for the testing data. This is what I recommend doing during CI.
Here is how you would declare both profiles:
dbt_northius:
target: villoro_ci
outputs:
villoro_ci:
type: duckdb
path: ":memory:"
attach:
- path: awsdatacatalog.duckdb # The name is for compatibility with Athena
alias: ""
villoro_sequencial_ci:
type: duckdb
path: awsdatacatalog.duckdb # The name is for compatibility with Athena
# Other profiles go here, such as PRE and PRO
The naming for awsdatacatalog.duckdb
is for compatibility with Athena, more on that in the next section.
4.3. Duckdb filename
You might wonder why the duckdb file is called awsdatacatalog.duckdb
, and behind that name, there is a very interesting trick.
In production, I’m using Athena (see Running DBT with AWS ECS (Fargate)) and there the default catalog is called awsdatacatalog
.
If you want to query the table athena_history
inside the bronze__metadata
schema you can either do:
SELECT * FROM bronze__metadata.athena_history
SELECT * FROM awsdatacatalog.bronze__metadata.athena_history
That allows us to define the DBT sources like:
sources:
- name: nt_bronze__metadata
database: awsdatacatalog # Needed for duckdb compatibility
tags: ["metadata"]
description: Raw metadata tables
tables:
- name: athena_history
- name: dbt_execution
- name: dbt_run_results
- name: emr_history
- name: raw_files
So queries using source(xx)
will have the prefix awsdatacatalog
and queries using ref(xx)
will not have it.
With duckdb, that implies that source(xx)
will be read from the awsdatacatalog.duckdb
file and ref(xx)
will be run in memory.
Running duckdb in memory is ~8x faster based on my current project with around 1000 models.
5. Continuous Integration (CI)
Once you are able to run DBT locally with DuckDB, you can set up the Continuous Integration (CI) job. With that, it will run at each commit done in a pull request.
name: CI
on:
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
dbt_all:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
dbt:
- 'dbt_src/**'
- 'testing_data/**'
- uses: actions/setup-python@v4
if: steps.changes.outputs.dbt == 'true'
- name: Install poetry
if: steps.changes.outputs.dbt == 'true'
run: pip install poetry==1.6.1 # Should match .github/docker/Dockerfile.base
- name: Install dependencies
if: steps.changes.outputs.dbt == 'true'
run: poetry install
- name: Create duckdb DB
if: steps.changes.outputs.dbt == 'true'
run: poetry run python .github/scripts/create_duck_db.py
- name: Test pipeline
if: steps.changes.outputs.dbt == 'true'
run: |
cd dbt_src
poetry run python run.py --exclude tag:no_test
Remember to set this test as a mandatory prerequisite for merging PRs into the main
branch.
6. Integrating with other SQL engines
You probably will use another engine in production instead of duckdb. In my case, I use dbt-athena. The problem with that is that some SQL functions will be different and you will need to adapt your code so that it works with both engines.
The way to solve that problem is with DBT macros that do different things based on the engine (or dbt adapter) you are using. You can do that by calling adapter.type()
.
For example, imagine you want to get the last element of an array column. In duckdb, you can do last
but in Athena, you should do last_value
.
By defining the last
macro you could handle that difference:
{%- macro last(column) -%}
{%- if (adapter.type() == "athena") -%}
last_value({{ column }})
{%- else -%}
last({{ column }})
{%- endif -%}
{%- endmacro -%}
Another situation that you might need to handle differently is when you want to run table maintenance operations such as OPTIMIZE
for Iceberg.
That kind of operation is usually done with a post-hook
(see DBT | pre-hook & post-hook).
So insted of adding the optimize
there like:
{{
config(
post_hook = "OPTIMIZE {{ this }} REWRITE DATA USING BIN_PACK;"
)
}}
You should first define a macro:
{%- macro optimize(table=this) -%}
{%- if (adapter.type() == "athena")
and (model.config.table_type == "iceberg")
and (model.config.materialized != "view")
-%}
OPTIMIZE {{ table }} REWRITE DATA USING BIN_PACK;
{%- endif -%}
{%- endmacro -%}
Then you can simply call it with:
+post-hook:
- "{{ optimize() }}"
7. Conclusion
Testing is a critical part of any data engineering workflow, and integrating DBT with DuckDB for testing can significantly enhance the reliability of your data transformation processes. By setting up SQL linting, automating tests with pre-commit hooks, and establishing a CI pipeline, you can ensure that your DBT models are robust and error-free.
In this post, we’ve shown how to leverage DuckDB for efficient testing and provided detailed steps to implement these practices in your DBT projects. By following these guidelines, you can maintain high standards for your SQL code and improve the overall quality of your data workflows.
We hope you find this guide helpful and encourage you to explore these testing strategies to keep your DBT projects running smoothly. As always, feel free to share your feedback or any questions you might have. Happy testing!