Solving the probem with small files in the Data Lake
- 2024-10-23
- ⚙️ DE
- DE Performance Spark DBT
0. Motivation
In modern data lake architectures, managing the size and number of files is crucial for maintaining performance and cost-efficiency. During my work, I frequently encountered tables with a huge number of small files, which led to very poor query performance and significantly increased costs.
To tackle this issue, I started tracking how tables were stored and developed a keen interest in optimizing data storage practices. After investing considerable time into tracking and fixing these problems, I wanted to share my approach and insights.
This post aims to provide a comprehensive guide on understanding, identifying, and resolving the small file problem in data lakes, ensuring that your data processing remains efficient and cost-effective.
1. What’s the problem
In data lakes, managing the size of partitions is crucial for maintaining optimal performance and cost-efficiency. Partitions that are too small, generally less than 1 GB, can significantly degrade performance and inflate costs. This issue is well-documented in the Delta Lake Optimisation Guide, which highlights the adverse effects of small partitions on data processing and querying efficiency.
1.1. Performance
According to the article Fixing the Small File Problem in Data Lake with Spark:
Small files can cause a performance degradation of up to 4x.
This is because small files increase the number of tasks required to process the data, leading to higher overhead in task management and scheduling. Each task incurs a startup cost, and when there are many small tasks, this overhead can accumulate, resulting in significant slowdowns.
1.2. Costs
Another significant issue with small file partitions is the impact on costs. When I was working at Glovo, my colleagues observed that reducing the number of small files in heavily queried tables resulted in a 10x cost reduction.
This significant saving was primarily due to the decreased number of S3 GET requests
, which are a major cost factor when querying small files.
1.3. Default Behavior of Data Lake Writers
By default, many data lake writers tend to create a large number of small files. For instance, Apache Spark will write multiple small files if not properly configured. Similarly, with AWS Athena, it is challenging to control the file sizes, often resulting in tiny files being created. If not set up correctly, these default behaviors can lead to a proliferation of small files, exacerbating performance and cost issues in the data lake.
2. Good file size
Determining the optimal file size in a data lake can be challenging, as it depends on various factors such as the data processing engine, query patterns, and specific workload characteristics. However, most documentation and data processing engines recommend targeting a file size around 1 GB.
For example, the Delta Lake Small File Compaction with OPTIMIZE article highlights that Delta Lake suggests aiming for partitions close to 1 GB in size. This recommendation helps balance the performance benefits of larger files with the practical considerations of data processing and storage.
Similarly, the DuckDB | Parquet File Sizes guide suggests that an optimal file size for Parquet files typically ranges between 100 MB and 10 GB. This broad range takes into account different use cases and workload requirements, allowing for flexibility while still emphasizing the importance of avoiding too small files.
Around 1 GB per file is the usual target for optimal file size in data lakes.
3. How to avoid it?
The key to avoiding small files in your data lake is to control the number of files written during data processing. Ensuring that the files are of optimal size improves both performance and cost-efficiency.
3.2. Spark
In Spark, you can control the number of files written by using the sdf.repartition(n_files)
function, which allows you to specify the number of partitions (and hence the number of output files).
Additionally, you can configure Spark to manage the size of files by setting the minimum and maximum number of rows per file through the configuration options spark.sql.files.maxRecordsPerFile
and spark.sql.files.minRecordsPerFile
.
Using repartition
can cause significant shuffling of data, which might lead to performance issues.
The number of partitions also affects Spark’s parallelization; too few partitions can lead to underutilized resources, while too many can cause excessive overhead.
3.3. Athena with DBT
In AWS Athena, there is no direct way to control the number of files written during query execution.
However, you can use DBT to manage this.
By implementing a DBT post-hook, you can compact the files after they are written using the OPTIMIZE
command, which merges smaller files into larger ones.
This will only work for Iceberg tables.
For more details, refer to the official documentation: Optimizing Iceberg tables
Running OPTIMIZE
will generate additional files. To avoid extra storage costs and maintain optimal performance, it is important to run VACUUM
on the table.
The VACUUM
command removes any files that are no longer needed after the optimization process, freeing up storage space and reducing costs.
Here is the macros you will need:
macros/iceberg.sql
{%- macro optimize(table=this) -%}
{%- if (model.config.table_type == "iceberg")
and (model.config.materialized != "view")
and (adapter.type() == "athena")
-%}
OPTIMIZE {{ table }} REWRITE DATA USING BIN_PACK;
{%- endif -%}
{%- endmacro -%}
{%- macro vacuum(table=this) -%}
{%- if (model.config.table_type == "iceberg")
and (model.config.materialized != "view")
and (adapter.type() == "athena")
-%}
VACUUM {{ table }};
{%- endif -%}
{%- endmacro -%}
And the docs for those macros:
macros/iceberg.yml
version: 2
macros:
- name: optimize
description: |
Optimze iceberg tables (and skip it on duckdb).
More info: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html#querying-iceberg-data-optimization-rewrite-data-action
arguments:
- name: table
type: string
description: Table to `optimize`, by default it will use `this`
- name: vacuum
description: |
Vacuum iceberg tables (and skip it on duckdb).
More info: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html#querying-iceberg-vacuum
arguments:
- name: table
type: string
description: Table to `vacuum`, by default it will use `this`
And then, you can declare the post-hook in the dbt_project.yml
:
dbt_project.yml
models:
your_project:
+post-hook:
- "{{ optimize() }}"
- "{{ vacuum() }}"
4. Good partitioning
When it comes to partitioning tables in a data lake, it’s important to consider the type of data and its usage patterns. There are generally two types of tables:
-
Fact tables: These tables typically store raw data and are often partitioned by a
p_updated_at
,p_extracted_at
, or a similar time-based column. Partitioning by time allows for efficient querying of recent data but can lead to many small files if the partitioning interval is too granular. For example, if data is extracted every 3 hours, it would result in approximately 1500 files per year. -
Aggregated (or presentation) tables: These tables represent a final snapshot or aggregated view of the data. They often don’t need to be partitioned unless the dataset is very large. Partitioning might not be necessary for these tables as they are typically queried in their entirety or by specific dimensions that do not benefit significantly from partitioning.
4.1. Partitioning Fact tables
For fact tables, while partitioning by time is common, it can be more efficient to avoid partitioning if it leads to small files. Using data formats like Delta Lake or Apache Iceberg can help manage this.
According to Databricks documentation (When to partition tables on Databricks):
Most Delta tables with less than 1 TB of data do not require partitions.
So in many cases, the best partition strategy is to not partition at all.
Given that we will be appending new data based on the updated_at
/extracted_at
column, we can use row groups
(see: Parquet | Concepts) for effective data pruning even if there are no partitions.
With Iceberg, we can follow a similar strategy where small tables are not partitioned.
Both Delta Lake and Iceberg use parquet
under the hood, enabling them to leverage the same techniques for smart data pruning.
However, Iceberg offers additional options.
Iceberg provides functions to partition the data effectively, such as months(extracted_at)
, which can reduce the number of partitions and improve query performance.
See Iceberg | Partition Transforms.
This is possible because of Iceberg's hidden partitioning.
5. How to track the problem
Tracking the number of files and partitions each table has is crucial for managing data lakes efficiently. Both Iceberg and Delta provide ways to query the metadata, which can then be exported into a table. Ideally, this data should be exported every time a table is written.
5.1. Iceberg
With Iceberg, you can query the files
metadata, which gives you detailed information such as:
content | file_path | partition | record_count | file_size_in_bytes |
---|---|---|---|---|
0 | s3:/…/table/data/00000-3.parquet | {1999-01-01, 01} | 1 | 597 |
0 | s3:/…/table/data/00001-4.parquet | {1999-01-01, 02} | 1 | 597 |
0 | s3:/…/table/data/00002-5.parquet | {1999-01-01, 03} | 1 | 597 |
Showing only some relevant columns
From there, you can extract the desired information with a query like this:
SELECT
'{{ this.schema }}.{{ this.table }}' AS table_name,
count(*) AS total_files,
sum(record_count) AS total_rows,
round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
CURRENT_TIMESTAMP AS exported_at
FROM {{ this.schema }}."{{ this.table }}$files";
5.1.1. Iceberg in Spark
According to the official documentation (Spark: Querying with SQL | Iceberg), you can query the files
metadata with:
spark.sql(f"SELCT * FROM iceberg.{schema}.{table}.files")
You must set up the iceberg
catalog for this to work. Read Spark Configuration | Iceberg for more info.
With that in mind, you can create the export_table_stats
function to be called every time you write a table:
import backoff
from prefect import get_run_logger
QUERY_STATS = """
SELECT
'{schema}.{table}' AS table_name,
count(*) AS total_files,
sum(record_count) AS total_rows,
round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
CURRENT_TIMESTAMP AS exported_at
FROM iceberg.{schema}.{table}.files;
"""
TABLE_STATS = "iceberg.nt_bronze__metadata.raw_files"
ICEBERG_COMMIT_EXC = "org.apache.iceberg.exceptions.CommitFailedException"
class CommitFailedException(Exception):
"""Represents 'org.apache.iceberg.exceptions.CommitFailedException' exception"""
pass
def log_error(details):
logger = get_run_logger()
msg = "'{target}' failed, backing off {wait:0.1f} seconds after {tries} tries"
logger.warning(msg.format(**details))
@backoff.on_exception(
backoff.expo, CommitFailedException, on_backoff=log_error, max_time=60 * 2 # 2 mins
)
def export_table_stats(spark, tablename):
_, schema, table = tablename.split(".")
query = QUERY_STATS.format(schema=schema, table=table)
sdf = spark.sql(query)
try:
sdf.repartition(1).write.mode("append").format("parquet").saveAsTable(
TABLE_STATS
)
# Catch the exception so that we can further inspect the type
except Exception as exc:
# If it's what we are looking for, raise our custom exception so that it can 'backoff'
if ICEBERG_COMMIT_EXC == exc.java_exception.getClass().getName():
raise CommitFailedException("Write failed (Commit Failed)")
# If it's not the wanted exception, raise it
raise exc
Then you only need to call export_table_stats(spark, tablename)
after every write.
While Iceberg can handle concurrent write operations, issues can sometimes occur when exporting stats. Therefore, retries with Backoff are recommended. For more information, see Concurrent write operations | Iceberg.
5.1.2. Iceberg in Athena with DBT
You can query the files
metadata with:
SELECT * FROM dbname."tablename$files"
For more info, read Querying Iceberg table metadata | Athena and Iceberg $files table | Trino.
To automate this, create the following macros:
macros/metadata.sql
{%- macro create_metadata_schema() -%}
{%- if is_scheduled_pro_run() -%}
{% do adapter.create_schema(api.Relation.create(database=target.database, schema=var("metadata")["db"])) %}
{%- endif -%}
{%- endmacro -%}
{%- macro create_metadata_table_raw_files() -%}
{%- if is_scheduled_pro_run() -%}
CREATE TABLE IF NOT EXISTS {{ var("metadata")["db"] }}.{{ var("metadata")["tables"]["raw_files"] }} (
table_name STRING,
total_files BIGINT,
total_rows BIGINT,
total_size_mb DOUBLE,
exported_at TIMESTAMP
)
LOCATION '{{ get_bronze_bucket() }}/metadata/{{ var("metadata")["tables"]["raw_files"] }}'
TBLPROPERTIES (
'table_type' ='ICEBERG',
'format' = 'parquet',
'write_compression' = 'snappy',
'vacuum_max_snapshot_age_seconds' = '86400'
);
{%- endif -%}
{%- endmacro -%}
{% macro get_raw_files_table() %}
{{ var("metadata")["db"] }}.{{ var("metadata")["tables"]["raw_files"] }}
{% endmacro %}
{%- macro export_metadata() -%}
{%- if is_scheduled_pro_run() and (model.config.table_type == "iceberg") and (model.config.materialized != "view") -%}
INSERT INTO {{ get_raw_files_table() }}
SELECT
'{{ this.schema }}.{{ this.table }}' AS table_name,
count(*) AS total_files,
sum(record_count) AS total_rows,
round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
CURRENT_TIMESTAMP AS exported_at
FROM {{ this.schema }}."{{ this.table }}$files";
{%- endif -%}
{%- endmacro -%}
{%- macro optimize_metadata_table() -%}
{%- if is_scheduled_pro_run() -%}
{{ optimize(get_raw_files_table()) }}
{%- endif -%}
{%- endmacro -%}
{%- macro vacuum_metadata_table() -%}
{%- if is_scheduled_pro_run() and var('vacuum_raw_files') -%}
{{ vacuum(get_raw_files_table()) }}
{%- endif -%}
{%- endmacro -%}
And their documentation:
macros/metadata.yml
version: 2
macros:
- name: create_metadata_schema
description: |
Creates the schema 'metadata' when running in `Athena` and in **PRO** using `nt_pro` profile
- name: create_metadata_table_raw_files
description: |
Creates the table 'raw_files' inside schema 'metadata'.
This contains `iceberg` metadata about the files each table has.
Will only be created when running in `Athena` and in **PRO** using `nt_pro` profile
- name: export_metadata
description: |
Exports metadata about a specific table
Will only be done when running in `Athena` and in **PRO** using `nt_pro` profile
- name: get_raw_files_table
description: Retrieves the fully qualified name of the raw files table from the metadata variables.
- name: optimize_metadata_table
description: Calls the `optimize` macro only when running in `Athena` and in **PRO** using `nt_pro` profile
- name: vacuum_metadata_table
description: |
Calls the `vacuum` macro only when:
* Running in `Athena`
* Running in **PRO** using `nt_pro` profile
* The variable `vacuum_raw_files` is `True` (happens when `--select` not present)
And then, declare the post-hook in the dbt_project.yml
:
dbt_project.yml
models:
your_project:
+post-hook:
- "{{ optimize() }}"
- "{{ vacuum() }}"
- "{{ export_metadata() }}" # <-- Add this
5.1.3. Track files_by_day
Now that we have a table with a snapshot of the metadata for each table whenever there is a write, we can create a table that tracks the latest state by day. This table will help us monitor partitioning problems.
Here is the DBT model for that:
models/files_by_day.sql
{{ config(unique_key='_uuid') }}
WITH source AS (
SELECT *
FROM {{ source('nt_bronze__metadata', 'raw_files') }}
),
fillna AS (
SELECT
table_name,
total_files,
exported_at,
COALESCE(total_rows, 0) AS total_rows,
COALESCE(total_size_mb, 0) AS total_size_mb
FROM source
),
daily AS (
SELECT
table_name,
CAST(exported_at AS date) AS export_date,
{{ last('total_files') }}
OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_files,
{{ last('total_rows') }}
OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_rows,
{{ last('total_size_mb') }}
OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_size_mb,
RANK()
OVER (PARTITION BY table_name, CAST(exported_at AS date) ORDER BY exported_at ASC)
AS _rank
FROM fillna
),
daily_with_lags AS (
SELECT
table_name,
export_date,
total_files,
total_rows,
total_size_mb,
LAG(total_files)
OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_files,
LAG(total_rows)
OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_rows,
LAG(total_size_mb)
OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_size_mb
FROM daily
WHERE _rank = 1
),
daily_with_additions AS (
SELECT
---------- ids
{{ dbt_utils.generate_surrogate_key(['table_name', 'export_date']) }} AS _uuid,
table_name,
SUBSTRING(table_name FROM 1 FOR POSITION('.' IN table_name) - 1) AS schema_name,
export_date,
---------- totals
total_files,
total_rows,
total_size_mb,
---------- added datas
COALESCE(total_files - last_total_files, total_files) AS added_files,
COALESCE(total_rows - last_total_rows, total_rows) AS added_rows,
ROUND(COALESCE(total_size_mb - last_total_size_mb, total_size_mb), 3) AS added_mb
FROM daily_with_lags
)
SELECT *
FROM daily_with_additions
And the documentation for it:
models/files_by_day.yml
models:
- name: files_by_day
description: Shows total and added `files`, `rows` and `MBs` of each table
columns:
# ---------- ids
- name: _uuid
description: Unique identifier of the table. Composite key of `table_name` and `export_date`
tests:
- not_null
- unique
- name: table_name
description: Full name of the table (format `schema.table`)
tests: [not_null]
- name: schema_name
description: Name of the schema, extracted from `table_name`
- name: export_date
description: Day the statistics where exported for the given table
tests: [not_null]
# ---------- totals
- name: total_files
description: Total number of files the table has
tests: [not_null]
- name: total_rows
description: Total number of rows the table has
tests: [not_null]
- name: total_size_mb
description: Total size of the table in MBs
tests: [not_null]
# ---------- added
- name: added_files
description: Added number of files from the last write
tests: [not_null]
- name: added_rows
description: Added rows from the last write
tests: [not_null]
- name: added_mb
description: Increase in size (as `MB`) from the last write
tests: [not_null]
5.2. Delta
With Delta, you can use the DESCRIBE DETAIL table
command to get similar information.
This produces a table like this:
name | location | createdAt | lastModified | partitionColumns | numFiles | sizeInBytes |
---|---|---|---|---|---|---|
default.deltatable | file:/Users/tuor/… | 2020-06-05 12:20:20 | 2020-06-05 12:20:20 | [] | 10 | 12345 |
Showing only some relevant columns
You can read more about this in the docs Review Delta Lake table details with describe detail.
For tracking metadata, you can follow a similar approach to what was described for Iceberg.
6. Fixing the problem
After following the previous steps, you have detected multiple tables that are not well partitioned and have a lot of small files. What can you do to fix the data?
6.1. Non partitioned tables
For non-partitioned tables, it is quite straightforward. You can simply run OPTIMIZE
to compact the table.
The OPTIMIZE
command can take a lot of time and might require a lot of processing power, depending on the table size. Proceed with caution.
6.2. Partitioned tables
For partitioned tables, the best case scenario is where you can compact the partitions with OPTIMIZE
, similar to non-partitioned tables.
If you want, you can optimize only some partitions, which will be faster and require less computing power.
If compacting the partitions is not enough, you will need to change the partitioning of the table. This process differs for Delta and Iceberg tables.
6.2.1. Partitioned Delta tables
Delta tables have fixed partitioning, and you cannot change the partitioning of an existing table. Therefore, you will need to recreate the table without the partition column or with a different partitioning.
To do this, follow these steps:
- Create a new temporary table with the desired partitioning.
- Move all the data from the original table to the temporary one.
- Optimize the new table.
- Validate the new table.
Optimizing the temporary table ensures all changes are made before removing the original table, minimizing downtime.
You can create the new table with a CREATE TABLE AS
statement:
CREATE TABLE {db}.{new}
WITH (
table_type = 'iceberg',
is_external = false,
location ='s3://{bucket}/{db_path}/{new}/'
) AS
SELECT *
FROM {db}.{old}
--Optionally add a WHERE clause to create it with a subset of data
You might encounter issues when creating a table with all the data in one query.
In such cases, create the table with only some partitions and then use INSERTS
to move all the data.
Once everything is good with the new table, you can recreate the original table:
- Delete the original table.
- Recreate the original table.
- Move back all the data.
- Delete the temporary table.
6.2.2. Partitioned Iceberg tables
With Iceberg, you can perform schema evolutions, such as changing the partitioning. You can read more about it at Iceberg evolutions.
However, there is a caveat:
Changing the partitioning of an Iceberg table does not repartition old data.
To compact old partitions, you still need to move the data to a temporary table and then bring it back. You can follow a similar approach to what was described for Delta tables.
7. Conclusion
Managing the size and number of files in a data lake is essential for maintaining optimal performance and controlling costs. Throughout this post, we’ve explored the challenges posed by small files, the ideal file sizes recommended by various data processing engines, and practical strategies for avoiding and fixing these issues.
Key takeaways:
- Understanding the problem: Small files can significantly degrade performance and increase costs due to inefficient data processing and numerous storage requests.
- Optimal file size: Aim for around 1 GB per file to balance performance and storage efficiency.
- Avoiding small files: Use appropriate configurations and tools like Spark’s repartition and Delta Lake’s OPTIMIZE commands to control file sizes.
- Tracking the issue: Regularly monitor the number and size of files using metadata queries in Iceberg and Delta Lake.
- Fixing the problem: Compact existing small files and, if necessary, recreate tables with better partitioning strategies.
By implementing these strategies, you can ensure that your data lake remains efficient and cost-effective. The insights and methods shared here are based on practical experience and thorough research, aimed at helping you manage your data lake more effectively.
Efficient data management not only improves performance but also significantly reduces costs.
With these tools and techniques at your disposal, you are well-equipped to tackle the small file problem in your data lake.