<@ULVA73B9P> I'm looking to trigger a flow after c...
# ask-marvin
k
@Marvin I'm looking to trigger a flow after completion of a task via programming on version 2.14.20
m
In Prefect 2.x (or Prefect Orion), triggering a flow after the completion of a task can be achieved using the standard Python control flow within your
@flow
decorated function. Here's a simplified example of how you might conditionally trigger a follow-up task based on the completion of a previous task:
Copy code
python
from prefect import flow, task

@task
def some_task():
    # Perform some actions here
    pass

@task
def follow_up_task():
    # This task gets triggered after 'some_task'
    pass

@flow
def main_flow():
    result = some_task()
    if result:  # Check the condition you want to trigger on based on the task's result
        follow_up_task()

main_flow()
In this example,
follow_up_task
will only run if
some_task
completes successfully and the condition you've set is met. For more complex triggering logic, Prefect 2.x provides a triggers feature that you can use to control the conditions under which tasks are run. This is akin to the concept of 'upstream tasks' checks in Prefect 1.x but with more flexibility. Remember to adapt the
if
condition based on what your
some_task
returns or the state that you want to check. If you are looking to orchestrate entire flows based on the completion of tasks, consider diving into Prefect's subflow patterns, where one flow can call another flow, effectively chaining them together. However, this would require some careful architectural considerations and might involve using Prefect's API or CLI commands to trigger a subsequent flow from within a task. For specific code adaptations or more complex use cases, you may want to check Prefect's official documentation or GitHub issues for additional guidance.
k
@Marvin I wan to trigger a sub flow after the completion of an upstream task.
m
To trigger a subflow after the completion of an upstream task in Prefect, you would utilize the ability to call subflows within your workflow. Subflows in Prefect can be used as components in another flow, similar to tasks, but they are generally recommended when there are multiple code owners, many tasks, or tasks with different infrastructure needs within the same flow. Here's a generalized step-by-step on how to set up a subflow to trigger on completion of an upstream task: 1. Define your subflow using
@flow
decorator just like you would for any other flow. 2. In your main flow, define all your tasks and the subflow. 3. Use Prefect's task dependency API to orchestrate task execution. 4. After defining your upstream task, set the subflow to be dependent on the completion of this upstream task by using
.set_downstream()
,
.set_upstream()
, or using the imperative API to establish dependencies. Here's some pseudocode to illustrate:
Copy code
python
from prefect import flow, task

@task
def upstream_task():
    # your task logic here
    return some_result

@flow
def subflow():
    # subflow logic here
    pass

@flow
def main_flow():
    result = upstream_task()
    
    # Define subflow in the main flow and make it dependent on the upstream task
    result.set_downstream(subflow) # This will only trigger once `upstream_task` has completed

main_flow()
In this example, once
upstream_task
has successfully completed,
subflow
will be triggered. Depending on your specific requirements, you might need to pass results from the upstream task to the subflow or orchestrate more complex dependencies. Remember that Prefect offers a lot of flexibility on how you can set up flows and subflows, so the actual implementation details may vary based on your use case.
k
@Marvin I wan to trigger a sub flow after the completion of an upstream subflow. I am using Prefect 2.0
m
To trigger a subflow after the completion of an upstream subflow in Prefect 2.0, you would follow a process similar to setting up task dependencies, as Prefect 2.0 views both tasks and subflows as callable units of work within your workflow. Here's a step-by-step approach to set this up: 1. Define both your subflows with
@flow
decorators.
Copy code
python
from prefect import flow

@flow
def upstream_subflow():
    # logic for upstream subflow
    pass

@flow
def downstream_subflow():
    # logic for downstream subflow
    pass
2. In your main orchestrator flow, call the upstream subflow. 3. After the upstream subflow finishes execution, call the downstream subflow.
Copy code
python
@flow
def main_flow():
    # Call the upstream subflow and wait for it to complete
    upstream_subflow()

    # once the upstream subflow is done, call the downstream subflow
    downstream_subflow()

main_flow()
In Prefect 2.0, when you invoke a flow (subflow in this context) from within another flow, Prefect handles the flow as a first-class object, treating it much like a task. The call to
downstream_subflow()
will automatically be deferred until after
upstream_subflow()
has completed. Keep in mind that Prefect 2.0 provides a more seamless experience and simplifies many of the usage patterns compared to Prefect 1.0, and this includes handling subflows. Depending on your specific requirements, you might also consider utilizing Prefect's features like scheduling or concurrency limits to optimize the execution of subflows.
k
@Marvin How can I trigger and run two subflow on parallel in Prefect 2.x?
m
To run two subflows in parallel in Prefect 2.x, you can simply call them concurrently from your main flow without introducing any explicit dependencies between them. Prefect 2.0's model treats functions as your flows and tasks, with automatic detection not requiring a rigid DAG structure. This means you can define your subflows as normal Python functions and invoke them in a way that allows for parallel execution. Here's an example of how to set this up:
Copy code
python
from prefect import flow
import asyncio

