search iconsearch icon
Type something to search...

Setting Up and Deploying Prefect Server: A Comprehensive Guide

Table of Contents

Open Table of Contents

0. Intro

In this post I’ll explain how to:

  1. Set up your Prefect Server.
  2. Create Deployments.
  3. Connect to the Server.

If you are not familiar with Prefect I suggest you read Domain LogoPrefect Essentials first.

1. Setting up a Prefect Server

This section is based on Domain LogoHosting a Prefect server instance guide.

The simplest configuration that can work in production consists of three main components:

  1. A database for the server.
  2. The prefect server.
  3. At least one prefect worker.

For the database, I opted for a postgres instance hosted on AWS.

Prefect is designed to scale well beyond this basic configuration (more information in Domain LogoDeploying Flows to Work Pools and Workers), but for the purposes of this post, I’ll focus on the fundamental setup.

This basic configuration suffices for my needs, as I primarily use prefect workers to trigger various AWS jobs and tasks using boto3, including:

  • lambdas
  • ecs tasks with fargate
  • emr serverless tasks

Hence, a single worker adequately serves this setup.

1.1. Creating the Prefect Server

To create the Prefect Server, we need a simple docker container with prefect installed. While there are no official images for a docker server, we can use the official one meant for workers.

The setup process involves providing the correct configuration for the database:

/Dockerfile.server

# Prefect version must match Dockerfile.worker and pyproject.toml
FROM prefecthq/prefect:2.16-python3.10 as base

# Set connection (format: `postgresql+asyncpg://{user}:{password}@{url}:{port}/prefect`)
ARG DB_URL
RUN prefect config set PREFECT_API_DATABASE_CONNECTION_URL="${DB_URL}"
RUN prefect config set PREFECT_SERVER_API_HOST=0.0.0.0

CMD ["prefect", "server", "start"]

If you attempt to run this with the database inside a docker container, you may encounter networking issues. It’s strongly recommended to use Domain LogoPrefect example with Docker compose to see how to handle it with docker compose.

To create the docker image, execute the following command, replacing XXX with the database URL. For a postgres database, the format should be like postgresql+asyncpg://{user}:{password}@{url}:{port}/prefect:

docker build --build-arg DB_URL=XXX -f Dockerfile.server -t northius/prefect/server .

Once the Prefect Server is set up, you can access the web UI. If it’s running locally, you can find it at localhost:4200. However, the server won’t be able to run any flow without a worker.

To deploy the server somewhere, like AWS ECS running as a Service with Fargate, ensure to provision a public URL so that other components can connect to it.

Ensure to secure the server behind a VPN to prevent unauthorized access.

1.2. Creating a Prefect Worker

To create a Prefect Worker, we first need to create a Work Pool so that the Worker we create can join it.

We will use a local subprocess work pool. Given that we will only use it to trigger other services with boto3, it should be more than enough. If needed, we can always change it to an ECS task and/or add more workers.

You can do this using the Prefect UI:

Create Prefect Work Pool

Once you have the Work Pool, we will create a Worker with Docker, following a similar approach to the Server:

/Dockerfile.worker

# Prefect version must match Dockerfile.server and pyproject.toml
FROM prefecthq/prefect:2.16-python3.10 as base

# Connect to Prefect
ARG PREFECT_API_URL
ENV PREFECT_API_URL=${PREFECT_API_URL}

# Allow to config pool_name
ARG POOL_NAME=subprocess-pool
ENV POOL_NAME=${POOL_NAME}

ENTRYPOINT prefect worker start --pool ${POOL_NAME}

Make sure that the Prefect versions match in both Dockerfiles.

Similarly to before, we can build the image with:

docker build --build-arg PREFECT_API_URL=XXX -f Dockerfile.worker -t northius/prefect/worker .

Replace XXX with the Prefect API URL. It should be like https//{server_url}/api.

As before, you will need to run this docker container somewhere. In my case, I’ll also run it as another ECS service using Fargate.

2. Deployments

As the official documentation states in Domain LogoPrefect Deployments:

Prefect Deployments are server-side representations of flows. They store the crucial metadata needed for remote orchestration including when, where, and how a workflow should run Deployments elevate workflows from functions that you must call manually to API-managed entities that can be triggered remotely.

