Thread
#prefect-community
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    Hey. We are investigating Prefect to our needs and so far it looks really awesome. However we encountered strange issue. Let’s say we have registered 2 flows which share the same task. Now we receive an error:
    KeyError: 'Task slug []-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
    And this error always occurs only for the flow which was registered as first. Any help?
    Marko Herkaliuk

    Marko Herkaliuk

    1 year ago
    what are your code and flow structure? how you define the shared task?
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    get_campaign_ids = GetIdsTask("PIN_Attribute_Campaign_day")
    save_ad_groups = SaveObjectsTask("PIN_Attribute_AdGroup_day")
    
    
    with Flow("load-ad-groups-flow") as flow:
    
        ids = get_campaign_ids.run()
        ad_groups = get_campaign_ad_groups.map(ids)
        save_ad_groups.map(ad_groups)
    
    flow.executor = LocalDaskExecutor()
    get_advertiser_ids = GetIdsTask("PIN_Attribute_Advertiser_day")
    save_campaigns = SaveObjectsTask("PIN_Attribute_Campaign_day")
    
    with Flow("load-campaigns-flow") as flow:
        ids = get_advertiser_ids.run()
        campaigns = get_advertiser_campaigns.map(ids)
        save_campaigns.map(campaigns)
    class GetIdsTask(Task):
        def __init__(self, dataset: str):
            ...
    
        def run(self) -> list[int]:
            ...
            return [r[0] for r in ids][:3]
    Marko Herkaliuk

    Marko Herkaliuk

    1 year ago
    that strange, i just create a new flow, use the same task definition, register and run, everything ok. what kind of storage/agent you use? what about register
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    We registered in both python and cli. We use local agent. Could it be related by the fact we install tasks by setup.py?
    Marko Herkaliuk

    Marko Herkaliuk

    1 year ago
    i don't think so, but maybe Prefect guys could say more. Why are you use run method here
    ids = get_advertiser_ids.run()
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    This is a class based task with overloaded run method.
    Marko Herkaliuk

    Marko Herkaliuk

    1 year ago
    i use the same way task without run in flow definition, but i think no difference
    get_advertiser_ids = GetIdsTask("PIN_Attribute_Advertiser_day")
    save_campaigns = SaveObjectsTask("PIN_Attribute_Campaign_day")
    
    with Flow("load-campaigns-flow") as flow:
        ids = get_advertiser_ids.run()
        campaigns = get_advertiser_campaigns.map(ids)
        save_campaigns.map(campaigns)
    here you use 3 task get_advertiser_ids, get_advertiser_campaigns, save_campaigns but i can see only 2 upper flow definition
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    The second is used with map, so no instance is required.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Piotr Karnasiewicz, I’ve seen this once before do you have two flows in a script? Can you try naming the second one something other than
    flow
    ? Or in general try giving them different variable names?
    Piotr Karnasiewicz

    Piotr Karnasiewicz

    1 year ago
    Thanks! We don’t have two flows in a single script. We replaced classes with functions and it works.
    It seems we shouldn’t call
    run
    directly (from some reason).