If I `time.sleep(x)` right before `with Flow():` ,...
# prefect-community
s
If I
time.sleep(x)
right before
with Flow():
, will the run respect that sleep before spinning up the cluster, or does a run work differently (e.g. only runs code inside the flow's task graph)?
n
Hi @Sébastien - are you running on Server/Cloud or Core-only? the
with Flow()
block only defines the flow, it doesn't run it. To run the flow, you would either call
flow.run()
(to run locally with Core-only) or make a call to the API `create_flow_run`(using either GraphQL or the UI).
s
Cloud with LocalRun and DaskExecutor via SSHCluster
Scheduled with CronSchedule, or by manually triggering a run on the Cloud
I'm trying to figure out if I can just
time.sleep(x)
before the flow's context manager, or if I need to update the Schedule to include an
adjustments=
for those seconds of sleep it would've had.
n
You'll need to go with the latter @Sébastien - however depending on your objective it might be easier to include some sort of sleep task at the start of your flow that calls
time.sleep
, since that'll probably produce more reliable results.
s
That won't work, since the
@task
would run inside the flow, right? And I need the sleep to happen before it constructs the flow.
n
I'm curious what the use case is here? Perhaps adjustments is the way to go if you're just looking to offset the schedule. Unfortunately that won't work for ad-hoc runs since it's applied only to the clock.
s
I'm using SSHCluster for ephemeral dask clusters without investing time right now to set up more robust infrastructure. Since SSHCluster uses idle_timeout/death_timeout to determine shutdown, it may happen that two separate flows start one right after the other. When that happens, the timeout may not have passed and the SSHCluster will fail to start because the previous cluster is still running.
The offset on the clock is a simple way to avoid adding complexity and just keep using SSHCluster without risking overlap.
(At least as long as only a single Flow can run at any time - which is the case for me since I'm on the free plan)
n
Hm got it - I don't have a great answer for you there, but let me check with the team to see if they have any ideas.
s
I think the Schedule adjustment is a good way to do it — there's only the matter of it not working for manually triggered runs
n
Yeah, you'll need to make sure your cluster has spun down before doing that but otherwise that's probably a fine interim approach
s
Just realized that won't work — since it may happen that a previous Flow wasn't finished when the Schedule with the adjustment queues. In that case, both runs will still execute one directly after the other.
And I don't know of a way for Prefect to ensure that, inside of a Flow, the cluster has spun down — or to wait a couple of seconds.
n
That depends, if the schedule is fairly aggressive you're correct. It sounds like you're expecting some amount of run collision with the schedule?
s
Yes, I use Flows very freely, to keep code very light and easy to maintain
So during the whole day, there are several back-end tasks that need to do work, with overlapping schedules. At most there'll be a minute or so of delay, and that's perfectly OK (I'm working on timelines of 10h+), but that introduces the issue.
And yes — I'm looking into setting up Kubernetes with Dask Gateway, but that's a lot of overhead to deal with at once for something that could be fixed with a
time.sleep
. Long-term it's the more robust solution, but there should be a middle option.
n
Got it @Sébastien - if you've got a concrete proposal for how that could work, that'd be a great issue or even PR! I'm not sure I've seen this particular edge case before since a lot of the burden of this falls on the infrastructure management rather than the pipeline.
s
@nicholas I don't think I know enough about Prefect/Flows to know why
time.sleep
wouldn't work right before the Flow (does it pickle only once during registration?), so I'm not sure I can be of much help.
n
You're correct that it's pickled only once (at build/registration time) - so the
time.sleep
will only run at that point and not before execution. You might try extending a resource manager and using the sleep there? Otherwise I'd suggest using a local dask executor to avoid hitting the timeouts on your current platform.
s
@nicholas Maybe a new
adjustments=
entry to say "at least X time after the last run" (for all flows vs current flow only)? The schedule would stay the same, and the adjustment could adjust itself based on the last run.
n
I can imagine that being tricky with a flow on a schedule that's equal to or less than the time it takes to run - it could really easily lead to a large backlog of runs. However, you're welcome to open a GitHub discussion or propose that in a ticket!