Andreas Nord
04/17/2024, 3:33 PMMarvin
04/17/2024, 3:34 PMsubmit
, you'll need to utilize a task runner that supports asynchronous execution like Dask. Here is an example from the Prefect documentation showing how a task is submitted for execution:
python
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner
@task
def hello():
return "Hello from Dask!"
@flow(task_runner=DaskTaskRunner())
def hello_dask_flow():
future = hello.submit()
return future.result()
result = hello_dask_flow()
print(result)
This example illustrates how to create a simple task and flow with a DaskTaskRunner, which will execute tasks asynchronously. When you use .submit()
to submit a task to a task runner, it creates a PrefectFuture
for accessing the state and result of the task. You can access the result using the result()
method on the future.
Please note that PrefectFuture
provides access to a computation that may be happening on a remote system, which is typical for distributed task runners like Dask.
For more detailed guidance, please refer to:
- Using a DaskTaskRunner in Prefect
- Prefect tasks documentation for further reference
Keep in mind that code examples should be tailored to your specific use case, infrastructure, and requirements.Andreas Nord
04/17/2024, 3:34 PMMarvin
04/17/2024, 3:35 PMConcurrentTaskRunner
to run tasks concurrently, which allows tasks to switch when blocking on I/O. This is beneficial when your tasks do a lot of waiting, such as for web requests or reading and writing from disks. When using ConcurrentTaskRunner
, tasks are submitted to a thread pool maintained by anyio
.
To use ConcurrentTaskRunner
in your flow, you can import it and assign it as an argument to your flow definition as shown below:
python
from prefect import task, flow
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task():
# task implementation here
pass
@flow(task_runner=ConcurrentTaskRunner())
def my_concurrent_flow():
future = my_task.submit()
# possibly gather more futures
result = future.result()
# process results
pass
my_concurrent_flow()
Remember to call .submit()
on the tasks when using a ConcurrentTaskRunner
to ensure they are handled appropriately by the runner.
For more detailed information on task runners, you can refer to:
- Prefect Concepts - Task Runners