https://prefect.io logo
Title
c

Chohang Ng

06/03/2021, 10:07 PM
Failed: "Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'task\'")\nThis may be due to a missing Python module in your current environment
I followed this to debug https://stackoverflow.com/questions/63881231/prefect-modulenotfounderror-when-running-from-ui. Occassionally it still gives me this error message. I want to confirm if this is indeed the problem
n

nicholas

06/03/2021, 10:09 PM
Hi @Chohang Ng - can you give a little more information on your flow, like what storage you're using, your agent and executor?
Feel free to post the flow code as well
c

Chohang Ng

06/03/2021, 10:10 PM
with Flow("parent-flow", schedule=weekday_schedule,
                executor=LocalExecutor(), 
           run_config=LocalRun()) as flow:

    ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
    ads_step_2 = aadvfeesbyorderitem_flow_c(upstream_tasks = [ads_step_1, aadvfeesbychannel_flow_e(),aDTC_ads_flow_f()])  
    ads_step_3 = AdvertisementFee_flow_d(upstream_tasks = [ads_step_2])
    

    upload_all_fee = all_fee_upload_flow(upstream_tasks = [invoice_adjust_flow(), channelfees_flow(),ShippingFees_flow(),ShippingMaterialFees_flow(),StorageFees_flow(),laborfee_flow(),cogs_flow(),customer_fee_flow(),commission_rebate_flow(),transfer_fee_flow(),ads_step_3,promo_flow()])
    
    upload_revenue = revenue_upload_flow(upstream_tasks = [upload_all_fee])

    abi_profitability = abi_profit_flow(upstream_tasks = [upload_revenue])
    
    delete_data = delete_fee_flow(upstream_tasks= [abi_profitability])
flow.register(project_name=project_name)
I run this on the background - nohup prefect agent start -p <PATH OF THE DIRECTORY WHERE THE FLOWS ARE STORED>
I am running the flows on cloud
n

nicholas

06/03/2021, 10:15 PM
Ok thanks @Chohang Ng - you mentioned that it occasionally gives the error but sometimes the same flow version succeeds as expected?
c

Chohang Ng

06/03/2021, 10:15 PM
yea
task.py is where I have methods like this
class Create_tmp(Task):
    def run(self,script,datestr):
n

nicholas

06/03/2021, 10:17 PM
Got it - do you have more than 1 agent running that could pick up the flow? Also if you're importing that file, are you sure there are no import collisions with the
task
that Prefect exports?
c

Chohang Ng

06/03/2021, 10:19 PM
I am pretty sure only one agent is running.
from task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
this is where I have at the flow.py where all flows are registered.
Could you elaborate what you meant by import collisions?
n

nicholas

06/03/2021, 10:20 PM
Hm ok that import seems fine (so no need to go into that one more)
In the Prefect UI on the dashboard, do you see just a single agent polling?
My guess is that there's a chance you `nohup`ed another agent without the import path that could be submitting the flow
c

Chohang Ng

06/03/2021, 10:22 PM
n

nicholas

06/03/2021, 10:23 PM
Could you try stopping your nohuped agent and starting it with a special label applied?
c

Chohang Ng

06/03/2021, 10:23 PM
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = project_name)
This is an exmaple of a flow
I can kill it and start again
n

nicholas

06/03/2021, 10:24 PM
Basically what I'm trying to figure out is whether you have 2 otherwise identical agents registering themselves with Cloud; Cloud will only see them as 1 agent since they're self-declaring
So adding a label will tell us if you have more than 1; note: to run your flow you'll also need to add that special label to your flow
c

Chohang Ng

06/03/2021, 10:24 PM
how do I add a special label
n

nicholas

06/03/2021, 10:24 PM
Which you can do from the flow page of the UI
Something like this:
nohup prefect agent start -p <PATH OF THE DIRECTORY WHERE THE FLOWS ARE STORED> -l dev
And then from the UI you can use this button to add the same label to your flow:
c

Chohang Ng

06/03/2021, 10:28 PM
done
n

nicholas

06/03/2021, 10:28 PM
Ok great - you should see 2 agents on your agent tile now (if you click details there you'll see them more explicitly)
c

Chohang Ng

06/03/2021, 10:28 PM
yup
n

nicholas

06/03/2021, 10:30 PM
Ok great - so now we can be reasonably sure that the wrong agent isn't picking up your flow. From the screenshot you sent me, it looks like that flow hasn't succeeded ever though, is that correct?
c

Chohang Ng

06/03/2021, 10:31 PM
looks like so. I am wondering if it has something to do with my flow as I feel weird after making the changes.
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
I am passing df as a parameter to the run method but also as a upstream_tasks
n

nicholas

06/03/2021, 10:32 PM
Yeah that's not a problem (though isn't necessary, since passing an upstream task as a parameter creates that data dependency already)
Is this flow creating other flows?
c

Chohang Ng

06/03/2021, 10:32 PM
It is a subflow.
it is part of a parent flow where I am doing flows of flows
n

nicholas

