https://prefect.io logo
Title
m

Moss Ebeling

01/20/2022, 6:33 AM
Hi there, We've recently begun using Prefect Core and Prefect Server and have had positive initial results with writing flows but have hit some pain points when coming to deployments out to Prefect. When writing our flows, we make quite heavy usage of shared functions and tasks between different flows. For example helpers that fetch some common datasets or utilities for data munging. These are imported from a package that contains the common tasks and the flow definitions.
from common_etl.data_sources import fetch_sales
from common_etl.util import enrich_with_user_details

# in flows/agg_sales.py
@task
def aggregate_sales():
    sales = fetch_sales()
    enriched_sales = enrich_with_user_details(sales)

# in flows/publish_sales_report.py
@task
def publish_to_wiki():
    sales = fetch_sales()
    sales.to_wiki()
We are using docker storage when deploying flows and copy the relevant package into the docker image and pip install it using the final commands options. Doing this means that we're able to deploy and run using a
LocalExecutor
that has the correct version of our package in its execution environment. However, since our jobs are large and can exhaust the resources on a single host, we're trying to use the
DaskExecutor
instead, and point to a multi-host cluster. At this point we see failures because the dask workers in that cluster do not have this package installed in their execution environments. If we were to try to install it in their environments it would require stopping dask workers, installing the package in their environment, then restarting them. This would also seem to restrict all deployed flows to be using the exact same release of the common package (since the dask workers can only have one version installed?). Has anyone had similar experience dealing with docker storage and dask executors or suggestions on this design?
k

Kevin Kho

01/20/2022, 6:57 AM
Hi @Moss Ebeling, I think Prefect users normally use ephemeral clusters with their
DaskExecutor
so this problem is less common. I have an idea, but just note you are right it’ll be easy to hit version conflicts for different jobs. You can use the worker plug to pip install libraries. You can access the Dask client like the code snippet here . The only difficulty is installing those custom modules. In this case, maybe you try pip install directly from a git repo? Note though you can also use the
UploadFile
plug-in and maybe you can install a wheel like that?
m

Moss Ebeling

01/20/2022, 8:25 AM
Thanks Kevin, I'll have a look through some of those ideas. Re-reading the docs, this option to specify images for the workers also looks like it might help. When you say ephemeral clusters and are you still talking about across multiple machines?
k

Kevin Kho

01/20/2022, 2:36 PM
Yes think like a Kubernetes cluster that creates a couple of pods with an image for the flow duration and then spins down.
m

Moss Ebeling

01/20/2022, 10:33 PM
Thanks Kevin, I'll have a look at that also 🙂