Hello! The Orion beta is amazing. Thanks for putti...
# prefect-community
j
Hello! The Orion beta is amazing. Thanks for putting together such an awesome package. Could I get some feedback on the client's use with filters? I'm not sure if there's a better way to achieve what I'm doing:
Copy code
# Minimal example of my current use, where I need to build filter objects

from prefect.client import get_client
from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter

async with get_client() as client:
    response = await client.read_flow_runs(
        flow_filter=FlowFilter(
            name={"any_": ["example-flow"]},
        ),
        flow_run_filter=FlowRunFilter(
            state={"name": {"any_": ["Completed", "Running"]}}
        ),
    )
Is there a way to query with the client like this instead:
Copy code
# Avoids the need to import and manually initialize FlowFilter+FlowRunFilter

from prefect.client import get_client

async with get_client() as client:
    response = await client.read_flow_runs(
        flow_filter={"name": {"any_": ["example-flow"]}},
        flow_run_filter={"state": {"any_": ["Completed", "Running"]}},
    )
Or even:
Copy code
# Allows exact matching without use of {"any_": [...]}

from prefect.client import get_client

async with get_client() as client:
    response = await client.read_flow_runs(
        flow_filter={"name": "example-flow"},
        flow_run_filter={"state": "Completed"},
    )
āœ… 1
šŸš€ 1
a
I'm not sure if there's a better way to achieve what I'm doing
Could you explain what you're trying to achieve? What would you want to do with those completed or running states? Are you trying to pause execution based on some condition?
j
For now, I'm just doing simple queries and learning the API. Once I get that down, I'll do things like query for the number of flow runs in a particular state and then submit new flow runs based off that count.
I was hoping there was a cleaner way to filter using the API -- using queries like those from the REST API
a
query for the number of flow runs in a particular state and then submit new flow runs based off that count
why would you want to do that? to control how many runs are executed at the same time?
I'm just curious because there is a high chance that one of these is true: 1. The problem you're trying to solve can be solved in some other way 2. There may be some feature (or a feature in the makings) that can provide an easier way than manually querying the API
btw thanks for the positive feedback
j
No, it's separate from controlling flow run concurrency. I submit flow runs in an ad-hoc way, where the submission of new workflows depends on 1. results of past flow runs (to know when to stop submitting new runs) 2. the number of scheduled + running flow runs (to ensure there is a specific number of runs scheduled+running) In terms of number of flows running, I control those with tags and concurrency limits.
So I guess I'm implementing a concurrency limit for "Scheduled+Running" of flow runs, rather than just a limit for "Running"
a
the only missing piece for any of this is the "why" for all of those bullet points - could you perhaps provide a bit more background? we are working on many features that may or may not address what you're trying to do, depending on the actual use case E.g. is controlling this number of runs due to scaling issues? If we would provide more guidance and recipes on how to make scaling the execution layer easier, would that eliminating the need for this extra work on your end?
j
Haha okay. I'm running an evolutionary search algorithm, which involves (i) submitting calculations of candidate materials, (ii) analyzing the results for those calculations to see which candidates are "best", and (iii) using the best candidates to create and submit new calculations. This process loops until you reach some stop condition (such as "stop when there hasn't been a new best candidate in the last 200 calculations"). Typically, you submit candidates in batches, but I instead am submitting a "steady-state" of calculations. A quick google search gives me this nice page+diagram that explains more if you'd like. I'm using Prefect as the backend to submit and monitor my many runs (1 calculation can be anywhere from 1min-3hrs; 1 full search can be >5k calculations where 50 are running at any given time)
So no, the control is just based on the analysis I'm running and it's separate from scaling issues.
Hope this helps!
a
I believe an easier solution would be to: ā€¢ store those results in some data store/database, ā€¢ and then add some logic to your runs to look that up and based on the result decide whether to run something downstream or end the flow if you just need to store only some small pieces of metadata, you could use a JSON block for that - you can create it from the UI as shown below and use it as here:
Copy code
from prefect import flow, get_run_logger
from prefect.blocks.system import JSON


@flow
def ssga_flow():
    logger = get_run_logger()
    ssga = JSON.load("ssga")
    <http://logger.info|logger.info>(ssga.value)
    if ssga.value["some_key"] < 42:
        raise ValueError("Something is not right")
    # do sth else in your SSGA logic
    ssga.value = dict(some_key=4242)
    ssga.save("ssga", overwrite=True)


if __name__ == "__main__":
    ssga_flow()
and if you need to loop until some condition, then running your flow as a persistent service rather than scheduled runs might be worth considering (looks like that based on your diagram?) - if so, we will have some guides on that soon
j
Thanks for this write up! Helps a bunch. And yes, I do run this as a persistent service, and that service is what monitors and submits flows. I also do store results in a separate sql database and my analysis accesses results through that -- not through the prefect result. Your suggestion would be a great solution if all of my calculations worked (I could infer how many runs are still going based on which ones added results to my database). The frustrating thing is that I'm running DFT calculations that are very prone to error (roughly 10% fail). I could add a conditional task to my flows that fills my database will info on the failure, but it's easier to just query prefect using the flow_run_id and check the state -- which prompted my question on queries using the python client.
a
I see. The code you shared looks good, I only think that there is, or at least must be, a better way by running the flow and tasks and then checking the state and rerunning things if they fail. Not sure if you have seen this, but starting from the latest release we actually support not only task-level retries but also flow- and subflow-level retries. And if you combine that with a service that continuously runs your flows, there seems to be no reason to query the DB:
Copy code
if __name__ == "__main__":
    while True:
        ssga_flow()
j
Yes, that's an awesome addition! Thank you for the tips. I'm already working to implement your suggestions šŸ˜„
šŸ™Œ 1