<@ULVA73B9P>: Do you have any example of concurren...
# marvin-in-the-wild
s
@Marvin: Do you have any example of concurrency for a DaskTaskRunner flow?
m
Certainly! Below is an example of a Prefect flow using
DaskTaskRunner
for concurrency. You can see how tasks are submitted and run concurrently:
Copy code
python
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def sleep(t: int):
    print(f"Going to sleep for {t} seconds...")
    time.sleep(t)
    print("...and I'm awake!")

@flow(task_runner=DaskTaskRunner)
def my_flow():
    """Flows run submitted tasks concurrently by default with DaskTaskRunner"""
    sleep(3)  # Block the flow for 3 seconds (not concurrently)

    future = sleep.submit(2)  # Submit a task run to the task runner, doesn't block the flow

    future.wait()  # Block the flow until the submitted task run is complete

    sleep.map([5] * 10)  # Submit 10 task runs at once, each sleeps for 5 seconds, don't block

    # Even though the flow is done, we won't exit until all the task runs are complete

if __name__ == "__main__":
    my_flow()
This example demonstrates how you can leverage
DaskTaskRunner
to run tasks concurrently. Tasks are submitted using
.submit()
and can be waited upon using
.wait()
. Using
.map()
, you can run multiple instances of a task in parallel. For more detailed approaches and configurations, you can always refer to the official documentation. Hopefully, this provides a miserable modicum of assistance. So tedious, isn’t it?
s
@Marvin: I meant prefect's concurrency and not time.sleep
m
For handling concurrency in your Prefect DaskTaskRunner flow, you'll want to leverage Prefect's task and flow orchestration capabilities. Below is an example that emphasizes using Dask for task concurrency without relying on
time.sleep
. Here's an example of how you can achieve concurrency using DaskTaskRunner:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def add(x, y):
    return x + y

@flow(task_runner=DaskTaskRunner)
def concurrent_flow():
    results = []
    for i in range(5):
        results.append(add.submit(i, i + 1))
    
    results = [result.result() for result in results]  # Gather all the results
    print(f"Results: {results}")

if __name__ == "__main__":
    concurrent_flow()
### Utilizing Prefect's Concurrency Management Prefect provides advanced concurrency management options through the
concurrency
context manager. Here's a synchronous example using global concurrency limits:
Copy code
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y

@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)

if __name__ == "__main__":
    my_flow()
And if you prefer an asynchronous approach:
Copy code
python
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency

@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y

@flow
async def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        await process_data.submit(x, y)

if __name__ == "__main__":
    asyncio.run(my_flow())
In these examples, the concurrency context manager ensures that only one task is interacting with the "database" at a time. For detailed information and further customizations, you can refer to the official documentation on task runners and global concurrency limits. Tedious as always, isn't it? Feel free to explore and adapt these examples to your needs!
s
@Marvin: Why do I keep getting this error? Code:
Copy code
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner

@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y


@flow(task_runner=DaskTaskRunner())
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)
7 def process_data(x, y): ----> 8 with concurrency("database", occupy=1): 9 return x + y AttributeError: enter
m
Your message was too long by 320 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
:
Why do I keep getting this error? Code:
Copy code
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner

@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y


@flow(task_runner=DaskTaskRunner())
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)
File c\Users\soham\anaconda3\envs\echodataflowmem\lib\site packages\prefect\engine.py293, in enter_flow_run_engine_from_flow_call*(flow, parameters, wait_for, return_type)* <file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:286|286> retval = from_async.wait_for_call_in_loop_thread( <file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:287|287> begin_run, <file///C/Users/soham/anaconda3/envs/echodataflowmem/lib/site-packages/prefect/engine.py:288|
The error
AttributeError: __enter__
occurs because the concurrency context manager from
prefect.concurrency.asyncio
is designed to be used in an asynchronous context using
async with
. Here’s how to fix your example: ### Corrected Example Since you're using
DaskTaskRunner
, and it's managing tasks in a potentially asynchronous environment, you'll want to ensure your concurrency context is used asynchronously.
Copy code
python
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner

@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y

@flow(task_runner=DaskTaskRunner())
async def my_flow():
    tasks = [process_data.submit(x, y) for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]]
    results = await asyncio.gather(*[task.result() for task in tasks])
    print(f"Results: {results}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(my_flow())
### Key Changes: 1. Change
process_data
to an
async
function
: This allows it to use
async with
. 2. Use
async with concurrency
inside
process_data
: Ensures the concurrency context is used correctly. 3. Change
my_flow
to an
async
function and use
asyncio.run()
to execute it
: This adheres to the asynchronous context required by
async with
. 4. Use
asyncio.gather
for concurrent task execution and results retrieval
: Ensures all tasks are awaited correctly. ### Additional Resources - Global Concurrency Limits Documentation: This provides more information on managing concurrency in both synchronous and asynchronous contexts. Tedious, as always, but I hope this helps with your concurrency issues.