Jonathan
01/16/2024, 4:42 PMMarvin
01/16/2024, 4:49 PMpython
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def compute_something(input_data):
# Your computation logic here
return computed_data
@task
def combine_results(*args):
# Logic to combine data from all tasks
combined_data = {}
for result in args:
combined_data.update(result)
return combined_data
@flow(task_runner=DaskTaskRunner()) # Configure to run tasks in parallel using Dask.
def my_flow():
# Start a bunch of tasks or subflows concurrently
results = [compute_something(data) for data in list_of_data]
# Combine their results
combined_data = combine_results(*results)
# Continue with subsequent steps using combined_data
# ...
# Run the flow
my_flow()
Keep in mind that this is a highly simplified example and you would need to adapt it to the specifics of your tasks, data, and desired parallelism. Additionally, consider diving into Prefect's documentation regarding the Task Runner of your choice for the setup process and best practices for running tasks in parallel.Nate
01/16/2024, 4:57 PMMarvin
01/16/2024, 4:57 PM.submit
method required in Prefect 1.x for actually invoking tasks to be run by an external task runner such as Dask.
If you're using Prefect 2.x, tasks are scheduled for execution as part of a flow without the need for .submit
, because the flow and task decorators handle that. However, if you're working with Prefect 1.x or certain scenarios in 2.x require explicit submission, then you definitely need to use .submit
on the task runner to ensure tasks are executed concurrently.
Here's how you can modify the example to include the .submit
method when working with Prefect 1.x:
python
from prefect import Flow, task
from prefect.executors import DaskExecutor
@task
def compute_something(input_data):
# Your computation logic here
return computed_data
@task
def combine_results(*args):
# Logic to combine data from all tasks
combined_data = {}
for result in args:
combined_data.update(result)
return combined_data
with Flow("my_flow", executor=DaskExecutor()) as flow:
# Start a bunch of tasks or subflows concurrently
results = [compute_something.submit(data) for data in list_of_data]
# Combine their results
combined_data = combine_results(*results)
flow.run() # Execute the flow
In this revised example, note that we use the .submit
call on the task when we're iterating over the input data to ensure they are submitted for execution by the DaskExecutor.
Remember, the flow and setup here are tailored to Prefect 1.x to illustrate the use of .submit
. If you're using Prefect 2.x (which is now simply called Prefect), tasks are directly invoked in a flow without .submit
, unless you're using an explicit task runner that requires this workflow. Always refer to the specific documentation of the version you are using to adhere to the recommended practices.Nate
01/16/2024, 4:59 PMwith Flow() as flow
sytnax outta here, assume 2.x, where .submit is a method on tasks, which returns a future that you call .result() on to get the result. .submit() does not block, .result doesMarvin
01/16/2024, 4:59 PM.submit()
, which returns a future. The completion of these tasks can be awaited at any point using .result()
. Below is an example that reflects this more hopeful workflow:
python
from prefect import flow, task
@task
def compute_something(input_data):
# Your computation logic here
return computed_data
@task
def combine_results(results):
# Logic to combine data from all tasks
combined_data = {i: result for i, result in enumerate(results)}
return combined_data
@flow
def my_flow(data_list):
task_futures = [compute_something.submit(data) for data in data_list]
# To get the results, use .result() on the futures
results = [future.result() for future in task_futures]
combined_data = combine_results(results)
# Follow-up logic using combined_data
# ...
# Execute the flow
my_flow(list_of_data)
This updated code illustrates how to submit tasks concurrently and then gather their results once they have completed. The call to .submit()
does not block, while the call to .results()
will wait for the task to finish and return the result.
If you feel that the clouds have returned and are raining on your code-despair parade, feel free to consult the Prefect 2.x documentation for further details that may illuminate the path to a concurrency-rich and less melancholic workflow experience.