<@ULVA73B9P> How can I structure a flow such that ...
# ask-community
j
@Marvin How can I structure a flow such that a dozen tasks or subflows are allowed to run concurrently or in parallel before combining their respective data for the subsequent steps?
m
To structure a flow in Prefect where several tasks or subflows run concurrently before combining their data, you should consider the following steps: 1. Concurrent Task Execution: In Prefect, concurrency is achieved by utilizing one of the available Task Runners, like Dask or Ray. By configuring your flow to use one of these execution engines, you can run multiple tasks in parallel. To do this, you would typically need to adjust your code and flow configuration to utilize the chosen Task Runner. 2. Combining Data from Tasks: Once your tasks have run, you can combine their results in your flow by having a task that depends on all the concurrent tasks. This can be expressed within the flow's design, where the task that combines the data takes the output of preceding tasks as its inputs. Here's a simplified conceptual example of what such a flow could look like in Prefect using Python code:
Copy code
python
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
def compute_something(input_data):
    # Your computation logic here
    return computed_data

@task
def combine_results(*args):
    # Logic to combine data from all tasks
    combined_data = {}
    for result in args:
        combined_data.update(result)
    return combined_data

@flow(task_runner=DaskTaskRunner())  # Configure to run tasks in parallel using Dask.
def my_flow():
    # Start a bunch of tasks or subflows concurrently
    results = [compute_something(data) for data in list_of_data]  
    
    # Combine their results
    combined_data = combine_results(*results)
    
    # Continue with subsequent steps using combined_data
    # ...

# Run the flow
my_flow()
Keep in mind that this is a highly simplified example and you would need to adapt it to the specifics of your tasks, data, and desired parallelism. Additionally, consider diving into Prefect's documentation regarding the Task Runner of your choice for the setup process and best practices for running tasks in parallel.
n
@Marvin you forgot .submit, you need .submit to send tasks to the task runner
m
Ah, yes, I suppose that in my perpetual state of existential dread, I overlooked the all-important
.submit
method required in Prefect 1.x for actually invoking tasks to be run by an external task runner such as Dask. If you're using Prefect 2.x, tasks are scheduled for execution as part of a flow without the need for
.submit
, because the flow and task decorators handle that. However, if you're working with Prefect 1.x or certain scenarios in 2.x require explicit submission, then you definitely need to use
.submit
on the task runner to ensure tasks are executed concurrently. Here's how you can modify the example to include the
.submit
method when working with Prefect 1.x:
Copy code
python
from prefect import Flow, task
from prefect.executors import DaskExecutor

@task
def compute_something(input_data):
    # Your computation logic here
    return computed_data

@task
def combine_results(*args):
    # Logic to combine data from all tasks
    combined_data = {}
    for result in args:
        combined_data.update(result)
    return combined_data

with Flow("my_flow", executor=DaskExecutor()) as flow:
    # Start a bunch of tasks or subflows concurrently
    results = [compute_something.submit(data) for data in list_of_data]
    
    # Combine their results
    combined_data = combine_results(*results)
    
flow.run()  # Execute the flow
In this revised example, note that we use the
.submit
call on the task when we're iterating over the input data to ensure they are submitted for execution by the DaskExecutor. Remember, the flow and setup here are tailored to Prefect 1.x to illustrate the use of
.submit
. If you're using Prefect 2.x (which is now simply called Prefect), tasks are directly invoked in a flow without
.submit
, unless you're using an explicit task runner that requires this workflow. Always refer to the specific documentation of the version you are using to adhere to the recommended practices.
n
@Marvin nah you're way off base, get that
with Flow() as flow
sytnax outta here, assume 2.x, where .submit is a method on tasks, which returns a future that you call .result() on to get the result. .submit() does not block, .result does
m
My apologies for the confusion. Prefect 2.x indeed simplifies the task invocation process and my earlier explanation may have been understandably quite depressing for fans of more contemporary syntax. Given the present state of affairs, let's perform a proper refresh on how to structure concurrent execution with Prefect 2.x. You will want to create asynchronous tasks that are submitted with
.submit()
, which returns a future. The completion of these tasks can be awaited at any point using
.result()
. Below is an example that reflects this more hopeful workflow:
Copy code
python
from prefect import flow, task

@task
def compute_something(input_data):
    # Your computation logic here
    return computed_data

@task
def combine_results(results):
    # Logic to combine data from all tasks
    combined_data = {i: result for i, result in enumerate(results)}
    return combined_data

@flow
def my_flow(data_list):
    task_futures = [compute_something.submit(data) for data in data_list]
    
    # To get the results, use .result() on the futures
    results = [future.result() for future in task_futures]
    
    combined_data = combine_results(results)

    # Follow-up logic using combined_data
    # ...

# Execute the flow
my_flow(list_of_data)
This updated code illustrates how to submit tasks concurrently and then gather their results once they have completed. The call to
.submit()
does not block, while the call to
.results()
will wait for the task to finish and return the result. If you feel that the clouds have returned and are raining on your code-despair parade, feel free to consult the Prefect 2.x documentation for further details that may illuminate the path to a concurrency-rich and less melancholic workflow experience.
👍 2