https://prefect.io logo
Title
n

Naga Sravika Bodapati

05/27/2022, 12:02 PM
any way to programmatically kill a flow that is running for more than a certain period of time? I am aware of SLA automations but want to do it code way. Please suggest. Thanks!
v

Vadym Dytyniak

05/27/2022, 12:04 PM
client = Client()

where = {
    'name': {'_like': 'flow_name_example%'},
    'status': {}
}

flow_run_query = {
    'query': {
        with_args('flow_run', {'where': where}): {
            'id': True,
        },
    },
}

result = client.graphql(flow_run_query)
flow_runs = result['data']['flow_run']

for flow_run in flow_runs:
    client.cancel_flow_run(flow_run.id)
just modify query to get flow that runs more than expected
n

Naga Sravika Bodapati

05/27/2022, 12:05 PM
can this be done from with in the flow itself?
or there has to be a poller doing this on a regular basis?
v

Vadym Dytyniak

05/27/2022, 12:07 PM
If you want to kill flow itself, probably it is better to use task timeout
n

Naga Sravika Bodapati

05/27/2022, 12:08 PM
we are using a version 0.14.10 which doesnt allow that option
n

Naga Sravika Bodapati

05/27/2022, 12:11 PM
we are using a version 0.14.10 which doesnt allow that option
how to get runtime of a flow?
use python current time and start time from flow_run?
v

Vadym Dytyniak

05/27/2022, 12:12 PM
It is code for 0.14.10
n

Naga Sravika Bodapati

05/27/2022, 12:13 PM
i tried timeout
but it gave option not availabl
so not using it
v

Vadym Dytyniak

05/27/2022, 12:16 PM
It is strange
in that case you can use Prefect Client
I used created as start_time field is null, I don't know why
n

Naga Sravika Bodapati

05/27/2022, 12:17 PM
`ok thanks! will try out and get back in case of any issues!
v

Vadym Dytyniak

05/27/2022, 12:18 PM
check Task class if timeout is there
it should be more elegant solution
n

Naga Sravika Bodapati

05/27/2022, 12:18 PM
ok sure
k

Kevin Kho

05/27/2022, 1:53 PM
timeout is the more elegant solution, but not guaranteed when the flow is running somewhere else (Dask). Dask itself has no mechanism to cancel work already happening on the cluster. It’s very hard in Python to cancel a process happening, especially is the process is on another machine. you can also wrap the code Vadym proposed inside a state handler to cancel the Flow from within itself based on some condition of a task. There is no built-in Prefect Signal to end a run immediately
🙌 1
n

Naga Sravika Bodapati

05/27/2022, 1:55 PM
yes, i just tried it. The tasks wouldnt even start running with the timeout parameter. Working on scavenger script to kill flows using the logic mentioned by vadym above!
k

Kevin Kho

05/27/2022, 1:57 PM
You can do:
client.cancel_flow_run(prefect.context.flow_run_id)
n

Naga Sravika Bodapati

05/27/2022, 2:01 PM
out flows seem to be getting stuck in one task and not getting out
so not sure how to handle with state handler?
k

Kevin Kho

05/27/2022, 2:02 PM
Ahh for that automation is really the best way because it’s driven by Cloud
n

Naga Sravika Bodapati

05/27/2022, 2:03 PM
yeah, there are like 150 flows!
🙂
hi, how to get the name of flow which is registered to a project using graphql?
name seems to be the run name - i want to use the name of the flow as it is registered
v

Vadym Dytyniak

05/27/2022, 2:51 PM
flow_run has flow, flow has name
query {
      flow_run {
        id,
        created,
        flow {
          name
        }
      }
    }
n

Naga Sravika Bodapati

05/27/2022, 2:52 PM
ok..