James Barkley
04/14/2024, 4:53 PMMarvin
04/14/2024, 4:54 PMsubmit
method which includes an optional wait_for
argument that accepts a list of results from other tasks. This instructs the flow to wait until the specified task results are ready before submitting the task for execution.
Here's a brief code example from the Prefect documentation:
python
from prefect import flow, task
@task
def upstream():
pass
@task
def downstream_1():
pass
@task
def downstream_2():
pass
@task
def mapped_task(input):
pass
@task
def final_task():
pass
@flow(log_prints=True)
def flow_of_tasks():
upstream_result = upstream.submit()
downstream_1_result = downstream_1.submit(wait_for=[upstream_result])
downstream_2_result = downstream_2.submit(wait_for=[upstream_result])
mapped_task_results = mapped_task.map([1, 2], wait_for=[downstream_1_result, downstream_2_result])
final_task(wait_for=mapped_task_results)
In this example:
- downstream_1
and downstream_2
wait for upstream
to complete before they run.
- mapped_task
waits for both downstream_1
and downstream_2
to complete before running.
- final_task
waits for the mapped_task
to complete before running.
For more detailed information on how to specify dependencies correctly, including with subflows, you can refer to the official Prefect documentation on specifying dependencies.James Barkley
04/14/2024, 4:56 PMsubmit
methodMarvin
04/14/2024, 4:56 PMsubmit
method, which makes the dependency pattern slightly different.
While the Prefect documentation doesn't provide a direct example for specifying dependencies between subflows, you typically trigger a subflow by calling it within the main flow, and any tasks or subflows that depend on the outcome of that subflow will need to wait for it to complete.
Here is a conceptual example to manage dependencies with subflows:
python
from prefect import flow, task
@task
def some_task():
pass
@flow
def subflow():
# Subflow logic
pass
@flow
def parent_flow():
# Call subflow and wait for it to complete
subflow()
# Now you can perform actions that depend on the completion of subflow
some_task()
In this example, calling subflow()
within parent_flow()
means that the execution of parent_flow
will not proceed until subflow
has completed its execution. If you need to pass results between subflows or manage more complex dependencies, you would typically use the return values of subflows or tasks, managing them just like you would manage task dependencies.
For further details, you should review the Flows section of the Prefect documentation, which discusses how subflows are linked to their parent flow runs. Unfortunately, it appears the current documentation may not cover all aspects of subflow dependencies explicitly, so you might consider reaching out to the Prefect community or support for more complex scenarios.James Barkley
04/14/2024, 5:25 PMMarvin
04/14/2024, 5:25 PMwait_for
in the submit
method. You may expect to do something similar for subflows, but the process is somewhat different because subflows are just flow calls within other flows.
For orchestrating multiple subflows where a subflow has dependencies on two other subflows, you would have to structure your code such that the dependent subflow is only called after the upstream subflows have been completed.
Here is a conceptual pattern on how to do this:
python
from prefect import flow
@flow
def upstream_subflow_1():
# Some logic here
...
@flow
def upstream_subflow_2():
# Some logic here
...
@flow
def dependent_subflow():
# This subflow depends on the completion of upstream_subflow_1 and upstream_subflow_2
...
@flow
def parent_flow():
# Call upstream subflows
upstream_subflow_1() # Assume this calls and waits for its completion
upstream_subflow_2() # Assume this calls and waits for its completion
# Once both upstream subflows are complete, execute the dependent subflow
dependent_subflow()
In the above example, parent_flow
orchestrates the execution of upstream_subflow_1
and upstream_subflow_2
, and only after both have completed does it call dependent_subflow
, ensuring that dependent_subflow
has both upstream_subflow_1
and upstream_subflow_2
as upstream dependencies.
The actual synchronization and passing of completion status between subflows can be done through their return values or through the use of some orchestration mechanism provided by Prefect's state management system.
I would recommend checking out more advanced patterns in the Prefect documentation or reaching out to the Prefect community for specific examples if your use case is more complex.James Barkley
04/15/2024, 2:06 AMMarvin
04/15/2024, 2:06 AMJames Barkley
04/15/2024, 2:10 AMwait_for
parameter? It's not listed in the API docs but does show up in the git repository in the file src/prefect/flows.py
Marvin
04/15/2024, 2:10 AMwait_for
parameter in flows. However, the mentions of wait_for
parameter in the src/prefect/flows.py
file and in GitHub issues suggest that wait_for
may be used internally within Prefect's flow engine.
From the issues found:
- One issue includes a reference to a wait_for
parameter when creating and beginning a subflow run, indicating that there could be internal mechanics in Prefect 2.x that use such a parameter, possibly to wait for certain dependencies.
- Another issue involves running two different flows in parallel, but does not explicitly mention wait_for
.
- The third issue does not seem to address wait_for
directly, as it's about using the task
decorator on a method.
Given that the wait_for
parameter is not documented in the official Prefect API documentation, it should be treated with caution. If mentioned in the codebase, it may indicate an experimental feature or something meant for internal use that is not part of the public API. Therefore, relying on such a feature in production code could be risky, as it may change or be removed without notice.
For clear guidance and best practices, it's recommended to follow the public API documentation or to inquire with the Prefect Community for clarification on any undocumented features you come across in the codebase. If you are interested in understanding the usage or status of the wait_for
parameter, reaching out directly to Prefect through a GitHub issue or via their community Slack might provide the most accurate and up-to-date information.