search iconsearch icon
Type something to search...

Solving the probem with small files in the Data Lake

Solving the probem with small files in the Data Lake

0. Motivation

In modern data lake architectures, managing the size and number of files is crucial for maintaining performance and cost-efficiency. During my work, I frequently encountered tables with a huge number of small files, which led to very poor query performance and significantly increased costs.

To tackle this issue, I started tracking how tables were stored and developed a keen interest in optimizing data storage practices. After investing considerable time into tracking and fixing these problems, I wanted to share my approach and insights.

This post aims to provide a comprehensive guide on understanding, identifying, and resolving the small file problem in data lakes, ensuring that your data processing remains efficient and cost-effective.

1. What’s the problem

In data lakes, managing the size of partitions is crucial for maintaining optimal performance and cost-efficiency. Partitions that are too small, generally less than 1 GB, can significantly degrade performance and inflate costs. This issue is well-documented in the Domain LogoDelta Lake Optimisation Guide, which highlights the adverse effects of small partitions on data processing and querying efficiency.

1.1. Performance

According to the article Domain LogoFixing the Small File Problem in Data Lake with Spark:

Small files can cause a performance degradation of up to 4x.

This is because small files increase the number of tasks required to process the data, leading to higher overhead in task management and scheduling. Each task incurs a startup cost, and when there are many small tasks, this overhead can accumulate, resulting in significant slowdowns.

1.2. Costs

Another significant issue with small file partitions is the impact on costs. When I was working at Glovo, my colleagues observed that reducing the number of small files in heavily queried tables resulted in a 10x cost reduction.

This significant saving was primarily due to the decreased number of S3 GET requests, which are a major cost factor when querying small files.

1.3. Default Behavior of Data Lake Writers

By default, many data lake writers tend to create a large number of small files. For instance, Apache Spark will write multiple small files if not properly configured. Similarly, with AWS Athena, it is challenging to control the file sizes, often resulting in tiny files being created. If not set up correctly, these default behaviors can lead to a proliferation of small files, exacerbating performance and cost issues in the data lake.

2. Good file size

Determining the optimal file size in a data lake can be challenging, as it depends on various factors such as the data processing engine, query patterns, and specific workload characteristics. However, most documentation and data processing engines recommend targeting a file size around 1 GB.

For example, the Domain LogoDelta Lake Small File Compaction with OPTIMIZE article highlights that Delta Lake suggests aiming for partitions close to 1 GB in size. This recommendation helps balance the performance benefits of larger files with the practical considerations of data processing and storage.

Similarly, the Domain LogoDuckDB | Parquet File Sizes guide suggests that an optimal file size for Parquet files typically ranges between 100 MB and 10 GB. This broad range takes into account different use cases and workload requirements, allowing for flexibility while still emphasizing the importance of avoiding too small files.

Around 1 GB per file is the usual target for optimal file size in data lakes.

3. How to avoid it?

The key to avoiding small files in your data lake is to control the number of files written during data processing. Ensuring that the files are of optimal size improves both performance and cost-efficiency.

3.2. Spark

In Spark, you can control the number of files written by using the sdf.repartition(n_files) function, which allows you to specify the number of partitions (and hence the number of output files). Additionally, you can configure Spark to manage the size of files by setting the minimum and maximum number of rows per file through the configuration options spark.sql.files.maxRecordsPerFile and spark.sql.files.minRecordsPerFile.

Using repartition can cause significant shuffling of data, which might lead to performance issues. The number of partitions also affects Spark’s parallelization; too few partitions can lead to underutilized resources, while too many can cause excessive overhead.

3.3. Athena with DBT

In AWS Athena, there is no direct way to control the number of files written during query execution.

However, you can use DBT to manage this. By implementing a DBT post-hook, you can compact the files after they are written using the OPTIMIZE command, which merges smaller files into larger ones. This will only work for Domain LogoIceberg tables. For more details, refer to the official documentation: Domain LogoOptimizing Iceberg tables

Running OPTIMIZE will generate additional files. To avoid extra storage costs and maintain optimal performance, it is important to run VACUUM on the table. The VACUUM command removes any files that are no longer needed after the optimization process, freeing up storage space and reducing costs.

Here is the macros you will need:

macros/iceberg.sql

{%- macro optimize(table=this) -%}
    {%- if (model.config.table_type == "iceberg")
        and (model.config.materialized != "view")
        and (adapter.type() == "athena")
    -%}
        OPTIMIZE {{ table }} REWRITE DATA USING BIN_PACK;
    {%- endif -%}
{%- endmacro -%}