In my case, I’ll use them to define:

  • The different flows I want to run
  • From where they will get the code (a GitHub repo)
  • When the flow should be run (schedule)

2.1. Getting the code

The first thing you will need to do is to create a Prefect Block (more info at Domain LogoPrefect Essentials | Using prefect blocks) to store the GitHub credentials if the repo is not public.

Once done, we can define how to get the code and the dependencies with:

/prefect.yaml

pull:
- prefect.deployments.steps.git_clone:
    id: clone-step
    repository: https://github.com/github_user/repo.git
    branch: main
    credentials: "{{ prefect.blocks.github.github }}"
- prefect.deployments.steps.pip_install_requirements:
    requirements_file: requirements.txt
    directory: "{{ clone-step.directory }}"
    stream_output: False

This will only work if there is a GitHub block called github with a token that allows reading the repo. This is only needed if the repo is not public.

2.2. Creating the first Deployment

Now that we’ve defined how to fetch the code, we will create the first Deployment. Add the following code to the prefect.yaml you just created.

/prefect.yaml

- name: lambdas.athena_history
  description: Export Athena history with a lambda
  entrypoint: src/lambdas/lambdas.py:athena_history
  version: 0.1.0
  work_pool:
    name: subprocess-pool
  tags: ["type:prefect", "group:lambda", "job:athena_history"]

Here is what we are setting:

variabledescription
nameThe name you want for the deployment
descriptionDescription of what the deployment does
entrypointWhere is located the flow. It is path/to/filename.py:flow_name
versionThe version of the flow
work_poolWhich work pool should run the deployment. It references what we created before
tagsAny tags you want to add to all flow_runs that will come from this deployment

Now you can deploy this to the prefect server with:

set PREFECT_API_URL=XXX
prefect deploy --all

Replace XXX with the Prefect API URL. It should be like https//{server_url}/api.

This will deploy all Deployments in prefect.yaml to the server.

Now you can head to the server and trigger a run of the Deployment from there.

2.3. Advanced Usage of Deployments

What we’ve created works fine, but once you start adding multiple Deployments, you will probably encounter a lot of repetition. To avoid this, you can define some defaults that will be later used in each Deployment with:

/prefect.yaml

definitions:
  work_pools:
    subprocess-pool: &subprocess-pool
      name: subprocess-pool

  version: &version
    version: 0.1.0

  # Define defaults (that can be overwritten)
  defaults: &defaults
    <<: *version
    work_pool: *subprocess-pool
    tags: ["type:prefect"]
    schedule: {}

You can read more about Domain LogoYAML alias nodes.

With that, you can reuse the defaults like so:

/prefect.yaml

- name: lambdas.athena_history
  <<: *defaults
  description: Export Athena history with a lambda
  entrypoint: src/lambdas/lambdas.py:athena_history
  tags: ["type:prefect", "group:lambda", "job:athena_history"]

- name: lambdas.emr_history
  <<: *defaults
  description: Export EMR history with a lambda
  entrypoint: src/lambdas/lambdas.py:emr_history
  tags: ["type:prefect", "group:lambda", "job:emr_history"]
  schedule:
    cron: "0 3 * * *"

Notice that you can overwrite the defaults. For example here we are overwriting tags.

2.4. Deploying deployments

We’ve seen how to manually deploy the Deployments, but it’s better to do it with Continuous Deployment (CD).

2.4.1. Deploying with Continuous Deployment (CD)

Here, you will only need a GitHub Action (or any CD equivalent) that:

  1. Detects changes to prefect.yaml on commits to the main branch
  2. Deploys all flows to Prefect

The only change you will need to make is to add --no-prompt so that it doesn’t expect user confirmations:

prefect --no-prompt deploy --all

You can redeploy an already existing Deployment without losing any data. It will simply update whatever has changed.

2.4.2. Manual deploy

One really cool thing you can do is to manually deploy Deployments when you are creating a new one. Here’s what I do: change the branch reference in prefect.yaml to the current branch where I am developing by with:

/prefect.yaml

prefect.deployments.steps.git_clone:
    id: clone-step
    repository: https://github.com/github_user/repo.git
-    branch: main
+    branch: your_current_branch
    credentials: "{{ prefect.blocks.github.github }}"

And then you can deploy the Deployment you are working with:

prefect --no-prompt deploy --name lambdas.athena_history

