https://prefect.io logo
v

Vamsi Reddy

02/08/2022, 12:08 AM
Hi all, is it possible to know the status of a flow with a particular run_name ? I want to query flow runs using python….we want to check if a flow is currently running else we will be creating a run for it.
k

Kevin Kho

02/08/2022, 1:40 AM
You want something like this. Or you can pass an
idempotency_key
when you create a flow run because a call with a key that has already ran will not trigger a new flow run
v

Vamsi Reddy

02/08/2022, 3:26 PM
How do i use the idempotency_key? shall i just pass it to the create_flow_run() ? also this will not trigger a new flow run only in case there is already a flow with that key running? otherwise i would want to trigger the flow
k

Kevin Kho

02/08/2022, 3:28 PM
Yes you can check the docstring . So you can use something like a stringified datetime
v

Vamsi Reddy

02/08/2022, 3:33 PM
so my doubt is if i use this idempotency_key = ‘do-not-create-multiple-runs’……for flow B. suppose there are two flow runs for flow A currently running and they will be triggering flow B based on certain conditions. If for example Flow A has two instances running and both of them at the exact time try to trigger Flow B i want only 1 flow run instead of 2. also once flow B has finished will i need to again provide a new idempotency_key inorder to trigger future flow runs for flow B or it can remain constant ?
k

Kevin Kho

02/08/2022, 3:38 PM
Yes you need a new idempotency key the next time around
The default is the task run id that calls create_flow_run
v

Vamsi Reddy

02/08/2022, 3:54 PM
so would it be possible to cancel a flow based on its flow_run_name? i can have a unique name for the flow run and would like to cancel out duplicate flow runs for that flow with that run_name . the above idempotency_key would just be useful for one run and will never trigger with that key …… the other method suggested above will cancel all other instances of the flow run. we have our flow setup in a way where they process data from multiple sources but the logic is same so it is essentially multiple runs.
k

Kevin Kho

02/08/2022, 3:59 PM
Not name but with the flow run id you can use the Client. You can probably query for the id with the flow run name. Is there some schedule to these runs? Maybe you can use an idempotency key rounded to the nearest hour? That way, at the next hour, you get a new idempotency key You can also use caching so that it doesn’t re-run tasks that have run already
If the tasks are cached, the duplicate flow run won’t run it (assuming they already finished)
v

Vamsi Reddy

02/08/2022, 4:09 PM
so our flows are not scheduled they are triggered on file drop on s3. the next flow gets triggered once the s3 triggered flow finishes based on certain logic. I guess using the idempotency_key but rounding it to the nearest min would work. There maybe other flows that might run during that hour. Thanks for the suggestion @Kevin Kho I will give this a try. Also the probability of two flows creating a flow run at the same time is low but wanted to address this edge case.
@Kevin Kho when i try to pass the argument for
idempotency_key
in create_flow_run i get this error :
Copy code
TypeError: got an unexpected keyword argument 'idempotency_key'
Copy code
File "/Users/vamsi/git_repo/automation/Automation/flows/SIE_flow.py", line 536, in <module>
    idempotency_key=f'SPE_flow_{company_bucket}' + datetime.datetime.now().strftime("_%m-%d-%Y_%H")
  File "/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/core/task.py", line 634, in __call__
    *args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
  File "/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/core/task.py", line 674, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/inspect.py", line 3015, in bind
    return args[0]._bind(args[1:], kwargs)
  File "/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/inspect.py", line 3006, in _bind
    arg=next(iter(kwargs))))
TypeError: got an unexpected keyword argument 'idempotency_key'
k

Kevin Kho

02/10/2022, 2:31 PM
What? The docstring has it. What version are you on?
v

Vamsi Reddy

02/10/2022, 2:32 PM
i am on prefect version - 0.15.6
k

Kevin Kho

02/10/2022, 2:34 PM
It is in 0.15.8
v

Vamsi Reddy

02/10/2022, 2:36 PM
ok i will try to update it.
9 Views