John Muehlhausen
11/18/2021, 11:08 PMJohn Muehlhausen
11/18/2021, 11:17 PMKevin Kho
def mystatehandler(obj, old_state, new_state):
if new_state.is_finished():
client.create_flow_run(...)
return new_state
with Flow(..., state_handlers = [mystatehandler]) as flow:
...
This will also be a first class feature in Orion (Prefect 2.0)John Muehlhausen
11/18/2021, 11:56 PMZanie
John Muehlhausen
11/18/2021, 11:58 PMprefect.client.client.Client.create_flow_run(flow_id=None, context=None, parameters=None, run_config=None, labels=None, scheduled_start_time=None, idempotency_key=None, run_name=None, version_group_id=None)
Zanie
create_flow_run
taskJohn Muehlhausen
11/18/2021, 11:58 PMZanie
Zanie
.run()
Zanie
Zanie
create_flow_run
task may be helpful hereJohn Muehlhausen
11/18/2021, 11:59 PMfrom prefect.tasks.prefect import create_flow_run
John Muehlhausen
11/19/2021, 12:00 AMcreate_flow_run(flow_name="")
Zanie
if flow_name:
flow = FlowView.from_flow_name(flow_name, project_name=project_name)
...
client = Client()
flow_run_id = client.create_flow_run(
flow_id=flow.flow_id,
...
John Muehlhausen
11/19/2021, 12:00 AMZanie
create_flow_run.run(…)
to trigger the task code outside of a with Flow..
blockZanie
FlowView
as I shared above to get the flow id instead, it’s simpler.Anna Geller
if new_state.is_finished():
John Muehlhausen
11/19/2021, 1:51 AMJohn Muehlhausen
11/19/2021, 5:16 AMAnna Geller
I’m wondering if I have some rogue flows that are not visible on the Prefect Cloud dashboardThis shouldn’t happen.
Any chance this would make flows too hard to kill?In general, cancellation is a best effort attempt to kill execution initiated on your infrastructure. The best way to tackle it would be to use MERGE INTO or upsert instead of insert. Would it make sense to apply that in your use case?
John Muehlhausen
11/19/2021, 2:41 PMcancellation is a best effort attempt to kill execution initiated on your infrastructure
John Muehlhausen
11/19/2021, 2:41 PMKevin Kho