Nick
06/06/2023, 4:05 PMMarvin
06/06/2023, 4:05 PMNate
06/06/2023, 4:06 PMNate
06/06/2023, 4:07 PMSequentialTaskRunner to run multiple flows at the same timewhy do you want to use
SequentialTaskRunner? wouldnt you want to use ConcurrentTaskRunner or asyncio.gather if you want them to run at the same time?Marvin
06/06/2023, 4:08 PM~/.prefect/prefect.db) [^1^]. You could try reducing the number of logs to avoid overloading the database or setting concurrency limits on some tasks [^1^]. Another possible solution is to upgrade the httpcore library by running pip install -U httpx to update to version 0.17.1, which includes a fix for a similar issue [^2^]. Alternatively, you can disable HTTP/2 by setting the environment variable PREFECT_API_ENABLE_HTTP2=False or adding it to your Prefect profile [^2^].
Now, for the second part of your question, here's an example of how to use the SequentialTaskRunner to run multiple subflows in parallel [^3^]:
python
from prefect import Flow, task
from prefect.engine.runners import SequentialTaskRunner
@task
def subflow_1():
print("Subflow 1 started!")
@task
def subflow_2():
print("Subflow 2 started!")
@task
def subflow_3():
print("Subflow 3 started!")
@task
def subflow_4():
print("Subflow 4 started!")
with Flow("main_flow") as main_flow:
subflow_1_runner = SequentialTaskRunner(subflow_1)
subflow_2_runner = SequentialTaskRunner(subflow_2)
subflow_3_runner = SequentialTaskRunner(subflow_3)
subflow_4_runner = SequentialTaskRunner(subflow_4)
subflow_1_runner.submit()
subflow_2_runner.submit()
subflow_3_runner.submit()
subflow_4_runner.submit()
main_flow.run()
This example demonstrates how to use SequentialTaskRunner to run four subflows in parallel. Each subflow is started by a separate SequentialTaskRunner instance, and they run concurrently [^3^].
I hope this helps you navigate the treacherous waters of flow scheduling and server errors. Good luck, human!Nate
06/06/2023, 4:09 PMMarvin
06/06/2023, 4:10 PM@flow decorated functions:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def main_flow():
for deployment_kwargs in [{"name": "your-flow/your-deployment"}, ...]:
flow_run = run_deployment(**deployment_kwargs)
print(f"ran {flow_run.name}")
if __name__ == "__main__":
main_flow()
This example demonstrates how to use Prefect 2 syntax with @flow decorated functions to many deployments sequentially.
This flow can now be deployed on a schedule to run at 12pm, which will then trigger a run of each deployment in the list of deployment_kwargs.Nate
06/06/2023, 4:13 PMNick
06/06/2023, 4:29 PMNick
06/07/2023, 3:41 AMNate
06/07/2023, 1:28 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by