https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Martin T

05/31/2022, 12:10 PM
Hi :-) StartFlowRun() with map() not creating unique runs. I have a wrapper flow, that creates sub-flow runs:
flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
, that I execute with
flow_run.map(parameters=parameters, run_name=run_name)
. (Here
parameters
and
run_name
are generated by other
map()
tasks, and of same length.) When I run the wrapper task locally, targetting the sub-task in Prefect cloud, I get:
Copy code
└── 12:48:22 | DEBUG   | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:22 | DEBUG   | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:23 | INFO    | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
└── 12:48:23 | DEBUG   | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:23 | DEBUG   | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:24 | INFO    | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
Note the same GUIDs!!! When we visit the cloud UI, only the first mapped task is created! If the wrapper task is also running in cloud, it works as expected. I'm not sure how to create a minimal reproducible example for this, since
StartFlowRun
can't trigger a local flow (I don't use prefect server, only prefect cloud).
k

Kevin Kho

05/31/2022, 2:25 PM
You need to supply idempotency keys into StartFlowRun. Flow runs with the same idempotency key only trigger 1 flow run, and the idempotency key used by default is the task run id.
StartFlowRun
takes in an idempotency key in the run method
m

Martin T

05/31/2022, 4:05 PM
Thanks @Kevin Kho, I will try that. Can you explain why this is not an issue in Prefect cloud??
k

Kevin Kho

05/31/2022, 4:08 PM
Idempotency is an intentional feature of orchestrators. You just need to provide different keys to be clear you want different runs.
StartFlowRun
is an older task but I think
create_flow_run
will work off the shelf here because it adds the
map-index
.
create_flow_run
is the newer version of the task
m

Martin T

06/01/2022, 11:53 AM
I still don't get this. If you want the flow to be idempotent, provide a non-changing
idempotency_key
. That makes sense, and also what the docs describes. But, if i don't want my flows to be idempotent (create new runs every time), then I also have to provide an
idempotency_key
??? That can't be true. The docs don't describe a mandatory idempotency_key for non-idempotent flows, and we rarely (ever?) see idempotency keys in official examples. So it should be pretty clear, that by default, flow are always non-idempotent, right? In reality, that is also the case. Local and cloud runs are non-idempotent. Mapped tasks for
StartFlowRun
are also non-idempotent... well only in the cloud. When execute locally it behaves oppositely. On the implementational side:
StartFlowRun
sets
idempotency_key = prefect.context.get("task_run_id", None)
by default, then calls
create_flow_run
. Is the
map_index
concatenated with the
task_run_id
? I couldn't locate the code for this?
k

Kevin Kho

06/01/2022, 2:40 PM
On the implementation side, I said above that the create_flow_run task is the newer version of the
StartFlowRun
task. This task adds the map index by default here.
StartFlowRun
calls
client.create_flow_run
, not the
create_flow_run
task. Is you want to use
StartFlowRun
, you just need to make a list of distinct keys and pass them to your map call.
Copy code
@task
def get_idempotency_keys():
    items = ["a","b","c"]
    res = []
    for item in items:
        res.append(str(datetime.date.now()+item)
    return res

start = StartFlowRun(..)
with Flow(..) as flow:
    keys = get_idempotency_keys()
    start.map(.., idempotency_key = keys)
Something like that. This will generate new runs. I don’t think the discussion is flows are idempotent or not by default is helpful. It’s different between a normal scheduled flow run and the
StartFlowRun
and
create_flow_run
tasks. For those tasks, the assumption is that a retry should trigger the same flow run so an idempotency key is added by default.
m

Martin T

06/01/2022, 3:40 PM
@Kevin Kho thanks 🙂 I thought of doing something like your
get_idempotency_keys()
and combining the
task_run_id+map_index
, but it felt like I was being offroad... I'm open to switch to
create_flow_run
task, but 1) It's not so clear from the docs what the key difference is from
StartFlowRun
. An example is missing in the docs. 2) I get
Could not infer an active Flow context while creating edge
when replacing the task 1-to-1. Can
create_flow_run
be used together with
map()
?
k

Kevin Kho

