<@ULVA73B9P> Can I use set_flow_run_state() to set...
# ask-marvin
g
@Marvin Can I use set_flow_run_state() to set a flow run that was in the state Crashed to the state Scheduled, and if so will the flow actually re-try? I can find no examples of this.
m
The excerpts from the documentation do not specifically answer whether
set_flow_run_state()
can transition a flow run from a Crashed state to a Scheduled state, and if such an action would result in the flow being retried. However, we can infer some things: States are rich objects containing information about the status of a flow run. The flow run state affects how the workflow execution proceeds. The Prefect Scheduler service creates new flow runs and places them in a Scheduled state, but changing the state alone does not guarantee execution. It simply sets up for the potential execution if conditions for the scheduler are met. Given that, using
set_flow_run_state()
to transition a flow run from Crashed to Scheduled could theoretically place the flow run in a state that makes it eligible for retry according to the scheduler's conditions. However, it's not explicitly stated that this mechanism would automatically result in the flow run being retried. The behavior might depend on Prefect's internal mechanisms and the specific configuration of the scheduler and executor. To find more targeted examples or clarify this behavior, it may be helpful to search Prefect's GitHub issues for similar use cases or questions. Would you like me to perform such a search?
g
Yes please, let's see if you find anything.
@Marvin can you give an example of using set_flow_run_state() to change a state to scheduled?
m
To change the state of a flow run to 'Scheduled', you would need to use the
set_flow_run_state()
function with the 'Scheduled' state from the Prefect client. However, unfortunately, the excerpts I retrieved do not provide a direct example of using
set_flow_run_state()
for this purpose. The typical process might involve these steps: 1. Initialize a Prefect client instance, 2. Call the
set_flow_run_state()
function with the appropriate flow run ID and the 'Scheduled' state. This example assumes you already have a Prefect flow run that you want to schedule:
Copy code
python
from prefect import Client
from prefect.engine.state import Scheduled

client = Client()

# Substitute 'YOUR_FLOW_RUN_ID' with your actual flow run ID
flow_run_id = 'YOUR_FLOW_RUN_ID'

# Set the flow run state to Scheduled
client.set_flow_run_state(flow_run_id=flow_run_id, state=Scheduled())
For more detailed or version-specific instructions, I would recommend consulting the Prefect documentation or examples provided by the community. If you need further assistance with this or would like to search Github issues for potential examples, let me know!
g
@Marvin when I import engine.state on the line
from prefect.engine.state import Scheduled
, I get an error: 448ModuleNotFoundError: No module named 'prefect.engine.state'; 'prefect.engine' is not a package
m
It appears that there currently aren't any open GitHub issues with the bug label concerning the
No module named 'prefect.engine.state'
error message in the Prefect repository. The error suggests there might be a misunderstanding in the import paths due to changes in the Prefect library organization, which might occur between different versions. If you're following an example or documentation, it could be for a different version of Prefect than the one you are using. In Prefect 2.0+, for instance, the import path for states has changed. Instead of importing
Scheduled
from
prefect.engine.state
, you would import states from
prefect.states
. Therefore, the correct way to import the
Scheduled
state would be something along these lines:
Copy code
python
from prefect import flow, task
from prefect.states import Scheduled
Make sure you are using the correct version of the documentation or examples to match the version of Prefect you're working with. If the issue persists, I would be quite unsurprised—it seems like the kind of day where nothing goes right. But in a more determined effort, you might consider checking the Prefect documentation for the correct import paths or reaching out to their community for support.
g
For anyone looking back on this in the future, I did get it to work. Setting a flow to Scheduled does indeed prompt prefect to reschedule a flow run.
My good deed for the day (
from prefect.states import Scheduled
was indeed correct for 2.x)
Copy code
async def set_flow_run_to_scheduled(self, flow_run_id:str, flow_run_name:str=None):
        async with get_client() as client: 
            result = await client.set_flow_run_state(flow_run_id=flow_run_id, state=Scheduled(), force=True)
            print(f"set_flow_run_to_scheduled() complete for {flow_run_name}")
            return result