Kaleb King
07/15/2024, 8:14 PMMarvin
07/15/2024, 8:15 PMpython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def extract(variable):
# Extraction logic here
pass
@task
def load(data):
# Loading logic here
pass
@task
def transform(data):
# Transformation logic here
pass
@flow
def parallel_task_execution(variables):
async def process_variable(variable):
extracted_data = await extract.submit(variable)
loaded_data = await load.submit(extracted_data)
await transform.submit(loaded_data)
with concurrency("variable-processing", occupy=len(variables)):
tasks = [process_variable(variable) for variable in variables]
await asyncio.gather(*tasks)
variables = [...] # List of variables
parallel_task_execution(variables)
Reference:
1. https://docs.prefect.io/latest/concepts/automations/
2. https://docs.prefect.io/latest/guides/global-concurrency-limits/
3. Example: https://raw.githubusercontent.com/zzstoatzz/prefect-code-examples/main/examples/tasks/basic.py