Moss Ebeling
01/20/2022, 6:33 AMfrom common_etl.data_sources import fetch_sales
from common_etl.util import enrich_with_user_details
# in flows/agg_sales.py
@task
def aggregate_sales():
sales = fetch_sales()
enriched_sales = enrich_with_user_details(sales)
# in flows/publish_sales_report.py
@task
def publish_to_wiki():
sales = fetch_sales()
sales.to_wiki()
We are using docker storage when deploying flows and copy the relevant package into the docker image and pip install it using the final commands options. Doing this means that we're able to deploy and run using a LocalExecutor
that has the correct version of our package in its execution environment.
However, since our jobs are large and can exhaust the resources on a single host, we're trying to use the DaskExecutor
instead, and point to a multi-host cluster. At this point we see failures because the dask workers in that cluster do not have this package installed in their execution environments. If we were to try to install it in their environments it would require stopping dask workers, installing the package in their environment, then restarting them. This would also seem to restrict all deployed flows to be using the exact same release of the common package (since the dask workers can only have one version installed?).
Has anyone had similar experience dealing with docker storage and dask executors or suggestions on this design?Kevin Kho
01/20/2022, 6:57 AMDaskExecutor
so this problem is less common. I have an idea, but just note you are right it’ll be easy to hit version conflicts for different jobs.
You can use the worker plug to pip install libraries. You can access the Dask client like the code snippet here . The only difficulty is installing those custom modules. In this case, maybe you try pip install directly from a git repo? Note though you can also use the UploadFile
plug-in and maybe you can install a wheel like that?Moss Ebeling
01/20/2022, 8:25 AMKevin Kho
01/20/2022, 2:36 PMMoss Ebeling
01/20/2022, 10:33 PM