John Muehlhausen
11/18/2021, 11:08 PMKevin Kho
11/18/2021, 11:29 PMdef mystatehandler(obj, old_state, new_state):
if new_state.is_finished():
client.create_flow_run(...)
return new_state
with Flow(..., state_handlers = [mystatehandler]) as flow:
...
This will also be a first class feature in Orion (Prefect 2.0)John Muehlhausen
11/18/2021, 11:56 PMMichael Adkins
11/18/2021, 11:57 PMJohn Muehlhausen
11/18/2021, 11:58 PMprefect.client.client.Client.create_flow_run(flow_id=None, context=None, parameters=None, run_config=None, labels=None, scheduled_start_time=None, idempotency_key=None, run_name=None, version_group_id=None)
Michael Adkins
11/18/2021, 11:58 PMcreate_flow_run
taskJohn Muehlhausen
11/18/2021, 11:58 PMMichael Adkins
11/18/2021, 11:58 PM.run()
create_flow_run
task may be helpful hereJohn Muehlhausen
11/18/2021, 11:59 PMfrom prefect.tasks.prefect import create_flow_run
create_flow_run(flow_name="")
Michael Adkins
11/19/2021, 12:00 AMif flow_name:
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
...
client = Client()
flow_run_id = client.create_flow_run(
flow_id=flow.flow_id,
...
John Muehlhausen
11/19/2021, 12:00 AMMichael Adkins
11/19/2021, 12:00 AMcreate_flow_run.run(…)
to trigger the task code outside of a with Flow..
blockFlowView
as I shared above to get the flow id instead, it’s simpler.Anna Geller
11/19/2021, 12:24 AMif new_state.is_finished():
John Muehlhausen
11/19/2021, 1:51 AMAnna Geller
11/19/2021, 10:02 AMI’m wondering if I have some rogue flows that are not visible on the Prefect Cloud dashboardThis shouldn’t happen.
Any chance this would make flows too hard to kill?In general, cancellation is a best effort attempt to kill execution initiated on your infrastructure. The best way to tackle it would be to use MERGE INTO or upsert instead of insert. Would it make sense to apply that in your use case?
John Muehlhausen
11/19/2021, 2:41 PMcancellation is a best effort attempt to kill execution initiated on your infrastructure
Kevin Kho
11/19/2021, 2:46 PM