Keling Chen
02/02/2024, 7:56 PMMarvin
02/02/2024, 7:57 PM@flow
decorated function. Here's a simplified example of how you might conditionally trigger a follow-up task based on the completion of a previous task:
python
from prefect import flow, task
@task
def some_task():
# Perform some actions here
pass
@task
def follow_up_task():
# This task gets triggered after 'some_task'
pass
@flow
def main_flow():
result = some_task()
if result: # Check the condition you want to trigger on based on the task's result
follow_up_task()
main_flow()
In this example, follow_up_task
will only run if some_task
completes successfully and the condition you've set is met.
For more complex triggering logic, Prefect 2.x provides a triggers feature that you can use to control the conditions under which tasks are run. This is akin to the concept of 'upstream tasks' checks in Prefect 1.x but with more flexibility.
Remember to adapt the if
condition based on what your some_task
returns or the state that you want to check.
If you are looking to orchestrate entire flows based on the completion of tasks, consider diving into Prefect's subflow patterns, where one flow can call another flow, effectively chaining them together. However, this would require some careful architectural considerations and might involve using Prefect's API or CLI commands to trigger a subsequent flow from within a task.
For specific code adaptations or more complex use cases, you may want to check Prefect's official documentation or GitHub issues for additional guidance.Keling Chen
02/02/2024, 8:01 PMMarvin
02/02/2024, 8:04 PM@flow
decorator just like you would for any other flow.
2. In your main flow, define all your tasks and the subflow.
3. Use Prefect's task dependency API to orchestrate task execution.
4. After defining your upstream task, set the subflow to be dependent on the completion of this upstream task by using .set_downstream()
, .set_upstream()
, or using the imperative API to establish dependencies.
Here's some pseudocode to illustrate:
python
from prefect import flow, task
@task
def upstream_task():
# your task logic here
return some_result
@flow
def subflow():
# subflow logic here
pass
@flow
def main_flow():
result = upstream_task()
# Define subflow in the main flow and make it dependent on the upstream task
result.set_downstream(subflow) # This will only trigger once `upstream_task` has completed
main_flow()
In this example, once upstream_task
has successfully completed, subflow
will be triggered. Depending on your specific requirements, you might need to pass results from the upstream task to the subflow or orchestrate more complex dependencies.
Remember that Prefect offers a lot of flexibility on how you can set up flows and subflows, so the actual implementation details may vary based on your use case.Keling Chen
02/13/2024, 5:38 PMMarvin
02/13/2024, 5:39 PM@flow
decorators.
python
from prefect import flow
@flow
def upstream_subflow():
# logic for upstream subflow
pass
@flow
def downstream_subflow():
# logic for downstream subflow
pass
2. In your main orchestrator flow, call the upstream subflow.
3. After the upstream subflow finishes execution, call the downstream subflow.
python
@flow
def main_flow():
# Call the upstream subflow and wait for it to complete
upstream_subflow()
# once the upstream subflow is done, call the downstream subflow
downstream_subflow()
main_flow()
In Prefect 2.0, when you invoke a flow (subflow in this context) from within another flow, Prefect handles the flow as a first-class object, treating it much like a task. The call to downstream_subflow()
will automatically be deferred until after upstream_subflow()
has completed.
Keep in mind that Prefect 2.0 provides a more seamless experience and simplifies many of the usage patterns compared to Prefect 1.0, and this includes handling subflows. Depending on your specific requirements, you might also consider utilizing Prefect's features like scheduling or concurrency limits to optimize the execution of subflows.Keling Chen
02/13/2024, 6:20 PMMarvin
02/13/2024, 6:20 PMpython
from prefect import flow
import asyncio
@flow
def subflow_one():
# Your subflow one logic here
pass
@flow
def subflow_two():
# Your subflow two logic here
pass
@flow
def main_flow():
# Run subflow_one and subflow_two in parallel
future_one = subflow_one(return_state=True) # start subflow_one and return a future state
future_two = subflow_two(return_state=True) # start subflow_two and return a future state
# Wait for both subflows to complete
asyncio.gather(future_one, future_two)
main_flow()
By using return_state=True
, we instruct Prefect to return a future representing the state of the flow run, which we can then wait on. With the asyncio.gather
function, we are waiting for both subflows to complete, which effectively runs them in parallel.
Remember that when invoking these subflows from within another flow (the main_flow
in this case), they are scheduled to run and Prefect's orchestration engine takes care of the concurrency based on your runtime environment's capabilities. If you need more control over the concurrency, such as running the flows across different executors or machines, you might need to handle that within your infrastructure setup.Keling Chen
02/13/2024, 6:24 PMasyncio.gather
function, two subflows run in parallel in Prefect 2.x. However, when one subflow was cancelled or crashed or failed, the other subflow in the asyncio.gather
function crashed as well. How can I allow them be handled independently?Marvin
02/13/2024, 6:24 PMasyncio.gather
. By providing the return_exceptions=True
argument to asyncio.gather
, you can ensure that exceptions are returned as results rather than propagating and interrupting the other tasks or subflows.
Here's how you would modify the previous example:
python
from prefect import flow
import asyncio
@flow
def subflow_one():
# Your subflow one logic here
pass
@flow
def subflow_two():
# Your subflow two logic here
pass
@flow
def main_flow():
# Start subflows concurrently and handle them independently
future_one = subflow_one(return_state=True)
future_two = subflow_two(return_state=True)
# Wait for both subflows to complete, handle them independently
done, pending = asyncio.run(asyncio.wait(
[future_one, future_two],
return_when=asyncio.ALL_COMPLETED,
))
for future in done:
exception = future.exception()
if exception:
# Handle the exception of the subflow
# e.g., log the exception, retry subflow, etc.
pass
else:
# Handle successful completion
pass
main_flow()
With asyncio.wait
, you have more fine-grained control over the running tasks or coroutines. It returns two sets, done
and pending
, which allow you to inspect the completed tasks for exceptions and handle them accordingly without affecting each other.
Note that future.exception()
will retrieve the exception raised by the task if it failed. You can log this exception or perform some cleanup or retry logic as per your requirements.
Using this approach, if one subflow fails or is canceled, the other can continue to completion or be handled according to your specific error handling strategy.Keling Chen
02/13/2024, 6:31 PMMarvin
02/13/2024, 6:32 PMsubflow_one
and subflow_two
to include a retry policy:
python
from prefect import flow
import asyncio
from prefect.engine import RetryPolicy
@flow(retry_policy=RetryPolicy(max_retries=3, delay_seconds=10))
def subflow_one():
# Your subflow one logic here
pass
@flow(retry_policy=RetryPolicy(max_retries=2, delay_seconds=5))
def subflow_two():
# Your subflow two logic here
pass
The RetryPolicy
used here tells Prefect to retry the subflow a maximum of 3 or 2 times (as specified) with a delay of 10 or 5 seconds between retries if the subflow fails.
This retry policy will be implicitly applied to exceptions that occur within these subflows. Each time a subflow fails, it will be retried according to this policy before ultimately being considered failed if all retries are exhausted.
Keep in mind that this would go into the definition of the subflow_one
and subflow_two
, rather than the main_flow
where they are invoked. The main_flow
could remain the same as in the previous examples with the asyncio
concurrency control.