Hi! can I please get some guidance as to how to st...
# prefect-community
s
Hi! can I please get some guidance as to how to start diagnosing why my flow won't register() on prefect core server. When I call flow.run() it successfully executes. I then call flow.register() on the same machine and it doesn't have any issues registering it. I then open up Prefect Core Server UI, start a local agent and try to run the registered flow and I'm getting an error below. 'wf' is the name of one our folders in the structure, using relative imports.:
'Last State Message
[10:44am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'wf'",)
I should note that i did have this working a couple of weeks ago, but with some recent code changes / environment changes / dependencies perhaps, I can no longer reproduce a successful run after executing. I can successfully get a simple example flow to run on Prefect Core Server so my problem here is specific to our code base I'm sure. But I'm not sure how to start diagnosing why it runs when I test it flow.run() and then has a problem with prefect core server with everything still running on my local host. I've also tried calling flow.serialize() and it successfully prints out a big dictionary but don't know exactly what I'm looking for. An alternate option is to start trying to containerize with Docker - which I'll likely need to do to move off my host machine anyway (as directed by prefect developers). Therefore my second question would be, will manually creating this dockerized container approach still work locally as well? Still in initial functionality and testing phase so just trying to get some examples working with our code base. Thanks so much
👀 1
n
Hi @Sanjay Patel! Sorry you're running into this issue; it appears there's a module being imported by your flow that the Agent isn't picking up. The first potential cause that comes to mind is perhaps the Agent was started in a different environment from the one you're calling
flow.run()
, which would explain why you're able to successfully run locally but can't when you register it with the server. Your Local Agent needs to have access to the same environment as the one you're calling
flow.run()
from. Running the flow locally will still work when you use Docker storage! For iterative development, you can always comment out the docker storage and only add it back when you're ready to re-register the flow with your server. You also don't need to manually build a custom image for your flow, but can instead let Prefect know how you want it built and where to push it, as well as what python dependencies to include like such:
Copy code
flow.storage = Docker(
    base_image="python:3.8",
    python_dependencies=["wf"],
    registry_url="//your registry url",
    image_name="//image name",
    image_tag="//image tag",
)
There's a lot of really good info in the execution and environments documentation, which goes more in-depth than what I posted; for more information on Docker storage in particular, we've got a section in storage options 😄
s
thanks, I am running the agent from the same virtual environment that I was calling
flow.run()
. Wondering if I just systematically need to go through all the modules and their dependencies to work out which one is causing the problem? Thanks for pointing me in the direction of the documentation that will help with docker storage
n
Yeah that's very odd! It seems like you've got everything else set up nicely, maybe a good sanity check would be to
pip install
each of your dependencies in that virtual environment (or
pip install .
if you've got a dependencies file) before restarting the agent
s
just to be clear, 'wf' is our defined module and is not a python package. Does that change anything?
n
Ah that's a good point, let me check to see that I'm not missing anything for local imports that aren't pip installable
@Sanjay Patel sorry for the slowness on my part: this seems to be a
PATH
issue related to the agent. Since Python prioritizes the current working directory for imports, this can lead to difference between a local run and a run picked up by the agent. The best way to prevent this is to make sure the imports are available to the agent, which you can do by providing the agent with a working path.You can accomplish this by using the 
-p
 flag to add paths to the directory in which imports would be available when you start the agent:
Copy code
prefect agent start -p /full/path/to/where/flow/was/working
👍 1
s
unfortunately that doesn't seem to be working either. I think the path issue is definitely something that is causing an issue. But it's reporting back module 'wf' cannot be found when 'wf' is not even a module - its a folder /package. Maybe the relative imports that we have in these files is not being picked up correctly. I tried so many combinations of -p but didn't know whether it was even being picked up correctly as nothing changed in the reporting. I didn't know if it was absolute, relative to the virtual environment, where the flow is registered or where I'm calling the function to register the flow - i think i tried every combination. -f was working as it showed the log lines. Sorry, but any other suggestions?
n
No worries at all! Are you able to share any portion of the code? Perhaps a minimum reproducible example would let us debug a bit better
s
Simple example that reproduces my issue
Thanks @nicholas. I've created a single file without any internal dependencies (just needs SQLAlchemy). Producing the minimum reproducible code was actually very helpful as I think I narrowed it down to Prefect not being able to pickle weakref objects. The
'not being able to find a module'
error I was previously getting was a cryptic message. Now I've narrowed the problem to
"TypeError: can't pickle weakref objects"
and if the one function is moved to another module, I get the module not found again. I've uploaded the code but I have a feeling this is something I'm going to have to find an alternate solution which may require a restructure of the Prefect task decorators and flow specifications so DB sessions are not needed within. If you have any further guidance on the matter, would be appreciated (or even if you are able to reproduce). Thanks
n
This is really helpful @Sanjay Patel, thank you - let me tag @Chris White because he might have some better insights into this than I do; I know being able to maintain/share db connections among tasks is a roadmap item but I don't think this is quite that issue.
c
Hmmm I’m very surprised that using the
-p
option on the agent didn’t work. For example, if you run
import my_file
in the same directory as a file called
my_file.py
, Python will treat that import as a standard module import (even though it’s not a “real” module), and then using
-p /path/to/directory/containing_my_file/
should allow
import my_file
to work for flow runs submitted through that agent
I also don’t immediately see any weak references being created in the script you provided, and I was able to serialize all of your functions 🧐
s
Hi @Chris White, thanks for helping out. This may not be what you mean but I can also call
flow.serialize()
(in the same location as where the flow is being built) and it successfully outputs a dictionary. I tried buliding a clean virtual environment with just this one .py file and the piplock file attached. It successfully runs with
flow.run()
and
flow.register()
doesn't give any issues. When I then run it from the UI starting the server and the agent from the same virtual environment, I get the log print out from the agent as shown below
[2020-04-30 14:51:14,573] INFO - agent | Starting LocalAgent with labels ['DESKTOP-IEAE9KE', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage']
[2020-04-30 14:51:14,573] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-04-30 14:51:14,573] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
[2020-04-30 14:51:14,673] INFO - agent | Waiting for flow runs...
[2020-04-30 14:51:14,967] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-04-30 14:51:15,016] INFO - agent | Deploying flow run 66eb81ac-fe2a-4f3b-9ad5-d5ed005024b2
[2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'testing_prefect_with_wf'
[2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Starting flow run.
[2020-04-30 14:51:16] DEBUG - prefect.CloudFlowRunner | Flow 'testing_prefect_with_wf': Handling state change from Scheduled to Running
[2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'temp_function': Starting task run...
[2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Handling state change from Pending to Running
[2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Calling task.run() method...
[2020-04-30 14:51:16] DEBUG - prefect.LocalResultHandler | Starting to upload result to C:\Users\SanjayPatel\.prefect\results\prefect-result-2020-04-30t14-51-16-353680-00-00...
[2020-04-30 14:51:16] ERROR - prefect.CloudTaskRunner | Unexpected error: TypeError("can't pickle weakref objects")
Traceback (most recent call last):
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\task_runner.py", line 932, in get_task_run_state
state._result.store_safe_value()
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\result\base.py", line 126, in store_safe_value
value = self.result_handler.write(self.value)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\result_handlers\local_result_handler.py", line 81, in write
f.write(cloudpickle.dumps(result))
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\cloudpickle\cloudpickle.py", line 1148, in dumps
cp.dump(obj)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\cloudpickle\cloudpickle.py", line 491, in dump
return Pickler.dump(self, obj)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 437, in dump
self.save(obj)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 662, in save_reduce
save(state)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 885, in _batch_setitems
save(v)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 662, in save_reduce
save(state)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 885, in _batch_setitems
save(v)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 524, in save
rv = reduce(self.proto)
TypeError: can't pickle weakref objects
[2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Handling state change from Running to Failed
[2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'temp_function': finished task run for task with final state: 'Failed'
[2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'test': Starting task run...
[2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'test': TRIGGERFAIL signal raised during execution.
[2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'test': Handling state change from Pending to TriggerFailed
[2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'test': finished task run for task with final state: 'TriggerFailed'
[2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Flow run FAILED: some reference tasks failed.
[2020-04-30 14:51:16] DEBUG - prefect.CloudFlowRunner | Flow 'testing_prefect_with_wf': Handling state change from Running to Failed
c
Oh thank you, this is super helpful - I understand the issue now! All data that is returned by Prefect Tasks needs to be cloudpickle-able. This is to ensure that you can recover the data in the event of a flow failure; database connections are not serializable (they cannot be shared across processes)
s
ok thanks, so i connect to a database within the flow but just can't pass the db connections around through tasks?
c
If you set
@task(checkpoint=False)
on that
temp_function
you should be OK for this particular flow, but just be warned that if you switch to a Dask executor or need to recover from failure you’ll need to refactor your flow
s
@Chris White and @nicholas. Thanks for all the help so far. I feel I'm getting closer to be set up with the example. I feel all this effort to get it going with core server will be useful for scaling in the near future which is why I'm being a little persistent sorry. Now that I've cleared up the DB connection issue I'm getting the actual 'module not found error' and would like a little more guidance on the
-p
. I feel this isnt' doing anything and don't know whether it's because I need quotations, relative path (to what?), absolute path or that it's to do with the virtual environment. When I navigate to the virtual environment directory containing the module that creates the flow and start the agent, I can then run the flow through the UI after the flow is registered. I believe the
-p
will address this issue but as above, I can't get it working on this simple example (when it runs with
flow.run()
). I've attached a screenshot of the command I am using to start the agent and simple .py files and structure that I can't get to work. If you have any guidance again would be appreciated. Thanks again
c
Hi @Sanjay Patel! No worries about being persistent, we’re here to help you get things up and running. Before I dive into a full explanation, could you try running your agent as follows:
Copy code
prefect agent start -f -p C:\path\subfolder1\ -p C:\path\subfolder2
(note there are two
-p
options provided) If this works, I can provide an explanation and some recommendations
s
@Chris White Unfortunately that still didn't work. Does it make a difference that I'm in the virtual environment?
(prefect_clean_test-V9oAIINt) C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test>prefect agent start -f -p C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test\subfolder1\ -p C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test\subfolder1\subfolder2
____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _
|/ _ \ '_ \| __|`
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
|___/
[2020-05-01 01:41:04,195] INFO - agent | Starting LocalAgent with labels ['DESKTOP-IEAE9KE', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage']
[2020-05-01 01:41:04,195] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
[2020-05-01 01:41:04,195] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
[2020-05-01 01:41:04,291] INFO - agent | Waiting for flow runs...
[2020-05-01 01:41:40,267] INFO - agent | Found 1 flow run(s) to submit for execution.
[2020-05-01 01:41:40,297] INFO - agent | Deploying flow run ef839f5d-a3da-4b37-9742-1c485c88895e
No module named 'subfolder2'
Traceback (most recent call last):
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\scripts\prefect.exe\__main__.py", line 7, in <module>
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 610, in invoke
return callback(*args, **kwargs)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\cli\execute.py", line 78, in cloud_flow
raise exc
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\cli\execute.py", line 65, in cloud_flow
flow = storage.get_flow(storage.flows[flow_data.name])
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\environments\storage\local.py", line 71, in get_flow
return prefect.core.flow.Flow.load(flow_location)
File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\core\flow.py", line 1267, in load
return cloudpickle.load(f)
ModuleNotFoundError: No module named 'subfolder2'
[2020-05-01 01:41:42,068] INFO - agent | Process PID 33548 returned non-zero exit code
@Chris White and @nicholas - So think I have narrowed down what may be causing this issue and wondering if you had suggestions on how to get around. my folder structure is as such
Copy code
parent
 - subfolder1
   - subfolder3
     - subfolder4
        -script_to_create_flow.py
 - dependencies
   - dependencies.py
my script for creating the prefect
Flow
is
script_to_create_flow.py
and I run this script in my windows terminal after navigating to subfolder4 from my virtual environment.
script_to_create_flow.py
references dependencies in
dependencies.py
through an import statement. The
script_to_create_flow.py
registers the flow in PrefectCloud successfully (it also executes without problems through
flow.run()
). I then start an agent from another terminal also after navigating to subfolder4 from my virtual environment. I execute my flow in PrefectCloud and it returns an error saying
module not found. can't find module dependencies
My local environment requires my environment variables to have a PYTHONPATH = /path/to/parent for this execute so it knows the relative location to search for the dependency folder. I then tested this same setup but moved the
script_to_create_flow.py
to the parent directory and I was successfully able to register and execute the flow through PrefectCloud. I think it can only search down I think the problem arises because the agent (although running on my local) doesn't have access to this same PYTHONPATH environment variable. This is my guess on what is happening. I tried playing around with
--env
arguments when running
prefect agent start
as per the documentation here local agent but that still didn't seem to work. 2 questions please: 1. I haven't successfully been able to verify that CLI arguments are getting successfully passed to the agent (i.e. -p above nor --env). Is there anyway to check logs on my machine to make sure? 2. Apart from rearranging my code so that the calling .py file is in the parent location, is there anything else I can be doing to get this working? Thank you
n
Hi @Sanjay Patel, thanks for the level of detail you've provided: 1. You can pass
--verbose
when starting the agent, which should give you more detail on the output, though afaik it doesn't relay the passed `PATH`; I can see how this would be useful, I'll open a ticket for it on the Core side 2. I'm not as familiar with Python pathing in a Windows environment, but if you start the agent in the
parent
directory but leave your other directories as you intended, can your agent pick up the required modules?
s
Thanks @nicholas . For #2 - No, I think I already tried every combination of registering the flow and starting the agent with different starting directories while in the virtual environment but nothing worked. For #1 - I was using -f while starting the agent which was printing out more information. It seems to give more information about the flow rather than scheduling which —verbose seems to be doing. is --env the correct argument for passing it an environment variable? what's the difference between --env and -p arguments that are passed