Hey. We are investigating Prefect to our needs and...
# ask-community
p
Hey. We are investigating Prefect to our needs and so far it looks really awesome. However we encountered strange issue. Let’s say we have registered 2 flows which share the same task. Now we receive an error:
Copy code
KeyError: 'Task slug []-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
And this error always occurs only for the flow which was registered as first. Any help?
m
what are your code and flow structure? how you define the shared task?
p
Copy code
get_campaign_ids = GetIdsTask("PIN_Attribute_Campaign_day")
save_ad_groups = SaveObjectsTask("PIN_Attribute_AdGroup_day")


with Flow("load-ad-groups-flow") as flow:

    ids = get_campaign_ids.run()
    ad_groups = get_campaign_ad_groups.map(ids)
    save_ad_groups.map(ad_groups)

flow.executor = LocalDaskExecutor()
Copy code
get_advertiser_ids = GetIdsTask("PIN_Attribute_Advertiser_day")
save_campaigns = SaveObjectsTask("PIN_Attribute_Campaign_day")

with Flow("load-campaigns-flow") as flow:
    ids = get_advertiser_ids.run()
    campaigns = get_advertiser_campaigns.map(ids)
    save_campaigns.map(campaigns)
Copy code
class GetIdsTask(Task):
    def __init__(self, dataset: str):
        ...

    def run(self) -> list[int]:
        ...
        return [r[0] for r in ids][:3]
m
that strange, i just create a new flow, use the same task definition, register and run, everything ok. what kind of storage/agent you use? what about register
p
We registered in both python and cli. We use local agent. Could it be related by the fact we install tasks by setup.py?
m
i don't think so, but maybe Prefect guys could say more. Why are you use run method here
Copy code
ids = get_advertiser_ids.run()
p
This is a class based task with overloaded run method.
m
i use the same way task without run in flow definition, but i think no difference
Copy code
get_advertiser_ids = GetIdsTask("PIN_Attribute_Advertiser_day")
save_campaigns = SaveObjectsTask("PIN_Attribute_Campaign_day")

with Flow("load-campaigns-flow") as flow:
    ids = get_advertiser_ids.run()
    campaigns = get_advertiser_campaigns.map(ids)
    save_campaigns.map(campaigns)
here you use 3 task get_advertiser_ids, get_advertiser_campaigns, save_campaigns but i can see only 2 upper flow definition
p
The second is used with map, so no instance is required.
k
Hi @Piotr Karnasiewicz, I’ve seen this once before do you have two flows in a script? Can you try naming the second one something other than
flow
? Or in general try giving them different variable names?
p
Thanks! We don’t have two flows in a single script. We replaced classes with functions and it works.
It seems we shouldn’t call
run
directly (from some reason).