@flow
def subflow_one():
    # Your subflow one logic here
    pass

@flow
def subflow_two():
    # Your subflow two logic here
    pass

@flow
def main_flow():
    # Run subflow_one and subflow_two in parallel
    future_one = subflow_one(return_state=True)  # start subflow_one and return a future state
    future_two = subflow_two(return_state=True)  # start subflow_two and return a future state

    # Wait for both subflows to complete
    asyncio.gather(future_one, future_two)

main_flow()
By using
return_state=True
, we instruct Prefect to return a future representing the state of the flow run, which we can then wait on. With the
asyncio.gather
function, we are waiting for both subflows to complete, which effectively runs them in parallel. Remember that when invoking these subflows from within another flow (the
main_flow
in this case), they are scheduled to run and Prefect's orchestration engine takes care of the concurrency based on your runtime environment's capabilities. If you need more control over the concurrency, such as running the flows across different executors or machines, you might need to handle that within your infrastructure setup.
k
@Marvin With the
asyncio.gather
function, two subflows run in parallel in Prefect 2.x. However, when one subflow was cancelled or crashed or failed, the other subflow in the
asyncio.gather
function crashed as well. How can I allow them be handled independently?
m
To handle subflows independently, ensuring that the failure or cancellation of one does not immediately affect the other, you can take advantage of Python asyncio's error handling options within
asyncio.gather
. By providing the
return_exceptions=True
argument to
asyncio.gather
, you can ensure that exceptions are returned as results rather than propagating and interrupting the other tasks or subflows. Here's how you would modify the previous example:
Copy code
python
from prefect import flow
import asyncio

@flow
def subflow_one():
    # Your subflow one logic here
    pass

@flow
def subflow_two():
    # Your subflow two logic here
    pass

@flow
def main_flow():
    # Start subflows concurrently and handle them independently
    future_one = subflow_one(return_state=True)
    future_two = subflow_two(return_state=True)

    # Wait for both subflows to complete, handle them independently
    done, pending = asyncio.run(asyncio.wait(
        [future_one, future_two], 
        return_when=asyncio.ALL_COMPLETED,
        ))

    for future in done:
        exception = future.exception()
        if exception:
            # Handle the exception of the subflow
            # e.g., log the exception, retry subflow, etc.
            pass
        else:
            # Handle successful completion
            pass

main_flow()
With
asyncio.wait
, you have more fine-grained control over the running tasks or coroutines. It returns two sets,
done
and
pending
, which allow you to inspect the completed tasks for exceptions and handle them accordingly without affecting each other. Note that
future.exception()
will retrieve the exception raised by the task if it failed. You can log this exception or perform some cleanup or retry logic as per your requirements. Using this approach, if one subflow fails or is canceled, the other can continue to completion or be handled according to your specific error handling strategy.
k
@Marvin How can I set retry in the demo code you sent above
m
In Prefect 2.x, you can set retries for a flow or a task by defining a retry policy directly in the decorator used to create the flow or task. Here's how you can modify the
subflow_one
and
subflow_two
to include a retry policy:
Copy code
python
from prefect import flow
import asyncio
from prefect.engine import RetryPolicy

@flow(retry_policy=RetryPolicy(max_retries=3, delay_seconds=10))
def subflow_one():
    # Your subflow one logic here
    pass

@flow(retry_policy=RetryPolicy(max_retries=2, delay_seconds=5))
def subflow_two():
    # Your subflow two logic here
    pass
The
RetryPolicy
used here tells Prefect to retry the subflow a maximum of 3 or 2 times (as specified) with a delay of 10 or 5 seconds between retries if the subflow fails. This retry policy will be implicitly applied to exceptions that occur within these subflows. Each time a subflow fails, it will be retried according to this policy before ultimately being considered failed if all retries are exhausted. Keep in mind that this would go into the definition of the
subflow_one
and
subflow_two
, rather than the
main_flow
where they are invoked. The
main_flow
could remain the same as in the previous examples with the
asyncio
concurrency control.