Denver H
09/04/2024, 3:33 PMMarvin
09/04/2024, 3:33 PMDenver H
09/04/2024, 3:33 PMNate
09/04/2024, 3:34 PMMarvin
09/04/2024, 3:34 PM.submit
method to run tasks (or sub-flows) concurrently.
2. Use the wait_for
method to ensure the main flow waits until all concurrent tasks (or sub-flows) finish.
Here's a very simple conceptual example, using your requirements:
python
from prefect import flow, task
@task
def sub_flow_call(sub_flow_param):
# What you want to do in each sub-flow
pass
@flow
def main_flow():
sub_flow_1 = sub_flow_call.submit("param_for_first_call")
sub_flow_2 = sub_flow_call.submit("param_for_second_call")
# Wait for both sub_flows to complete
sub_flow_1_result = sub_flow_1.result()
sub_flow_2_result = sub_flow_2.result()
# Continue with rest of the flow only after both sub_flows are done
continue_with_main_flow()
def continue_with_main_flow():
# The remaining tasks in your main flow
pass
if __name__ == "__main__":
main_flow()
This way, both instances of sub_flow
run concurrently, and the flow only proceeds once both calls are complete.
Feel free to peruse Prefect 3.x API and Concurrency Limits for more detailed info.
Good luck taming those flows.Denver H
09/04/2024, 3:35 PMMarvin
09/04/2024, 3:35 PM.submit
for flows directly. Instead, you need to orchestrate the sub-flows using tasks that can run asynchronously.
Here's a more accurate approach using Prefect 3.x capabilities:
1. You should call each sub-flow within a task.
2. Use anyio
or Prefect's built-in async capabilities to run them concurrently.
Here’s a better-tailored example:
python
from prefect import flow, task
import anyio
@task
async def call_subflow(name):
result = await subflow.run(parameters={"name": name})
return result
@flow
async def subflow(name):
# Replace this with the actual subflow work
print(f"Subflow {name} is running")
return f"Result of {name}"
@flow
async def main_flow():
# Launch sub-flows concurrently
results = await anyio.gather(
call_subflow("First call"),
call_subflow("Second call")
)
# Process your results after all sub-flows complete
for result in results:
print(result)
continue_with_main_flow() # Rest of your main flow tasks
def continue_with_main_flow():
print("Continuing with the main flow after sub-flows are complete")
if __name__ == "__main__":
import asyncio
asyncio.run(main_flow())
This example uses anyio.gather
to run the call_subflow
tasks concurrently and waits for all of them to finish before proceeding.
Enjoy navigating the labyrinth, here's the link to the Prefect 3.x docs where you can delve even deeper into more delightful nuances. Good luck!Nate
09/04/2024, 3:36 PMNate
09/04/2024, 3:40 PMIn [1]: from prefect import flow
In [2]: import asyncio
In [3]: @flow
...: async def b(): ...
In [4]: @flow(log_prints=True)
...: async def a():
...: await asyncio.gather(b(), b())
...: print('stuff after')
...:
In [5]: await a()
10:40:07.484 | INFO | prefect.engine - Created flow run 'capable-yak' for flow 'a'
10:40:07.487 | INFO | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/3ee7bd12-7dbf-43e0-97f7-025380da5cc3>
10:40:07.841 | INFO | Flow run 'capable-yak' - Created subflow run 'pretty-heron' for flow 'b'
10:40:07.844 | INFO | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/02cc238d-dd3c-4e23-98ca-e1e46402d768>
10:40:07.916 | INFO | Flow run 'pretty-heron' - Finished in state Completed()
10:40:07.957 | INFO | Flow run 'capable-yak' - Created subflow run 'hungry-capybara' for flow 'b'
10:40:07.958 | INFO | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/68879220-2e4c-47d1-97a3-4138c438731e>
10:40:08.017 | INFO | Flow run 'hungry-capybara' - Finished in state Completed()
10:40:08.017 | INFO | Flow run 'capable-yak' - stuff after
10:40:08.042 | INFO | Flow run 'capable-yak' - Finished in state Completed()
Denver H
09/04/2024, 3:41 PMDenver H
09/04/2024, 3:42 PMNate
09/04/2024, 3:45 PMIn [1]: from prefect import flow, task
In [2]: @task
...: def b(): ...
In [3]: @flow(log_prints=True)
...: def a():
...: futures = []
...: futures.append(b.submit())
...: futures.append(b.submit())
...: print([future.result() for future in futures])
...: print('stuff after')
...:
In [4]: a()
10:44:40.270 | INFO | prefect.engine - Created flow run 'exuberant-quail' for flow 'a'
10:44:40.272 | INFO | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/5f12fb98-98af-48f7-b2b0-272a751134dc>
10:44:40.309 | INFO | Flow run 'exuberant-quail' - Submitting task b to thread pool executor...
10:44:40.311 | INFO | Flow run 'exuberant-quail' - Submitting task b to thread pool executor...
10:44:40.332 | INFO | Task run 'b-9e0' - Created task run 'b-9e0' for task 'b'
10:44:40.333 | INFO | Task run 'b-d6e' - Created task run 'b-d6e' for task 'b'
10:44:40.368 | INFO | Task run 'b-9e0' - Finished in state Completed()
10:44:40.373 | INFO | Task run 'b-d6e' - Finished in state Completed()
10:44:40.384 | INFO | Flow run 'exuberant-quail' - [None, None]
10:44:40.384 | INFO | Flow run 'exuberant-quail' - stuff after
10:44:40.412 | INFO | Flow run 'exuberant-quail' - Finished in state Completed()
Denver H
09/04/2024, 3:51 PMDenver H
09/04/2024, 3:52 PMNate
09/04/2024, 3:55 PMonly 1 of the list of flows completes and the rest are set to running but nothing is actually happening in the containerif there's a discrepancy between how your flow runs locally / elsewhere it might be a version or config issue
Denver H
09/06/2024, 10:35 AMcollect_js_
for entry in url_set:
futures.append(collect_js_.submit(entry))
wait(futures)
This results in only the first entry from the set running correctly along with its nested tasks [this is also clear from the visual showing the nested tasks and their state], with the rest seeming to start at the same time as per the visual in the cloud dashboard however they do not actually run nor does it show their nested tasks.
Running the same thing with task_runner=DaskTaskRunner() gives a AssertionError: daemonic processes are not allowed to have children
which may relate to how prefect processes work in V3?
Any ideas?Nate
09/06/2024, 2:49 PMDenver H
09/06/2024, 2:50 PM