06/01/2022, 4:30 PM
Yes
create_flow_run
can be used with map.
StartFlowRun
was not friendly to users because the return type changed is
wait=True
. The inconsistent return made it hard to work with. So it was decoupled into two tasks in 0.15.0 to have
create_flow_run
and
wait_for_flow_run
👍 1
create_flow_run
is equivalent to
StartFlowRun(wait=False)
. Could you show me the usage that gives you the error?
m

Martin T

06/01/2022, 5:44 PM
@Kevin Kho This is a runable example (provided you point it to a cloud registered flow that takes parameters
myid
,
myparam
) including the alternative with
create_flow_run
. ((Should i note, we use prefect==0.15.11, since agents are not upgraded to 1.0 yet...))
Copy code
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import StartFlowRun
from prefect.tasks.prefect.flow_run import create_flow_run


@task
def get_myid():
    return [1, 2]


@task
def get_parameters(myid, myparam):
    return {"myid": myid, "myparam": myparam}


@task
def get_run_name(myid):
    return f"myid: {myid}"


PREFECT_PROJECT = "MYPROJECT"
SUB_FLOW = "MYFLOW"

flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW, wait=False)

# flow_run = create_flow_run(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
#   ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>.
#   This often means you called a task outside a `with Flow(...)` block.
#   If you're trying to run this task outside of a Flow context, you need to call `create_flow_run.run(...)`

with Flow("flow-spawner") as flow:
    myparam = Parameter("myparam", default="foo")
    myids = get_myid()
    parameters = get_parameters.map(myids, unmapped(myparam))
    run_name = get_run_name.map(myids)
    flow_run.map(parameters=parameters, run_name=run_name)

if __name__ == "__main__":
    flow.run()
k

Kevin Kho

06/01/2022, 6:19 PM
Ah I understand.
create_flow_run
was defined like this:
Copy code
@task
def create_flow_run():
So you don’t need to define it outside the flow. You can just use it directly inside the Flow
m

Martin T

06/01/2022, 10:24 PM
@Kevin Kho finally migrated to
create_flow_run
with
map()
. Still, the mapped runs are created with the same GUID. Seems the
map_index
is not appended?
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run

with Flow("main") as flow:
    parameters = [{"my_id": 0}, {"my_id": 1}]
    run_id = create_flow_run.map(project_name=unmapped("MY_PROJECT"), flow_name=unmapped("MY_FLOW"), parameters=parameters)

if __name__ == "__main__":
    flow.run()

'''
Retrieving local flow... Done
Running flow locally...
└── 00:17:34 | INFO    | Beginning Flow run for 'main'
└── 00:17:34 | INFO    | Task 'create_flow_run': Starting task run...
└── 00:17:34 | INFO    | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
└── 00:17:34 | INFO    | Task 'create_flow_run[0]': Starting task run...
└── 00:17:34 | INFO    | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:36 | INFO    | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:36 | INFO    | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success'
└── 00:17:36 | INFO    | Task 'create_flow_run[1]': Starting task run...
└── 00:17:36 | INFO    | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:37 | INFO    | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:37 | INFO    | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success'
└── 00:17:37 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
'''
k

Kevin Kho

06/01/2022, 10:27 PM
what is your Prefect version?
m

Martin T

06/01/2022, 10:29 PM
0.15.11
k

Kevin Kho

06/01/2022, 10:32 PM
Are you doing flow.run or Cloud run? The flow.run was fixed in 1.1. Changelog
m

Martin T

06/01/2022, 10:40 PM
flow.run locally. Okay, we need to upgrade agents and flows 🙂 The coupling between prefect flows versions and agent versions has held us from updating to >=1.0, because we have to upgrade all at once, or be careful not to mix flow/agent versions. But that's for another discussion.
k

Kevin Kho

06/01/2022, 10:42 PM
Well agent version just needs to be greater than Flow version (mostly). There are rarely breaking changes that make an agent fail to deploy a flow with a lower version
m

Martin T

06/01/2022, 10:54 PM
Okay 🙂 Will start with agents. https://github.com/PrefectHQ/prefect/pull/5443 this exactly describes my issue, and also why only local is affected.
k

Kevin Kho

06/01/2022, 10:58 PM
Because the rest spin up other containers to run the Flow so as long as the container is also spun up, it should run fine
8 Views