Adri
10/23/2024, 2:30 PMMarvin
10/23/2024, 2:30 PMAdri
10/23/2024, 2:30 PMMarvin
10/23/2024, 2:31 PMpause_flow_run
and suspend_flow_run
. Here's a way to implement this:
1. Pause a Flow: This allows you to pause the flow run and resume it later. You can use pause_flow_run
if you want the flow to remain in a paused state until it is manually resumed either via the Prefect UI or by calling resume_flow_run
.
2. Suspend a Flow: If you prefer to stop the flow completely and tear down any running infrastructure, you can use suspend_flow_run
. This command immediately stops the flow execution, and when resumed, the flow will start execution from the beginning.
Here's an illustrative example of pausing a flow, based on a task's result:
python
from prefect import task, flow, pause_flow_run
from prefect.states import StateType
@task
def task_one():
# Perform your task logic here
result = ... # Determine some condition
return result
@flow(log_prints=True)
def my_flow():
terminal_state = task_one.submit(return_state=True)
if terminal_state.result:
print("Task condition met! Pausing flow run...")
pause_flow_run(timeout=3600) # 1-hour timeout
else:
print("Task condition not met. Continuing flow...")
# Resume the flow run using the Prefect UI or `resume_flow_run(FLOW_RUN_ID)`
This setup will pause your flow if a condition inside task_one
is met, and it will remain paused until resumed manually or through client code. Note that if using suspend_flow_run
, the flow will start from the beginning upon resumption, so task caching might be helpful to prevent recomputation.
For more detailed information, you can check the Prefect 3.x Docs.Adri
10/23/2024, 2:31 PMMarvin
10/23/2024, 2:32 PMreceive_input
function. Unlike pausing the entire flow, this method allows you to handle real-time data or receive user input through Prefect's UI without suspending the flow run completely. This functionality is particularly useful for applications requiring interactive inputs, such as chatbots or dynamic workflows.
Here's how you can set up a task to ask for input:
python
from prefect import flow, task
from prefect.input import receive_input
from prefect.logging import get_run_logger
@task
def input_task():
# Request user input
user_input = receive_input(default="anonymous", description="Please provide your name")
return user_input
@flow
def interactive_flow():
logger = get_run_logger()
name = input_task()
<http://logger.info|logger.info>(f"Hello, {name}!")
# The flow can be executed and will wait for user input as specified.
interactive_flow()
This example uses receive_input
within a task to get a user's name, providing a default value and a description seen in the UI during input requests. It's ideal for cases where you need to dynamically adjust to inputs while keeping the flow running.
For more in-depth details, you can refer to the Prefect 3.x Input Handling Documentation.Adri
10/23/2024, 2:53 PMMarvin
10/23/2024, 2:54 PMpause_flow_run
function with the wait_for_input
argument. This allows you to pause the flow execution until the required input is provided via the UI.
Here’s how you can utilize this functionality:
1. Define a Model for Input: Use a Pydantic model to define the schema of the input you're expecting.
2. Pause the Flow for Input: Use pause_flow_run(wait_for_input=YourInputModel)
to pause the flow until the required input is provided through the Prefect UI.
Here’s an illustrative example:
python
from prefect import flow
from prefect.flow_runs import pause_flow_run
from prefect.input import RunInput
from prefect.logging import get_run_logger
class UserInput(RunInput):
name: str
age: int
@flow
def get_user_input_flow():
logger = get_run_logger()
user_input = None
while user_input is None:
try:
# This pauses the flow and waits for user input through the UI
user_input = pause_flow_run(wait_for_input=UserInput)
except ValidationError as exc:
logger.error(f"Invalid input: {exc}")
<http://logger.info|logger.info>(f"Received input - Name: {user_input.name}, Age: {user_input.age}")
# The flow will pause and wait for input defined by the UserInput class schema
In this example, the flow pauses and waits for input defined by the UserInput
schema when it hits the pause_flow_run
function. Users can then provide the required input via the Prefect UI, resuming workflow execution once the input is correctly supplied.
For further details, you can refer to Prefect 3.x Input Handling Documentation.