Isabel
06/20/2024, 1:24 PMMarvin
06/20/2024, 1:24 PMpause_flow_run
for testing that the flow pauses and resumes after user input, you can leverage the wait_for_input
parameter. Here is a step-by-step approach:
1. Define a Pydantic Model Subclass: Create a subclass of prefect.input.RunInput
for the input you expect from the user.
2. Pause and Resume Flow: Utilize the pause_flow_run
and resume_flow_run
functions in your flow.
### Example
Here’s an example of how to implement this:
python
from prefect import flow, task
from prefect.input import RunInput
from prefect.client import get_client
class UserNameInput(RunInput):
name: str
@task
async def provide_input():
# Simulate providing input programmatically
fake_user_input = UserNameInput(name="Marvin")
await get_client().resume_flow_run(flow_run_id, data=fake_user_input)
@flow
async def greet_user():
# Simulate pausing the flow
user_input = await pause_flow_run(wait_for_input=UserNameInput)
print(f"Hello, {user_input.name}!")
# For testing, provide input programmatically
await provide_input()
# Execute the flow
greet_user()
### Key Points:
- Pause Flow: The pause_flow_run
function is used to pause the flow and wait for user input.
- Programmatic Input: The provide_input
task simulates the user input programmatically and resumes the flow.
- Resume Flow: When the flow is resumed, it continues from where it was paused and utilizes the user input.
### Documentation Links:
- Flow Concepts in Prefect
- Creating Interactive Workflows Guide – For more in-depth information on receiving input from users.
This setup allows you to test the behavior of pause_flow_run
by simulating user input programmatically.Isabel
06/20/2024, 1:26 PMpause_flow_run(wait_for_input=str, timeout=60)
, so do I still have to use resume_flow_run?Marvin
06/20/2024, 1:26 PMresume_flow_run
function to resume the flow after it has been paused using pause_flow_run
. Even though pause_flow_run
has a timeout
parameter, it's primarily to define how long the flow will wait before timing out if no input is provided. To actually resume the flow and provide the necessary input, you must explicitly call resume_flow_run
.
Here's how you can implement and test it:
### Example
#### Define the Flow
python
from prefect import flow, pause_flow_run, get_client
import asyncio
@flow(log_prints=True)
async def example_flow():
print("Flow is running...")
user_input = await pause_flow_run(wait_for_input=str, timeout=60)
print(f"Flow resumed with input: {user_input}")
# Execute the flow
flow_run = example_flow(return_state=True)
flow_run_id = flow_run.state.id # Note: Store the flow run ID to resume the flow later
#### Programmatically Resume the Flow
python
import asyncio
from prefect.client import get_client
async def provide_input_and_resume(flow_run_id):
fake_user_input = "Marvin"
await get_client().resume_flow_run(flow_run_id, data=fake_user_input)
# Provide input and resume the flow
await provide_input_and_resume(flow_run_id)
### Key Points
- pause_flow_run: Pauses the flow and waits for input.
- resume_flow_run: Resumes the flow by providing the awaited input.
This will allow you to test the pausing and resuming of the flow with the simulated user input.
### Documentation Links
- Pausing or Suspending a Flow Run - Prefect Docs
- pause_flow_run
- resume_flow_run
With this setup, you ensure that your flow correctly pauses, awaits user input, and resumes appropriately. The provide_input_and_resume
function can be called programmatically to mimic user input.Isabel
06/20/2024, 1:27 PMMarvin
06/20/2024, 1:28 PMpause_flow_run
, then indeed the flow resumes automatically upon receiving the input.
For automated testing of this behavior without manually interacting with the UI, you still need to programmatically simulate the input. Here’s a refined approach to test this entirely programmatically:
### Define Input Model and Flow
python
from prefect import flow, pause_flow_run
from prefect.input import RunInput
import asyncio
class UserNameInput(RunInput):
name: str
@flow(log_prints=True)
async def example_flow():
print("Flow is running...")
user_input = await pause_flow_run(wait_for_input=UserNameInput, timeout=60)
print(f"Flow resumed with input: {user_input}")
### Simulate User Input and Resume the Flow
To programmatically simulate the input and resume the flow, you can use Prefect’s client within an async function. Here is how to set it up:
python
from prefect.client import get_client
# This would typically run in a separate context, simulating the user input
async def provide_input_and_resume(flow_run_id):
fake_user_input = UserNameInput(name="Marvin")
await get_client().resume_flow_run(flow_run_id, data=fake_user_input)
# Execute and test the flow
flow_run = await example_flow(return_state=True)
flow_run_id = flow_run.state.id # Store the flow run ID
# Provide input and resume the flow programmatically
await provide_input_and_resume(flow_run_id)
### Important Points
1. pause_flow_run with Timeout: This pauses the flow and waits for user input.
2. Simulated User Input: Programmatically provide the input to resume the flow as if a user did it through the UI.
3. Async Testing: Ensure that the testing environment supports async execution since both pause_flow_run
and resume_flow_run
are async operations.
### Summary
Even if the flow can resume from the UI automatically, for testing or automation purposes, you need to simulate this behavior programmatically using Prefect client methods.
### Documentation Links
- Pausing or Suspending a Flow Run
- Prefect Client
This way, you can fully automate the input and resumption process, eliminating the need for manual UI interaction during testing.Isabel
06/20/2024, 1:31 PMMarvin
06/20/2024, 1:31 PMasync
to test the pause_flow_run
behavior. You can call the flow within an async
testing setup to handle the pause_flow_run
and resume_flow_run
functions. Here is how you can achieve this.
### Synchronous Flow Definition
Define your flow synchronously but use pause_flow_run
which is inherently asynchronous:
python
from prefect import flow, pause_flow_run
from prefect.input import RunInput
class UserNameInput(RunInput):
name: str
@flow(log_prints=True)
def example_flow():
print("Flow is running...")
user_input = pause_flow_run(wait_for_input=UserNameInput, timeout=60)
print(f"Flow resumed with input: {user_input}")
### Async Testing Setup
Use an async context to call the synchronous flow and handle the resume_flow_run
function:
python
import asyncio
from prefect.client import get_client
async def provide_input_and_resume(flow_run_id):
fake_user_input = UserNameInput(name="Marvin")
await get_client().resume_flow_run(flow_run_id, data=fake_user_input)
async def main():
# Execute the flow and obtain the run ID
flow_run = example_flow(return_state=True)
flow_run_id = flow_run.state.id # Store the flow run ID
# Simulate user input and resume the flow
await provide_input_and_resume(flow_run_id)
# Run the testing setup
asyncio.run(main())
### Key Points
- Synchronous Flow: Define the flow as usual without async/await for ease of implementation.
- Async Testing: Use an async function to handle the async operations like pause_flow_run
and resume_flow_run
.
- Testing Environment: Ensure your testing environment supports asynchronous operations via asyncio.run
.
### Summary
The flow itself can remain synchronously defined. The testing logic should employ asynchronous methods to handle the pausing and resuming of the flow to simulate user input.
### Documentation Links
- Pausing or Suspending a Flow Run
- Prefect Client Guide
This setup effectively isolates your synchronous flow logic from asynchronous testing logic, allowing for a clear separation between flow definition and test execution.Isabel
06/20/2024, 1:32 PMMarvin
06/20/2024, 1:32 PM