{%- macro vacuum(table=this) -%}
    {%- if (model.config.table_type == "iceberg")
        and (model.config.materialized != "view")
        and (adapter.type() == "athena")
    -%}
        VACUUM {{ table }};
    {%- endif -%}
{%- endmacro -%}

And the docs for those macros:

macros/iceberg.yml

version: 2

macros:
  - name: optimize
    description: |
      Optimze iceberg tables (and skip it on duckdb).
      More info: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html#querying-iceberg-data-optimization-rewrite-data-action
    arguments:
      - name: table
        type: string
        description: Table to `optimize`, by default it will use `this`

  - name: vacuum
    description: |
      Vacuum iceberg tables (and skip it on duckdb).
      More info: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html#querying-iceberg-vacuum
    arguments:
      - name: table
        type: string
        description: Table to `vacuum`, by default it will use `this`

And then, you can declare the post-hook in the dbt_project.yml:

dbt_project.yml

models:
  your_project:
    +post-hook:
      - "{{ optimize() }}"
      - "{{ vacuum() }}"

4. Good partitioning

When it comes to partitioning tables in a data lake, it’s important to consider the type of data and its usage patterns. There are generally two types of tables:

  • Fact tables: These tables typically store raw data and are often partitioned by a p_updated_at, p_extracted_at, or a similar time-based column. Partitioning by time allows for efficient querying of recent data but can lead to many small files if the partitioning interval is too granular. For example, if data is extracted every 3 hours, it would result in approximately 1500 files per year.

  • Aggregated (or presentation) tables: These tables represent a final snapshot or aggregated view of the data. They often don’t need to be partitioned unless the dataset is very large. Partitioning might not be necessary for these tables as they are typically queried in their entirety or by specific dimensions that do not benefit significantly from partitioning.

4.1. Partitioning Fact tables

For fact tables, while partitioning by time is common, it can be more efficient to avoid partitioning if it leads to small files. Using data formats like Domain LogoDelta Lake or Domain LogoApache Iceberg can help manage this.

According to Databricks documentation (Domain LogoWhen to partition tables on Databricks):

Most Delta tables with less than 1 TB of data do not require partitions.

So in many cases, the best partition strategy is to not partition at all. Given that we will be appending new data based on the updated_at/extracted_at column, we can use row groups (see: Domain LogoParquet | Concepts) for effective data pruning even if there are no partitions.

With Iceberg, we can follow a similar strategy where small tables are not partitioned. Both Delta Lake and Iceberg use parquet under the hood, enabling them to leverage the same techniques for smart data pruning. However, Iceberg offers additional options. Iceberg provides functions to partition the data effectively, such as months(extracted_at), which can reduce the number of partitions and improve query performance. See Domain LogoIceberg | Partition Transforms.

This is possible because of Domain LogoIceberg's hidden partitioning.

5. How to track the problem

Tracking the number of files and partitions each table has is crucial for managing data lakes efficiently. Both Iceberg and Delta provide ways to query the metadata, which can then be exported into a table. Ideally, this data should be exported every time a table is written.

5.1. Iceberg

With Iceberg, you can query the files metadata, which gives you detailed information such as:

contentfile_pathpartitionrecord_countfile_size_in_bytes
0s3:/…/table/data/00000-3.parquet{1999-01-01, 01}1597
0s3:/…/table/data/00001-4.parquet{1999-01-01, 02}1597
0s3:/…/table/data/00002-5.parquet{1999-01-01, 03}1597

Showing only some relevant columns

From there, you can extract the desired information with a query like this:

SELECT
    '{{ this.schema }}.{{ this.table }}' AS table_name,
    count(*) AS total_files,
    sum(record_count) AS total_rows,
    round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
    CURRENT_TIMESTAMP AS exported_at
FROM {{ this.schema }}."{{ this.table }}$files";

5.1.1. Iceberg in Spark

According to the official documentation (Domain LogoSpark: Querying with SQL | Iceberg), you can query the files metadata with:

spark.sql(f"SELCT * FROM iceberg.{schema}.{table}.files")

You must set up the iceberg catalog for this to work. Read Domain LogoSpark Configuration | Iceberg for more info.

With that in mind, you can create the export_table_stats function to be called every time you write a table:

import backoff
from prefect import get_run_logger

