Thread
#prefect-community
    s

    Sanjay Patel

    2 years ago
    Hi! can I please get some guidance as to how to start diagnosing why my flow won't register() on prefect core server. When I call flow.run() it successfully executes. I then call flow.register() on the same machine and it doesn't have any issues registering it. I then open up Prefect Core Server UI, start a local agent and try to run the registered flow and I'm getting an error below. 'wf' is the name of one our folders in the structure, using relative imports.:
    'Last State Message
    [10:44am]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'wf'",)
    I should note that i did have this working a couple of weeks ago, but with some recent code changes / environment changes / dependencies perhaps, I can no longer reproduce a successful run after executing. I can successfully get a simple example flow to run on Prefect Core Server so my problem here is specific to our code base I'm sure. But I'm not sure how to start diagnosing why it runs when I test it flow.run() and then has a problem with prefect core server with everything still running on my local host. I've also tried calling flow.serialize() and it successfully prints out a big dictionary but don't know exactly what I'm looking for. An alternate option is to start trying to containerize with Docker - which I'll likely need to do to move off my host machine anyway (as directed by prefect developers). Therefore my second question would be, will manually creating this dockerized container approach still work locally as well? Still in initial functionality and testing phase so just trying to get some examples working with our code base. Thanks so much
    nicholas

    nicholas

    2 years ago
    Hi @Sanjay Patel! Sorry you're running into this issue; it appears there's a module being imported by your flow that the Agent isn't picking up. The first potential cause that comes to mind is perhaps the Agent was started in a different environment from the one you're calling
    flow.run()
    , which would explain why you're able to successfully run locally but can't when you register it with the server. Your Local Agent needs to have access to the same environment as the one you're calling
    flow.run()
    from. Running the flow locally will still work when you use Docker storage! For iterative development, you can always comment out the docker storage and only add it back when you're ready to re-register the flow with your server. You also don't need to manually build a custom image for your flow, but can instead let Prefect know how you want it built and where to push it, as well as what python dependencies to include like such:
    flow.storage = Docker(
        base_image="python:3.8",
        python_dependencies=["wf"],
        registry_url="//your registry url",
        image_name="//image name",
        image_tag="//image tag",
    )
    There's a lot of really good info in the execution and environments documentation, which goes more in-depth than what I posted; for more information on Docker storage in particular, we've got a section in storage options 😄
    s

    Sanjay Patel

    2 years ago
    thanks, I am running the agent from the same virtual environment that I was calling
    flow.run()
    . Wondering if I just systematically need to go through all the modules and their dependencies to work out which one is causing the problem? Thanks for pointing me in the direction of the documentation that will help with docker storage
    nicholas

    nicholas

    2 years ago
    Yeah that's very odd! It seems like you've got everything else set up nicely, maybe a good sanity check would be to
    pip install
    each of your dependencies in that virtual environment (or
    pip install .
    if you've got a dependencies file) before restarting the agent
    s

    Sanjay Patel

    2 years ago
    just to be clear, 'wf' is our defined module and is not a python package. Does that change anything?
    nicholas

    nicholas

    2 years ago
    Ah that's a good point, let me check to see that I'm not missing anything for local imports that aren't pip installable
    @Sanjay Patel sorry for the slowness on my part: this seems to be a
    PATH
    issue related to the agent. Since Python prioritizes the current working directory for imports, this can lead to difference between a local run and a run picked up by the agent. The best way to prevent this is to make sure the imports are available to the agent, which you can do by providing the agent with a working path.You can accomplish this by using the 
    -p
     flag to add paths to the directory in which imports would be available when you start the agent:
    prefect agent start -p /full/path/to/where/flow/was/working
    s

    Sanjay Patel

    2 years ago
    unfortunately that doesn't seem to be working either. I think the path issue is definitely something that is causing an issue. But it's reporting back module 'wf' cannot be found when 'wf' is not even a module - its a folder /package. Maybe the relative imports that we have in these files is not being picked up correctly. I tried so many combinations of -p but didn't know whether it was even being picked up correctly as nothing changed in the reporting. I didn't know if it was absolute, relative to the virtual environment, where the flow is registered or where I'm calling the function to register the flow - i think i tried every combination. -f was working as it showed the log lines. Sorry, but any other suggestions?
    nicholas

    nicholas

    2 years ago
    No worries at all! Are you able to share any portion of the code? Perhaps a minimum reproducible example would let us debug a bit better
    s

    Sanjay Patel

    2 years ago
    Simple example that reproduces my issue
    Thanks @nicholas. I've created a single file without any internal dependencies (just needs SQLAlchemy). Producing the minimum reproducible code was actually very helpful as I think I narrowed it down to Prefect not being able to pickle weakref objects. The
    'not being able to find a module'
    error I was previously getting was a cryptic message. Now I've narrowed the problem to
    "TypeError: can't pickle weakref objects"
    and if the one function is moved to another module, I get the module not found again. I've uploaded the code but I have a feeling this is something I'm going to have to find an alternate solution which may require a restructure of the Prefect task decorators and flow specifications so DB sessions are not needed within. If you have any further guidance on the matter, would be appreciated (or even if you are able to reproduce). Thanks
    nicholas

    nicholas

    2 years ago
    This is really helpful @Sanjay Patel, thank you - let me tag @Chris White because he might have some better insights into this than I do; I know being able to maintain/share db connections among tasks is a roadmap item but I don't think this is quite that issue.
    Chris White

    Chris White

    2 years ago
    Hmmm I’m very surprised that using the
    -p
    option on the agent didn’t work. For example, if you run
    import my_file
    in the same directory as a file called
    my_file.py
    , Python will treat that import as a standard module import (even though it’s not a “real” module), and then using
    -p /path/to/directory/containing_my_file/
    should allow
    import my_file
    to work for flow runs submitted through that agent
    I also don’t immediately see any weak references being created in the script you provided, and I was able to serialize all of your functions 🧐
    s

    Sanjay Patel

    2 years ago
    Hi @Chris White, thanks for helping out. This may not be what you mean but I can also call
    flow.serialize()
    (in the same location as where the flow is being built) and it successfully outputs a dictionary. I tried buliding a clean virtual environment with just this one .py file and the piplock file attached. It successfully runs with
    flow.run()
    and
    flow.register()
    doesn't give any issues. When I then run it from the UI starting the server and the agent from the same virtual environment, I get the log print out from the agent as shown below
    [2020-04-30 14:51:14,573] INFO - agent | Starting LocalAgent with labels ['DESKTOP-IEAE9KE', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage']
    [2020-04-30 14:51:14,573] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    [2020-04-30 14:51:14,573] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
    [2020-04-30 14:51:14,673] INFO - agent | Waiting for flow runs...
    [2020-04-30 14:51:14,967] INFO - agent | Found 1 flow run(s) to submit for execution.
    [2020-04-30 14:51:15,016] INFO - agent | Deploying flow run 66eb81ac-fe2a-4f3b-9ad5-d5ed005024b2
    [2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'testing_prefect_with_wf'
    [2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Starting flow run.
    [2020-04-30 14:51:16] DEBUG - prefect.CloudFlowRunner | Flow 'testing_prefect_with_wf': Handling state change from Scheduled to Running
    [2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'temp_function': Starting task run...
    [2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Handling state change from Pending to Running
    [2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Calling task.run() method...
    [2020-04-30 14:51:16] DEBUG - prefect.LocalResultHandler | Starting to upload result to C:\Users\SanjayPatel\.prefect\results\prefect-result-2020-04-30t14-51-16-353680-00-00...
    [2020-04-30 14:51:16] ERROR - prefect.CloudTaskRunner | Unexpected error: TypeError("can't pickle weakref objects")
    Traceback (most recent call last):
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\task_runner.py", line 932, in get_task_run_state
    state._result.store_safe_value()
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\result\base.py", line 126, in store_safe_value
    value = self.result_handler.write(self.value)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\engine\result_handlers\local_result_handler.py", line 81, in write
    f.write(cloudpickle.dumps(result))
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\cloudpickle\cloudpickle.py", line 1148, in dumps
    cp.dump(obj)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\cloudpickle\cloudpickle.py", line 491, in dump
    return Pickler.dump(self, obj)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 437, in dump
    self.save(obj)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 662, in save_reduce
    save(state)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 885, in _batch_setitems
    save(v)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 662, in save_reduce
    save(state)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 885, in _batch_setitems
    save(v)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\pickle.py", line 524, in save
    rv = reduce(self.proto)
    TypeError: can't pickle weakref objects
    [2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'temp_function': Handling state change from Running to Failed
    [2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'temp_function': finished task run for task with final state: 'Failed'
    [2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'test': Starting task run...
    [2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'test': TRIGGERFAIL signal raised during execution.
    [2020-04-30 14:51:16] DEBUG - prefect.CloudTaskRunner | Task 'test': Handling state change from Pending to TriggerFailed
    [2020-04-30 14:51:16] INFO - prefect.CloudTaskRunner | Task 'test': finished task run for task with final state: 'TriggerFailed'
    [2020-04-30 14:51:16] INFO - prefect.CloudFlowRunner | Flow run FAILED: some reference tasks failed.
    [2020-04-30 14:51:16] DEBUG - prefect.CloudFlowRunner | Flow 'testing_prefect_with_wf': Handling state change from Running to Failed
    Chris White

    Chris White

    2 years ago
    Oh thank you, this is super helpful - I understand the issue now! All data that is returned by Prefect Tasks needs to be cloudpickle-able. This is to ensure that you can recover the data in the event of a flow failure; database connections are not serializable (they cannot be shared across processes)
    s

    Sanjay Patel

    2 years ago
    ok thanks, so i connect to a database within the flow but just can't pass the db connections around through tasks?
    Chris White

    Chris White

    2 years ago
    If you set
    @task(checkpoint=False)
    on that
    temp_function
    you should be OK for this particular flow, but just be warned that if you switch to a Dask executor or need to recover from failure you’ll need to refactor your flow
    s

    Sanjay Patel

    2 years ago
    @Chris White and @nicholas. Thanks for all the help so far. I feel I'm getting closer to be set up with the example. I feel all this effort to get it going with core server will be useful for scaling in the near future which is why I'm being a little persistent sorry. Now that I've cleared up the DB connection issue I'm getting the actual 'module not found error' and would like a little more guidance on the
    -p
    . I feel this isnt' doing anything and don't know whether it's because I need quotations, relative path (to what?), absolute path or that it's to do with the virtual environment. When I navigate to the virtual environment directory containing the module that creates the flow and start the agent, I can then run the flow through the UI after the flow is registered. I believe the
    -p
    will address this issue but as above, I can't get it working on this simple example (when it runs with
    flow.run()
    ). I've attached a screenshot of the command I am using to start the agent and simple .py files and structure that I can't get to work. If you have any guidance again would be appreciated. Thanks again
    Chris White

    Chris White

    2 years ago
    Hi @Sanjay Patel! No worries about being persistent, we’re here to help you get things up and running. Before I dive into a full explanation, could you try running your agent as follows:
    prefect agent start -f -p C:\path\subfolder1\ -p C:\path\subfolder2
    (note there are two
    -p
    options provided) If this works, I can provide an explanation and some recommendations
    s

    Sanjay Patel

    2 years ago
    @Chris White Unfortunately that still didn't work. Does it make a difference that I'm in the virtual environment?
    (prefect_clean_test-V9oAIINt) C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test>prefect agent start -f -p C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test\subfolder1\ -p C:\Users\SanjayPatel\Documents\GitRepositories\prefect_clean_test\subfolder1\subfolder2
    ____            __           _        _                    _
    |  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
    | |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _
    |/ _ \ '_ | __|`
    |  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
    |_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
    |___/
    [2020-05-01 01:41:04,195] INFO - agent | Starting LocalAgent with labels ['DESKTOP-IEAE9KE', 'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage']
    [2020-05-01 01:41:04,195] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    [2020-05-01 01:41:04,195] INFO - agent | Agent connecting to the Prefect API at <http://localhost:4200>
    [2020-05-01 01:41:04,291] INFO - agent | Waiting for flow runs...
    [2020-05-01 01:41:40,267] INFO - agent | Found 1 flow run(s) to submit for execution.
    [2020-05-01 01:41:40,297] INFO - agent | Deploying flow run ef839f5d-a3da-4b37-9742-1c485c88895e
    No module named 'subfolder2'
    Traceback (most recent call last):
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
    File "c:\users\sanjaypatel\appdata\local\programs\python\python37\Lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\scripts\prefect.exe\__main__.py", line 7, in <module>
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 829, in __call__
    return self.main(*args, **kwargs)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 782, in main
    rv = self.invoke(ctx)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\click\core.py", line 610, in invoke
    return callback(*args, **kwargs)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\cli\execute.py", line 78, in cloud_flow
    raise exc
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\cli\execute.py", line 65, in cloud_flow
    flow = storage.get_flow(storage.flows[flow_data.name])
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\environments\storage\local.py", line 71, in get_flow
    return prefect.core.flow.Flow.load(flow_location)
    File "c:\users\sanjaypatel\.virtualenvs\prefect_clean_test-v9oaiint\lib\site-packages\prefect\core\flow.py", line 1267, in load
    return cloudpickle.load(f)
    ModuleNotFoundError: No module named 'subfolder2'
    [2020-05-01 01:41:42,068] INFO - agent | Process PID 33548 returned non-zero exit code
    @Chris White and @nicholas - So think I have narrowed down what may be causing this issue and wondering if you had suggestions on how to get around. my folder structure is as such
    parent
     - subfolder1
       - subfolder3
         - subfolder4
            -script_to_create_flow.py
     - dependencies
       - dependencies.py
    my script for creating the prefect
    Flow
    is
    script_to_create_flow.py
    and I run this script in my windows terminal after navigating to subfolder4 from my virtual environment.
    script_to_create_flow.py
    references dependencies in
    dependencies.py
    through an import statement. The
    script_to_create_flow.py
    registers the flow in PrefectCloud successfully (it also executes without problems through
    flow.run()
    ). I then start an agent from another terminal also after navigating to subfolder4 from my virtual environment. I execute my flow in PrefectCloud and it returns an error saying
    module not found. can't find module dependencies
    My local environment requires my environment variables to have a PYTHONPATH = /path/to/parent for this execute so it knows the relative location to search for the dependency folder. I then tested this same setup but moved the
    script_to_create_flow.py
    to the parent directory and I was successfully able to register and execute the flow through PrefectCloud. I think it can only search down I think the problem arises because the agent (although running on my local) doesn't have access to this same PYTHONPATH environment variable. This is my guess on what is happening. I tried playing around with
    --env
    arguments when running
    prefect agent start
    as per the documentation here local agent but that still didn't seem to work. 2 questions please:1. I haven't successfully been able to verify that CLI arguments are getting successfully passed to the agent (i.e. -p above nor --env). Is there anyway to check logs on my machine to make sure? 2. Apart from rearranging my code so that the calling .py file is in the parent location, is there anything else I can be doing to get this working? Thank you
    nicholas

    nicholas

    2 years ago
    Hi @Sanjay Patel, thanks for the level of detail you've provided:1. You can pass
    --verbose
    when starting the agent, which should give you more detail on the output, though afaik it doesn't relay the passed PATH; I can see how this would be useful, I'll open a ticket for it on the Core side 2. I'm not as familiar with Python pathing in a Windows environment, but if you start the agent in the
    parent
    directory but leave your other directories as you intended, can your agent pick up the required modules?
    s

    Sanjay Patel

    2 years ago
    Thanks @nicholas . For #2 - No, I think I already tried every combination of registering the flow and starting the agent with different starting directories while in the virtual environment but nothing worked. For #1 - I was using -f while starting the agent which was printing out more information. It seems to give more information about the flow rather than scheduling which —verbose seems to be doing. is --env the correct argument for passing it an environment variable? what's the difference between --env and -p arguments that are passed