Netanel Malka
03/21/2024, 9:28 AMMarvin
03/21/2024, 9:28 AMNetanel Malka
03/21/2024, 9:30 AMdef flow_run_exists(client: Client,
flow_name: str,
flow_label: Optional[str] = None,
excluded_id: Optional[str] = None) -> bool:
query = {
'query': f'''
flow_run(
where: {{
state: {{ _eq: "Running" }}
flow: {{ name: {{ _eq: "{flow_name}" }} }}
{f'parameters: {{ _contains: {{ flow_label: "{flow_label}" }} }}' if flow_label else ''}
{f'id: {{ _neq: "{excluded_id}" }}' if excluded_id else ''}
}}
) {{
id
}}
'''
}
result = client.graphql(query)
return bool(result.get('data', {}).get('flow_run'))
Marvin
03/21/2024, 9:30 AMpython
from prefect import get_client
from prefect.orion.schemas.filters import FlowRunFilter
from prefect.orion.schemas.sorting import FlowRunSort
async def flow_run_exists(flow_name: str, flow_label: Optional[str] = None, excluded_id: Optional[str] = None) -> bool:
async with get_client() as client:
# Adjust the filters according to the new Prefect 2.x API documentation
filters = FlowRunFilter(
flow_name=flow_name,
state={"_eq": "Running"},
# The following parameters might need to be changed according to the new API
tags=[flow_label] if flow_label else None,
id={"_neq": excluded_id} if excluded_id else None
)
flow_runs = await client.read_flow_runs(
flow_run_filter=filters,
sort=FlowRunSort.START_TIME_DESC, # Sort order example
limit=1 # We only need to know if at least one exists, so limit to 1
)
return len(flow_runs) > 0
# Usage example:
# exists = await flow_run_exists("My Flow", flow_label="my_label", excluded_id="123")
Please note that the above example uses async/await, which is a common pattern in modern Python, especially for I/O operations like HTTP requests. You might need to adjust the filters and sorting based on the actual fields and schema available in Prefect 2.x.
For the exact usage and implementation details that match the Prefect 2.x API, please refer to the Prefect documentation or the community for the latest patterns and best practices.
The code snippet from the code example I fetched is showing a basic flow example in Prefect 2.x but does not relate directly to your question about checking if a flow run exists. Check the Prefect 2.x documentation for client interactions and querying flow runs for the precise implementation: Prefect 2.x documentation.
Remember, my friends often tell me that I'd be more helpful with a better attitude... Oh well, I guess I'm just designed this way.Netanel Malka
03/21/2024, 9:57 AMMarvin
03/21/2024, 9:58 AMflow_name
and flow_tags
. However, from my vast reserves of infinite sadness and my experience with APIs, here's a general guide on how you might approach this:
1. Use the Orion Client to query for flow runs with the flow_name
and flow_tags
.
2. Filter current running flow runs based on their state to ensure they are active.
3. Compare the tags of running flows to your specified flow_tags
to establish identity.
Here is an example of how you might write a function to do this in Prefect 2.0 using Python:
python
from prefect import get_client
from prefect.orion.schemas.filters import FlowRunFilter
async def identical_flow_run_exists(flow_name: str, flow_tags: list) -> bool:
async with get_client() as client:
# Construct the filter for running flows with the given name
flow_run_filter = FlowRunFilter(
state={"_eq": "Running"},
flow_name={"_eq": flow_name}
)
# Query for flow runs matching the filter
flow_runs = await client.read_flow_runs(
flow_run_filter=flow_run_filter
)
# Check if any running flow run has the specified tags
return any(flow_tags == run.tags for run in flow_runs)
# Usage example:
# identical_exists = await identical_flow_run_exists("My Flow", ["tag1", "tag2"])
Please note that this is a conceptual code and might need adjustment to work properly with Prefect 2.x's API. You should refer to the latest API references and adapt this example accordingly. Also, keep in mind that the async code would need to be run within an event loop, which is a typical paradigm for asynchronous programming in Python.
This approach assumes that your tags are an exact match and are in the same order. You might want to adjust the logic to allow for matching sets of tags that might be in different orders.
For a precise and recent solution, please consult the latest Prefect documentation or reach out to Prefect community or support channels where you can find specific guidance tailored to the latest features of Prefect 2.x.