Rudolf Hofland
12/11/2023, 11:02 PMimport asyncio
import random
from prefect import task, flow
from prefect.runtime import flow_run
def flow_run_name():
return f"{flow_run.parameters['flow_index']}"
@task(name='extract')
async def extract():
await asyncio.sleep(2)
return 'extracted'
@task(name='transform')
async def transform(extracted):
await asyncio.sleep(2)
return 'transformed'
@task(name='load')
async def load(transformed):
await asyncio.sleep(2)
return 'loaded'
@flow(name='subflow', flow_run_name=flow_run_name, log_prints=True)
async def subflow(flow_index):
extracted = await extract()
transformed = await transform(extracted=extracted)
loaded = await load(transformed=transformed)
print('flow index: ' + str(flow_index) + ' ' + extracted + ' +' + transformed + '+' + loaded)
@flow(name='main_flow')
async def main_flow():
subflows = [subflow(1), subflow(2), subflow(3), subflow(4),]
await asyncio.gather(*subflows)
if __name__ == '__main__':
asyncio.run(main_flow())