<@ULVA73B9P> I have a deployment that I'm passing ...
# ask-marvin
k
@Marvin I have a deployment that I'm passing variables to. Each variable executes a series of tasks (extract, load, and transform). They're currently executing sequentially, but I'd like them to execute simultaneously per variable.
m
To execute your tasks simultaneously per variable, you can use Prefect's concurrency utilities. Here's how you can structure your flow to achieve parallel execution:
Copy code
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def extract(variable):
    # Extraction logic here
    pass

@task
def load(data):
    # Loading logic here
    pass

@task
def transform(data):
    # Transformation logic here
    pass

@flow
def parallel_task_execution(variables):
    async def process_variable(variable):
        extracted_data = await extract.submit(variable)
        loaded_data = await load.submit(extracted_data)
        await transform.submit(loaded_data)
    
    with concurrency("variable-processing", occupy=len(variables)):
        tasks = [process_variable(variable) for variable in variables]
        await asyncio.gather(*tasks)

variables = [...]  # List of variables
parallel_task_execution(variables)
Reference: 1. https://docs.prefect.io/latest/concepts/automations/ 2. https://docs.prefect.io/latest/guides/global-concurrency-limits/ 3. Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.py