Moss Ebeling01/20/2022, 6:33 AM
We are using docker storage when deploying flows and copy the relevant package into the docker image and pip install it using the final commands options. Doing this means that we're able to deploy and run using a
from common_etl.data_sources import fetch_sales from common_etl.util import enrich_with_user_details # in flows/agg_sales.py @task def aggregate_sales(): sales = fetch_sales() enriched_sales = enrich_with_user_details(sales) # in flows/publish_sales_report.py @task def publish_to_wiki(): sales = fetch_sales() sales.to_wiki()
that has the correct version of our package in its execution environment. However, since our jobs are large and can exhaust the resources on a single host, we're trying to use the
instead, and point to a multi-host cluster. At this point we see failures because the dask workers in that cluster do not have this package installed in their execution environments. If we were to try to install it in their environments it would require stopping dask workers, installing the package in their environment, then restarting them. This would also seem to restrict all deployed flows to be using the exact same release of the common package (since the dask workers can only have one version installed?). Has anyone had similar experience dealing with docker storage and dask executors or suggestions on this design?
Kevin Kho01/20/2022, 6:57 AM
so this problem is less common. I have an idea, but just note you are right it’ll be easy to hit version conflicts for different jobs. You can use the worker plug to pip install libraries. You can access the Dask client like the code snippet here . The only difficulty is installing those custom modules. In this case, maybe you try pip install directly from a git repo? Note though you can also use the
plug-in and maybe you can install a wheel like that?
Moss Ebeling01/20/2022, 8:25 AM
Kevin Kho01/20/2022, 2:36 PM
Moss Ebeling01/20/2022, 10:33 PM