Effortless EMR: A Guide to Seamlessly Running PySpark Code
- 2024-05-07
- ☁️ Cloud/DevOps
- AWS
0. Intro
Tired of wrestling with complex setups when trying to get PySpark up and running on EMR? You’re not alone! This guide was created out of frustration with overly convoluted configurations. I’m here to show you a simpler way to get PySpark up and running on EMR. No more tangled messes—just clear, straightforward steps and easy traceability. Whether you’re new to PySpark or just tired of the headache, I’ve got your back. Let’s make PySpark on EMR hassle-free together!
This post breaks down multiple concepts step by step. If you prefer to see the final result, head to the last section where you can find everything consolidated.
1. Running a script in EMR
The simplest way to execute PySpark code on EMR is by storing a Python file in S3.
For example, let’s suppose we have a main.py
file in a bucket named your-bucket
:
s3://your-bucket/main.py
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()
# Create a DataFrame
data = [("Alice", 34), ("Bob", 45), ("Charlie", 23)]
sdf = spark.createDataFrame(data, ["Name", "Age"])
# Show the DataFrame
sdf.show()
# Stop the SparkSession
spark.stop()
First, you need to create or start your EMR cluster, which is beyond the scope of this post.
Next, you submit a job using the specified file (s3://your-bucket/main.py
) as the entrypoint.
This method works well for single-file jobs, but for more complex jobs with dependencies, it’s not very practical.
A common solution is to create a Python package so you can import code from other files. Let’s explore how to do this next.
2. Using packages
In general, you’ll need to use two different types of packages: Java
packages and Python
packages.
2.1 Java packages
Adding Java packages is quite straightforward:
- Download the package as a JAR file, for example,
spark-3.3-bigquery-0.31.1.jar
- Add this file somewhere in S3 where you have reading access, for example,
s3://your-bucket/emr/jars/
- Set up the cluster to include that package by adding the following configuration:
--jars s3://your-bucket/emr/jars/spark-3.3-bigquery-0.31.1.jar
With that, you should be able to use that package.
2.2. Python packages
The simplest way of adding a Python package is by Setting up a bootstrap action (as recomended in QA: EMR install python libraries)
The main problem with that is that you’ll be downloading the package at runtime. You could download the package (an egg
or a wheel
) to S3 and install it from there, but it adds complexity.
The way to overcome those problems is to create a docker image.
3. Creating an environment with Docker
According to EMR spark docker by AWS, you can create a Docker image and later use it to run EMR. However, I think it’s better to create a virtual environment using Docker and export it as a tar.gz
file.
With that, you can simply export the file to S3 and later use it.
3.1. Add external packages
The overall idea is to follow EMR serverless samples.
In this example I’m using Poetry python package manager since I think it’s better than regular pip
.
The steps I follow are:
- Get
pip
- Install
poetry
- Copy the needed files in order to install the
poetry
dependencies - Create the virtual environment
- Create a
tar.gz
with the virtual environment - Export the
tar.gz
Dockerfile
FROM --platform=linux/amd64 amazonlinux:2023 AS base
RUN dnf install python3-pip -y
# Update and install python packages
RUN pip install poetry --no-cache-dir
# Copy needed files to install the project
COPY pyproject.toml .
COPY README.md .
COPY my_package my_package
# Set up poetry and install dependencies
RUN poetry config virtualenvs.in-project true && \
poetry install
# Create a '.tar.gz' with the virtual environment
RUN mkdir /output && \
poetry run venv-pack -o /output/my_package-latest.tar.gz
# Export the '.tar.gz'
FROM scratch AS export
COPY --from=base /output/*.tar.gz /
For this to work you will need to have venv-pack
as a poetry
dependency so that you can export the virtual environment.
You can add it with poetry add venv-pack
Here I’m using amazonlinux:2023
as the base image since I’m using EMR 7
and it uses that version.
For older versions replace it with amazonlinux:2
.
To create the tar.gz
simply run:
docker build --output . .
Once the tar.gz
is created, you will need to upload to S3.
Then you can set up an EMR
cluster to use that environment with:
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.archives=s3:/your-bucket/venv/my_package-latest.tar.gz#environment
Make sure to adapt the path (s3:/your-bucket/venv/my_package-latest.tar.gz
in the example) to the file you created.
3.2. Create a ‘Package’ for Adding Code
Now that we know how to add external packages, we can reuse it to also be able to import our own code. As stated in How to run a Python project (package) on AWS EMR serverless?, the idea is to create a Python package and then install it.
In my case, it would be done with Docker by:
# Install 'my_package' as a package
RUN poetry build && \
poetry run pip install dist/*.whl
Another thing I added is the ability to pass the version
as an argument.
The final Dockerfile
code would be:
Dockerfile
FROM --platform=linux/amd64 amazonlinux:2023 AS base
ARG PACKAGE_VERSION
# Should match CI config
ARG POETRY_VERSION=1.6.1
RUN dnf install python3-pip -y
# Update and install python packages
RUN pip install \
"poetry==${POETRY_VERSION}" \
boto3 \
loguru \
toml \
--no-cache-dir
# Copy needed files to install the project
COPY pyproject.toml .
COPY README.md .
COPY my_package my_package
# Set up poetry and install dependencies
RUN poetry config virtualenvs.in-project true && \
poetry install
# Install 'my_package' as a package
RUN poetry build && \
poetry run pip install dist/*.whl
# Create a '.tar.gz' with the virtual environment
RUN mkdir /output && \
poetry run venv-pack -o /output/my_package-${PACKAGE_VERSION}.tar.gz
# Export the '.tar.gz'
FROM scratch AS export
COPY --from=base /output/*.tar.gz /
And it is compiled with:
# Set a version that is unique and will not collide with other people
docker build --output . . --build-arg PACKAGE_VERSION=2.1.7 # Any version you want
3.3. Export the environment with CD
Here, the idea is to have a GitHub action that uploads the virtual environment whenever there is a commit to main
.
The general steps are:
Fetch
the code- Create the
tar.gz
as seen in the previous section - Upload it to
S3
One thing that I find very useful is to always export the real version, such as 2.1.7
, and then also create one called latest
. With this, I can easily point to the latest.tar.gz
in production
and/or point to a specific version if I need to.
You might want to read Managing package versions with Poetry for more information about how to manage versions with Poetry.
4. Generic entrypoint
You’ve seen how to add external packages and how to install your own code as a package. However, you still need an entrypoint for running EMR jobs.
Given that any Python file you have in your code could be an entrypoint, you would be forced to upload all files to S3.
Luckily, there is a better way to handle this. The idea is to have a generic entrypoint.py
that runs code from other packages.
Let’s imagine we have:
- my_package
├── salesforce
│ ├── utils.py
│ ├── query.py
│ └── main.py # This is an entrypoint
├── big_query
│ ├── constants.py
│ └── run.py # This is an entrypoint
└── ...
Some of those files will be prepared to be an entrypoint.
The only important part is that it has a main
function.
my_package/salesforce/main.py
# Other code
def main():
# This is what will be called
print(1)
With that, we can pass an argument to the generic entrypoint in a way that ends up importing the file we need and running the main
function inside. It would be similar to:
from my_package import salesforce
salesforce.main()
However, we will be doing the import dynamically so that it can be done based on the path as a string. This is done with:
import importlib
module = importlib.import_module("my_package.salesforce")
module.main()
This does exactly the same as before.
5. Handling Parameters
Now that you have a unique entrypoint, you could use sys.argv
or argparse
to catch the entrypoint from the arguments, as explained in How do I access command line arguments?.
One way of doing that would be to always use the first parameter for the entrypoint and the rest of the parameters as the actual script parameters.
But there is a better way with click
.
5.1. Catching Generic Parameters with click
The idea here is to define the entrypoint as a parameter in click
and then catch the rest of the parameters as explained in Add unspecified options to cli command using python-click.
The basic code for that would be:
entrypoint.py
import click
from loguru import logger
@click.command(
name="entrypoint",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.pass_context
@click.option(
"--entrypoint",
"-e",
required=True,
type=str,
help="File used as an entrypoint. Must have a 'main' function",
)
def main(ctx, entrypoint):
logger.debug("Extracting parameters from context")
# https://stackoverflow.com/a/32946412/3488853
kwargs = {ctx.args[i][2:]: ctx.args[i + 1] for i in range(0, len(ctx.args), 2)}
kwargs.update({"entrypoint": entrypoint})
# Replace dashes for underscores in param names to ensure compatibility
kwargs = {k.replace("-", "_"): v for k, v in kwargs.items()}
logger.info(f"Importing '{entrypoint}'")
module = importlib.import_module(entrypoint)
module.main(**kwargs)
if __name__ == "__main__":
print("Starting EMR job")
main()
Then you can define the parameters in each file as usual:
my_package/salesforce/main.py
# Other code
def main(name, num):
# This is what will be called
print(f"Hello {name} with {num=}")
Notice that all parameters are as strings
.
5.2. Define Script Parameters with pydantic
One way to improve the previous code is to define the parameters with Pydantic. This way you don’t need to handle the castings and can use the powerful validators from pydantic.
You would do that with:
my_package/salesforce/main.py
from pydantic import BaseModel
class MyParams(BaseModel):
entrypoint: str # This is needed
name: str
num: int
def main(**kwargs):
args = MyParams(**kwargs)
print(f"Hello {args.name} with {args.num=}")
6. Track Executions with Prefect
One really good way of having better observability on EMR jobs is to add Prefect. If you are not familiar with Prefect I suggest you read Prefect Essentials first.
Also make sure you connect EMR to Prefect by following Connect EMR | Setting Up and Deploying Prefect Server.
6.1. Jobs as Flows
The idea here is to create a flow at the entrypoint so that you can see each run.
The idea is to create a flow dynamically with flow(name="flow_name")(function)(*args, **kwargs)
.
So the modified lines look like as follows:
entrypoint.py
from prefect import flow
def main(ctx, entrypoint):
logger.info(f"Importing '{entrypoint}'")
module = importlib.import_module(entrypoint)
flow(name=entrypoint)(module.main)(**kwargs)
6.2. Adding Tasks and/or Subflows
You can follow the same approach to add different tasks and/or subflows. For example, let’s imagine you are extracting multiple tables from somewhere. Here you could create a Prefect task for each table. This way you will have a much better observability system. It would be done with something like:
from prefect import task
for table in tables:
name = f"{params.entrypoint}.{table.name}"
task(name=name)(process_one_table)(name=table.name, **any_other_arguments)
7. Putting it All Together
We have seen multiple code snippets explaining the different parts. Here, I’m adding the final code so that it’s easier to see the final outcome.
7.1. Entrypoint
This takes care of:
- Calling the
main
function of other files - Catching parameters as a
dict
- Adding
prefect.tags
entrypoint.py
import importlib
import sys
import click
from loguru import logger
from prefect import tags
PACKAGE_NAME = "my_package"
logger.configure(handlers=[{"sink": sys.stdout, "level": "INFO"}])
logger.enable(PACKAGE_NAME)
def get_run_tags(entrypoint):
tags = [
"type:emr",
"entrypoint:" + entrypoint.replace("my_package.", ""),
# Feel free to add any other tags
]
logger.info(f"Running with {tags=}")
return tags
@click.command(
name="entrypoint",
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.pass_context
@click.option(
"--entrypoint",
"-e",
required=True,
type=str,
help="File used as an entrypoint. Must have a 'main' function",
)
def main(ctx, entrypoint):
if not entrypoint.startswith("my_package."):
entrypoint = f"my_package.{entrypoint}"
logger.debug("Extracting parameters from context")
# https://stackoverflow.com/a/32946412/3488853
kwargs = {ctx.args[i][2:]: ctx.args[i + 1] for i in range(0, len(ctx.args), 2)}
kwargs.update({"entrypoint": entrypoint})
# Replace dashes for underscores in param names to ensure compatibility
kwargs = {k.replace("-", "_"): v for k, v in kwargs.items()}
logger.info(f"Importing '{entrypoint}'")
module = importlib.import_module(entrypoint)
logger.info(f"Running '{entrypoint}.main()' with kwargs='{kwargs}'")
with tags(*get_run_tags(entrypoint)):
module.main(**kwargs)
if __name__ == "__main__":
print("Starting EMR job")
main()
7.2. Arguments code
Here we define:
BaseParams
to be extended in each module- A decorator that catches the
Params
and creates aprefect.flow
my_package/common/parameters.py
from time import time
from pydantic import BaseModel
from prefect import flow, get_run_logger
class BaseParams(BaseModel):
"""
Parent class for the Params meant to be extended.
It adds 'entrypoint' as a forced parameter.
"""
entrypoint: str
def catch_params(ParamsClass):
"""
This is used to easily parse params. It takes a class that inherits from 'BaseParams'.
For example:
class Params(params.BaseParams):
schema_out: str = "nt_bronze__big_query"
files_per_partition: str = 1
@params.catch_params(Params)
def main(params):
do_stuff
"""
def decorator(func):
def inner(**kwargs):
params = ParamsClass(**kwargs)
entrypoint = params.entrypoint
@flow(name=entrypoint)
def prefect_flow(params):
t0 = time()
logger = get_run_logger()
logger.info(f"Running with {params=}")
# Make sure to log the outcome of the process
try:
func(params)
except Exception as ex:
logger.exception(
f"Job '{entrypoint}.main()' failed after {(time() - t0)/ 60:.2f} minutes"
)
raise ex
else:
logger.info(
f"Job '{entrypoint}.main()' finished successfully "
f"after {(time() - t0)/ 60:.2f} minutes"
)
return prefect_flow(params)
return inner
return decorator
7.3. Sample module
Here is an example of how to define a job:
my_package/salesforce/main.py
from datetime import datetime
from my_package.common import BaseParams, catch_params
# Here we define all params
class Params(BaseParams):
n_jobs: int = 10
start_time: datetime = None
@catch_params(Params)
def main(params):
print(f"Starting {params.n_jobs} jobs with start_time={params.start_time}")
Notice that using parameters is as simple as declaring a pydantic
class.
And then for retrieving the values simply call params.param_name
.