Scott Walsh
11/17/2022, 12:15 AMimport asyncio
from prefect.client import get_client
from prefect import flow, task
@task
async def child_task1():
async with get_client() as client:
deployment = await client.read_deployment_by_name(name='sub_flow1/sub_flow1')
response = await client.create_flow_run_from_deployment(deployment.id)
@task
async def child_task2():
async with get_client() as client:
deployment = await client.read_deployment_by_name(name='sub_flow2/sub_flow2')
response = await client.create_flow_run_from_deployment(deployment.id)
@flow
async def parent_flow():
await child_task1.submit()
await child_task2.submit()
if __name__ == "__main__":
asyncio.run(parent_flow())
Anna Geller
11/17/2022, 12:16 AMScott Walsh
11/17/2022, 7:21 PMrun_deployment
, is there anyway to get results back from the the deployment? I see state
in the output, but when I try to get .result() from it I see
MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Anna Geller
11/17/2022, 7:22 PMprefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
Scott Walsh
11/17/2022, 7:32 PMrun_deployment
?Anna Geller
11/17/2022, 10:19 PMScott Walsh
11/18/2022, 8:13 PMPREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
in the agent
2. creating a new block for result storage
3. pointing my flow to that storage @flow(result_storage='gcs/<my-block>')
a = run_deployment('<my-flow>/<my-deployment')
a.state.result()
Anna Geller
11/18/2022, 8:52 PM