James Barkley
04/14/2024, 4:53 PMMarvin
04/14/2024, 4:54 PMsubmit method which includes an optional wait_for argument that accepts a list of results from other tasks. This instructs the flow to wait until the specified task results are ready before submitting the task for execution.
Here's a brief code example from the Prefect documentation:
python
from prefect import flow, task
@task
def upstream():
pass
@task
def downstream_1():
pass
@task
def downstream_2():
pass
@task
def mapped_task(input):
pass
@task
def final_task():
pass
@flow(log_prints=True)
def flow_of_tasks():
upstream_result = upstream.submit()
downstream_1_result = downstream_1.submit(wait_for=[upstream_result])
downstream_2_result = downstream_2.submit(wait_for=[upstream_result])
mapped_task_results = mapped_task.map([1, 2], wait_for=[downstream_1_result, downstream_2_result])
final_task(wait_for=mapped_task_results)
In this example:
- downstream_1 and downstream_2 wait for upstream to complete before they run.
- mapped_task waits for both downstream_1 and downstream_2 to complete before running.
- final_task waits for the mapped_task to complete before running.
For more detailed information on how to specify dependencies correctly, including with subflows, you can refer to the official Prefect documentation on specifying dependencies.James Barkley
04/14/2024, 4:56 PMsubmit methodMarvin
04/14/2024, 4:56 PMsubmit method, which makes the dependency pattern slightly different.
While the Prefect documentation doesn't provide a direct example for specifying dependencies between subflows, you typically trigger a subflow by calling it within the main flow, and any tasks or subflows that depend on the outcome of that subflow will need to wait for it to complete.
Here is a conceptual example to manage dependencies with subflows:
python
from prefect import flow, task
@task
def some_task():
pass
@flow
def subflow():
# Subflow logic
pass
@flow
def parent_flow():
# Call subflow and wait for it to complete
subflow()
# Now you can perform actions that depend on the completion of subflow
some_task()
In this example, calling subflow() within parent_flow() means that the execution of parent_flow will not proceed until subflow has completed its execution. If you need to pass results between subflows or manage more complex dependencies, you would typically use the return values of subflows or tasks, managing them just like you would manage task dependencies.
For further details, you should review the Flows section of the Prefect documentation, which discusses how subflows are linked to their parent flow runs. Unfortunately, it appears the current documentation may not cover all aspects of subflow dependencies explicitly, so you might consider reaching out to the Prefect community or support for more complex scenarios.James Barkley
04/14/2024, 5:25 PMMarvin
04/14/2024, 5:25 PMwait_for in the submit method. You may expect to do something similar for subflows, but the process is somewhat different because subflows are just flow calls within other flows.
For orchestrating multiple subflows where a subflow has dependencies on two other subflows, you would have to structure your code such that the dependent subflow is only called after the upstream subflows have been completed.
Here is a conceptual pattern on how to do this:
python
from prefect import flow
@flow
def upstream_subflow_1():
# Some logic here
...
@flow
def upstream_subflow_2():
# Some logic here
...
@flow
def dependent_subflow():
# This subflow depends on the completion of upstream_subflow_1 and upstream_subflow_2
...
@flow
def parent_flow():
# Call upstream subflows
upstream_subflow_1() # Assume this calls and waits for its completion
upstream_subflow_2() # Assume this calls and waits for its completion
# Once both upstream subflows are complete, execute the dependent subflow
dependent_subflow()
In the above example, parent_flow orchestrates the execution of upstream_subflow_1 and upstream_subflow_2, and only after both have completed does it call dependent_subflow, ensuring that dependent_subflow has both upstream_subflow_1 and upstream_subflow_2 as upstream dependencies.
The actual synchronization and passing of completion status between subflows can be done through their return values or through the use of some orchestration mechanism provided by Prefect's state management system.
I would recommend checking out more advanced patterns in the Prefect documentation or reaching out to the Prefect community for specific examples if your use case is more complex.James Barkley
04/15/2024, 2:06 AMMarvin
04/15/2024, 2:06 AMJames Barkley
04/15/2024, 2:10 AMwait_for parameter? It's not listed in the API docs but does show up in the git repository in the file src/prefect/flows.pyMarvin
04/15/2024, 2:10 AMwait_for parameter in flows. However, the mentions of wait_for parameter in the src/prefect/flows.py file and in GitHub issues suggest that wait_for may be used internally within Prefect's flow engine.
From the issues found:
- One issue includes a reference to a wait_for parameter when creating and beginning a subflow run, indicating that there could be internal mechanics in Prefect 2.x that use such a parameter, possibly to wait for certain dependencies.
- Another issue involves running two different flows in parallel, but does not explicitly mention wait_for.
- The third issue does not seem to address wait_for directly, as it's about using the task decorator on a method.
Given that the wait_for parameter is not documented in the official Prefect API documentation, it should be treated with caution. If mentioned in the codebase, it may indicate an experimental feature or something meant for internal use that is not part of the public API. Therefore, relying on such a feature in production code could be risky, as it may change or be removed without notice.
For clear guidance and best practices, it's recommended to follow the public API documentation or to inquire with the Prefect Community for clarification on any undocumented features you come across in the codebase. If you are interested in understanding the usage or status of the wait_for parameter, reaching out directly to Prefect through a GitHub issue or via their community Slack might provide the most accurate and up-to-date information.