06/03/2021, 10:34 PM
Aha! I think that might be part of the problem - maybe the solution here is to make sure that importable is available from your system PATH
c

Chohang Ng

06/03/2021, 10:34 PM
advertisement_fee_a = StartFlowRun(flow_name='1_oAdvertisements_flow',project_name=project_name,wait = True)
with Flow("parent-flow", schedule=weekday_schedule,
                executor=LocalExecutor(), 
           run_config=LocalRun()) as flow:

    ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
what do you mean?
n

nicholas

06/03/2021, 10:35 PM
If this is failing in one of the subflows (and not the parent flow), that means the subflow, which executes on its own, doesn't have access to whatever module you're importing
c

Chohang Ng

06/03/2021, 10:36 PM
when I say subflow, I think I am referring to that flow is part of another flow(parents) that manage multiple flows. Just to make sure we are on the same page.
if the flows and the flows of flows are all in the same script where the task import is, why wouldn't they?
vs code doesn't throw me any import error when they are all calling those methods
n

nicholas

06/03/2021, 10:40 PM
It sounds like they were potentially registered differently, meaning they could have different import paths. When you use the
StartFlowRun
task, you're explicitly creating a run of a different flow, which executes completely on its own and reports its state back to the parent flow
c

Chohang Ng

06/03/2021, 10:41 PM
even though they are all registered under the same project? okay, what is a potential solution for that?
should I seperate the child-flow from the parent flows in different script?
n

nicholas

06/03/2021, 10:44 PM
I think that would help isolate the problem for sure; given how complex this is it's a little tough to make recommendations
c

Chohang Ng

06/03/2021, 10:46 PM
I can try. There are >10 child flows in a parent flow. Do they all fall within the same directory where the task.py is called?
n

nicholas

06/03/2021, 10:50 PM
Maybe before separating all of them, can you figure out which one is failing to unpickle from your logs/run schematic?
c

Chohang Ng

06/03/2021, 10:51 PM
it is all of them right now after I make those changes in my flow
n

nicholas

06/03/2021, 10:52 PM
You should have runs from before those changes though, right?
c

Chohang Ng

06/03/2021, 10:54 PM
I added upstream_tasks in all my child-flows then everything just fail
n

nicholas

06/03/2021, 11:00 PM
Hm ok it sounds like you have 2 issues (from what I can tell): 1. Some of your subflows are attempting to import a module that's not available; this can be fixed by making sure the module is available to the flow as if it were running on its own 2. Your orchestrator flow isn't running as you'd expect; once you can confirm your child flows can be run independently this will be easier to debug
c

Chohang Ng

06/03/2021, 11:08 PM
yea, I tested indivdual subflows and they are failing. But separating the child flows into different scripts might be hard to manage them in the future. It'd be nice to have a solution where everything is kept in one script. I'd try though and see
n

nicholas

06/03/2021, 11:09 PM
Hm ok - unfortunately I can't provide much more help on this but I'd be happy to direct you to one of our partners that provides dedicated support for getting up and running with Prefect. If you'd be interested in that let me know and I'll set you up 🙂
c

Chohang Ng

06/03/2021, 11:13 PM
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor
from prefect import task, Flow, Parameter
import prefect
import os,sys
from task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential

with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
I created an .py to a child-flow and try running it on Prefect UI but it fails
But it is still pending. It looks like the agent hasn't pick it up?
oh but it is in a different directory let me redo it.
still failing even though I separate it from a different directory.
n

nicholas

06/03/2021, 11:29 PM
@Chohang Ng it looks like you're importing
task
from
prefect
as well as calling
from task import ...
- it's usually not a good idea to name your python files the same as other things you're importing, since serializers like
cloudpickle
won't be able to differentiate the 2
c

Chohang Ng

06/04/2021, 12:29 AM
from prefect import task, Flow, Parameter
import prefect
import os,sys
from etl import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential
import pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
import datetime
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor


with Flow("1_oAdvertisements_flow", executor=LocalExecutor(), 
            run_config=LocalRun()) as advertisement_fee:
    
    df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
    Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
changed it to etl. But still failing
n

nicholas

06/04/2021, 2:35 PM
Hi @Chohang Ng - have you managed to make some progress on this?
c

Chohang Ng

06/04/2021, 3:26 PM
no. Still got no luck. I don't know why the agent is not picking it up
n

nicholas

06/04/2021, 3:28 PM
Can you provide more information on how it's failing?
c

Chohang Ng

06/04/2021, 5:40 PM
We prefer not to break down flows in different scripts if possible. It'd nice to set up a meeting with one of your partners that provides dedicated support for getting up and running with Prefect
n

nicholas

06/04/2021, 5:45 PM
Sounds good @Chohang Ng - I'm tagging @George Coyne here, who can make that happen. Best of luck!
🙌 1
c

Chohang Ng

06/04/2021, 5:50 PM
@Michael Brown
g

George Coyne

06/04/2021, 6:04 PM
Hey @Chohang Ng, sending you a DM!