Failed: "Failed to load and execute Flow's environ...
# prefect-ui
c
Failed: "Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'task\'")\nThis may be due to a missing Python module in your current environment
I followed this to debug https://stackoverflow.com/questions/63881231/prefect-modulenotfounderror-when-running-from-ui. Occassionally it still gives me this error message. I want to confirm if this is indeed the problem
n
Hi @Chohang Ng - can you give a little more information on your flow, like what storage you're using, your agent and executor?
Feel free to post the flow code as well
c
Copy code
with Flow("parent-flow", schedule=weekday_schedule,
                executor=LocalExecutor(), 
           run_config=LocalRun()) as flow:

    ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
    ads_step_2 = aadvfeesbyorderitem_flow_c(upstream_tasks = [ads_step_1, aadvfeesbychannel_flow_e(),aDTC_ads_flow_f()])  
    ads_step_3 = AdvertisementFee_flow_d(upstream_tasks = [ads_step_2])
    

    upload_all_fee = all_fee_upload_flow(upstream_tasks = [invoice_adjust_flow(), channelfees_flow(),ShippingFees_flow(),ShippingMaterialFees_flow(),StorageFees_flow(),laborfee_flow(),cogs_flow(),customer_fee_flow(),commission_rebate_flow(),transfer_fee_flow(),ads_step_3,promo_flow()])
    
    upload_revenue = revenue_upload_flow(upstream_tasks = [upload_all_fee])

    abi_profitability = abi_profit_flow(upstream_tasks = [upload_revenue])
    
    delete_data = delete_fee_flow(upstream_tasks= [abi_profitability])
flow.register(project_name=project_name)
I run this on the background - nohup prefect agent start -p <PATH OF THE DIRECTORY WHERE THE FLOWS ARE STORED>
I am running the flows on cloud
n
Ok thanks @Chohang Ng - you mentioned that it occasionally gives the error but sometimes the same flow version succeeds as expected?
c
yea
task.py is where I have methods like this
Copy code
class Create_tmp(Task):
    def run(self,script,datestr):
n
Got it - do you have more than 1 agent running that could pick up the flow? Also if you're importing that file, are you sure there are no import collisions with the
task
that Prefect exports?
c
I am pretty sure only one agent is running.
Copy code
from task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
this is where I have at the flow.py where all flows are registered.
Could you elaborate what you meant by import collisions?
n
Hm ok that import seems fine (so no need to go into that one more)
In the Prefect UI on the dashboard, do you see just a single agent polling?
My guess is that there's a chance you `nohup`ed another agent without the import path that could be submitting the flow
c
n
Could you try stopping your nohuped agent and starting it with a special label applied?
c
Copy code
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = project_name)
This is an exmaple of a flow
I can kill it and start again
n
Basically what I'm trying to figure out is whether you have 2 otherwise identical agents registering themselves with Cloud; Cloud will only see them as 1 agent since they're self-declaring
So adding a label will tell us if you have more than 1; note: to run your flow you'll also need to add that special label to your flow
c
how do I add a special label
n
Which you can do from the flow page of the UI
Something like this:
Copy code
nohup prefect agent start -p <PATH OF THE DIRECTORY WHERE THE FLOWS ARE STORED> -l dev
And then from the UI you can use this button to add the same label to your flow:
c
done
n
Ok great - you should see 2 agents on your agent tile now (if you click details there you'll see them more explicitly)
c
yup
n
Ok great - so now we can be reasonably sure that the wrong agent isn't picking up your flow. From the screenshot you sent me, it looks like that flow hasn't succeeded ever though, is that correct?
c
looks like so. I am wondering if it has something to do with my flow as I feel weird after making the changes.
Copy code
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
I am passing df as a parameter to the run method but also as a upstream_tasks
n
Yeah that's not a problem (though isn't necessary, since passing an upstream task as a parameter creates that data dependency already)
Is this flow creating other flows?
c
It is a subflow.
it is part of a parent flow where I am doing flows of flows
n
Aha! I think that might be part of the problem - maybe the solution here is to make sure that importable is available from your system PATH
c
Copy code
advertisement_fee_a = StartFlowRun(flow_name='1_oAdvertisements_flow',project_name=project_name,wait = True)
Copy code
with Flow("parent-flow", schedule=weekday_schedule,
                executor=LocalExecutor(), 
           run_config=LocalRun()) as flow:

    ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
what do you mean?
n
If this is failing in one of the subflows (and not the parent flow), that means the subflow, which executes on its own, doesn't have access to whatever module you're importing
c
when I say subflow, I think I am referring to that flow is part of another flow(parents) that manage multiple flows. Just to make sure we are on the same page.
if the flows and the flows of flows are all in the same script where the task import is, why wouldn't they?
vs code doesn't throw me any import error when they are all calling those methods
n
It sounds like they were potentially registered differently, meaning they could have different import paths. When you use the
StartFlowRun
task, you're explicitly creating a run of a different flow, which executes completely on its own and reports its state back to the parent flow
c
even though they are all registered under the same project? okay, what is a potential solution for that?
should I seperate the child-flow from the parent flows in different script?
n
I think that would help isolate the problem for sure; given how complex this is it's a little tough to make recommendations
c
I can try. There are >10 child flows in a parent flow. Do they all fall within the same directory where the task.py is called?
n
Maybe before separating all of them, can you figure out which one is failing to unpickle from your logs/run schematic?
c
it is all of them right now after I make those changes in my flow
n
You should have runs from before those changes though, right?
c
I added upstream_tasks in all my child-flows then everything just fail
n
Hm ok it sounds like you have 2 issues (from what I can tell): 1. Some of your subflows are attempting to import a module that's not available; this can be fixed by making sure the module is available to the flow as if it were running on its own 2. Your orchestrator flow isn't running as you'd expect; once you can confirm your child flows can be run independently this will be easier to debug
c
yea, I tested indivdual subflows and they are failing. But separating the child flows into different scripts might be hard to manage them in the future. It'd be nice to have a solution where everything is kept in one script. I'd try though and see
n
Hm ok - unfortunately I can't provide much more help on this but I'd be happy to direct you to one of our partners that provides dedicated support for getting up and running with Prefect. If you'd be interested in that let me know and I'll set you up 🙂
c
Copy code
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor
from prefect import task, Flow, Parameter
import prefect
import os,sys
from task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential

with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
I created an .py to a child-flow and try running it on Prefect UI but it fails
But it is still pending. It looks like the agent hasn't pick it up?
oh but it is in a different directory let me redo it.
still failing even though I separate it from a different directory.
n
@Chohang Ng it looks like you're importing
task
from
prefect
as well as calling
from task import ...
- it's usually not a good idea to name your python files the same as other things you're importing, since serializers like
cloudpickle
won't be able to differentiate the 2
c
Copy code
from prefect import task, Flow, Parameter
import prefect
import os,sys
from etl import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential
import pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
import datetime
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor


with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
changed it to etl. But still failing
n
Hi @Chohang Ng - have you managed to make some progress on this?
c
no. Still got no luck. I don't know why the agent is not picking it up
n
Can you provide more information on how it's failing?
c
We prefer not to break down flows in different scripts if possible. It'd nice to set up a meeting with one of your partners that provides dedicated support for getting up and running with Prefect
n
Sounds good @Chohang Ng - I'm tagging @George Coyne here, who can make that happen. Best of luck!
🙌 1
c
@Michael Brown
g
Hey @Chohang Ng, sending you a DM!