Thread
#prefect-community
    Martin T

    Martin T

    3 months ago
    Hi πŸ˜ƒStartFlowRun() with map() not creating unique runs. I have a wrapper flow, that creates sub-flow runs:
    flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
    , that I execute with
    flow_run.map(parameters=parameters, run_name=run_name)
    . (Here
    parameters
    and
    run_name
    are generated by other
    map()
    tasks, and of same length.) When I run the wrapper task locally, targetting the sub-task in Prefect cloud, I get:
    └── 12:48:22 | DEBUG   | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
    └── 12:48:22 | DEBUG   | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
    └── 12:48:23 | INFO    | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
    ...
    └── 12:48:23 | DEBUG   | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
    └── 12:48:23 | DEBUG   | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
    └── 12:48:24 | INFO    | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
    ...
    Note the same GUIDs!!! When we visit the cloud UI, only the first mapped task is created! If the wrapper task is also running in cloud, it works as expected. I'm not sure how to create a minimal reproducible example for this, since
    StartFlowRun
    can't trigger a local flow (I don't use prefect server, only prefect cloud).
    Kevin Kho

    Kevin Kho

    3 months ago
    You need to supply idempotency keys into StartFlowRun. Flow runs with the same idempotency key only trigger 1 flow run, and the idempotency key used by default is the task run id.
    StartFlowRun
    takes in an idempotency key in the run method
    Martin T

    Martin T

    3 months ago
    Thanks @Kevin Kho, I will try that. Can you explain why this is not an issue in Prefect cloud??
    Kevin Kho

    Kevin Kho

    3 months ago
    Idempotency is an intentional feature of orchestrators. You just need to provide different keys to be clear you want different runs.
    StartFlowRun
    is an older task but I think
    create_flow_run
    will work off the shelf here because it adds the
    map-index
    .
    create_flow_run
    is the newer version of the task
    Martin T

    Martin T

    3 months ago
    I still don't get this. If you want the flow to be idempotent, provide a non-changing
    idempotency_key
    . That makes sense, and also what the docs describes. But, if i don't want my flows to be idempotent (create new runs every time), then I also have to provide an
    idempotency_key
    ??? That can't be true. The docs don't describe a mandatory idempotency_key for non-idempotent flows, and we rarely (ever?) see idempotency keys in official examples. So it should be pretty clear, that by default, flow are always non-idempotent, right? In reality, that is also the case. Local and cloud runs are non-idempotent. Mapped tasks for
    StartFlowRun
    are also non-idempotent... well only in the cloud. When execute locally it behaves oppositely. On the implementational side:
    StartFlowRun
    sets
    idempotency_key = prefect.context.get("task_run_id", None)
    by default, then calls
    create_flow_run
    . Is the
    map_index
    concatenated with the
    task_run_id
    ? I couldn't locate the code for this?
    Kevin Kho

    Kevin Kho

    3 months ago
    On the implementation side, I said above that the create_flow_run task is the newer version of the
    StartFlowRun
    task. This task adds the map index by default here.
    StartFlowRun
    calls
    client.create_flow_run
    , not the
    create_flow_run
    task. Is you want to use
    StartFlowRun
    , you just need to make a list of distinct keys and pass them to your map call.
    @task
    def get_idempotency_keys():
        items = ["a","b","c"]
        res = []
        for item in items:
            res.append(str(datetime.date.now()+item)
        return res
    
    start = StartFlowRun(..)
    with Flow(..) as flow:
        keys = get_idempotency_keys()
        start.map(.., idempotency_key = keys)
    Something like that. This will generate new runs. I don’t think the discussion is flows are idempotent or not by default is helpful. It’s different between a normal scheduled flow run and the
    StartFlowRun
    and
    create_flow_run
    tasks. For those tasks, the assumption is that a retry should trigger the same flow run so an idempotency key is added by default.
    Martin T

    Martin T

    3 months ago
    @Kevin Kho thanks πŸ™‚ I thought of doing something like your
    get_idempotency_keys()
    and combining the
    task_run_id+map_index
    , but it felt like I was being offroad... I'm open to switch to
    create_flow_run
    task, but1) It's not so clear from the docs what the key difference is from
    StartFlowRun
    . An example is missing in the docs. 2) I get
    Could not infer an active Flow context while creating edge
    when replacing the task 1-to-1. Can
    create_flow_run
    be used together with
    map()
    ?
    Kevin Kho

    Kevin Kho

    3 months ago
    Yes
    create_flow_run
    can be used with map.
    StartFlowRun
    was not friendly to users because the return type changed is
    wait=True
    . The inconsistent return made it hard to work with. So it was decoupled into two tasks in 0.15.0 to have
    create_flow_run
    and
    wait_for_flow_run
    create_flow_run
    is equivalent to
    StartFlowRun(wait=False)
    . Could you show me the usage that gives you the error?
    Martin T

    Martin T

    3 months ago
    @Kevin Kho This is a runable example (provided you point it to a cloud registered flow that takes parameters
    myid
    ,
    myparam
    ) including the alternative with
    create_flow_run
    . ((Should i note, we use prefect==0.15.11, since agents are not upgraded to 1.0 yet...))
    from prefect import Flow, Parameter, task, unmapped
    from prefect.tasks.prefect import StartFlowRun
    from prefect.tasks.prefect.flow_run import create_flow_run
    
    
    @task
    def get_myid():
        return [1, 2]
    
    
    @task
    def get_parameters(myid, myparam):
        return {"myid": myid, "myparam": myparam}
    
    
    @task
    def get_run_name(myid):
        return f"myid: {myid}"
    
    
    PREFECT_PROJECT = "MYPROJECT"
    SUB_FLOW = "MYFLOW"
    
    flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW, wait=False)
    
    # flow_run = create_flow_run(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
    #   ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>.
    #   This often means you called a task outside a `with Flow(...)` block.
    #   If you're trying to run this task outside of a Flow context, you need to call `create_flow_run.run(...)`
    
    with Flow("flow-spawner") as flow:
        myparam = Parameter("myparam", default="foo")
        myids = get_myid()
        parameters = get_parameters.map(myids, unmapped(myparam))
        run_name = get_run_name.map(myids)
        flow_run.map(parameters=parameters, run_name=run_name)
    
    if __name__ == "__main__":
        flow.run()
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I understand.
    create_flow_run
    was defined like this:
    @task
    def create_flow_run():
    So you don’t need to define it outside the flow. You can just use it directly inside the Flow
    Martin T

    Martin T

    3 months ago
    @Kevin Kho finally migrated to
    create_flow_run
    with
    map()
    . Still, the mapped runs are created with the same GUID. Seems the
    map_index
    is not appended?
    from prefect import Flow, unmapped
    from prefect.tasks.prefect import create_flow_run
    
    with Flow("main") as flow:
        parameters = [{"my_id": 0}, {"my_id": 1}]
        run_id = create_flow_run.map(project_name=unmapped("MY_PROJECT"), flow_name=unmapped("MY_FLOW"), parameters=parameters)
    
    if __name__ == "__main__":
        flow.run()
    
    '''
    Retrieving local flow... Done
    Running flow locally...
    └── 00:17:34 | INFO    | Beginning Flow run for 'main'
    └── 00:17:34 | INFO    | Task 'create_flow_run': Starting task run...
    └── 00:17:34 | INFO    | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
    └── 00:17:34 | INFO    | Task 'create_flow_run[0]': Starting task run...
    └── 00:17:34 | INFO    | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
    └── 00:17:36 | INFO    | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
    └── 00:17:36 | INFO    | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success'
    └── 00:17:36 | INFO    | Task 'create_flow_run[1]': Starting task run...
    └── 00:17:36 | INFO    | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
    └── 00:17:37 | INFO    | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
    └── 00:17:37 | INFO    | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success'
    └── 00:17:37 | INFO    | Flow run SUCCESS: all reference tasks succeeded
    Flow run succeeded!
    '''
    Kevin Kho

    Kevin Kho

    3 months ago
    what is your Prefect version?
    Martin T

    Martin T

    3 months ago
    0.15.11
    Kevin Kho

    Kevin Kho

    3 months ago
    Are you doing flow.run or Cloud run? The flow.run was fixed in 1.1. Changelog
    Martin T

    Martin T

    3 months ago
    flow.run locally. Okay, we need to upgrade agents and flows πŸ™‚ The coupling between prefect flows versions and agent versions has held us from updating to >=1.0, because we have to upgrade all at once, or be careful not to mix flow/agent versions. But that's for another discussion.
    Kevin Kho

    Kevin Kho

    3 months ago
    Well agent version just needs to be greater than Flow version (mostly). There are rarely breaking changes that make an agent fail to deploy a flow with a lower version
    Martin T

    Martin T

    3 months ago
    Okay πŸ™‚ Will start with agents. https://github.com/PrefectHQ/prefect/pull/5443 this exactly describes my issue, and also why only local is affected.
    Kevin Kho

    Kevin Kho

    3 months ago
    Because the rest spin up other containers to run the Flow so as long as the container is also spun up, it should run fine