QUERY_STATS = """
SELECT
    '{schema}.{table}' AS table_name,
    count(*) AS total_files,
    sum(record_count) AS total_rows,
    round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
    CURRENT_TIMESTAMP AS exported_at
FROM iceberg.{schema}.{table}.files;
"""
TABLE_STATS = "iceberg.nt_bronze__metadata.raw_files"
ICEBERG_COMMIT_EXC = "org.apache.iceberg.exceptions.CommitFailedException"

class CommitFailedException(Exception):
    """Represents 'org.apache.iceberg.exceptions.CommitFailedException' exception"""

    pass

def log_error(details):
    logger = get_run_logger()

    msg = "'{target}' failed, backing off {wait:0.1f} seconds after {tries} tries"
    logger.warning(msg.format(**details))


@backoff.on_exception(
    backoff.expo, CommitFailedException, on_backoff=log_error, max_time=60 * 2  # 2 mins
)
def export_table_stats(spark, tablename):
    _, schema, table = tablename.split(".")

    query = QUERY_STATS.format(schema=schema, table=table)
    sdf = spark.sql(query)

    try:
        sdf.repartition(1).write.mode("append").format("parquet").saveAsTable(
            TABLE_STATS
        )

    # Catch the exception so that we can further inspect the type
    except Exception as exc:
        # If it's what we are looking for, raise our custom exception so that it can 'backoff'
        if ICEBERG_COMMIT_EXC == exc.java_exception.getClass().getName():
            raise CommitFailedException("Write failed (Commit Failed)")

        # If it's not the wanted exception, raise it
        raise exc

Then you only need to call export_table_stats(spark, tablename) after every write.

While Iceberg can handle concurrent write operations, issues can sometimes occur when exporting stats. Therefore, retries with Domain LogoBackoff are recommended. For more information, see Domain LogoConcurrent write operations | Iceberg.

5.1.2. Iceberg in Athena with DBT

You can query the files metadata with:

SELECT * FROM dbname."tablename$files"

For more info, read Domain LogoQuerying Iceberg table metadata | Athena and Domain LogoIceberg $files table | Trino.

To automate this, create the following macros:

macros/metadata.sql

{%- macro create_metadata_schema() -%}
    {%- if is_scheduled_pro_run() -%}
        {% do adapter.create_schema(api.Relation.create(database=target.database, schema=var("metadata")["db"])) %}
    {%- endif -%}
{%- endmacro -%}

{%- macro create_metadata_table_raw_files() -%}
    {%- if is_scheduled_pro_run() -%}
        CREATE TABLE IF NOT EXISTS {{ var("metadata")["db"] }}.{{ var("metadata")["tables"]["raw_files"] }} (
            table_name STRING,
            total_files BIGINT,
            total_rows BIGINT,
            total_size_mb DOUBLE,
            exported_at TIMESTAMP
        )
        LOCATION '{{ get_bronze_bucket() }}/metadata/{{ var("metadata")["tables"]["raw_files"] }}'
        TBLPROPERTIES (
            'table_type' ='ICEBERG',
            'format' = 'parquet',
            'write_compression' = 'snappy',
            'vacuum_max_snapshot_age_seconds' = '86400'
        );
    {%- endif -%}
{%- endmacro -%}

{% macro get_raw_files_table() %}
    {{ var("metadata")["db"] }}.{{ var("metadata")["tables"]["raw_files"] }}
{% endmacro %}

{%- macro export_metadata() -%}
    {%- if is_scheduled_pro_run() and (model.config.table_type == "iceberg") and (model.config.materialized != "view") -%}
        INSERT INTO {{ get_raw_files_table() }}
        SELECT
            '{{ this.schema }}.{{ this.table }}' AS table_name,
            count(*) AS total_files,
            sum(record_count) AS total_rows,
            round(sum(CAST(file_size_in_bytes AS DOUBLE)) / 1024 / 1024, 3) AS total_size_mb,
            CURRENT_TIMESTAMP AS exported_at
        FROM {{ this.schema }}."{{ this.table }}$files";
    {%- endif -%}
{%- endmacro -%}

{%- macro optimize_metadata_table() -%}
    {%- if is_scheduled_pro_run() -%}
        {{ optimize(get_raw_files_table()) }}
    {%- endif -%}
{%- endmacro -%}

