https://prefect.io logo
Title
d

dmo

10/16/2019, 1:49 PM
Hello! I'm looking into Prefect and have a hard time finding a good way to handle shared code. Let's say I have utility function in my_utility.py:
def get_answer_to_everything():
    return 42
And then I define my flow in one_of_many_flows.py:
import my_utility
from prefect import Flow, task 


@task
def run_imported_function():
    return my_utility.get_answer_to_everything()


with Flow("Imported function") as flow1:
    run_imported_function()
flow1.run() # 42
flow1.run(executor=DaskExecutor(address=IP)) # ModuleNotFoundError: No module named 'my_utility'
I know this is due to how the serialization works in Python. However, Dask provides a way to upload files to workers through Executor.upload_file: https://stackoverflow.com/questions/39295200/can-i-use-functions-imported-from-py-files-in-dask-distributed However, I have found no way to perform anything similar through the Prefect API. Am I looking in the wrong place or is this use case out of scope?
c

Chris White

10/16/2019, 1:53 PM
hi @dmo - you can just use Dask directly to push that file to your cluster; we assume you manage your cluster “externally” from Prefect
d

dmo

10/16/2019, 2:00 PM
Thanks for the reply! Ah okay, so there is no way for me to control the cluster environment itself through the Flow. So I need to have some environment handling around the flows? I guess that is partly what the Prefect Cloud is about?
c

Chris White

10/16/2019, 2:12 PM
Yea that’s exactly right! in Prefect Cloud there are first class “Execution Environments” with “on_start” and “on_exit” hooks that you could use to broadcast this file, for example
d

dmo

10/16/2019, 3:07 PM
Okay! thanks 🙂
m

Maikel Penz

02/02/2020, 11:35 PM
Hey @Chris White.. Is there an example on prefect cloud of how to broadcast a file using
on_start
? I am using the
DaskKubernetesEnvironment
and I get an error that my helper file cannot be imported when running it
c

Chris White

02/02/2020, 11:36 PM
Hi @Maikel Penz! We don’t have any examples documented with
on_start
, but if you are using
DaskKubernetesEnvironment
I’d instead recommend including the files that your Flow relies on within the Docker container storing your Flows. Caveat: make sure that you also add the location of your helper files to your Docker image’s PATH
m

Maikel Penz

02/02/2020, 11:39 PM
right.. I'm using the
files
setting on
DockerStorage
to send my helpers along with the serialized version of my flow so they are there.. I believe the location could be missing from the
PATH
then ...
c

Chris White

02/02/2020, 11:39 PM
yea, the easiest thing to do here is to write your own
Dockerfile
so you can fully control how the image is built
m

Maikel Penz

02/02/2020, 11:40 PM
ok I'll give it a go
this can be achieved by setting
PREFECT__CONTEXT__FLOW_FILE_PATH
right ?
c

Chris White

02/02/2020, 11:46 PM
Hmm no I don’t recommend you change that variable; to add a file to your PATH you can do either one of the following within your Dockerfile, depending on whether you want it on your system path or python path only:
COPY /my/file.sh /my/file/in/docker/file.sh
ENV PATH="/my/file/in/docker:${PATH}"
or
COPY /my/file.py /my/file/in/docker/file.py
ENV PYTHON_PATH="/my/file/in/docker:${PYTHON_PATH}"
(note that the problem here is not related to prefect but rather your runtime environment, which in this case is a docker container)
m

Maikel Penz

02/02/2020, 11:46 PM
yeah that makes sense.. thanks @Chris White!
c

Chris White

02/02/2020, 11:46 PM
yup for sure, anytime!