Martin T
05/31/2022, 12:10 PMflow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW) , that I execute with flow_run.map(parameters=parameters, run_name=run_name) . (Here parameters and run_name are generated by other map() tasks, and of same length.)
When I run the wrapper task locally, targetting the sub-task in Prefect cloud, I get:
└── 12:48:22 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:22 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:23 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
└── 12:48:23 | DEBUG | Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9 created.
└── 12:48:23 | DEBUG | Creating link artifact for Flow Run 7951f47c-1ddf-4e33-b913-a2698a89e4c9.
└── 12:48:24 | INFO | Flow Run: <https://cloud.prefect.io/.../flow-run/7951f47c-1ddf-4e33-b913-a2698a89e4c9>
...
Note the same GUIDs!!! When we visit the cloud UI, only the first mapped task is created!
If the wrapper task is also running in cloud, it works as expected.
I'm not sure how to create a minimal reproducible example for this, since StartFlowRun can't trigger a local flow (I don't use prefect server, only prefect cloud).Kevin Kho
StartFlowRun takes in an idempotency key in the run methodMartin T
05/31/2022, 4:05 PMKevin Kho
StartFlowRun is an older task but I think create_flow_run will work off the shelf here because it adds the map-index. create_flow_run is the newer version of the taskMartin T
06/01/2022, 11:53 AMidempotency_key. That makes sense, and also what the docs describes.
But, if i don't want my flows to be idempotent (create new runs every time), then I also have to provide an idempotency_key??? That can't be true. The docs don't describe a mandatory idempotency_key for non-idempotent flows, and we rarely (ever?) see idempotency keys in official examples. So it should be pretty clear, that by default, flow are always non-idempotent, right?
In reality, that is also the case. Local and cloud runs are non-idempotent. Mapped tasks for StartFlowRun are also non-idempotent... well only in the cloud. When execute locally it behaves oppositely.
On the implementational side:
StartFlowRun sets idempotency_key = prefect.context.get("task_run_id", None) by default, then calls create_flow_run . Is the map_index concatenated with the task_run_id? I couldn't locate the code for this?Kevin Kho
StartFlowRun task. This task adds the map index by default here. StartFlowRun calls client.create_flow_run, not the create_flow_run task.
Is you want to use StartFlowRun, you just need to make a list of distinct keys and pass them to your map call.
@task
def get_idempotency_keys():
items = ["a","b","c"]
res = []
for item in items:
res.append(str(datetime.date.now()+item)
return res
start = StartFlowRun(..)
with Flow(..) as flow:
keys = get_idempotency_keys()
start.map(.., idempotency_key = keys)
Something like that. This will generate new runs.
I don’t think the discussion is flows are idempotent or not by default is helpful. It’s different between a normal scheduled flow run and the StartFlowRun and create_flow_run tasks. For those tasks, the assumption is that a retry should trigger the same flow run so an idempotency key is added by default.Martin T
06/01/2022, 3:40 PMget_idempotency_keys() and combining the task_run_id+map_index, but it felt like I was being offroad...
I'm open to switch to create_flow_run task, but
1) It's not so clear from the docs what the key difference is from StartFlowRun. An example is missing in the docs.
2) I get Could not infer an active Flow context while creating edge when replacing the task 1-to-1. Can create_flow_run be used together with map() ?Kevin Kho
create_flow_run can be used with map. StartFlowRun was not friendly to users because the return type changed is wait=True. The inconsistent return made it hard to work with. So it was decoupled into two tasks in 0.15.0 to have create_flow_run and wait_for_flow_runKevin Kho
create_flow_run is equivalent to StartFlowRun(wait=False). Could you show me the usage that gives you the error?Martin T
06/01/2022, 5:44 PMmyid ,myparam) including the alternative with create_flow_run .
((Should i note, we use prefect==0.15.11, since agents are not upgraded to 1.0 yet...))
from prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import StartFlowRun
from prefect.tasks.prefect.flow_run import create_flow_run
@task
def get_myid():
return [1, 2]
@task
def get_parameters(myid, myparam):
return {"myid": myid, "myparam": myparam}
@task
def get_run_name(myid):
return f"myid: {myid}"
PREFECT_PROJECT = "MYPROJECT"
SUB_FLOW = "MYFLOW"
flow_run = StartFlowRun(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW, wait=False)
# flow_run = create_flow_run(project_name=PREFECT_PROJECT, flow_name=SUB_FLOW)
# ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>.
# This often means you called a task outside a `with Flow(...)` block.
# If you're trying to run this task outside of a Flow context, you need to call `create_flow_run.run(...)`
with Flow("flow-spawner") as flow:
myparam = Parameter("myparam", default="foo")
myids = get_myid()
parameters = get_parameters.map(myids, unmapped(myparam))
run_name = get_run_name.map(myids)
flow_run.map(parameters=parameters, run_name=run_name)
if __name__ == "__main__":
flow.run()Kevin Kho
create_flow_run was defined like this:
@task
def create_flow_run():
So you don’t need to define it outside the flow. You can just use it directly inside the FlowMartin T
06/01/2022, 10:24 PMcreate_flow_run with map(). Still, the mapped runs are created with the same GUID. Seems the map_index is not appended?
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
with Flow("main") as flow:
parameters = [{"my_id": 0}, {"my_id": 1}]
run_id = create_flow_run.map(project_name=unmapped("MY_PROJECT"), flow_name=unmapped("MY_FLOW"), parameters=parameters)
if __name__ == "__main__":
flow.run()
'''
Retrieving local flow... Done
Running flow locally...
└── 00:17:34 | INFO | Beginning Flow run for 'main'
└── 00:17:34 | INFO | Task 'create_flow_run': Starting task run...
└── 00:17:34 | INFO | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
└── 00:17:34 | INFO | Task 'create_flow_run[0]': Starting task run...
└── 00:17:34 | INFO | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:36 | INFO | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:36 | INFO | Task 'create_flow_run[0]': Finished task run for task with final state: 'Success'
└── 00:17:36 | INFO | Task 'create_flow_run[1]': Starting task run...
└── 00:17:36 | INFO | Creating flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW' for flow 'MY_FLOW'...
└── 00:17:37 | INFO | Created flow run 'ef6ec97f-ec4f-446c-b869-aeea2a1e789f-MY_FLOW': <https://cloud.prefect.io/MY_ACCOUNT/flow-run/546d7d9e-c8df-417e-8d51-55ed78bfaa4a>
└── 00:17:37 | INFO | Task 'create_flow_run[1]': Finished task run for task with final state: 'Success'
└── 00:17:37 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
'''Kevin Kho
Martin T
06/01/2022, 10:29 PMKevin Kho
Martin T
06/01/2022, 10:40 PMKevin Kho
Martin T
06/01/2022, 10:54 PMKevin Kho