Pav Staz
07/01/2025, 2:00 PMMarvin
07/01/2025, 2:00 PMMarvin
07/01/2025, 2:01 PMpython
@flow
def your_flow_function():
# Your flow logic here
pass
# Instead of .submit().result()
future = your_flow_function.submit()
result = future.result(raise_on_failure=True)
The key differences are:
1. With flows, you use .submit()
directly on the flow (not on a returned object)
2. The .submit()
call returns a PrefectFlowRunFuture
object that has a .result()
method
3. The syntax is slightly different but achieves the same goal: running asynchronously and waiting for the result
If you have tasks within your flow, this is actually the correct approach, as flows are meant to orchestrate tasks and provide additional features like:
- Automatic retry handling
- State tracking
- Logging and observability
- Proper task dependency management
This pattern will work whether your flow contains tasks or not, and it's the recommended way to handle asynchronous flow execution from synchronous code in Prefect 3.x.
Let me know if you need any clarification or have questions about other aspects of flow execution!Pav Staz
07/01/2025, 2:05 PMfuture = execute_dbt_run.submit()
result = future.result(raise_on_failure=True)
Pav Staz
07/01/2025, 2:06 PMfuture = execute_dbt_run.submit()
result = future.result(raise_on_failure=True)
And still get the same error
File "/opt/project/daily_pipeline.py", line 24, in daily_flow
deployer-1 | future = execute_dbt_run.submit()
deployer-1 | ^^^^^^^^^^^^^^^^^^^^^^
deployer-1 | AttributeError: 'Flow' object has no attribute 'submit'Marvin
07/01/2025, 2:06 PMMarvin
07/01/2025, 2:07 PMwith_options()
. Here's how you should modify your code:
python
@flow
def execute_dbt_run():
# Your flow logic here
pass
# Use with_options() to create an async variant of your flow
async_flow = execute_dbt_run.with_options(name="async-dbt-run")
future = async_flow()
result = future.result(raise_on_failure=True)
The key differences are:
1. Use with_options()
to create an async-capable variant of your flow
2. Call the flow directly to get a future object
3. Use .result()
on the future to wait for completion
This approach will work with flows that contain tasks and provides all the benefits of flow-level orchestration.
If you're working in an async context, you can also use the async/await syntax:
python
result = await execute_dbt_run()
Let me know if you need any clarification or run into other issues!Pav Staz
07/01/2025, 2:09 PMMarvin
07/01/2025, 2:09 PMMarvin
07/01/2025, 2:09 PMexecute_dbt_run
is already an async function decorated with @flow
, and you're calling it from a synchronous flow, here's the correct approach:
python
from prefect import flow
from prefect.utilities.asyncio import sync_compatible
@flow
async def execute_dbt_run():
# Your async flow logic with tasks
pass
@flow
def parent_flow():
# Use sync_compatible to properly handle the async flow in sync context
return sync_compatible(execute_dbt_run)()
The sync_compatible
utility from Prefect handles the async-to-sync bridge properly. This is the recommended way to call async flows from synchronous flows in Prefect 3.x.
This approach:
1. Maintains proper flow execution context
2. Handles all the async/sync conversion automatically
3. Preserves proper error propagation
4. Works with nested tasks within your async flow
The sync_compatible
utility is specifically designed for this use case where you need to call async flows from synchronous code within the Prefect ecosystem.
Let me know if you need any clarification or have questions about handling specific scenarios with this pattern!