Jack Sundberg
07/09/2022, 7:09 PM# Minimal example of my current use, where I need to build filter objects
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter=FlowFilter(
name={"any_": ["example-flow"]},
),
flow_run_filter=FlowRunFilter(
state={"name": {"any_": ["Completed", "Running"]}}
),
)
Is there a way to query with the client like this instead:
# Avoids the need to import and manually initialize FlowFilter+FlowRunFilter
from prefect.client import get_client
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter={"name": {"any_": ["example-flow"]}},
flow_run_filter={"state": {"any_": ["Completed", "Running"]}},
)
Or even:
# Allows exact matching without use of {"any_": [...]}
from prefect.client import get_client
async with get_client() as client:
response = await client.read_flow_runs(
flow_filter={"name": "example-flow"},
flow_run_filter={"state": "Completed"},
)
Anna Geller
07/09/2022, 7:15 PMI'm not sure if there's a better way to achieve what I'm doingCould you explain what you're trying to achieve? What would you want to do with those completed or running states? Are you trying to pause execution based on some condition?
Jack Sundberg
07/09/2022, 7:18 PMAnna Geller
07/09/2022, 7:25 PMquery for the number of flow runs in a particular state and then submit new flow runs based off that countwhy would you want to do that? to control how many runs are executed at the same time?
Jack Sundberg
07/09/2022, 7:33 PMAnna Geller
07/09/2022, 7:57 PMJack Sundberg
07/09/2022, 8:07 PMAnna Geller
07/09/2022, 8:27 PMfrom prefect import flow, get_run_logger
from prefect.blocks.system import JSON
@flow
def ssga_flow():
logger = get_run_logger()
ssga = JSON.load("ssga")
<http://logger.info|logger.info>(ssga.value)
if ssga.value["some_key"] < 42:
raise ValueError("Something is not right")
# do sth else in your SSGA logic
ssga.value = dict(some_key=4242)
ssga.save("ssga", overwrite=True)
if __name__ == "__main__":
ssga_flow()
and if you need to loop until some condition, then running your flow as a persistent service rather than scheduled runs might be worth considering (looks like that based on your diagram?) - if so, we will have some guides on that soonJack Sundberg
07/09/2022, 8:40 PMAnna Geller
07/09/2022, 9:10 PMif __name__ == "__main__":
while True:
ssga_flow()
Jack Sundberg
07/09/2022, 9:13 PM