search iconsearch icon
Type something to search...

Effortless EMR: A Guide to Seamlessly Running PySpark Code

Effortless EMR: A Guide to Seamlessly Running PySpark Code

0. Intro

Tired of wrestling with complex setups when trying to get PySpark up and running on EMR? You’re not alone! This guide was created out of frustration with overly convoluted configurations. I’m here to show you a simpler way to get PySpark up and running on EMR. No more tangled messes—just clear, straightforward steps and easy traceability. Whether you’re new to PySpark or just tired of the headache, I’ve got your back. Let’s make PySpark on EMR hassle-free together!

This post breaks down multiple concepts step by step. If you prefer to see the final result, head to the last section where you can find everything consolidated.

1. Running a script in EMR

The simplest way to execute PySpark code on EMR is by storing a Python file in S3. For example, let’s suppose we have a main.py file in a bucket named your-bucket:

s3://your-bucket/main.py

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Create a DataFrame
data = [("Alice", 34), ("Bob", 45), ("Charlie", 23)]
sdf = spark.createDataFrame(data, ["Name", "Age"])

# Show the DataFrame
sdf.show()

# Stop the SparkSession
spark.stop()

First, you need to create or start your EMR cluster, which is beyond the scope of this post.

Next, you submit a job using the specified file (s3://your-bucket/main.py) as the entrypoint.

This method works well for single-file jobs, but for more complex jobs with dependencies, it’s not very practical.

A common solution is to create a Python package so you can import code from other files. Let’s explore how to do this next.

2. Using packages

In general, you’ll need to use two different types of packages: Java packages and Python packages.

2.1 Java packages

Adding Java packages is quite straightforward:

  1. Download the package as a JAR file, for example, spark-3.3-bigquery-0.31.1.jar
  2. Add this file somewhere in S3 where you have reading access, for example, s3://your-bucket/emr/jars/
  3. Set up the cluster to include that package by adding the following configuration:
--jars s3://your-bucket/emr/jars/spark-3.3-bigquery-0.31.1.jar

With that, you should be able to use that package.

2.2. Python packages

The simplest way of adding a Python package is by Domain LogoSetting up a bootstrap action (as recomended in Domain LogoQA: EMR install python libraries)

The main problem with that is that you’ll be downloading the package at runtime. You could download the package (an egg or a wheel) to S3 and install it from there, but it adds complexity.

The way to overcome those problems is to create a docker image.

3. Creating an environment with Docker

According to Domain LogoEMR spark docker by AWS, you can create a Docker image and later use it to run EMR. However, I think it’s better to create a virtual environment using Docker and export it as a tar.gz file.

With that, you can simply export the file to S3 and later use it.

3.1. Add external packages

The overall idea is to follow Domain LogoEMR serverless samples. In this example I’m using Domain LogoPoetry python package manager since I think it’s better than regular pip.

The steps I follow are:

  1. Get pip
  2. Install poetry
  3. Copy the needed files in order to install the poetry dependencies
  4. Create the virtual environment
  5. Create a tar.gz with the virtual environment
  6. Export the tar.gz

Dockerfile

FROM --platform=linux/amd64 amazonlinux:2023 AS base

RUN dnf install python3-pip -y

# Update and install python packages
RUN pip install poetry --no-cache-dir

# Copy needed files to install the project
COPY pyproject.toml .
COPY README.md .
COPY my_package my_package

# Set up poetry and install dependencies
RUN poetry config virtualenvs.in-project true && \
    poetry install

# Create a '.tar.gz' with the virtual environment
RUN mkdir /output && \
    poetry run venv-pack -o /output/my_package-latest.tar.gz

# Export the '.tar.gz'
FROM scratch AS export
COPY --from=base /output/*.tar.gz /

For this to work you will need to have venv-pack as a poetry dependency so that you can export the virtual environment. You can add it with poetry add venv-pack

Here I’m using amazonlinux:2023 as the base image since I’m using EMR 7 and it uses that version. For older versions replace it with amazonlinux:2.

To create the tar.gz simply run:

docker build --output . .

Once the tar.gz is created, you will need to upload to S3. Then you can set up an EMR cluster to use that environment with:

--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.archives=s3:/your-bucket/venv/my_package-latest.tar.gz#environment

Make sure to adapt the path (s3:/your-bucket/venv/my_package-latest.tar.gz in the example) to the file you created.

3.2. Create a ‘Package’ for Adding Code

Now that we know how to add external packages, we can reuse it to also be able to import our own code. As stated in Domain LogoHow to run a Python project (package) on AWS EMR serverless?, the idea is to create a Python package and then install it.

In my case, it would be done with Docker by:

# Install 'my_package' as a package
RUN poetry build && \
    poetry run pip install dist/*.whl

Another thing I added is the ability to pass the version as an argument. The final Dockerfile code would be:

Dockerfile

FROM --platform=linux/amd64 amazonlinux:2023 AS base

ARG PACKAGE_VERSION

# Should match CI config
ARG POETRY_VERSION=1.6.1 

RUN dnf install python3-pip -y

# Update and install python packages
RUN pip install \
  "poetry==${POETRY_VERSION}" \
  boto3 \
  loguru \
  toml \
  --no-cache-dir

# Copy needed files to install the project
COPY pyproject.toml .
COPY README.md .
COPY my_package my_package

# Set up poetry and install dependencies
RUN poetry config virtualenvs.in-project true && \
    poetry install

# Install 'my_package' as a package
RUN poetry build && \
    poetry run pip install dist/*.whl

# Create a '.tar.gz' with the virtual environment
RUN mkdir /output && \
    poetry run venv-pack -o /output/my_package-${PACKAGE_VERSION}.tar.gz

# Export the '.tar.gz'
FROM scratch AS export
COPY --from=base /output/*.tar.gz /

And it is compiled with:

# Set a version that is unique and will not collide with other people
docker build --output . . --build-arg PACKAGE_VERSION=2.1.7 # Any version you want

3.3. Export the environment with CD

Here, the idea is to have a GitHub action that uploads the virtual environment whenever there is a commit to main.

The general steps are:

  1. Fetch the code
  2. Create the tar.gz as seen in the previous section
  3. Upload it to S3

One thing that I find very useful is to always export the real version, such as 2.1.7, and then also create one called latest. With this, I can easily point to the latest.tar.gz in production and/or point to a specific version if I need to.

You might want to read Domain LogoManaging package versions with Poetry for more information about how to manage versions with Poetry.

4. Generic entrypoint

You’ve seen how to add external packages and how to install your own code as a package. However, you still need an entrypoint for running EMR jobs.

Given that any Python file you have in your code could be an entrypoint, you would be forced to upload all files to S3.

Luckily, there is a better way to handle this. The idea is to have a generic entrypoint.py that runs code from other packages.

Let’s imagine we have:

- my_package
  ├── salesforce
  │   ├── utils.py
  │   ├── query.py
  │   └── main.py # This is an entrypoint
  ├── big_query
  │   ├── constants.py
  │   └── run.py # This is an entrypoint
  └── ...

Some of those files will be prepared to be an entrypoint. The only important part is that it has a main function.

my_package/salesforce/main.py

# Other code

def main():
    # This is what will be called
    print(1)

With that, we can pass an argument to the generic entrypoint in a way that ends up importing the file we need and running the main function inside. It would be similar to:

from my_package import salesforce
salesforce.main()

However, we will be doing the import dynamically so that it can be done based on the path as a string. This is done with:

import importlib
module = importlib.import_module("my_package.salesforce")
module.main()

This does exactly the same as before.

5. Handling Parameters

Now that you have a unique entrypoint, you could use sys.argv or argparse to catch the entrypoint from the arguments, as explained in Domain LogoHow do I access command line arguments?. One way of doing that would be to always use the first parameter for the entrypoint and the rest of the parameters as the actual script parameters.

But there is a better way with click.

5.1. Catching Generic Parameters with click

The idea here is to define the entrypoint as a parameter in click and then catch the rest of the parameters as explained in Domain LogoAdd unspecified options to cli command using python-click.

The basic code for that would be:

entrypoint.py

import click
from loguru import logger

@click.command(
    name="entrypoint",
    context_settings=dict(
        ignore_unknown_options=True,
        allow_extra_args=True,
    ),
)
@click.pass_context
@click.option(
    "--entrypoint",
    "-e",
    required=True,
    type=str,
    help="File used as an entrypoint. Must have a 'main' function",
)
def main(ctx, entrypoint):
    logger.debug("Extracting parameters from context")
    # https://stackoverflow.com/a/32946412/3488853
    kwargs = {ctx.args[i][2:]: ctx.args[i + 1] for i in range(0, len(ctx.args), 2)}
    kwargs.update({"entrypoint": entrypoint})

    # Replace dashes for underscores in param names to ensure compatibility
    kwargs = {k.replace("-", "_"): v for k, v in kwargs.items()}

    logger.info(f"Importing '{entrypoint}'")
    module = importlib.import_module(entrypoint)
    module.main(**kwargs)


if __name__ == "__main__":
    print("Starting EMR job")
    main()

Then you can define the parameters in each file as usual:

my_package/salesforce/main.py

# Other code

def main(name, num):
    # This is what will be called
    print(f"Hello {name} with {num=}")

Notice that all parameters are as strings.

5.2. Define Script Parameters with pydantic

One way to improve the previous code is to define the parameters with Domain LogoPydantic. This way you don’t need to handle the castings and can use the powerful validators from pydantic.

You would do that with:

my_package/salesforce/main.py

from pydantic import BaseModel

class MyParams(BaseModel):
    entrypoint: str # This is needed
    name: str
    num: int

def main(**kwargs):
    args = MyParams(**kwargs)
    print(f"Hello {args.name} with {args.num=}")

6. Track Executions with Prefect

One really good way of having better observability on EMR jobs is to add Prefect. If you are not familiar with Prefect I suggest you read Domain LogoPrefect Essentials first.

Also make sure you connect EMR to Prefect by following Domain LogoConnect EMR | Setting Up and Deploying Prefect Server.

6.1. Jobs as Flows

The idea here is to create a flow at the entrypoint so that you can see each run. The idea is to create a flow dynamically with flow(name="flow_name")(function)(*args, **kwargs).

So the modified lines look like as follows:

entrypoint.py

from prefect import flow

def main(ctx, entrypoint):
    logger.info(f"Importing '{entrypoint}'")
    module = importlib.import_module(entrypoint)
    flow(name=entrypoint)(module.main)(**kwargs)

6.2. Adding Tasks and/or Subflows

You can follow the same approach to add different tasks and/or subflows. For example, let’s imagine you are extracting multiple tables from somewhere. Here you could create a Prefect task for each table. This way you will have a much better observability system. It would be done with something like:

from prefect import task

for table in tables:
    name = f"{params.entrypoint}.{table.name}"
    task(name=name)(process_one_table)(name=table.name, **any_other_arguments)

7. Putting it All Together

We have seen multiple code snippets explaining the different parts. Here, I’m adding the final code so that it’s easier to see the final outcome.

7.1. Entrypoint

This takes care of:

  • Calling the main function of other files
  • Catching parameters as a dict
  • Adding prefect.tags

entrypoint.py

import importlib
import sys

import click
from loguru import logger
from prefect import tags


PACKAGE_NAME = "my_package"

logger.configure(handlers=[{"sink": sys.stdout, "level": "INFO"}])
logger.enable(PACKAGE_NAME)


def get_run_tags(entrypoint):
    tags = [
        "type:emr",
        "entrypoint:" + entrypoint.replace("my_package.", ""),
        # Feel free to add any other tags
    ]
    logger.info(f"Running with {tags=}")
    return tags


@click.command(
    name="entrypoint",
    context_settings=dict(
        ignore_unknown_options=True,
        allow_extra_args=True,
    ),
)
@click.pass_context
@click.option(
    "--entrypoint",
    "-e",
    required=True,
    type=str,
    help="File used as an entrypoint. Must have a 'main' function",
)
def main(ctx, entrypoint):
    if not entrypoint.startswith("my_package."):
        entrypoint = f"my_package.{entrypoint}"

    logger.debug("Extracting parameters from context")
    # https://stackoverflow.com/a/32946412/3488853
    kwargs = {ctx.args[i][2:]: ctx.args[i + 1] for i in range(0, len(ctx.args), 2)}
    kwargs.update({"entrypoint": entrypoint})

    # Replace dashes for underscores in param names to ensure compatibility
    kwargs = {k.replace("-", "_"): v for k, v in kwargs.items()}

    logger.info(f"Importing '{entrypoint}'")
    module = importlib.import_module(entrypoint)

    logger.info(f"Running '{entrypoint}.main()' with kwargs='{kwargs}'")
    with tags(*get_run_tags(entrypoint)):
        module.main(**kwargs)


if __name__ == "__main__":
    print("Starting EMR job")
    main()

7.2. Arguments code

Here we define:

  • BaseParams to be extended in each module
  • A decorator that catches the Params and creates a prefect.flow

my_package/common/parameters.py

from time import time

from pydantic import BaseModel
from prefect import flow, get_run_logger

class BaseParams(BaseModel):
    """
    Parent class for the Params meant to be extended.
    It adds 'entrypoint' as a forced parameter.
    """

    entrypoint: str

def catch_params(ParamsClass):
    """
    This is used to easily parse params. It takes a class that inherits from 'BaseParams'.

    For example:
        class Params(params.BaseParams):
            schema_out: str = "nt_bronze__big_query"
            files_per_partition: str = 1


        @params.catch_params(Params)
        def main(params):
            do_stuff
    """

    def decorator(func):
        def inner(**kwargs):
            params = ParamsClass(**kwargs)
            entrypoint = params.entrypoint

            @flow(name=entrypoint)
            def prefect_flow(params):
                t0 = time()
                logger = get_run_logger()

                logger.info(f"Running with {params=}")

                # Make sure to log the outcome of the process
                try:
                    func(params)

                except Exception as ex:
                    logger.exception(
                        f"Job '{entrypoint}.main()' failed after {(time() - t0)/ 60:.2f} minutes"
                    )
                    raise ex
                else:
                    logger.info(
                        f"Job '{entrypoint}.main()' finished successfully "
                        f"after {(time() - t0)/ 60:.2f} minutes"
                    )

            return prefect_flow(params)

        return inner

    return decorator

7.3. Sample module

Here is an example of how to define a job:

my_package/salesforce/main.py

from datetime import datetime

from my_package.common import BaseParams, catch_params

# Here we define all params
class Params(BaseParams):
    n_jobs: int = 10
    start_time: datetime = None

@catch_params(Params)
def main(params):
    print(f"Starting {params.n_jobs} jobs with start_time={params.start_time}")

Notice that using parameters is as simple as declaring a pydantic class. And then for retrieving the values simply call params.param_name.