<@ULVA73B9P> Can you point me to where the source ...
# ask-marvin
m
@Marvin Can you point me to where the source code for setting up the flow environment is? Things like querying for the flow object
m
If you're looking to explore the source code for setting up the flow environment in Prefect, including querying for flow objects, you can check the
flows.py
file in the Prefect GitHub repository. This file contains the implementation details for creating and managing flow objects. You can view it here: flows.py on GitHub. For setting up the environment and infrastructure for running flows, including work pools and workers, you might find the Prefect 3.x documentation useful. It provides guidance on creating work pools and deploying flows. If you have specific tasks in mind, like querying flow objects, you might want to explore the
flow
function in the
flows.py
file, which is responsible for creating a new flow object. If you have further questions or need more specific guidance, feel free to ask!
n
hi @Matthew Scanlon - marvin might struggle since I haven't indexed all the source code so much are you asking about retrieving the flow code at runtime?
m
No. I am having issues with my kubernetes based deployments. Long story short the only thing i see in the logs are
Copy code
Flow run 'ultramarine-muskox' - Changing working directory to '/opt/prefect'
19:37:46.468 | DEBUG   | Flow run 'ultramarine-muskox' - Importing flow code from 'flows.equities.sample.sample:sample'
19:37:46.471 | DEBUG   | prefect.runner - Stopping runner...
Traceback (most recent call last):

...

File "/usr/local/lib/python3.10/dist-packages/prefect/flows.py", line 1732, in load_flow_from_entrypoint
    |     flow = import_object(entrypoint)
    |   File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/importtools.py", line 216, in import_object
    |     module = load_module(module_name)
    |   File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/importtools.py", line 187, in load_module
    |     return importlib.import_module(module_name)
    |   File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
    |     return _bootstrap._gcd_import(name[level:], package, level)
    |   File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
    |   File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
    |   File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
    |   File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
    |   File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
    |   File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
    |   File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
    |   File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
    |   File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
    |   File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
    |   File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
    | ModuleNotFoundError: No module named 'flows.equities'
The traceback before this just takes me through a bunch of coroutines that do not seem to do anything. The curious part is that if i spin up a pod with this image, and run python, then execute the set_working_directory function as the pull step should (since we use LocalStorage) and then execute
Copy code
load_flow_from_entrypoint('flows.equities.sample.sample:sample')
it works. So i am confused why it is not working here and why it is cutting the entrypoint short
n
yeah sorry this is what i meant by retrieving flow code at runtime. we need the flow object to submit it to the engine so we call
load_flow_from_entrypoint
and it seems
importlib
cant find this object
flows.equities.sample.sample:sample
are you doing some
pip install
in a dockerfile or in a
pull
step at all? seems like your module is not importable when it fails
So i am confused why it is not working here and why it is cutting the entrypoint short
its bc it goes left to right, as soon as it cant resolve the module it fails
m
Yes. There is a requirements file that gets pip installed. Is that relevant here? What would that have to do with the entrypoint not being resolveable? Im still a little confused as to why
load_flow_from_entrypoint
would work from inside a debug pod but not when the pull step is being executed on a worker
n
pull step is being executed on a worker
this isnt the case though, we send a
command
like
python -m prefect.engine
to the pod and then it enters the engine, where the pull steps are run https://github.com/prefecthq/prefect/blob/main/src/prefect/flow_engine.py#L130-L143
m
its also not clear to me why it would not be able to resolve the module
Copy code
>>> import flows
>>> import flows.equities
>>> import flows.equities.sample
>>> import flows.equities.sample.sample
>>> from flows.equities.sample.sample import sample
>>>
these all work and this doesnt raise an error
Copy code
>>> from prefect.utilities.importtools import load_module
>>> load_module('flows')
<module 'flows' (<_frozen_importlib_external._NamespaceLoader object at 0x7f842a01d120>)>
>>> load_module('flows.equities')
<module 'flows.equities' (<_frozen_importlib_external._NamespaceLoader object at 0x7f842a01d300>)>
>>> load_module('flows.equities.sample')
<module 'flows.equities.sample' (<_frozen_importlib_external._NamespaceLoader object at 0x7f842a145e40>)>
>>> load_module('flows.equities.sample.sample')
<module 'flows.equities.sample.sample' from '/opt/prefect/flows/equities/sample/sample.py'>
>>>
n
sorry, its hard for me to say why its not importable without knowing how / if you're doing python packaging
m
We basically bake a base image with the prefect entrypoint and a python env, and then copy our flows into
opt/prefect
The part that is tripping me up is this: If i have an image, say flow_image:my_tag, then how come if i run a flow through the prefect dashboard using this flow_image:my_tag the entrypoint cant be imported but if i run
Copy code
kubectl run -it --rm debug-pod --image=flow_image:my_tag --namespace=<NS> -- /bin/bash
and then
Copy code
cd /opt/prefect # as done by the pull step
python
import load_flow_from_entrypoint
load_flow_from_entrypoint(flow, flow.equities, flow.equities.sample (etc)) # these all work
n
is this example helpful? all from repo root: • i install the worker helm chart
Copy code
» helm install prefect-worker prefect/prefect-worker -f examples/run_a_prefect_worker/on_k8s/values.yaml
• create the deployment, which is defined here and uses this dockerfile
Copy code
» uv run prefect --no-prompt deploy -n network-speed-on-k8s
Running deployment build steps...
 > Running build_docker_image step...  
