How do I tell Prefect to run exactly one copy of a...
# prefect-community
j
How do I tell Prefect to run exactly one copy of a Flow, starting immediately, until the Flow ends (variable amount of time), then immediately start another single copy, and so forth? ... by "copy" I just mean I wouldn't want the Flow runs to overlap
perhaps I'm asking if a Flow can begin on a "schedule" of a Flow ending... then after starting the Flow manually it would never end?
k
Hey @John Muehlhausen, use it in the flow level state handler . It will look something like this:
Copy code
def mystatehandler(obj, old_state, new_state):
    if new_state.is_finished():
        client.create_flow_run(...)
    return new_state

with Flow(..., state_handlers = [mystatehandler]) as flow:
     ...
This will also be a first class feature in Orion (Prefect 2.0)
j
create_flow_run() appears to want a flow_id ... how does my flow know it's own flow_id?
z
Your flow needs to be registered to be spawnable by the API. You can pass a flow name and project name combination to uniquely identify it instead.
j
Copy code
prefect.client.client.Client.create_flow_run(flow_id=None, context=None, parameters=None, run_config=None, labels=None, scheduled_start_time=None, idempotency_key=None, run_name=None, version_group_id=None)
z
Oh, my apologies. I was referring to the
create_flow_run
task
j
I don't see project/flow name?
You can use that task and just call
.run()
Or you can query for the flow and use the client method
The source code of the
create_flow_run
task may be helpful here
j
from prefect.tasks.prefect import create_flow_run
create_flow_run(flow_name="")
z
Copy code
if flow_name:
        flow = FlowView.from_flow_name(flow_name, project_name=project_name)

    ...

    client = Client()
    flow_run_id = client.create_flow_run(
        flow_id=flow.flow_id,
        ...
j
This told me I have to run in a Flow context
z
You’d have to call
create_flow_run.run(…)
to trigger the task code outside of a
with Flow..
block
I’d recommend instantiating a
FlowView
as I shared above to get the flow id instead, it’s simpler.
a
@John Muehlhausen here is one example that you could use - any time your flow run ends successfully, a new flow run gets triggered: https://gist.github.com/0dca1181d56e797f38390ad8850c54d7 if you want it to be triggered even on failure i.e. as long as it’s finished, use this instead, as Kevin pointed out:
Copy code
if new_state.is_finished():
j
@Anna Geller thank you! Was thinking about dropping in at the event tonight but parent-teacher conference had priority 🙂
👍 1
Any chance this would make flows too hard to kill? I deleted some flows (Prefect Cloud UI) that insert into a database. The most recently installed one does indeed insert on the condition of interest, but 5 other versions of the same record at nearly the same time show up in the db. I've logged to Prefect all insert statements being executed and there is only one of them.... so I'm wondering if I have some rogue flows that are not visible on the Prefect Cloud dashboard. Ever hear of anything like that?
a
I’m wondering if I have some rogue flows that are not visible on the Prefect Cloud dashboard
This shouldn’t happen.
Any chance this would make flows too hard to kill?
In general, cancellation is a best effort attempt to kill execution initiated on your infrastructure. The best way to tackle it would be to use MERGE INTO or upsert instead of insert. Would it make sense to apply that in your use case?
upvote 1
j
cancellation is a best effort attempt to kill execution initiated on your infrastructure
Yes for some reason Cloud lost control of an agent that was still executing. Hmm
k
So if the flow has compute that is happening on a different machine (kubernetes pod or dask), then it is very hard to cancel those processes as well. For Docker/Kubernetes, you may still see work being run if the agent process dies
upvote 1