{%- macro vacuum_metadata_table() -%}
    {%- if is_scheduled_pro_run() and var('vacuum_raw_files') -%}
        {{ vacuum(get_raw_files_table()) }}
    {%- endif -%}
{%- endmacro -%}

And their documentation:

macros/metadata.yml

version: 2

macros:
  - name: create_metadata_schema
    description: |
      Creates the schema 'metadata' when running in `Athena` and in **PRO** using `nt_pro` profile

  - name: create_metadata_table_raw_files
    description: |
      Creates the table 'raw_files' inside schema 'metadata'.
      This contains `iceberg` metadata about the files each table has.
      Will only be created when running in `Athena` and in **PRO** using `nt_pro` profile

  - name: export_metadata
    description: |
      Exports metadata about a specific table
      Will only be done when running in `Athena` and in **PRO** using `nt_pro` profile

  - name: get_raw_files_table
    description: Retrieves the fully qualified name of the raw files table from the metadata variables.

  - name: optimize_metadata_table
    description: Calls the `optimize` macro only when running in `Athena` and in **PRO** using `nt_pro` profile

  - name: vacuum_metadata_table
    description: |
      Calls the `vacuum` macro only when:

      * Running in `Athena`
      * Running in **PRO** using `nt_pro` profile
      * The variable `vacuum_raw_files` is `True` (happens when `--select` not present)

And then, declare the post-hook in the dbt_project.yml:

dbt_project.yml

models:
  your_project:
    +post-hook:
      - "{{ optimize() }}"
      - "{{ vacuum() }}"
      - "{{ export_metadata() }}" # <-- Add this

5.1.3. Track files_by_day

Now that we have a table with a snapshot of the metadata for each table whenever there is a write, we can create a table that tracks the latest state by day. This table will help us monitor partitioning problems.

Here is the DBT model for that:

models/files_by_day.sql

{{ config(unique_key='_uuid') }}

WITH source AS (
    SELECT *
    FROM {{ source('nt_bronze__metadata', 'raw_files') }}
),

fillna AS (
    SELECT
        table_name,
        total_files,
        exported_at,
        COALESCE(total_rows, 0) AS total_rows,
        COALESCE(total_size_mb, 0) AS total_size_mb
    FROM source
),

daily AS (
    SELECT
        table_name,
        CAST(exported_at AS date) AS export_date,
        {{ last('total_files') }}
            OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_files,
        {{ last('total_rows') }}
            OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_rows,
        {{ last('total_size_mb') }}
            OVER (PARTITION BY table_name ORDER BY exported_at ASC) AS total_size_mb,
        RANK()
            OVER (PARTITION BY table_name, CAST(exported_at AS date) ORDER BY exported_at ASC)
            AS _rank
    FROM fillna
),

daily_with_lags AS (
    SELECT
        table_name,
        export_date,
        total_files,
        total_rows,
        total_size_mb,
        LAG(total_files)
            OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_files,
        LAG(total_rows)
            OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_rows,
        LAG(total_size_mb)
            OVER (PARTITION BY table_name ORDER BY export_date ASC) AS last_total_size_mb
    FROM daily
    WHERE _rank = 1
),

daily_with_additions AS (
    SELECT
        ----------  ids
        {{ dbt_utils.generate_surrogate_key(['table_name', 'export_date']) }} AS _uuid,
        table_name,
        SUBSTRING(table_name FROM 1 FOR POSITION('.' IN table_name) - 1) AS schema_name,
        export_date,
        ----------  totals
        total_files,
        total_rows,
        total_size_mb,
        ----------  added datas
        COALESCE(total_files - last_total_files, total_files) AS added_files,
        COALESCE(total_rows - last_total_rows, total_rows) AS added_rows,
        ROUND(COALESCE(total_size_mb - last_total_size_mb, total_size_mb), 3) AS added_mb
    FROM daily_with_lags
)

SELECT *
FROM daily_with_additions

And the documentation for it:

models/files_by_day.yml

models:
  - name: files_by_day
    description: Shows total and added `files`, `rows` and `MBs` of each table
    columns:
      # ----------  ids
      - name: _uuid
        description: Unique identifier of the table. Composite key of `table_name` and `export_date`
        tests:
          - not_null
          - unique
      - name: table_name
        description: Full name of the table (format `schema.table`)
        tests: [not_null]
      - name: schema_name
        description: Name of the schema, extracted from `table_name`
      - name: export_date
        description: Day the statistics where exported for the given table
        tests: [not_null]

      # ----------  totals
      - name: total_files
        description: Total number of files the table has
        tests: [not_null]
      - name: total_rows
        description: Total number of rows the table has
        tests: [not_null]
      - name: total_size_mb
        description: Total size of the table in MBs
        tests: [not_null]

      # ----------  added
      - name: added_files
        description: Added number of files from the last write
        tests: [not_null]
      - name: added_rows
        description: Added rows from the last write
        tests: [not_null]
      - name: added_mb
        description: Increase in size (as `MB`) from the last write
        tests: [not_null]