....
Running deployment push steps...
 > Running push_docker_image step...
The push refers to repository [<http://docker.io/***/prefect-pack|docker.io/***/prefect-pack>]                                                                                                                                                                                      
....
• create the flow run
Copy code
» prefect deployment run 'monitor-network/network-speed-on-k8s' --watch
Creating flow run for deployment 'monitor-network/network-speed-on-k8s'...
Created flow run 'radical-finch'.
└── UUID: 8ab6adcd-74a0-4307-9250-92eb1377a5d5
└── Parameters: {}
└── Job Variables: {}
└── Scheduled start time: 2025-02-27 15:37:35 CST (now)
└── URL: <https://app.prefect.cloud/account/xxx/workspace/xxx/runs/flow-run/8ab6adcd-74a0-4307-9250-92eb1377a5d5>
Watching flow run 'radical-finch'...
15:37:36.583 | INFO    | prefect - Flow run is in state 'Scheduled'
15:37:41.672 | INFO    | prefect - Flow run is in state 'Running'
15:37:46.980 | INFO    | prefect - Flow run is in state 'Running'
15:37:52.259 | INFO    | prefect - Flow run is in state 'Running'
15:37:57.843 | INFO    | prefect - Flow run is in state 'Running'
15:38:03.509 | INFO    | prefect - Flow run is in state 'Running'
15:38:08.641 | INFO    | prefect - Flow run is in state 'Running'
15:38:13.737 | INFO    | prefect - Flow run is in state 'Completed'
Flow run finished successfully in 'Completed'.
again the details of how you get the deps installed depends on how you write your dockerfile, what you're using to manage python deps, but I like
uv
so I
uv sync
all my deps at image build time and then I just tell the pod to start with `uv run -m prefect.engine`
m
Can you explain the "tell your pod to run" part? This code has always worked until very recently when upgrading to prefect 3 when we had to change a lot, so i am a little confused as to why i need to execute a command to run the prefect engine?
Copy code
COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh

COPY ${FLOW_PATH} /opt/prefect/${FLOW_PATH}

RUN pip install -r /opt/prefect/${FLOW_PATH}/requirements.txt
is pretty much the extent of what we do in the dockerfile
n
you don't need to do exactly that, the default
command
(which is a job variable you can override) is
python -m prefect.engine
which is appropriate is most cases im saying, use whatever
python
executable
uv
gives me bc i already set it up in the image
what is
prefect_entrypoint.sh
doing?
Copy code
COPY prefect_entrypoint.sh /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/entrypoint.sh
m
Copy code
#!/bin/bash
set -e
if [ ! -z "$EXTRA_PIP_PACKAGES" ]; then
  echo "+pip install $EXTRA_PIP_PACKAGES"
  python -m pip install $EXTRA_PIP_PACKAGES
fi
if [ -z "$*" ]; then
  echo "\
            _____  _____  ______ ______ ______ _____ _______
           |  __ \|  __ \|  ____|  ____|  ____/ ____|__   __|
           | |__) | |__) | |__  | |__  | |__ | |       | |
           |  ___/|  _  /|  __| |  __| |  __|| |       | |
           | |    | | \ \| |____| |    | |___| |____   | |
           |_|    |_|  \_\______|_|    |______\_____|  |_|

Thanks for using Prefect!!!

This is the official docker image for Prefect Core, intended for executing
Prefect Flows. For more information, please see the docs:
<https://docs.prefect.io/core/getting_started/installation.html#docker>
"
  exec bash --login
else
  exec "$@"
fi
n
I would remove that, unless you have a reason to keep it
m
No we can remove it. We do use EXTRA_PIP_PACKAGES but i can package this in the dockerfile
👍 1
n
ok let me know if you hit any specific trouble doing that
m
None at all. It was a quick turn around. But still getting the same issue. We were passing this as an entrypoint to TINI. Do you think that is overkill. By the way, your comment about comnmand got me thinking, so i used it to pass
PYTHONPATH=$PYTHONPATH:$(pwd) && python -m prefect.engine
and this seems to of resolved the issue. Any idea what this could point too. I am still a little confused as to why if i am setting my working dir to opt/prefect and the only thing under that is the flows module, why it would not be able to resolve the imports from inside the prefect engine script but can if i just exec into the container and run the fns
n
hmm, im happy to dig into an example if you can make an MRE?
still a little confused as to why if i am setting my working dir to opt/prefect and the only thing under that is the flows module, why it would not be able to resolve the imports from inside the prefect engine script but can if i just exec into the container and run the fns
just because there are a lot of potential ways that one could build a docker image so its hard to say definitively why a given image is behaving without seeing exactly how it is being built
m
yeah ill work on something over the weekend and get back to you. Thanks!
👍 1
And thank you for all the help!
catjam 1