Martin T
05/31/2022, 12:10 PMflow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
, that I execute with flow_run.map(parameters=parameters, run_name=run_name)
. (Here parameters
and run_name
are generated by other map()
tasks, and of same length.)
When I run the wrapper task locally, targetting the sub-task in Prefect cloud, I get:
└── 12:48:22 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:22 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:23 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
└── 12:48:23 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:23 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:24 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
Note the same GUIDs!!! When we visit the cloud UI, only the first mapped task is created!
If the wrapper task is also running in cloud, it works as expected.
I'm not sure how to create a minimal reproducible example for this, since StartFlowRun
can't trigger a local flow (I don't use prefect server, only prefect cloud).Kevin Kho
05/31/2022, 2:25 PMStartFlowRun
takes in an idempotency key in the run methodMartin T
05/31/2022, 4:05 PMKevin Kho
05/31/2022, 4:08 PMStartFlowRun
is an older task but I think create_flow_run
will work off the shelf here because it adds the map-index
. create_flow_run
is the newer version of the taskMartin T
06/01/2022, 11:53 AMidempotency_key
. That makes sense, and also what the docs describes.
But, if i don't want my flows to be idempotent (create new runs every time), then I also have to provide an idempotency_key
??? That can't be true. The docs don't describe a mandatory idempotency_key for non-idempotent flows, and we rarely (ever?) see idempotency keys in official examples. So it should be pretty clear, that by default, flow are always non-idempotent, right?
In reality, that is also the case. Local and cloud runs are non-idempotent. Mapped tasks for StartFlowRun
are also non-idempotent... well only in the cloud. When execute locally it behaves oppositely.
On the implementational side:
StartFlowRun
sets idempotency_key = prefect.context.get("task_run_id", None)
by default, then calls create_flow_run
. Is the map_index
concatenated with the task_run_id
? I couldn't locate the code for this?Kevin Kho
06/01/2022, 2:40 PMStartFlowRun
task. This task adds the map index by default here. StartFlowRun
calls client.create_flow_run
, not the create_flow_run
task.
Is you want to use StartFlowRun
, you just need to make a list of distinct keys and pass them to your map call.
@task
def get_idempotency_keys():
items = ["a","b","c"]
res = []
for item in items:
res.append(str(datetime.date.now()+item)
return res
start = StartFlowRun(..)
with Flow(..) as flow:
keys = get_idempotency_keys()
start.map(.., idempotency_key = keys)
Something like that. This will generate new runs.
I don’t think the discussion is flows are idempotent or not by default is helpful. It’s different between a normal scheduled flow run and the StartFlowRun
and create_flow_run
tasks. For those tasks, the assumption is that a retry should trigger the same flow run so an idempotency key is added by default.Martin T
06/01/2022, 3:40 PMget_idempotency_keys()
and combining the task_run_id+map_index
, but it felt like I was being offroad...
I'm open to switch to create_flow_run
task, but
1) It's not so clear from the docs what the key difference is from StartFlowRun
. An example is missing in the docs.
2) I get Could not infer an active Flow context while creating edge
when replacing the task 1-to-1. Can create_flow_run
be used together with map()
?Kevin Kho
06/01/2022, 4:30 PMcreate_flow_run
can be used with map. StartFlowRun
was not friendly to users because the return type changed is wait=True
. The inconsistent return made it hard to work with. So it was decoupled into two tasks in 0.15.0 to have create_flow_run
and wait_for_flow_run
create_flow_run
is equivalent to StartFlowRun(wait=False)
. Could you show me the usage that gives you the error?Martin T
06/01/2022, 5:44 PMmyid
,myparam
) including the alternative with create_flow_run
.
((Should i note, we use prefect==0.15.11, since agents are not upgraded to 1.0 yet...))
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import StartFlowRun
from prefect.tasks.prefect.flow_run import create_flow_run
@task
def get_myid():
return [1, 2]
@task
def get_parameters(myid, myparam):
return {"myid": myid, "myparam": myparam}
@task
def get_run_name(myid):
return f"myid: {myid}"
PREFECT_PROJECT = "MYPROJECT"
SUB_FLOW = "MYFLOW"
flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW, wait=False)
# flow_run = create_flow_run(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
# ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>.
# This often means you called a task outside a `with Flow(...)` block.
# If you're trying to run this task outside of a Flow context, you need to call `create_flow_run.run(...)`
with Flow("flow-spawner") as flow:
myparam = Parameter("myparam", default="foo")
myids = get_myid()
parameters = get_parameters.map(myids, unmapped(myparam))
run_name = get_run_name.map(myids)
flow_run.map(parameters=parameters, run_name=run_name)
if __name__ == "__main__":
flow.run()
Kevin Kho
06/01/2022, 6:19 PMcreate_flow_run
was defined like this:
@task
def create_flow_run():
So you don’t need to define it outside the flow. You can just use it directly inside the FlowMartin T
06/01/2022, 10:24 PMcreate_flow_run
with map()
. Still, the mapped runs are created with the same GUID. Seems the map_index
is not appended?
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
with Flow("main") as flow:
parameters = [{"my_id": 0}, {"my_id": 1}]
run_id = create_flow_run.map(project_name=unmapped("MY_PROJECT"), flow_name=unmapped("MY_FLOW"), parameters=parameters)
if __name__ == "__main__":
flow.run()
'''
Retrieving local flow... Done
Running flow locally...
└── 00:17:34 | INFO | Beginning Flow run for 'main'
└── 00:17:34 | INFO | Task 'create_flow_run': Starting task run...
└── 00:17:34 | INFO | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
└── 00:17:34 | INFO | Task 'create_flow_run[0]': Starting task run...
└── 00:17:34 | INFO | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:36 | INFO | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:36 | INFO | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success'
└── 00:17:36 | INFO | Task 'create_flow_run[1]': Starting task run...
└── 00:17:36 | INFO | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:37 | INFO | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:37 | INFO | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success'
└── 00:17:37 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
'''
Kevin Kho
06/01/2022, 10:27 PMMartin T
06/01/2022, 10:29 PMKevin Kho
06/01/2022, 10:32 PMMartin T
06/01/2022, 10:40 PMKevin Kho
06/01/2022, 10:42 PMMartin T
06/01/2022, 10:54 PMKevin Kho
06/01/2022, 10:58 PM