It is important to specify only the Deployment you are developing so that you don’t interfere with the other Deployments you might have running in production.

Once you finish your work and do a commit to main, the CD will overwrite the branch reference so that it points to main. Notice that any Deployment run you do will fetch the code from the branch you set and run. That implies that if you are changing only your code you do not need to deploy at each commit. You will only need to deploy if you change:

  • The reference branch
  • The flow configuration (name, parameters, tags etc.)

3. Connect to Prefect Server

Now that we have a working server, we want to connect everything that runs flows to the server so that we can track everything.

To connect to the server, we only need the following environment variable:

PREFECT_API_URL=XXX

Remember, you need to replace XXX with the Prefect API URL. It should be like https//{server_url}/api.

3.1. Connect Docker

For anything that runs in a Docker container (such as DBT in my case), you can follow the same approach we did in the Prefect worker. We need to set an argument for the API URL and then store it as an environment variable with:

ARG PREFECT_API_URL
ENV PREFECT_API_URL=${PREFECT_API_URL}

And then when we build it, we need to add the following argument:

--build-arg PREFECT_API_URL=XXX

3.2. Connect EMR

In EMR, we need to set the environment variable at the driver. We can do that by adding the following configuration:

--conf spark.emr-serverless.driverEnv.PREFECT_API_URL=XXX

This is for EMR serverless but is very similar for the non-serverless EMR.

4. Handling Multiple Environments

You probably have multiple environments such as pre, pro, and maybe also sandbox.

With the Prefect Server, you have 2 options:

  1. Creating a server per each environment
  2. Having a central server that can trigger runs to multiple/any environment

With a dedicated server, you will have better isolation but it would be more work to maintain and will have greater costs. In my case, I wanted simplicity and lower costs so I went with a central server. In that case, the only important bit is you will need to pass to each flow the env variable as a parameter, and then you will need some code to assume different roles per environment.

5. Useful Code Snippets

Here you have some snippets that can help you get started.

5.1. Updating Tags Dynamically

One really cool thing you can do with Prefect is to dynamically edit the tags of each flow at runtime. In my case, it’s very useful for tagging the environment where the flow is running (such as env:pro or env:pre).

You can do it with:

/src/common/prefect_utils.py

import asyncio

from prefect import get_run_logger
from prefect.client import get_client
from prefect.context import get_run_context


def update_tags(tags):
    logger = get_run_logger()
    if not tags:
        logger.warning("No tags passed to 'update_tags', nothing to do")
        return True

    logger.info(f"Adding {tags=} to current flow_run")

    # Read current flow
    flow_run = get_run_context().flow_run
    tags += flow_run.tags

    client = get_client()
    asyncio.run(client.update_flow_run(flow_run.id, tags=set(tags)))
    return True

This example only works when called from a flow. If you want to run it from a task, you will need to fetch the task instead.

5.2. Running a Lambda

Here you have the simplest example I have for running external jobs from Prefect. This calls any lambda you might want:

/src/lambdas/base.py

from prefect import get_run_logger
from prefect import task

from src.common.session import get_session


@task(name="prefect.lambdas.run")
def _run_lambda(client, name):
    logger = get_run_logger()

    response = client.invoke(
        FunctionName=name,
        InvocationType="RequestResponse",
        LogType="None",
    )

    if (status_code := response["StatusCode"]) == 200:
        logger.info(f"Lambda {name=} completed successfully")
        return True

    logger.error(f"Lambda {name=} failed with {status_code=}")


def run_lambda(env, name):
    logger = get_run_logger()
    logger.info(f"Running lambda {name=} in {env=}")

    client = get_session(env).client("lambda")
    _run_lambda(client, name)

5.3. Defining a Flow for Each Possible Lambda

With the base.py code for running lambdas, you can define a flow for each lambda you have. As an example, we could have:

/src/lambdas/lambdas.py

from typing import Literal

from prefect import flow

from prefect_northius.lambdas.base import run_lambda


@flow(name="prefect.lambdas.athena_history")
def athena_history(env: Literal["snd", "pro"]):
    run_lambda(env, "nt-lambda-function-athena-history")


@flow(name="prefect.lambdas.emr_history")
def emr_history(env: Literal["snd", "pro"]):
    run_lambda(env, "nt-lambda-function-emr-history")

Remember to declare those flows in the prefect.yaml.