Jack Goslin
12/06/2023, 5:45 PMMarvin
12/06/2023, 5:46 PMpython
from prefect import flow, task
from asyncio import gather
@task
def task1():
# perform some operation
return "result from task 1"
@task
def task2():
# perform some operation
return "result from task 2"
@flow
def sub_flow():
# async calling multiple tasks within the sub-flow
result1 = task1.submit()
result2 = task2.submit()
return gather(result1, result2)
@flow
def main_flow():
# Running sub-flows asynchronously
sub_flow_1_future = sub_flow.submit()
sub_flow_2_future = sub_flow.submit()
# Optionally, you can await their results together, if needed
results = gather(sub_flow_1_future, sub_flow_2_future)
return results
# Call main flow to run everything
main_flow()
In this example:
- We have two simple tasks task1
and task2
, which return some results.
- The sub_flow
is defined as a flow where task1
and task2
are called asynchronously using the submit
method on tasks. We use gather
to run them concurrently within the sub-flow.
- The main_flow
represents the primary flow, where multiple sub_flow
instances are launched asynchronously with the submit
method on the flow itself. For waiting on all results, we again use gather
.
This is a highly simplified example of how you might structure your flows and tasks to operate asynchronously, which can be especially useful when performing parallel data processing tasks. You can adapt this pattern to more complex scenarios as needed.Kevin Grismore
12/06/2023, 5:57 PMsubmit
method, but you can write an async def flow_func
and gather the results of multiple async subflow calls.
from prefect import flow
from anyio import sleep, run
from asyncio import gather
@flow
async def hello():
results = await gather(subflow1(), subflow2())
print(results)
print("both subflows finished")
@flow
async def subflow1():
await sleep(3)
print("hello from subflow 1")
return 1
@flow
async def subflow2():
await sleep(6)
print("hello from subflow 2")
return 2
if __name__ == "__main__":
run(hello)
12:55:40.642 | INFO | prefect.engine - Created flow run 'papaya-ladybug' for flow 'hello'
12:55:41.279 | INFO | Flow run 'papaya-ladybug' - Created subflow run 'proficient-alpaca' for flow 'subflow1'
12:55:41.315 | INFO | Flow run 'papaya-ladybug' - Created subflow run 'brown-boar' for flow 'subflow2'
hello from subflow 1
12:55:45.132 | INFO | Flow run 'proficient-alpaca' - Finished in state Completed()
hello from subflow 2
12:55:47.704 | INFO | Flow run 'brown-boar' - Finished in state Completed()
[1, 2]
both subflows finished
12:55:47.866 | INFO | Flow run 'papaya-ladybug' - Finished in state Completed('All states completed.')
Jack Goslin
12/06/2023, 6:07 PM@flow(log_prints=True, name='HQ - Locations')
async def locations_flow(creds, snowflake_session):
# these are both sync tasks but must be called sequentially
locations = get_locations(creds)
load_locations(locations, snowflake_session)
@flow(log_prints=True, name="HQ - Vehicles")
async def vehicles_flow(creds, snowflake_session):
# these are both sync tasks but must be called sequentially
vehicles = get_vehicles(creds)
load_vehicles(vehicles, snowflake_session)
@flow(log_prints=True, name='HQ Integration')
async def main():
# these two are not tasks, and they must be called sequentially
# so they can be passed to the 2 sub-flows
creds = get_private_keys()
s = snow.Session.builder.configs(
{
... (creds is used here)
}
).create()
await asyncio.gather(locations_flow(creds, s), vehicles_flow(creds, s))
Kevin Grismore
12/06/2023, 6:17 PMfrom prefect import flow, task
from anyio import run
from asyncio import gather
@flow
async def hello():
print("this should be first")
results = await gather(subflow1(), subflow2())
print(results)
print("both subflows finished")
@flow
async def subflow1():
print("hello from subflow 1")
howdy()
whatup()
return 1
@flow
async def subflow2():
print("hello from subflow 2")
whatup()
howdy()
return 2
@task
def whatup():
print("whatup")
@task
def howdy():
print("howdy")
if __name__ == "__main__":
run(hello)
Kevin Grismore
12/06/2023, 6:18 PMKevin Grismore
12/06/2023, 6:21 PMJack Goslin
12/06/2023, 6:24 PMcreds
object relies on getting an AWS block.Kevin Grismore
12/06/2023, 6:25 PMawait
your block loadingKevin Grismore
12/06/2023, 6:26 PMJack Goslin
12/06/2023, 6:28 PMKevin Grismore
12/06/2023, 6:30 PMKevin Grismore
12/06/2023, 6:31 PMJack Goslin
12/06/2023, 6:33 PMJack Goslin
12/06/2023, 6:34 PM@flow(log_prints=True, name='HQ Integration')
def main():
creds = get_private_keys()
s = snow.Session.builder.configs(
{
creds
}
).create()
locations = get_locations(creds)
load_locations(locations, s)
vehicles = get_vehicles(creds)
load_vehicles(vehicles, s)
reservations = get_reservations(creds, last_resv_date)
load_reservations(reservations, s)
Jack Goslin
12/06/2023, 6:36 PMJack Goslin
12/06/2023, 6:51 PMJack Goslin
12/06/2023, 6:51 PMsync
and I'm not awaiting anything in the sub-flows. I just assumed that each sub-flow and their tasks would run totally independently of any others. I'll keep playing with itJack Goslin
12/06/2023, 8:49 PMKevin Grismore
12/06/2023, 8:50 PM