<@ULVA73B9P> Is there a limit to how long a prefec...
# ask-marvin
s
@Marvin Is there a limit to how long a prefect flow in prefect cloud can run?
m
Based on what I found, it seems there is no specified time limit for how long a flow can run in Prefect Cloud, whether you're using version 2.x or 3.x. The runtime is mainly affected by concurrency limits and task structure, rather than explicit time restrictions. However, there are policies like Flow Run Retention Policies in Prefect Cloud which dictate how long flow run data is retained after execution, but this is unrelated to how long a flow can actively execute. For more detailed control over active flow executions, you might consider concurrency settings or contact Prefect directly to inquire about specific configurations relating to your organizational needs. You can explore more about these policies and concurrency settings in the following Prefect 2.x and 3.x documentation: 1. Prefect 2.x docs: Prefect Cloud Rates 2. Prefect 3.x docs: No specific link was provided, but you might find task concurrency details useful in the section on task-run limits. If you have any further questions or need specific details, feel free to ask!
s
@Marvin Can a subflow use the dask task runner?
m
Yes, a subflow in Prefect can indeed use the Dask task runner. You can organize your workflows such that different flows use different task runners, depending on the requirements. For instance, you might want to run certain tasks in parallel on a Dask cluster, which is a suitable use case for a subflow utilizing a
DaskTaskRunner
. Here's a basic example of how you might set this up: 1. Use a main flow that runs with a particular task runner, such as
SequentialTaskRunner
. 2. Within this main flow, define a subflow that runs using
DaskTaskRunner
for tasks that benefit from parallel execution on a Dask cluster. Example for Prefect 2.x:
Copy code
python
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from prefect_dask.task_runners import DaskTaskRunner

@task
def hello_local():
    print("Hello!")

@task
def hello_dask():
    print("Hello from Dask!")

@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
    hello_local.submit()
    dask_subflow()
    hello_local.submit()

@flow(task_runner=DaskTaskRunner())
def dask_subflow():
    hello_dask.submit()

if __name__ == "__main__":
    sequential_flow()
Learn more about Prefect 2.x task runners here. For Prefect 3.x, similar functionality applies, and you can find more about Dask integration here. If you have further questions or need any specific guidance on setting this up, feel free to ask!