Hey, I have a flow that launch multiple deployment...
# ask-community
p
Hey, I have a flow that launch multiple deployments, using
run_deployment()
. Is there a way to override the
timeout_seconds
param of
@flow
decorator, so that I can have a custom timeout for each run_deployment? P.S: The
timeout
param of run_deployment doesn't serve the purpose, as I don't want to wait for the flows, but just launch them, I want a mechanism to prevent them from running indefinetly. Thank you in advance
n
hi @prewarning - yep if you set
timeout_seconds
in the flow decorator, that will control the timeout for that flow whereas, as you mentioned,
timeout
for
run_deployment
is just how long to wait for the run you kick off (from the caller's perspective)
p
So there's no way to dynamically override
timeout_seconds
? Cause I'd like to have a variable for each run_deployment I call, not a fixed one on the decorator
n
if the flow you're talking about is the deployment entrypoint then this is correct > So there's no way to dynamically override
timeout_seconds
? however I'll point out a couple things • flows and tasks have a
@classmethod
called
with_options
where you can create a version of a flow/task like
new_task = old.with_options(timeout_seconds=42)
so that is a way to dynamically set config for child calls of your deployment entrypoint • you could also probably just use a
Variable
as the value (
.set
it from the parent if needed)
Copy code
In [1]: from prefect import flow

In [2]: from prefect.variables import Variable

In [3]: from time import sleep

In [4]: @flow(timeout_seconds=Variable.get("some_timeout", 2))
   ...: def f(): sleep(3)

In [5]: f()
11:41:32.832 | INFO    | prefect.engine - Created flow run 'encouraging-chamois' for flow 'f'
11:41:35.090 | ERROR   | Flow run 'encouraging-chamois' - Flow run exceeded timeout of 2.0 second(s)
11:41:35.503 | ERROR   | Flow run 'encouraging-chamois' - Finished in state TimedOut('Flow run exceeded timeout of 2.0 second(s)', type=FAILED)
šŸ™Œ 1
p
Thank you very much! I think I will then implement a custom logic to stop flows that runs for too long. I have created a function that retrieves the running flows for a deployment name: given that I have the time_limit and and I know the execution time to create a check condition, is there a way to force the stop of a running flow from code? The main idea is:
Copy code
for every running flow of example_deployment:
    if now() - start_time > timeout_limit:
        stop_the_run()
n
so you definitely could write some timing logic and then make API calls to force state changes if you find that some flows have exceeded their timeout, but if not going the variable route, I might recommend this instead
Copy code
# your deployment
@flow
def deployment_wrapper(timeout_seconds: int | float, **kwargs):
   your_previous_deployments_flow.with_options(timeout_seconds=timeout_seconds)(**kwargs)

# the caller
run_deployment("wrapperflow/wrapperdeploy", parameters=dict(timeout_seconds=42) | other_kwargs)
šŸ™Œ 1
p
This looks very nice, thank you very much!
n
catjam
p
@Nate I used your example and worked perfectly fine, now I've extended the wrapper to take an additional param "flow_fn" of type Flow, so that it can be used as a generic interface for multiple flows, without having to specify a wrapper for each one of them. When I want to launch manually a flow using the wrapper from the Prefect UI, how should I fill the form of the param? I tried using Flow and a custom Enum class:
Copy code
class StyleFlows(Flow, Enum):
    """Enum for the different flows in the style module."""

    TRAIN = train_flow
where
train_flow
is a flow. How do I specify this kind of type in form UI when launching a custom run?