Soham
05/30/2024, 6:53 AMMarvin
05/30/2024, 6:53 AMDaskTaskRunnerpython
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def sleep(t: int):
    print(f"Going to sleep for {t} seconds...")
    time.sleep(t)
    print("...and I'm awake!")
@flow(task_runner=DaskTaskRunner)
def my_flow():
    """Flows run submitted tasks concurrently by default with DaskTaskRunner"""
    sleep(3)  # Block the flow for 3 seconds (not concurrently)
    future = sleep.submit(2)  # Submit a task run to the task runner, doesn't block the flow
    future.wait()  # Block the flow until the submitted task run is complete
    sleep.map([5] * 10)  # Submit 10 task runs at once, each sleeps for 5 seconds, don't block
    # Even though the flow is done, we won't exit until all the task runs are complete
if __name__ == "__main__":
    my_flow()DaskTaskRunner.submit().wait().map()Soham
05/30/2024, 6:54 AMMarvin
05/30/2024, 6:54 AMtime.sleeppython
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def add(x, y):
    return x + y
@flow(task_runner=DaskTaskRunner)
def concurrent_flow():
    results = []
    for i in range(5):
        results.append(add.submit(i, i + 1))
    
    results = [result.result() for result in results]  # Gather all the results
    print(f"Results: {results}")
if __name__ == "__main__":
    concurrent_flow()concurrencypython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y
@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)
if __name__ == "__main__":
    my_flow()python
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y
@flow
async def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        await process_data.submit(x, y)
if __name__ == "__main__":
    asyncio.run(my_flow())Soham
05/30/2024, 7:28 AMfrom prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y
@flow(task_runner=DaskTaskRunner())
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)Marvin
05/30/2024, 7:28 AM:Why do I keep getting this error? Code:
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y
@flow(task_runner=DaskTaskRunner())
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)Marvin
05/30/2024, 7:29 AMAttributeError: __enter__prefect.concurrency.asyncioasync withDaskTaskRunnerpython
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect_dask import DaskTaskRunner
@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y
@flow(task_runner=DaskTaskRunner())
async def my_flow():
    tasks = [process_data.submit(x, y) for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]]
    results = await asyncio.gather(*[task.result() for task in tasks])
    print(f"Results: {results}")
if __name__ == "__main__":
    import asyncio
    asyncio.run(my_flow())process_dataasyncasync withasync with concurrencyprocess_datamy_flowasyncasyncio.run()async withasyncio.gather