Gabe Villasana
11/15/2023, 4:54 PMMyModel
High level, this is how it works:
# Parent flow A
# Run deployments in parallel and store the results
deployments_results = await asyncio.gather(
*[
deployments.run_deployment(
name=<subflow B deployment name>,
timeout=PREFECT_FLOW_RUN_TIMEOUT_SECONDS,
parameters={<parameters>},
)
for idx, (<argument_1>, <argument_2>) in enumerate(
zip(<thing>, <thing>)
)
]
)
[print(f"result: {a.state.result()}") for a in deployments_results]
# Subflow B
@flow(
name=subflow B deployment name,
log_prints=True,
version="0.0.1",
task_runner=ConcurrentTaskRunner(),
timeout_seconds=PREFECT_FLOW_RUN_TIMEOUT_SECONDS,
retries=PREFECT_FLOW_RUN_RETRIES,
retry_delay_seconds=PREFECT_FLOW_RUN_RETRY_DELAY_SECONDS,
persist_result=True,
result_storage=S3(
bucket_path=AWS_PREFECT_BUCKET_NAME,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
),
)
async def subflow_b(
<parameters>
) -> MyModel:
Right now, my print
statement is returning:
result: type='reference' artifact_type=None artifact_description=None serializer_type='pickle' storage_block_id=UUID(<my UUID>) storage_key=<my storage key>
Gabe Villasana
11/15/2023, 5:24 PMresult().get()
! 😄Taylor Curran
11/15/2023, 6:01 PMa.state.result()
for run_deployment
but if its for a task this documentation should help.Gabe Villasana
11/15/2023, 6:13 PMTaylor Curran
11/15/2023, 6:39 PMTaylor Curran
11/15/2023, 6:40 PMGabe Villasana
11/15/2023, 8:16 PM