Mark Reyes
10/27/2024, 4:51 PMMarvin
10/27/2024, 4:51 PMpython
from prefect import flow, task, concurrency
# Define a task with a tag for concurrency limiting
@task(tags=["my-task-tag"])
def my_task(i):
    return i
@flow
def my_flow():
    for i in range(10):
        my_task.submit(i)
# Set the concurrency limit at runtime
concurrency.create_concurrency_limit("my-task-tag", 4)
if __name__ == "__main__":
    my_flow()concurrency.create_concurrency_limit("my-task-tag", 4)"my-task-tag"Mark Reyes
10/27/2024, 4:52 PMMarvin
10/27/2024, 4:52 PMtask.submit()python
from prefect import flow, task
from prefect.concurrency.sync import rate_limit
@task
def my_task(i):
    # Task logic here
    print(f"Processing task {i}")
@flow
def my_flow():
    for i in range(10):
        # Apply rate limiting for task submission
        rate_limit("task_submission_limiter", 4)  # Limit to 4 concurrent submissions
        my_task.submit(i)
if __name__ == "__main__":
    my_flow()rate_limitMark Reyes
10/27/2024, 7:58 PMMarvin
10/27/2024, 7:58 PMasyncioasynciopython
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import rate_limit
# Define a task that processes each S3 object
@task
async def process_s3_object(s3_object):
    # Simulate task processing
    print(f"Processing {s3_object}")
    await asyncio.sleep(1)  # Simulate work by sleeping
# Define the main flow that submits tasks
@flow
async def my_s3_flow(s3_objects):
    semaphore = asyncio.Semaphore(4)  # Limit to 4 concurrent tasks
    async def submit_task(s3_obj):
        async with semaphore:
            await process_s3_object(s3_obj)
    tasks = [submit_task(obj) for obj in s3_objects]
    await asyncio.gather(*tasks)
if __name__ == "__main__":
    # Example list of S3 object strings
    s3_objects = [f"<s3://bucket/object_{i}>" for i in range(10)]
    asyncio.run(my_s3_flow(s3_objects))asyncioasyncio.gatherMark Reyes
10/27/2024, 7:59 PMMarvin
10/27/2024, 7:59 PMpython
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import rate_limit
from prefect.task_runners import DaskTaskRunner
# Define a task that processes each S3 object
@task
async def process_s3_object(s3_object):
    # Simulate task processing
    print(f"Processing {s3_object}")
    await asyncio.sleep(1)  # Simulate work by sleeping
# Define the main flow that submits tasks
@flow(task_runner=DaskTaskRunner())
async def my_s3_flow(s3_objects):
    semaphore = asyncio.Semaphore(4)  # Limit to 4 concurrent tasks
    async def submit_task(s3_obj):
        async with semaphore:
            await process_s3_object(s3_obj)
    tasks = [submit_task(obj) for obj in s3_objects]
    await asyncio.gather(*tasks)
if __name__ == "__main__":
    # Example list of S3 object strings
    s3_objects = [f"<s3://bucket/object_{i}>" for i in range(10)]
    asyncio.run(my_s3_flow(s3_objects))task_runner@flowDaskTaskRunnerasyncioMark Reyes
10/28/2024, 4:53 AMMarvin
10/28/2024, 4:53 AMDaskTaskRunnercluster_kwargsDaskTaskRunnerpython
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_task(data):
    print(f"Processing {data}")
    # Simulate some work
    return data * 2
@flow(task_runner=DaskTaskRunner(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 1})
)
def my_flow(data):
    results = my_task.map(data)
    return results
if __name__ == "__main__":
    # Pass your data and runtime configurations here
    data = [1, 2, 3, 4, 5]
    my_flow(data)n_workersthreads_per_workern_workersthreads_per_workerDaskTaskRunnerMark Reyes
10/28/2024, 4:56 AMMarvin
10/28/2024, 4:56 AMDaskTaskRunnercluster_kwargsn_workerscluster_kwargsDaskTaskRunnerpython
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_task(data):
    print(f"Processing {data}")
    return data * 2  # Simulate some work
@flow
def my_flow(data, num_workers):
    # Create the DaskTaskRunner with dynamic worker count
    task_runner = DaskTaskRunner(cluster_kwargs={"n_workers": num_workers, "threads_per_worker": 1})
    
    # Set the task runner for the flow
    with task_runner:
        results = my_task.map(data)
        return results
if __name__ == "__main__":
    # Example data
    data = [1, 2, 3, 4, 5]
    # User-defined number of workers
    num_workers = 4
    my_flow(data, num_workers)n_workersnum_workersn_workerscluster_kwargsMark Reyes
10/28/2024, 5:15 AMMarvin
10/28/2024, 5:15 AMadapt_kwargsDaskTaskRunneradapt_kwargsadapt_kwargspython
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def my_task(data):
    print(f"Processing {data}")
    return data ** 2  # Simulate some work
@flow
def my_flow(data):
    # Setup a DaskTaskRunner with adaptive scaling
    task_runner = DaskTaskRunner(
        cluster_kwargs={"n_workers": 1},  # Can set an initial number of workers
        adapt_kwargs={"minimum": 1, "maximum": 10}  # Adaptive scaling range
    )
    
    with task_runner:
        results = my_task.map(data)
        return results
if __name__ == "__main__":
    # Example data
    data = list(range(10))
    my_flow(data)minimummaximumadapt_kwargs