Soham
05/30/2024, 6:53 AMMarvin
05/30/2024, 6:53 AMDaskTaskRunner
for concurrency. You can see how tasks are submitted and run concurrently:
python
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def sleep(t: int):
print(f"Going to sleep for {t} seconds...")
time.sleep(t)
print("...and I'm awake!")
@flow(task_runner=DaskTaskRunner)
def my_flow():
"""Flows run submitted tasks concurrently by default with DaskTaskRunner"""
sleep(3) # Block the flow for 3 seconds (not concurrently)
future = sleep.submit(2) # Submit a task run to the task runner, doesn't block the flow
future.wait() # Block the flow until the submitted task run is complete
sleep.map([5] * 10) # Submit 10 task runs at once, each sleeps for 5 seconds, don't block
# Even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
my_flow()
This example demonstrates how you can leverage DaskTaskRunner
to run tasks concurrently. Tasks are submitted using .submit()
and can be waited upon using .wait()
. Using .map()
, you can run multiple instances of a task in parallel.
For more detailed approaches and configurations, you can always refer to the official documentation.
Hopefully, this provides a miserable modicum of assistance. So tedious, isn’t it?Soham
05/30/2024, 6:54 AMMarvin
05/30/2024, 6:54 AMtime.sleep
.
Here's an example of how you can achieve concurrency using DaskTaskRunner:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def add(x, y):
return x + y
@flow(task_runner=DaskTaskRunner)
def concurrent_flow():
results = []
for i in range(5):
results.append(add.submit(i, i + 1))
results = [result.result() for result in results] # Gather all the results
print(f"Results: {results}")
if __name__ == "__main__":
concurrent_flow()
### Utilizing Prefect's Concurrency Management
Prefect provides advanced concurrency management options through the concurrency
context manager. Here's a synchronous example using global concurrency limits:
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(x, y):
with concurrency("database", occupy=1):
return x + y
@flow
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
process_data.submit(x, y)
if __name__ == "__main__":
my_flow()
And if you prefer an asynchronous approach:
python
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def process_data(x, y):
async with concurrency("database", occupy=1):
return x + y
@flow
async def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
await process_data.submit(x, y)
if __name__ == "__main__":
asyncio.run(my_flow())
In these examples, the concurrency context manager ensures that only one task is interacting with the "database" at a time.
For detailed information and further customizations, you can refer to the official documentation on task runners and global concurrency limits. Tedious as always, isn't it?
Feel free to explore and adapt these examples to your needs!Soham
05/30/2024, 7:28 AMfrom prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
def process_data(x, y):
with concurrency("database", occupy=1):
return x + y
@flow(task_runner=DaskTaskRunner())
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
process_data.submit(x, y)
7 def process_data(x, y):
----> 8 with concurrency("database", occupy=1):
9 return x + y
AttributeError: enterMarvin
05/30/2024, 7:28 AM:Why do I keep getting this error? Code:
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
def process_data(x, y):
with concurrency("database", occupy=1):
return x + y
@flow(task_runner=DaskTaskRunner())
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
process_data.submit(x, y)
File c\Users\soham\anaconda3\envs\echodataflowmem\lib\site packages\prefect\engine.py293, in enter_flow_run_engine_from_flow_call*(flow, parameters, wait_for, return_type)*
<file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:286|286> retval = from_async.wait_for_call_in_loop_thread(
<file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:287|287> begin_run,
<file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:288|Marvin
05/30/2024, 7:29 AMAttributeError: __enter__
occurs because the concurrency context manager from prefect.concurrency.asyncio
is designed to be used in an asynchronous context using async with
. Here’s how to fix your example:
### Corrected Example
Since you're using DaskTaskRunner
, and it's managing tasks in a potentially asynchronous environment, you'll want to ensure your concurrency context is used asynchronously.
python
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
async def process_data(x, y):
async with concurrency("database", occupy=1):
return x + y
@flow(task_runner=DaskTaskRunner())
async def my_flow():
tasks = [process_data.submit(x, y) for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]]
results = await asyncio.gather(*[task.result() for task in tasks])
print(f"Results: {results}")
if __name__ == "__main__":
import asyncio
asyncio.run(my_flow())
### Key Changes:
1. Change process_data
to an async
function: This allows it to use async with
.
2. Use async with concurrency
inside process_data
: Ensures the concurrency context is used correctly.
3. Change my_flow
to an async
function and use asyncio.run()
to execute it: This adheres to the asynchronous context required by async with
.
4. Use asyncio.gather
for concurrent task execution and results retrieval: Ensures all tasks are awaited correctly.
### Additional Resources
- Global Concurrency Limits Documentation: This provides more information on managing concurrency in both synchronous and asynchronous contexts.
Tedious, as always, but I hope this helps with your concurrency issues.