5.2. Delta

With Delta, you can use the DESCRIBE DETAIL table command to get similar information. This produces a table like this:

namelocationcreatedAtlastModifiedpartitionColumnsnumFilessizeInBytes
default.deltatablefile:/Users/tuor/…2020-06-05 12:20:202020-06-05 12:20:20[]1012345

Showing only some relevant columns

You can read more about this in the docs Domain LogoReview Delta Lake table details with describe detail.

For tracking metadata, you can follow a similar approach to what was described for Iceberg.

6. Fixing the problem

After following the previous steps, you have detected multiple tables that are not well partitioned and have a lot of small files. What can you do to fix the data?

6.1. Non partitioned tables

For non-partitioned tables, it is quite straightforward. You can simply run OPTIMIZE to compact the table.

The OPTIMIZE command can take a lot of time and might require a lot of processing power, depending on the table size. Proceed with caution.

6.2. Partitioned tables

For partitioned tables, the best case scenario is where you can compact the partitions with OPTIMIZE, similar to non-partitioned tables. If you want, you can optimize only some partitions, which will be faster and require less computing power.

If compacting the partitions is not enough, you will need to change the partitioning of the table. This process differs for Delta and Iceberg tables.

6.2.1. Partitioned Delta tables

Delta tables have fixed partitioning, and you cannot change the partitioning of an existing table. Therefore, you will need to recreate the table without the partition column or with a different partitioning.

To do this, follow these steps:

  1. Create a new temporary table with the desired partitioning.
  2. Move all the data from the original table to the temporary one.
  3. Optimize the new table.
  4. Validate the new table.

Optimizing the temporary table ensures all changes are made before removing the original table, minimizing downtime.

You can create the new table with a CREATE TABLE AS statement:

CREATE TABLE {db}.{new}
  WITH (
    table_type = 'iceberg',
    is_external = false,
    location ='s3://{bucket}/{db_path}/{new}/'
  ) AS
  SELECT * 
  FROM {db}.{old}
  --Optionally add a WHERE clause to create it with a subset of data

You might encounter issues when creating a table with all the data in one query. In such cases, create the table with only some partitions and then use INSERTS to move all the data.

Once everything is good with the new table, you can recreate the original table:

  1. Delete the original table.
  2. Recreate the original table.
  3. Move back all the data.
  4. Delete the temporary table.

6.2.2. Partitioned Iceberg tables

With Iceberg, you can perform schema evolutions, such as changing the partitioning. You can read more about it at Domain LogoIceberg evolutions.

However, there is a caveat:

Changing the partitioning of an Iceberg table does not repartition old data.

To compact old partitions, you still need to move the data to a temporary table and then bring it back. You can follow a similar approach to what was described for Delta tables.

7. Conclusion

Managing the size and number of files in a data lake is essential for maintaining optimal performance and controlling costs. Throughout this post, we’ve explored the challenges posed by small files, the ideal file sizes recommended by various data processing engines, and practical strategies for avoiding and fixing these issues.

Key takeaways:

  1. Understanding the problem: Small files can significantly degrade performance and increase costs due to inefficient data processing and numerous storage requests.
  2. Optimal file size: Aim for around 1 GB per file to balance performance and storage efficiency.
  3. Avoiding small files: Use appropriate configurations and tools like Spark’s repartition and Delta Lake’s OPTIMIZE commands to control file sizes.
  4. Tracking the issue: Regularly monitor the number and size of files using metadata queries in Iceberg and Delta Lake.
  5. Fixing the problem: Compact existing small files and, if necessary, recreate tables with better partitioning strategies.

By implementing these strategies, you can ensure that your data lake remains efficient and cost-effective. The insights and methods shared here are based on practical experience and thorough research, aimed at helping you manage your data lake more effectively.

Efficient data management not only improves performance but also significantly reduces costs.

With these tools and techniques at your disposal, you are well-equipped to tackle the small file problem in your data lake.