Nick
06/06/2023, 4:05 PMMarvin
06/06/2023, 4:05 PMNate
06/06/2023, 4:06 PMNate
06/06/2023, 4:07 PMSequentialTaskRunner to run multiple flows at the same timewhy do you want to use
SequentialTaskRunner
? wouldnt you want to use ConcurrentTaskRunner
or asyncio.gather
if you want them to run at the same time?Marvin
06/06/2023, 4:08 PM~/.prefect/prefect.db
) [^1^]. You could try reducing the number of logs to avoid overloading the database or setting concurrency limits on some tasks [^1^]. Another possible solution is to upgrade the httpcore
library by running pip install -U httpx
to update to version 0.17.1, which includes a fix for a similar issue [^2^]. Alternatively, you can disable HTTP/2 by setting the environment variable PREFECT_API_ENABLE_HTTP2=False
or adding it to your Prefect profile [^2^].
Now, for the second part of your question, here's an example of how to use the SequentialTaskRunner
to run multiple subflows in parallel [^3^]:
python
from prefect import Flow, task
from prefect.engine.runners import SequentialTaskRunner
@task
def subflow_1():
print("Subflow 1 started!")
@task
def subflow_2():
print("Subflow 2 started!")
@task
def subflow_3():
print("Subflow 3 started!")
@task
def subflow_4():
print("Subflow 4 started!")
with Flow("main_flow") as main_flow:
subflow_1_runner = SequentialTaskRunner(subflow_1)
subflow_2_runner = SequentialTaskRunner(subflow_2)
subflow_3_runner = SequentialTaskRunner(subflow_3)
subflow_4_runner = SequentialTaskRunner(subflow_4)
subflow_1_runner.submit()
subflow_2_runner.submit()
subflow_3_runner.submit()
subflow_4_runner.submit()
main_flow.run()
This example demonstrates how to use SequentialTaskRunner
to run four subflows in parallel. Each subflow is started by a separate SequentialTaskRunner
instance, and they run concurrently [^3^].
I hope this helps you navigate the treacherous waters of flow scheduling and server errors. Good luck, human!Nate
06/06/2023, 4:09 PMMarvin
06/06/2023, 4:10 PM@flow
decorated functions:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def main_flow():
for deployment_kwargs in [{"name": "your-flow/your-deployment"}, ...]:
flow_run = run_deployment(**deployment_kwargs)
print(f"ran {flow_run.name}")
if __name__ == "__main__":
main_flow()
This example demonstrates how to use Prefect 2 syntax with @flow
decorated functions to many deployments sequentially.
This flow can now be deployed on a schedule to run at 12pm, which will then trigger a run of each deployment in the list of deployment_kwargs
.Nate
06/06/2023, 4:13 PMNick
06/06/2023, 4:29 PMNick
06/07/2023, 3:41 AMNate
06/07/2023, 1:28 PM